Apache ZooKeeper - 分布式协调服务

项目简介

Apache ZooKeeper是一个开源的分布式协调服务,为分布式应用程序提供一致性服务。它最初由Yahoo!开发,用于解决分布式系统中的协调问题,后来成为Apache软件基金会的顶级项目。

ZooKeeper的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单的接口提供给用户使用。它是许多大数据和分布式系统的基础组件,如Hadoop、Kafka、HBase等都依赖ZooKeeper来提供分布式协调服务。

主要特性

  • 顺序一致性:同一客户端发起的事务请求按发起顺序执行
  • 原子性:所有事务请求的处理结果在集群中所有机器上应用情况一致
  • 单一系统映像:客户端无论连接到哪个ZooKeeper服务器,看到的服务端数据模型都是一致的
  • 可靠性:一旦服务端成功应用了事务,数据状态将一直保留
  • 实时性:保证客户端在一定时间范围内获得服务端的最新数据状态

项目原理

数据模型

ZooKeeper的数据模型是一个层次化的命名空间,类似于文件系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/
├── zookeeper
│ ├── config
│ └── quota
├── app1
│ ├── config
│ │ ├── database
│ │ └── cache
│ └── servers
│ ├── server1
│ ├── server2
│ └── server3
└── app2
├── locks
│ ├── lock1
│ └── lock2
└── queue

ZNode类型

持久节点(PERSISTENT)

  • 一旦创建就一直存在,直到有删除操作主动清除
  • 适合存储配置信息、服务注册信息等

持久顺序节点(PERSISTENT_SEQUENTIAL)

  • 基本特性同持久节点
  • 创建时ZooKeeper会在路径后面加上一个递增的序列号

临时节点(EPHEMERAL)

  • 生命周期和客户端会话绑定
  • 客户端会话失效,该节点会被自动删除
  • 适合服务发现、分布式锁等场景

临时顺序节点(EPHEMERAL_SEQUENTIAL)

  • 基本特性同临时节点
  • 创建时ZooKeeper会在路径后面加上一个递增的序列号

集群架构

1
2
3
4
5
6
7
8
9
10
11
12
13
ZooKeeper集群
├── Leader
│ ├── 处理写请求
│ ├── 负责投票的发起和决议
│ └── 更新系统状态
├── Follower
│ ├── 处理读请求
│ ├── 在选主过程中参与投票
│ └── 接收写请求并转发给Leader
└── Observer
├── 处理读请求
├── 不参与投票过程
└── 提升集群的读性能

ZAB协议

ZooKeeper使用ZAB(ZooKeeper Atomic Broadcast)原子广播协议来保证分布式数据一致性:

两个基本模式

  1. 崩溃恢复模式:选举新的Leader并进行数据同步
  2. 消息广播模式:Leader接收客户端请求并广播给Follower

ZAB协议保证

  • 已经被Leader提交的事务最终会被所有服务器提交
  • 确保丢弃只在Leader上被提出的事务

使用场景

1. 配置管理

集中管理分布式系统的配置信息,支持配置的动态更新。

2. 服务发现

服务提供者注册服务信息,服务消费者发现可用的服务实例。

3. 分布式锁

实现分布式环境下的排他锁和读写锁。

4. 集群管理

监控集群中节点的状态,实现故障检测和自动恢复。

5. 分布式队列

实现分布式环境下的FIFO队列和优先级队列。

具体案例

案例1:分布式锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Watcher {
private ZooKeeper zk;
private String lockPath = "/distributed-lock";
private String currentPath;
private String waitPath;
private CountDownLatch connectedSignal = new CountDownLatch(1);
private CountDownLatch lockSignal = new CountDownLatch(1);

public DistributedLock(String connectString) throws Exception {
zk = new ZooKeeper(connectString, 5000, this);
connectedSignal.await();

Stat stat = zk.exists(lockPath, false);
if (stat == null) {
zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

public void lock() throws Exception {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

tryLock();
}

private void tryLock() throws Exception {
// 获取所有子节点并排序
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);

String currentNode = currentPath.substring(lockPath.length() + 1);
int index = children.indexOf(currentNode);

if (index == 0) {
// 当前节点是最小的,获得锁
System.out.println("获得锁: " + currentPath);
return;
}

// 监听前一个节点
String prevNode = children.get(index - 1);
waitPath = lockPath + "/" + prevNode;

Stat stat = zk.exists(waitPath, true);
if (stat == null) {
// 前一个节点已经不存在,重新尝试获取锁
tryLock();
} else {
// 等待前一个节点删除
System.out.println("等待锁释放: " + waitPath);
lockSignal.await();
}
}

public void unlock() throws Exception {
zk.delete(currentPath, -1);
zk.close();
System.out.println("释放锁: " + currentPath);
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}

if (event.getType() == Event.EventType.NodeDeleted &&
event.getPath().equals(waitPath)) {
lockSignal.countDown();
}
}

// 使用示例
public static void main(String[] args) throws Exception {
DistributedLock lock = new DistributedLock("localhost:2181");

try {
lock.lock();

// 执行需要互斥的业务逻辑
System.out.println("执行业务逻辑...");
Thread.sleep(5000);

} finally {
lock.unlock();
}
}
}

