etcd 教程

1. 核心概念

etcd 是一个开源的分布式键值存储系统,由 CoreOS 公司开发,现在是 CNCF(云原生计算基金会)的毕业项目。它专为分布式系统设计,提供了强一致性、高可用性、可靠性和安全性的键值存储服务。

1.1 主要特点

  • 强一致性:基于 Raft 共识算法,确保数据的一致性
  • 高可用性:支持集群部署,自动故障转移
  • 可靠性:数据持久化存储,支持快照和 WAL 日志
  • 安全性:支持 TLS 加密和身份认证
  • 简单易用:提供 HTTP/gRPC API 和命令行工具
  • 服务发现:内置服务发现机制
  • 配置管理:适合存储配置信息
  • 分布式锁:支持分布式锁和领导者选举

1.2 架构组件

etcd 架构由以下核心组件组成:

  • etcd server:etcd 服务器,处理客户端请求
  • Raft:共识算法,确保数据一致性
  • WAL:预写式日志,记录所有操作
  • Snapshot:数据快照,用于加速恢复
  • MVCC:多版本并发控制,支持事务和历史版本查询
  • API:HTTP/gRPC API,用于与 etcd 交互
  • Client:etcd 客户端库,支持多种编程语言

1.3 核心概念

  • Cluster:etcd 集群,由多个 etcd 服务器组成
  • Member:etcd 集群中的单个服务器节点
  • Leader:Raft 集群中的领导者节点,负责处理所有写请求
  • Follower:Raft 集群中的跟随者节点,复制领导者的日志
  • Candidate:选举过程中的候选节点
  • Term:Raft 选举的任期
  • Index:日志条目的索引
  • Key-Value:etcd 中存储的数据单元
  • Lease:键值对的租约,用于自动过期
  • Watch:监视键值变化的机制
  • Transaction:事务操作,支持原子性的多键操作

2. 安装配置

2.1 安装 etcd

2.1.1 二进制安装

# 下载 etcd
curl -L https://github.com/etcd-io/etcd/releases/download/v3.5.13/etcd-v3.5.13-linux-amd64.tar.gz -o etcd-v3.5.13-linux-amd64.tar.gz

# 解压
tar xzvf etcd-v3.5.13-linux-amd64.tar.gz

# 移动到 PATH 目录
cd etcd-v3.5.13-linux-amd64
sudo mv etcd etcdctl /usr/local/bin/

# 验证安装
etcd --version
etcdctl version

2.1.2 Docker 安装

# 运行单个 etcd 容器
docker run -d --name etcd -p 2379:2379 -p 2380:2380 \
  -e ETCD_NAME=etcd0 \
  -e ETCD_DATA_DIR=/etcd-data \
  -e ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380 \
  -e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
  -e ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 \
  -e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
  -e ETCD_INITIAL_CLUSTER=etcd0=http://0.0.0.0:2380 \
  -e ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-1 \
  -e ETCD_INITIAL_CLUSTER_STATE=new \
  gcr.io/etcd-development/etcd:v3.5.13

# 验证安装
docker exec -it etcd etcd --version
docker exec -it etcd etcdctl version

2.1.3 Kubernetes 安装

使用 Helm 安装 etcd 到 Kubernetes 集群:

# 添加 Helm 仓库
helm repo add bitnami https://charts.bitnami.com/bitnami

# 更新仓库
helm repo update

# 安装 etcd
helm install etcd bitnami/etcd --set replicaCount=3

# 验证安装
kubectl get pods
kubectl exec -it etcd-0 -- etcd --version

2.2 配置 etcd

2.2.1 单节点配置

创建 etcd 配置文件 etcd.conf.yml

# 基本配置
name: etcd0
data-dir: /var/lib/etcd

# 监听地址
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://localhost:2379

# 集群配置
listen-peer-urls: http://0.0.0.0:2380
initial-advertise-peer-urls: http://localhost:2380
initial-cluster: etcd0=http://localhost:2380
initial-cluster-token: etcd-cluster-1
initial-cluster-state: new

