ZooKeeper 教程
1. 核心概念
ZooKeeper 是 Apache 基金会的开源项目,是一个分布式协调服务,为分布式应用提供了一致性、可靠性和有序性的保证。它专为分布式系统设计,提供了服务发现、配置管理、分布式锁、领导者选举等功能。
1.1 主要特点
- 简单易用:提供简单的文件系统接口和编程模型
- 高可靠性:支持集群部署,自动故障转移
- 顺序一致性:保证客户端操作的顺序一致性
- 原子性:所有操作要么成功,要么失败
- 实时性:保证在一定时间内,客户端能够看到最新的数据
- 可靠性:数据持久化存储,支持快照和事务日志
- 可扩展性:支持水平扩展,适应大规模分布式系统
1.2 架构组件
ZooKeeper 架构由以下核心组件组成:
- ZooKeeper Server:ZooKeeper 服务器,处理客户端请求
- Leader:集群中的领导者节点,负责处理所有写请求
- Follower:集群中的跟随者节点,处理读请求并复制领导者的事务
- Observer:观察者节点,只处理读请求,不参与投票
- Client:ZooKeeper 客户端,与服务器交互
- Quorum:法定人数,决定集群的投票结果
- ZAB:ZooKeeper Atomic Broadcast,ZooKeeper 原子广播协议
- Data Tree:数据树,存储 ZooKeeper 的数据
- Session:客户端与服务器之间的会话
1.3 核心概念
- Data Model:数据模型,类似文件系统的树形结构
- Node:数据节点,ZooKeeper 中的基本数据单元
- znode:ZooKeeper 节点,分为持久节点、临时节点和顺序节点
- Watch:监视机制,用于监听节点变化
- Session:客户端会话,维护客户端与服务器的连接
- Version:节点版本,用于乐观锁
- ACL:访问控制列表,用于控制对节点的访问
- Quorum:法定人数,确保集群的一致性
- Epoch:选举纪元,确保领导者选举的正确性
2. 安装配置
2.1 安装 ZooKeeper
2.1.1 二进制安装
# 下载 ZooKeeper
curl -O https://dlcdn.apache.org/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1-bin.tar.gz
# 解压
tar xzvf apache-zookeeper-3.9.1-bin.tar.gz
# 移动到合适的目录
sudo mv apache-zookeeper-3.9.1-bin /opt/zookeeper
# 创建软链接
sudo ln -s /opt/zookeeper /opt/zookeeper/current2.1.2 Docker 安装
# 运行单个 ZooKeeper 容器
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper:3.9.1
# 验证安装
docker exec -it zookeeper zkServer.sh status2.1.3 Kubernetes 安装
使用 Helm 安装 ZooKeeper 到 Kubernetes 集群:
# 添加 Helm 仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
# 更新仓库
helm repo update
# 安装 ZooKeeper
helm install zookeeper bitnami/zookeeper --set replicaCount=3
# 验证安装
kubectl get pods
kubectl exec -it zookeeper-0 -- zkServer.sh status2.2 配置 ZooKeeper
2.2.1 单机配置
创建 ZooKeeper 配置文件 conf/zoo.cfg:
# 基本配置
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# 日志配置
dataLogDir=/var/log/zookeeper
# 客户端连接限制
maxClientCnxns=60
# 自动清理快照和日志
autopurge.snapRetainCount=3
autopurge.purgeInterval=12.2.2 集群配置
在三个不同的服务器上分别配置 ZooKeeper:
服务器 1 (conf/zoo.cfg):
# 基本配置
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# 集群配置
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888服务器 2 (conf/zoo.cfg):
# 基本配置
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# 集群配置
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888服务器 3 (conf/zoo.cfg):
# 基本配置
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# 集群配置
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888创建 myid 文件:
在每个服务器的 dataDir 目录中创建 myid 文件,内容为服务器的 ID:
# 服务器 1
echo "1" > /var/lib/zookeeper/myid
# 服务器 2
echo "2" > /var/lib/zookeeper/myid
# 服务器 3
echo "3" > /var/lib/zookeeper/myid2.3 启动 ZooKeeper
2.3.1 启动单机模式
# 启动 ZooKeeper
/opt/zookeeper/bin/zkServer.sh start
# 停止 ZooKeeper
/opt/zookeeper/bin/zkServer.sh stop
# 查看状态
/opt/zookeeper/bin/zkServer.sh status2.3.2 启动集群模式
在每个服务器上执行:
# 启动 ZooKeeper 服务器
/opt/zookeeper/bin/zkServer.sh start
# 查看集群状态
/opt/zookeeper/bin/zkServer.sh status3. 基本使用
3.1 客户端操作
3.1.1 连接 ZooKeeper
# 连接本地 ZooKeeper
/opt/zookeeper/bin/zkCli.sh
# 连接远程 ZooKeeper
/opt/zookeeper/bin/zkCli.sh -server zookeeper1:2181,zookeeper2:2181,zookeeper3:21813.1.2 创建节点
# 创建持久节点
create /app "application"
# 创建临时节点
create -e /temp "temporary"
# 创建顺序节点
create -s /seq "sequence"
# 创建临时顺序节点
create -e -s /temp-seq "temp-sequence"
# 创建带 ACL 的节点
create /secure "secret" world:anyone:r3.1.3 读取节点
# 读取节点数据
get /app
# 读取节点数据并设置监视
get /app watch
# 查看节点状态
stat /app
# 列出子节点
ls /
# 列出子节点并设置监视
ls /app watch3.1.4 更新节点
# 更新节点数据
set /app "updated application"
# 基于版本更新节点数据
set /app "versioned update" 13.1.5 删除节点
# 删除节点
delete /app
# 基于版本删除节点
delete /app 2
# 递归删除节点
rmr /app3.2 集群管理
3.2.1 查看集群状态
# 查看服务器状态
/opt/zookeeper/bin/zkServer.sh status
# 查看集群健康状态
echo stat | nc localhost 2181
# 查看详细的集群状态
echo mntr | nc localhost 2181
# 查看集群配置
echo conf | nc localhost 21813.2.2 客户端会话管理
# 查看会话信息
echo cons | nc localhost 2181
# 关闭客户端连接
echo kill <session-id> | nc localhost 21813.3 监视机制
3.3.1 设置监视
# 监视节点数据变化
get /app watch
# 监视子节点变化
ls /app watch
# 监视节点创建和删除
ls / watch3.3.2 触发监视
# 触发数据变化监视
set /app "new value"
# 触发子节点变化监视
create /app/child "child node"
# 触发节点删除监视
delete /app4. 高级特性
4.1 分布式锁
4.1.1 基于临时顺序节点的分布式锁
# 创建锁节点
create -e -s /locks/lock- "lock"
# 获取所有锁节点
ls /locks
# 检查是否获得锁
# 如果创建的节点是序列中的第一个,则获得锁
# 否则,监视前一个节点的删除
# 释放锁
delete /locks/lock-<sequence>4.1.2 使用 Curator 实现分布式锁
// 使用 Curator 客户端实现分布式锁
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
// 创建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
"zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 创建分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/resource");
// 尝试获取锁
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
// 执行临界区操作
System.out.println("Acquired lock, performing critical section operation");
} finally {
// 释放锁
lock.release();
System.out.println("Released lock");
}
}
// 关闭客户端
client.close();4.2 领导者选举
4.2.1 基于临时顺序节点的领导者选举
# 创建选举节点
create -e -s /election/candidate- "candidate"
# 获取所有选举节点
ls /election
# 检查是否成为领导者
# 如果创建的节点是序列中的第一个,则成为领导者
# 否则,监视前一个节点的删除
# 领导者退出
delete /election/candidate-<sequence>4.2.2 使用 Curator 实现领导者选举
// 使用 Curator 客户端实现领导者选举
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
// 创建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(
"zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// 创建领导者选择器
LeaderSelector leaderSelector = new LeaderSelector(
client,
"/election/leader",
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// 成为领导者
System.out.println("Elected as leader, performing leader tasks");
// 保持领导权,直到方法返回
Thread.sleep(60000);
System.out.println("Releasing leadership");
}
}
);
// 自动重新加入选举
leaderSelector.autoRequeue();
// 开始选举
leaderSelector.start();
// 运行一段时间后关闭
Thread.sleep(300000);
leaderSelector.close();
client.close();4.3 配置管理
4.3.1 存储配置
# 存储应用配置
create /config/app "application config"
create /config/app/database "db config"
create /config/app/database/host "db.example.com"
create /config/app/database/port "5432"
create /config/app/redis "redis config"
create /config/app/redis/host "redis.example.com"
create /config/app/redis/port "6379"4.3.2 监视配置变化
# 监视配置变化
get /config/app watch
ls /config/app watch
# 更新配置
set /config/app/database/host "new-db.example.com"4.4 服务发现
4.4.1 注册服务
# 注册服务实例
create -e /services/web/server1 "192.168.1.100:8080"
create -e /services/web/server2 "192.168.1.101:8080"
create -e /services/api/server1 "192.168.1.100:8081"
create -e /services/api/server2 "192.168.1.101:8081"4.4.2 发现服务
# 发现所有 web 服务实例
ls /services/web
# 发现单个服务实例
get /services/web/server1
# 监视服务变化
ls /services/web watch4.5 ACL 权限控制
4.5.1 设置 ACL
# 创建带 ACL 的节点
create /secure "secret" world:anyone:r
# 设置节点 ACL
setAcl /secure auth:user:password:rwcd
# 查看节点 ACL
getAcl /secure4.5.2 添加认证用户
# 添加认证用户
addauth digest user:password
# 使用认证用户创建节点
create /auth "protected" auth:user:password:rwcd5. 最佳实践
5.1 部署最佳实践
- 使用奇数个节点:推荐 3 或 5 个节点,确保高可用性
- 分布部署:将节点部署在不同的可用区或物理机器上
- 配置适当的资源:为 ZooKeeper 节点分配足够的 CPU、内存和磁盘资源
- 使用 SSD 存储:提高事务日志和快照的读写性能
- 配置备份策略:定期备份 ZooKeeper 数据,防止数据丢失
- 启用日志轮换:配置适当的日志轮换策略,避免日志文件过大
- 配置防火墙:限制 ZooKeeper 端口的访问,只允许必要的网络流量
5.2 性能优化
- 调整 JVM 参数:根据服务器性能调整 JVM 堆大小和 GC 策略
- 优化网络配置:调整操作系统的网络参数,如 TCP 缓冲区大小
- 使用批量操作:对于大量节点操作,使用批量 API 减少网络开销
- 优化客户端请求:减少频繁的小请求,合并为批量请求
- 使用连接池:复用客户端连接,减少连接建立开销
- 调整会话超时:设置合理的会话超时时间,避免频繁的会话过期
- 使用观察者节点:对于读多写少的场景,添加观察者节点提高读性能
5.3 安全最佳实践
- 启用认证:配置 ZooKeeper 认证,防止未授权访问
- 使用 SSL:启用 SSL 加密,保护客户端与服务器之间的通信
- 配置适当的 ACL:为不同的节点设置适当的 ACL,限制访问权限
- 使用最小权限原则:为不同的客户端分配最小必要的权限
- 定期轮换密码:确保认证密码及时轮换
- 监控安全事件:设置安全相关的监控和告警
5.4 操作最佳实践
- 使用命名空间:为不同的应用使用不同的命名空间,如
/app1、/app2 - 使用临时节点:对于临时数据,使用临时节点自动清理
- 实现优雅的节点管理:定期清理不再需要的节点
- 监控集群健康:设置集群健康状态的监控和告警
- 制定灾难恢复计划:准备好应对集群故障的恢复方案
- 测试故障转移:定期测试集群的故障转移能力
5.5 应用集成最佳实践
- 使用客户端库:使用官方或社区维护的客户端库,而非直接调用命令行工具
- 实现重试机制:处理网络故障和节点不可用的情况
- 使用连接池:复用客户端连接,减少连接建立开销
- 监控客户端性能:监控客户端的请求延迟和错误率
- 实现配置缓存:缓存配置信息,减少对 ZooKeeper 的频繁访问
- 使用监视机制:利用监视机制及时响应配置变更和服务发现
6. 实用案例
6.1 分布式锁实现
场景:使用 ZooKeeper 实现分布式锁,确保多个进程对共享资源的互斥访问。
实现示例:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock implements Watcher {
private ZooKeeper zk;
private String lockPath;
private String currentLock;
private String waitPath;
private CountDownLatch latch;
private String lockName;
private boolean isLocked = false;
public DistributedLock(ZooKeeper zk, String lockPath, String lockName) {
this.zk = zk;
this.lockPath = lockPath;
this.lockName = lockName;
try {
// 检查锁路径是否存在
Stat stat = zk.exists(lockPath, false);
if (stat == null) {
zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() throws Exception {
if (tryLock()) {
System.out.println(Thread.currentThread().getName() + " 获取锁成功");
isLocked = true;
return;
}
waitForLock(waitPath);
}
public boolean tryLock() throws Exception {
String lockNode = lockPath + "/" + lockName + "-";
currentLock = zk.create(lockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 创建锁节点: " + currentLock);
List<String> locks = zk.getChildren(lockPath, false);
Collections.sort(locks);
int index = locks.indexOf(currentLock.substring(lockPath.length() + 1));
if (index == 0) {
// 获得锁
return true;
} else {
// 监听前一个锁节点
waitPath = lockPath + "/" + locks.get(index - 1);
return false;
}
}
private void waitForLock(String path) throws Exception {
Stat stat = zk.exists(path, this);
if (stat != null) {
System.out.println(Thread.currentThread().getName() + " 等待锁: " + path);
latch = new CountDownLatch(1);
latch.await();
System.out.println(Thread.currentThread().getName() + " 获得锁");
isLocked = true;
}
}
public void unlock() {
if (isLocked) {
try {
System.out.println(Thread.currentThread().getName() + " 释放锁: " + currentLock);
zk.delete(currentLock, -1);
currentLock = null;
isLocked = false;
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void process(WatchedEvent event) {
if (latch != null && event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
latch.countDown();
}
}
}6.2 领导者选举实现
场景:使用 ZooKeeper 实现领导者选举,确保在分布式系统中只有一个进程执行特定任务。
实现示例:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class LeaderElection implements Watcher {
private ZooKeeper zk;
private String electionPath;
private String currentNode;
private String leaderNode;
private CountDownLatch latch;
private boolean isLeader = false;
public LeaderElection(ZooKeeper zk, String electionPath) {
this.zk = zk;
this.electionPath = electionPath;
try {
// 检查选举路径是否存在
Stat stat = zk.exists(electionPath, false);
if (stat == null) {
zk.create(electionPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void startElection() throws Exception {
// 创建临时顺序节点
String node = electionPath + "/candidate-";
currentNode = zk.create(node, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 创建选举节点: " + currentNode);
// 进行选举
electLeader();
}
private void electLeader() throws Exception {
List<String> candidates = zk.getChildren(electionPath, false);
Collections.sort(candidates);
String smallestNode = electionPath + "/" + candidates.get(0);
if (currentNode.equals(smallestNode)) {
// 成为领导者
System.out.println(Thread.currentThread().getName() + " 成为领导者");
isLeader = true;
leaderNode = currentNode;
} else {
// 监听前一个节点
String previousNode = electionPath + "/" + candidates.get(candidates.indexOf(currentNode.substring(electionPath.length() + 1)) - 1);
System.out.println(Thread.currentThread().getName() + " 监听节点: " + previousNode);
Stat stat = zk.exists(previousNode, this);
if (stat != null) {
latch = new CountDownLatch(1);
latch.await();
// 重新选举
electLeader();
} else {
// 前一个节点不存在,重新选举
electLeader();
}
}
}
public boolean isLeader() {
return isLeader;
}
@Override
public void process(WatchedEvent event) {
if (latch != null && event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
}
}6.3 配置管理实现
场景:使用 ZooKeeper 管理应用配置,实现配置的集中管理和动态更新。
实现示例:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class ConfigManager implements Watcher {
private ZooKeeper zk;
private String configPath;
private String config;
private CountDownLatch latch;
public ConfigManager(ZooKeeper zk, String configPath) {
this.zk = zk;
this.configPath = configPath;
try {
// 加载初始配置
loadConfig();
} catch (Exception e) {
e.printStackTrace();
}
}
public void loadConfig() throws Exception {
Stat stat = zk.exists(configPath, this);
if (stat != null) {
byte[] data = zk.getData(configPath, this, stat);
config = new String(data);
System.out.println("加载配置: " + config);
} else {
System.out.println("配置节点不存在: " + configPath);
}
}
public String getConfig() {
return config;
}
public void updateConfig(String newConfig) throws Exception {
Stat stat = zk.exists(configPath, false);
if (stat != null) {
zk.setData(configPath, newConfig.getBytes(), stat.getVersion());
} else {
zk.create(configPath, newConfig.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
System.out.println("更新配置: " + newConfig);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged && event.getPath().equals(configPath)) {
try {
loadConfig();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}6.4 服务发现实现
场景:使用 ZooKeeper 实现服务发现,自动发现和注册服务实例。
实现示例:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ServiceDiscovery implements Watcher {
private ZooKeeper zk;
private String servicePath;
private List<String> services;
private CountDownLatch latch;
public ServiceDiscovery(ZooKeeper zk, String servicePath) {
this.zk = zk;
this.servicePath = servicePath;
try {
// 检查服务路径是否存在
Stat stat = zk.exists(servicePath, false);
if (stat == null) {
zk.create(servicePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 加载初始服务列表
loadServices();
} catch (Exception e) {
e.printStackTrace();
}
}
public void registerService(String serviceName, String serviceAddress) throws Exception {
String serviceNode = servicePath + "/" + serviceName;
zk.create(serviceNode, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("注册服务: " + serviceName + " -> " + serviceAddress);
}
public void loadServices() throws Exception {
services = zk.getChildren(servicePath, this);
System.out.println("发现服务: " + services);
for (String service : services) {
String serviceNode = servicePath + "/" + service;
byte[] data = zk.getData(serviceNode, false, null);
System.out.println("服务地址: " + service + " -> " + new String(data));
}
}
public List<String> getServices() {
return services;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged && event.getPath().equals(servicePath)) {
try {
loadServices();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}7. 总结
ZooKeeper 是一个功能强大的分布式协调服务,为分布式应用提供了一致性、可靠性和有序性的保证。通过本教程的学习,您应该已经掌握了 ZooKeeper 的核心概念、安装配置、基本使用和高级特性,可以根据实际需求灵活应用 ZooKeeper 构建可靠、安全、可扩展的分布式系统。
7.1 关键要点回顾
- ZooKeeper 提供了简单易用的分布式协调服务,支持服务发现、配置管理、分布式锁、领导者选举等功能
- 掌握 ZooKeeper 的核心组件和概念,包括领导者、跟随者、观察者、数据树、znode 等
- 熟悉 ZooKeeper 的安装配置和基本使用,包括客户端操作、集群管理、监视机制等
- 了解 ZooKeeper 的高级特性,如分布式锁、领导者选举、配置管理、服务发现等
- 遵循最佳实践,优化 ZooKeeper 的部署、性能和安全性
- 根据实际场景灵活应用 ZooKeeper,如分布式锁实现、领导者选举实现、配置管理实现、服务发现实现等
7.2 后续学习建议
- 深入学习 ZAB 协议:了解 ZooKeeper 原子广播协议的内部实现
- 探索 ZooKeeper 源码:研究 ZooKeeper 的内部实现,特别是领导者选举和数据同步机制
- 实践大规模部署:在生产环境中部署和管理大规模 ZooKeeper 集群
- 学习 ZooKeeper 生态系统:探索 ZooKeeper 与其他工具的集成,如 Hadoop、Kafka、HBase 等
- 参与社区:关注 ZooKeeper 社区动态,参与贡献和讨论
- 学习其他分布式协调服务:比较 ZooKeeper 与 etcd、Consul 等其他分布式协调服务的异同
通过不断学习和实践,您将能够充分发挥 ZooKeeper 的强大功能,为您的分布式系统提供可靠的协调服务。