案例2:配置管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ConfigurationManager implements Watcher {
private ZooKeeper zk;
private String configPath = "/app-config";
private ObjectMapper objectMapper = new ObjectMapper();
private volatile Map<String, Object> configCache = new ConcurrentHashMap<>();

public ConfigurationManager(String connectString) throws Exception {
zk = new ZooKeeper(connectString, 5000, this);

// 等待连接建立
CountDownLatch connectedSignal = new CountDownLatch(1);
connectedSignal.await();

// 初始化配置路径
if (zk.exists(configPath, false) == null) {
zk.create(configPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

// 加载初始配置
loadConfiguration();
}

public void setConfig(String key, Object value) throws Exception {
String path = configPath + "/" + key;
byte[] data = objectMapper.writeValueAsBytes(value);

Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.setData(path, data, stat.getVersion());
}

configCache.put(key, value);
System.out.println("配置更新: " + key + " = " + value);
}

public <T> T getConfig(String key, Class<T> type) {
Object value = configCache.get(key);
if (value == null) {
return null;
}
return objectMapper.convertValue(value, type);
}

private void loadConfiguration() throws Exception {
List<String> children = zk.getChildren(configPath, true);

for (String child : children) {
String childPath = configPath + "/" + child;
byte[] data = zk.getData(childPath, true, null);

if (data != null) {
Object value = objectMapper.readValue(data, Object.class);
configCache.put(child, value);
}
}

System.out.println("配置加载完成,共 " + configCache.size() + " 个配置项");
}

@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged ||
event.getType() == Event.EventType.NodeDataChanged) {

try {
loadConfiguration();
System.out.println("检测到配置变更,重新加载配置");
} catch (Exception e) {
e.printStackTrace();
}
}
}

// 配置变更监听器
public interface ConfigChangeListener {
void onConfigChanged(String key, Object oldValue, Object newValue);
}

private List<ConfigChangeListener> listeners = new ArrayList<>();

public void addConfigChangeListener(ConfigChangeListener listener) {
listeners.add(listener);
}

// 使用示例
public static void main(String[] args) throws Exception {
ConfigurationManager configManager = new ConfigurationManager("localhost:2181");

// 设置配置
configManager.setConfig("database.url", "jdbc:mysql://localhost:3306/testdb");
configManager.setConfig("database.maxConnections", 100);
configManager.setConfig("cache.enabled", true);

// 读取配置
String dbUrl = configManager.getConfig("database.url", String.class);
Integer maxConn = configManager.getConfig("database.maxConnections", Integer.class);
Boolean cacheEnabled = configManager.getConfig("cache.enabled", Boolean.class);

System.out.println("Database URL: " + dbUrl);
System.out.println("Max Connections: " + maxConn);
System.out.println("Cache Enabled: " + cacheEnabled);

// 保持程序运行以监听配置变更
Thread.sleep(Long.MAX_VALUE);
}
}