# 安全配置
trusted-ca-file: /etc/etcd/ssl/ca.pem
cert-file: /etc/etcd/ssl/server.pem
key-file: /etc/etcd/ssl/server-key.pem
peer-trusted-ca-file: /etc/etcd/ssl/ca.pem
peer-cert-file: /etc/etcd/ssl/peer.pem
peer-key-file: /etc/etcd/ssl/peer-key.pem

# 日志配置
debug: false
log-level: info
# 使用配置文件启动 etcd
etcd --config-file=etcd.conf.yml

2.2.2 集群配置

在三个不同的服务器上分别启动 etcd 节点:

节点 1

etcd --name etcd0 \
  --data-dir /var/lib/etcd \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://192.168.1.100:2379 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --initial-advertise-peer-urls http://192.168.1.100:2380 \
  --initial-cluster etcd0=http://192.168.1.100:2380,etcd1=http://192.168.1.101:2380,etcd2=http://192.168.1.102:2380 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster-state new

节点 2

etcd --name etcd1 \
  --data-dir /var/lib/etcd \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://192.168.1.101:2379 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --initial-advertise-peer-urls http://192.168.1.101:2380 \
  --initial-cluster etcd0=http://192.168.1.100:2380,etcd1=http://192.168.1.101:2380,etcd2=http://192.168.1.102:2380 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster-state new

节点 3

etcd --name etcd2 \
  --data-dir /var/lib/etcd \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://192.168.1.102:2379 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --initial-advertise-peer-urls http://192.168.1.102:2380 \
  --initial-cluster etcd0=http://192.168.1.100:2380,etcd1=http://192.168.1.101:2380,etcd2=http://192.168.1.102:2380 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster-state new

3. 基本使用

3.1 键值操作

3.1.1 设置键值

# 设置单个键值
etcdctl put /message "Hello, etcd!"

# 设置带租约的键值
etcdctl lease create 60 # 创建 60 秒的租约
ETCDCTL_API=3 etcdctl put /temp/key "temporary value" --lease=<lease-id>

# 设置带前缀的键值
etcdctl put /config/database/host "db.example.com"
etcdctl put /config/database/port "5432"
etcdctl put /config/redis/host "redis.example.com"
etcdctl put /config/redis/port "6379"

3.1.2 获取键值

# 获取单个键值
etcdctl get /message

# 获取带前缀的所有键值
etcdctl get --prefix /config

# 获取键值的详细信息
etcdctl get --print-value-only /message

# 获取键值的历史版本
etcdctl get --rev=2 /message

3.1.3 删除键值

# 删除单个键值
etcdctl del /message

# 删除带前缀的所有键值
etcdctl del --prefix /config

# 删除指定范围的键值
etcdctl del /config/a /config/z

3.2 集群管理

3.2.1 查看集群状态

# 查看集群成员
etcdctl member list

# 查看集群健康状态
etcdctl endpoint health

# 查看集群状态
etcdctl endpoint status

# 查看领导者信息
etcdctl endpoint status --write-out=json | jq '.[] | select(.Status.leader == true)'

3.2.2 添加集群成员

# 添加新成员
etcdctl member add etcd3 --peer-urls=http://192.168.1.103:2380

# 在新服务器上启动 etcd 实例
etcd --name etcd3 \
  --data-dir /var/lib/etcd \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://192.168.1.103:2379 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --initial-advertise-peer-urls http://192.168.1.103:2380 \
  --initial-cluster etcd0=http://192.168.1.100:2380,etcd1=http://192.168.1.101:2380,etcd2=http://192.168.1.102:2380,etcd3=http://192.168.1.103:2380 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster-state existing

3.2.3 移除集群成员

# 移除成员
etcdctl member remove <member-id>

3.3 监视操作

3.3.1 监视键值变化

# 监视单个键的变化
etcdctl watch /message

# 监视带前缀的所有键的变化
etcdctl watch --prefix /config

# 监视指定范围的键的变化
etcdctl watch /config/a /config/z

# 从指定版本开始监视
etcdctl watch --rev=5 /message

3.3.2 监视操作示例

在一个终端中启动监视:

etcdctl watch /message

在另一个终端中修改键值:

etcdctl put /message "Hello, etcd!"
etcdctl put /message "Hello, world!"
etcdctl del /message

3.4 事务操作

3.4.1 基本事务