案例3:服务发现实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ServiceDiscovery implements Watcher {
private ZooKeeper zk;
private String servicePath = "/services";
private ObjectMapper objectMapper = new ObjectMapper();
private Map<String, List<ServiceInstance>> serviceCache = new ConcurrentHashMap<>();

public static class ServiceInstance {
private String host;
private int port;
private Map<String, String> metadata;

// 构造函数、getter、setter方法
public ServiceInstance() {}

public ServiceInstance(String host, int port) {
this.host = host;
this.port = port;
this.metadata = new HashMap<>();
}

// getters and setters
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public Map<String, String> getMetadata() { return metadata; }
public void setMetadata(Map<String, String> metadata) { this.metadata = metadata; }

@Override
public String toString() {
return host + ":" + port;
}
}

public ServiceDiscovery(String connectString) throws Exception {
zk = new ZooKeeper(connectString, 5000, this);

// 创建服务根路径
if (zk.exists(servicePath, false) == null) {
zk.create(servicePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}

// 注册服务
public void registerService(String serviceName, ServiceInstance instance) throws Exception {
String serviceTypePath = servicePath + "/" + serviceName;

// 创建服务类型路径
if (zk.exists(serviceTypePath, false) == null) {
zk.create(serviceTypePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

// 创建服务实例节点(临时顺序节点)
String instancePath = serviceTypePath + "/" + instance.getHost() + ":" + instance.getPort() + "-";
byte[] data = objectMapper.writeValueAsBytes(instance);

String actualPath = zk.create(instancePath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("服务注册成功: " + serviceName + " -> " + actualPath);
}

// 发现服务
public List<ServiceInstance> discoverServices(String serviceName) throws Exception {
String serviceTypePath = servicePath + "/" + serviceName;

if (zk.exists(serviceTypePath, false) == null) {
return Collections.emptyList();
}

List<String> children = zk.getChildren(serviceTypePath, true);
List<ServiceInstance> instances = new ArrayList<>();

for (String child : children) {
String childPath = serviceTypePath + "/" + child;
byte[] data = zk.getData(childPath, false, null);

if (data != null) {
ServiceInstance instance = objectMapper.readValue(data, ServiceInstance.class);
instances.add(instance);
}
}

serviceCache.put(serviceName, instances);
return instances;
}

// 负载均衡选择服务实例
public ServiceInstance selectInstance(String serviceName, LoadBalancer loadBalancer) throws Exception {
List<ServiceInstance> instances = serviceCache.get(serviceName);

if (instances == null || instances.isEmpty()) {
instances = discoverServices(serviceName);
}

if (instances.isEmpty()) {
throw new RuntimeException("没有可用的服务实例: " + serviceName);
}

return loadBalancer.select(instances);
}

@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
String path = event.getPath();
String serviceName = path.substring(servicePath.length() + 1);

try {
discoverServices(serviceName);
System.out.println("检测到服务变更: " + serviceName);
} catch (Exception e) {
e.printStackTrace();
}
}
}

// 负载均衡接口
public interface LoadBalancer {
ServiceInstance select(List<ServiceInstance> instances);
}

// 轮询负载均衡实现
public static class RoundRobinLoadBalancer implements LoadBalancer {
private AtomicInteger counter = new AtomicInteger(0);

@Override
public ServiceInstance select(List<ServiceInstance> instances) {
int index = counter.getAndIncrement() % instances.size();
return instances.get(index);
}
}

// 使用示例
public static void main(String[] args) throws Exception {
ServiceDiscovery discovery = new ServiceDiscovery("localhost:2181");

// 注册服务
ServiceInstance instance1 = new ServiceInstance("192.168.1.10", 8080);
instance1.getMetadata().put("version", "1.0");
discovery.registerService("user-service", instance1);

ServiceInstance instance2 = new ServiceInstance("192.168.1.11", 8080);
instance2.getMetadata().put("version", "1.0");
discovery.registerService("user-service", instance2);

// 发现服务
Thread.sleep(1000);
List<ServiceInstance> instances = discovery.discoverServices("user-service");
System.out.println("发现服务实例: " + instances);

// 负载均衡选择实例
LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
for (int i = 0; i < 5; i++) {
ServiceInstance selected = discovery.selectInstance("user-service", loadBalancer);
System.out.println("选择的服务实例: " + selected);
}
}
}

案例4:集群监控和管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/bash
# ZooKeeper集群监控脚本

ZK_HOSTS="localhost:2181,localhost:2182,localhost:2183"

echo "=== ZooKeeper集群状态监控 ==="

# 检查集群连接状态
echo "1. 集群连接状态:"
for host in $(echo $ZK_HOSTS | tr "," " "); do
echo -n "检查 $host: "
echo ruok | nc ${host%:*} ${host#*:} 2>/dev/null
done

# 检查Leader/Follower状态
echo -e "\n2. 集群角色状态:"
for host in $(echo $ZK_HOSTS | tr "," " "); do
echo -n "$host: "
echo stat | nc ${host%:*} ${host#*:} 2>/dev/null | grep Mode | awk '{print $2}'
done

# 检查节点数据
echo -e "\n3. 根节点数据:"
zkCli.sh -server $ZK_HOSTS <<EOF
ls /
quit
EOF

# 监控脚本
echo -e "\n4. 性能监控:"
for host in $(echo $ZK_HOSTS | tr "," " "); do
echo "=== $host ==="
echo mntr | nc ${host%:*} ${host#*:} 2>/dev/null | grep -E "(outstanding|latency|connections)"
done

最佳实践

1. 集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181

# 集群配置
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

# 性能优化
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=24

2. 客户端连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ZooKeeperPool {
private static final int MAX_CONNECTIONS = 20;
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 5000;

private final BlockingQueue<ZooKeeper> pool = new LinkedBlockingQueue<>();

public ZooKeeperPool() throws Exception {
for (int i = 0; i < MAX_CONNECTIONS; i++) {
pool.offer(createConnection());
}
}

private ZooKeeper createConnection() throws Exception {
CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
});
connectedSignal.await();
return zk;
}

public ZooKeeper borrowConnection() throws InterruptedException {
return pool.take();
}

public void returnConnection(ZooKeeper zk) {
if (zk != null && zk.getState() == ZooKeeper.States.CONNECTED) {
pool.offer(zk);
}
}
}

3. 监控指标

重要的ZooKeeper监控指标:

  • zk_avg_latency: 平均延迟
  • zk_max_latency: 最大延迟
  • zk_min_latency: 最小延迟
  • zk_packets_received: 接收的包数量
  • zk_packets_sent: 发送的包数量
  • zk_num_alive_connections: 活跃连接数
  • zk_outstanding_requests: 未完成的请求数

Apache ZooKeeper作为分布式系统的协调服务核心,其高可用性和强一致性特性使其成为构建可靠分布式应用的基石。通过合理的设计和配置,ZooKeeper可以为复杂的分布式系统提供稳定可靠的协调服务。

版权所有,如有侵权请联系我