# 执行事务
etcdctl txn --interactive
# 输入以下内容:
# cmp /message = "Hello, etcd!"
# then
# put /message "Hello, world!"
# else
# put /message "Hello, etcd!"
# end

3.4.2 原子性操作

# 原子性地设置多个键值
etcdctl txn <<EOF
cmp /lock = ""
then
put /lock "locked"
put /status "updating"
else
put /status "already locked"
end
EOF

4. 高级特性

4.1 分布式锁

4.1.1 使用 etcd 实现分布式锁

# 创建分布式锁
etcdctl lock mylock

# 使用租约创建分布式锁
etcdctl lease create 60 > lease.txt
LEASE_ID=$(grep -o '[0-9]*' lease.txt)
etcdctl put --lease=$LEASE_ID /locks/mylock "locked"

# 释放分布式锁
etcdctl del /locks/mylock

4.1.2 领导者选举

# 实现领导者选举
etcdctl elect myelection leader1

# 监听领导者变化
etcdctl elect --watch myelection

4.2 租约管理

4.2.1 创建和管理租约

# 创建租约
etcdctl lease create 60

# 查看租约信息
etcdctl lease timetolive <lease-id>

# 续约
etcdctl lease keep-alive <lease-id>

# 撤销租约
etcdctl lease revoke <lease-id>

4.2.2 自动过期键值

# 创建带租约的键值,60 秒后自动过期
LEASE_ID=$(etcdctl lease create 60 | grep -o '[0-9]*')
etcdctl put /temp/key "temporary value" --lease=$LEASE_ID

# 验证键值是否存在
etcdctl get /temp/key

# 60 秒后再次验证
sleep 61
etcdctl get /temp/key

4.3 快照和备份

4.3.1 创建快照

# 创建快照
etcdctl snapshot save snapshot.db

# 查看快照信息
etcdctl snapshot status snapshot.db

4.3.2 从快照恢复

# 停止 etcd 服务
systemctl stop etcd

# 清空数据目录
rm -rf /var/lib/etcd/*

# 从快照恢复
etcdctl snapshot restore snapshot.db \
  --data-dir=/var/lib/etcd \
  --name=etcd0 \
  --initial-cluster=etcd0=http://192.168.1.100:2380 \
  --initial-cluster-token=etcd-cluster-1 \
  --initial-advertise-peer-urls=http://192.168.1.100:2380

# 启动 etcd 服务
systemctl start etcd

4.4 安全配置

4.4.1 启用 TLS 加密

  1. 生成证书
# 生成 CA 证书
cfssl gencert -initca ca-csr.json | cfssljson -bare ca

# 生成服务器证书
cfssl gencert \
  -ca=ca.pem \
  -ca-key=ca-key.pem \
  -config=ca-config.json \
  -profile=server \
  server-csr.json | cfssljson -bare server

# 生成客户端证书
cfssl gencert \
  -ca=ca.pem \
  -ca-key=ca-key.pem \
  -config=ca-config.json \
  -profile=client \
  client-csr.json | cfssljson -bare client
  1. 配置 etcd 使用 TLS
etcd --name etcd0 \
  --data-dir /var/lib/etcd \
  --listen-client-urls https://0.0.0.0:2379 \
  --advertise-client-urls https://192.168.1.100:2379 \
  --listen-peer-urls https://0.0.0.0:2380 \
  --initial-advertise-peer-urls https://192.168.1.100:2380 \
  --initial-cluster etcd0=https://192.168.1.100:2380 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster-state new \
  --trusted-ca-file=/etc/etcd/ssl/ca.pem \
  --cert-file=/etc/etcd/ssl/server.pem \
  --key-file=/etc/etcd/ssl/server-key.pem \
  --peer-trusted-ca-file=/etc/etcd/ssl/ca.pem \
  --peer-cert-file=/etc/etcd/ssl/peer.pem \
  --peer-key-file=/etc/etcd/ssl/peer-key.pem
  1. 使用 TLS 连接 etcd
etcdctl --cacert=/etc/etcd/ssl/ca.pem \
  --cert=/etc/etcd/ssl/client.pem \
  --key=/etc/etcd/ssl/client-key.pem \
  --endpoints=https://192.168.1.100:2379 \
  get /message

4.5 配额管理

4.5.1 配置配额

# 查看当前配额
etcdctl --endpoints=http://localhost:2379 get /registry/quota/max-disk

# 设置磁盘配额(单位:字节)
etcdctl --endpoints=http://localhost:2379 put /registry/quota/max-disk "8589934592" # 8GB

# 查看配额使用情况
etcdctl --endpoints=http://localhost:2379 get /registry/quota/used-disk

4.5.2 处理配额超限

# 压缩历史版本
etcdctl compact 1000

# 整理碎片
etcdctl defrag

# 清理过期键值
etcdctl del --prefix /temp

5. 最佳实践

5.1 部署最佳实践

  • 使用奇数个节点:推荐 3 或 5 个节点,确保高可用性
  • 分布部署:将节点部署在不同的可用区或物理机器上
  • 配置适当的资源:为 etcd 节点分配足够的 CPU、内存和磁盘资源
  • 使用 SSD 存储:提高 WAL 日志和快照的读写性能
  • 配置备份策略:定期创建快照,防止数据丢失
  • 启用 TLS 加密:保护节点间和客户端通信
  • 配置防火墙:限制 etcd 端口的访问,只允许必要的网络流量

5.2 性能优化

  • 调整 Raft 配置:根据集群规模调整选举超时和心跳间隔
  • 调整 WAL 配置:设置适当的 WAL 缓冲区大小
  • 启用压缩:定期压缩历史版本,减少存储空间
  • 使用批量操作:对于大量键值操作,使用批量 API 减少网络开销
  • 优化客户端请求:减少频繁的小请求,合并为批量请求
  • 使用 gRPC API:对于性能要求高的场景,使用 gRPC API 代替 HTTP API
  • 调整并发连接数:根据服务器性能调整最大并发连接数

5.3 安全最佳实践

  • 启用 TLS:启用并正确配置 TLS,保护所有通信
  • 使用客户端证书:要求客户端使用证书进行身份认证
  • 配置访问控制:使用 etcd 的 RBAC 功能,限制对 API 的访问
  • 使用最小权限原则:为不同的客户端分配最小必要的权限
  • 定期轮换证书:确保证书及时轮换,避免过期
  • 监控安全事件:设置安全相关的监控和告警

5.4 操作最佳实践

  • 使用前缀组织键值:为不同的服务和环境使用不同的前缀,如 services/web/production
  • 使用租约管理临时数据:对于临时数据,使用租约自动过期
  • 实现优雅的键值管理:定期清理不再需要的键值
  • 监控集群健康:设置集群健康状态的监控和告警
  • 制定灾难恢复计划:准备好应对集群故障的恢复方案
  • 测试故障转移:定期测试集群的故障转移能力

5.5 应用集成最佳实践

  • 使用客户端库:使用官方或社区维护的客户端库,而非直接调用 API
  • 实现重试机制:处理网络故障和节点不可用的情况
  • 使用连接池:复用客户端连接,减少连接建立开销
  • 监控客户端性能:监控客户端的请求延迟和错误率
  • 实现配置缓存:缓存配置信息,减少对 etcd 的频繁访问
  • 使用 watch 机制:利用 watch 机制及时响应配置变更

6. 实用案例

6.1 服务发现

场景:使用 etcd 实现服务发现,自动发现和注册服务实例。

配置示例

  1. 注册服务
# 注册服务实例
service_id=$(uuidgen)
service_addr="192.168.1.100:8080"

# 创建带租约的服务注册
etcdctl lease create 60 > lease.txt
lease_id=$(grep -o '[0-9]*' lease.txt)

# 注册服务
etcdctl put --lease=$lease_id /services/web/$service_id "$service_addr"

# 定期续约
while true; do
  etcdctl lease keep-alive $lease_id > /dev/null 2>&1
  sleep 45
done
  1. 发现服务
# 发现所有 web 服务实例
etcdctl get --prefix /services/web

# 监视服务变化
etcdctl watch --prefix /services/web
  1. 使用 Go 客户端实现服务发现
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go.etcd.io/etcd/client/v3"
)

func main() {
    // 连接 etcd
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    // 发现服务
    ctx := context.Background()
    rsp, err := cli.Get(ctx, "/services/web", clientv3.WithPrefix())
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Available web services:")
    for _, kv := range rsp.Kvs {
        fmt.Printf("%s: %s\n", kv.Key, kv.Value)
    }

    // 监视服务变化
    wch := cli.Watch(ctx, "/services/web", clientv3.WithPrefix())
    for wresp := range wch {
        for _, ev := range wresp.Events {
            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

6.2 配置管理

场景:使用 etcd 管理应用配置,实现配置的集中管理和动态更新。

配置示例

  1. 存储配置
# 存储应用配置
etcdctl put /config/app/database/host "db.example.com"
etcdctl put /config/app/database/port "5432"
etcdctl put /config/app/database/name "app_db"
etcdctl put /config/app/redis/host "redis.example.com"
etcdctl put /config/app/redis/port "6379"
etcdctl put /config/app/server/port "8080"
etcdctl put /config/app/server/timeout "30s"
  1. 使用配置
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "go.etcd.io/etcd/client/v3"
)

// Config 应用配置结构
type Config struct {
    Database struct {
        Host string `json:"host"`
        Port string `json:"port"`
        Name string `json:"name"`
    } `json:"database"`
    Redis struct {
        Host string `json:"host"`
        Port string `json:"port"`
    } `json:"redis"`
    Server struct {
        Port    string `json:"port"`
        Timeout string `json:"timeout"`
    } `json:"server"`
}

func main() {
    // 连接 etcd
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    ctx := context.Background()

    // 读取配置
    config := loadConfig(cli, ctx)
    fmt.Printf("Database host: %s\n", config.Database.Host)
    fmt.Printf("Redis host: %s\n", config.Redis.Host)
    fmt.Printf("Server port: %s\n", config.Server.Port)

    // 监视配置变化
    wch := cli.Watch(ctx, "/config/app", clientv3.WithPrefix())
    for wresp := range wch {
        fmt.Println("Configuration updated:")
        config = loadConfig(cli, ctx)
        fmt.Printf("Database host: %s\n", config.Database.Host)
        fmt.Printf("Redis host: %s\n", config.Redis.Host)
        fmt.Printf("Server port: %s\n", config.Server.Port)
    }
}

// loadConfig 从 etcd 加载配置
func loadConfig(cli *clientv3.Client, ctx context.Context) Config {
    rsp, err := cli.Get(ctx, "/config/app", clientv3.WithPrefix())
    if err != nil {
        log.Fatal(err)
    }

    config := Config{}
    data := make(map[string]string)

    for _, kv := range rsp.Kvs {
        key := string(kv.Key)
        value := string(kv.Value)
        data[key] = value
    }

    // 构建配置
    config.Database.Host = data["/config/app/database/host"]
    config.Database.Port = data["/config/app/database/port"]
    config.Database.Name = data["/config/app/database/name"]
    config.Redis.Host = data["/config/app/redis/host"]
    config.Redis.Port = data["/config/app/redis/port"]
    config.Server.Port = data["/config/app/server/port"]
    config.Server.Timeout = data["/config/app/server/timeout"]

    return config
}

6.3 分布式锁实现

场景:使用 etcd 实现分布式锁,确保多个进程对共享资源的互斥访问。

配置示例

package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"

    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
    // 连接 etcd
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            acquireLock(cli, id)
        }(i)
    }

    wg.Wait()
    fmt.Println("All goroutines completed")
}

// acquireLock 尝试获取分布式锁并执行临界区操作
func acquireLock(cli *clientv3.Client, id int) {
    ctx := context.Background()

    // 创建会话
    sess, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
    if err != nil {
        log.Printf("Goroutine %d: Failed to create session: %v", id, err)
        return
    }
    defer sess.Close()

    // 创建分布式锁
    mutex := concurrency.NewMutex(sess, "/locks/resource")

    // 尝试获取锁
    fmt.Printf("Goroutine %d: Trying to acquire lock...\n", id)
    if err := mutex.Lock(ctx); err != nil {
        log.Printf("Goroutine %d: Failed to acquire lock: %v", id, err)
        return
    }

    // 临界区操作
    fmt.Printf("Goroutine %d: Acquired lock, performing critical section...\n", id)
    time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
    fmt.Printf("Goroutine %d: Released lock\n", id)

    // 释放锁
    if err := mutex.Unlock(ctx); err != nil {
        log.Printf("Goroutine %d: Failed to release lock: %v", id, err)
    }
}

6.4 领导者选举

场景:使用 etcd 实现领导者选举,确保在分布式系统中只有一个进程执行特定任务。

配置示例

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
    // 连接 etcd
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    // 启动多个候选者
    for i := 0; i < 3; i++ {
        go func(id int) {
            campaignForLeadership(cli, id)
        }(i)
    }

    // 运行 60 秒
    time.Sleep(60 * time.Second)
    fmt.Println("Simulation completed")
}

// campaignForLeadership 参与领导者选举
func campaignForLeadership(cli *clientv3.Client, id int) {
    ctx := context.Background()

    for {
        // 创建会话
        sess, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
        if err != nil {
            log.Printf("Candidate %d: Failed to create session: %v", id, err)
            time.Sleep(2 * time.Second)
            continue
        }

        // 参与选举
        fmt.Printf("Candidate %d: Campaigning for leadership...\n", id)
        leaderKey, err := concurrency.Campaign(ctx, sess, "/election/leader", fmt.Sprintf("candidate-%d", id))
        if err != nil {
            log.Printf("Candidate %d: Failed to campaign: %v", id, err)
            sess.Close()
            time.Sleep(2 * time.Second)
            continue
        }

        // 成为领导者
        fmt.Printf("Candidate %d: Elected as leader!\n", id)

        // 执行领导者任务
        for {
            // 检查会话是否仍然有效
            if sess == nil || !sess.Active() {
                fmt.Printf("Candidate %d: Session expired, stepping down...\n", id)
                break
            }

            // 执行领导者任务
            fmt.Printf("Candidate %d: Performing leader tasks...\n", id)
            time.Sleep(2 * time.Second)
        }

        // 退出选举
        if err := concurrency.Resign(ctx, sess, leaderKey); err != nil {
            log.Printf("Candidate %d: Failed to resign: %v", id, err)
        }

        sess.Close()
        fmt.Printf("Candidate %d: Stepped down, waiting to campaign again...\n", id)
        time.Sleep(2 * time.Second)
    }
}

7. 总结

etcd 是一个功能强大的分布式键值存储系统,为分布式系统提供了强一致性、高可用性、可靠性和安全性的存储服务。通过本教程的学习,您应该已经掌握了 etcd 的核心概念、安装配置、基本使用和高级特性,可以根据实际需求灵活应用 etcd 构建可靠、安全、可扩展的分布式系统。

7.1 关键要点回顾

  • etcd 基于 Raft 共识算法,提供强一致性的分布式键值存储
  • 掌握 etcd 的核心组件和概念,包括集群、成员、领导者、跟随者等
  • 熟悉 etcd 的安装配置和基本使用,包括键值操作、集群管理、监视操作、事务操作等
  • 了解 etcd 的高级特性,如分布式锁、领导者选举、租约管理、快照和备份等
  • 遵循最佳实践,优化 etcd 的部署、性能和安全性
  • 根据实际场景灵活应用 etcd,如服务发现、配置管理、分布式锁实现等

7.2 后续学习建议

  • 深入学习 Raft 算法:了解 Raft 共识算法的内部实现,掌握分布式一致性的原理
  • 探索 etcd 源码:研究 etcd 的内部实现,特别是 Raft 共识、WAL 日志和快照机制
  • 实践大规模部署:在生产环境中部署和管理大规模 etcd 集群
  • 学习 etcd 生态系统:探索 etcd 与其他工具的集成,如 Kubernetes、Prometheus 等
  • 参与社区:关注 etcd 社区动态,参与贡献和讨论
  • 学习其他分布式存储:比较 etcd 与 Zookeeper、Consul 等其他分布式存储系统的异同

通过不断学习和实践,您将能够充分发挥 etcd 的强大功能,为您的分布式系统提供可靠的键值存储服务。

« 上一篇 Consul 教程 - 服务发现与配置管理 下一篇 » ZooKeeper 教程 - 分布式协调服务