分布式中件间-zookeeper使用

Zookeeper 是一个开源的分布式协调框架,基于ZAB协议来确保在分布式环境下的数据一致性和可靠性,实现了一个高可用的、小型的、树形结构(类似文件系统)的数据存储。通常用于分布式系统中的配置管理、同步服务、命名服务等,Zookeeper 主要用于以下几种场景:

  • 分布式锁:通过 Zookeeper 提供的节点机制,可以实现分布式环境中的锁机制。
  • 配置管理:Zookeeper 用作分布式系统的配置中心,客户端可以从 Zookeeper 获取共享的配置信息。
  • 命名服务:Zookeeper 可以作为一个高效的命名服务,提供唯一的命名空间。
  • 集群管理:Zookeeper 可以用来管理分布式系统中节点的健康状况和成员变更。

zookeeper客户端

在 Java 生态中,有多种 ZooKeeper 客户端可供选择。主要包括 ZooKeeper 官方原生客户端、以及两个流行的第三方开源客户端 ZkClient 和 Apache Curator。目前在生产环境中最被推荐和广泛使用的是 Apache Curator。

客户端 优点 缺点
官方原生客户端:Zookeeper 官方支持,最基础的 API API 复杂、功能简单,需要手动处理连接丢失、Watcher(观察者)一次性注册等问题,不推荐在生产环境直接使用
ZkClient 对原生 API 进行了封装,提供了更简洁的 API 社区不活跃,文档不完善,异常处理简化(抛出 RuntimeException),重试机制较难用
Apache Curator 简化了 ZooKeeper 的复杂性,提供了高级 API、连接管理、重试机制、各种分布式场景的抽象封装(如分布式锁、领导选举) 学习曲线相对复杂(相比 ZkClient)

官方原生客户端:Zookeeper

ZooKeeper 的官方原生 Java 客户端库org.apache.zookeeper虽然功能基础,但提供了所有与 ZooKeeper 服务端交互的核心接口和类,实现了建立、管理连接和执行所有数据操作。下面是ZookKeeper的其中一个构造方法:

1
2
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher){}

  • connectString: 集群地址列表(如 “host1:2181,host2:2181”)。
  • sessionTimeout: 会话超时时间(毫秒),ZooKeeper 依赖心跳来维持会话,超时后会话失效,临时节点将被删除。
  • watcher: 默认的全局 Watcher,用于处理连接状态变化等事件。

ZooKeeper增删改的核心方法如下:

  • create:用于创建节点,可以指定节点路径、节点数据、节点的访问权限、节点类型

  • delete:删除节点,每个节点都有一个版本,删除时可指定删除的版本,类似乐观锁。设置-1,则就直接删除节点。

  • exists:节点存不存在,若存在返回节点Stat信息,否则返回null。

  • getChildren:获取子节点。

  • getData/setData:获取/设置节点数据。

  • getACL/setACL:获取节点访问权限列表,每个节点都可以设置访问权限,指定只有特定的客户端才能访问和操作节点。ACL说明:

    • Ids.CREATOR_ALL_ACL:只有创建节点的客户端才有所有权限
    • Ids.OPEN_ACL_UNSAFE:这是一个完全开放的权限,所有客户端都有权限
    • Ids.READ_ACL_UNSAFE:所有客户端只有读取的
  • close:关闭连接并终止会话。

Watcher 接口(事件监听):org.apache.zookeeper.Watcher 是处理 ZooKeeper 客户端事件的核心接口。ZooKeeper 的设计原则是“一次性通知”(One-time trigger),即一个 Watcher 只能监听一次事件,事件触发后就需要重新注册。

1
2
// 监控所有被触发的事件处理
void process(WatchedEvent event);

在使用时可以通过实现 process(WatchedEvent event) 来做一些自定义处理逻辑与Watcher重新注册。WatchedEvent 包含事件的所有信息:

  • EventType: 事件类型(如 NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged)。
  • KeeperState: 客户端连接状态(如 SyncConnected, Disconnected, Expired)。
  • getPath(): 发生事件的 ZNode 路径。

Stat元数据类:org.apache.zookeeper.data.Stat 类用于存储 ZNode 的所有元数据(Metadata),例如版本号、创建/修改时间等。在调用 getData() 或 exists() 方法时,该对象会被填充。重要的属性包括:

  • czxid: 创建 ZNode 的事务 ID。
  • mzxid: 最后修改 ZNode 数据的事务 ID。
  • version: 数据的版本号。
  • cversion: 子节点列表的版本号。
  • dataLength: 数据长度。

AsyncCallback 接口(异步操作回调):原生客户端支持同步(Synchronous)和异步(Asynchronous)两种 API。当使用异步方法时(方法名通常以 Async 结尾),需要实现 AsyncCallback 接口来处理操作完成后的结果。常见的子接口:

  • DataCallback: 用于 getData 异步操作。
  • StatCallback: 用于 exists 异步操作。
  • VoidCallback: 用于 delete 异步操作。
  • StringCallback: 用于 create 异步操作。
  • ChildrenCallback: 用于 getChildren 异步操作。

基于原生客户端实现同步队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* 使用原生 zookeeper api
*/
public class Sync implements Watcher {

private static final Logger logger = LoggerFactory.getLogger(Sync.class);
protected final ZooKeeper zookeeper;
protected final String root;
protected static final Object mutex = new Object();


Sync(String addr, String root) throws IOException {
this.zookeeper = new ZooKeeper(addr, 3000, this);
this.root = root;
}

@Override
synchronized public void process(WatchedEvent watchedEvent) {
logger.info(watchedEvent.toString());
synchronized (mutex) {
mutex.notify();
}
}

static public class Queue extends Sync {

Queue(String address, String root) throws IOException, InterruptedException, KeeperException {
super(address, root);
if (zookeeper == null) {
throw new RuntimeException("zookeeper is null");
}

Stat stat = zookeeper.exists(root, false);
if (stat == null) {
zookeeper.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}

boolean produce(int i) throws InterruptedException, KeeperException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zookeeper.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}

int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;

while (true) {
synchronized (mutex) {
List<String> list = zookeeper.getChildren(root, true);
if (list.isEmpty()) {
logger.info("consumer going to wait");
mutex.wait();
} else {
int min = Integer.parseInt(list.get(0).substring(7));
String minNode = list.get(0);
for (String s : list) {
int tempValue = Integer.parseInt(s.substring(7));
if (tempValue < min) {
min = tempValue;
minNode = s;
}
}
logger.info("Temporary value: {}/{}", root, minNode);
byte[] b = zookeeper.getData(root + "/" + minNode, false, stat);
zookeeper.delete(root + "/" + minNode, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
}

原生客户端的使用流程通常如下:

  • 创建 ZooKeeper 实例,传入全局 Watcher。
  • 等待连接建立(SyncConnected 状态)。
  • 执行操作(create, getData 等),并根据需要注册临时的 Watcher。
  • 处理 KeeperException 异常。
  • 处理 Watcher 事件(并在需要时重新注册 Watcher)。
  • 调用 close() 结束会话。

由于需要手动管理连接状态和 Watcher 的重复注册,原生客户端使用起来相对繁琐,这也是 Apache Curator 等第三方客户端流行的原因。

Apache Curator5.7

Apache Curator 是 Netflix 开源并贡献给 Apache 基金会的 ZooKeeper 客户端框架,它极大地简化了 ZooKeeper 的使用,提供了许多强大的特性来解决原生客户端的痛点。使用原生Zookeeper客户端时存在着如下几个问题:

  • 连接丢失(ConnectionLossException)后不会自动重连。
  • 会话过期(Session Expired)后,需要手动关闭旧客户端、重新创建新客户端、并恢复所有瞬时(EPHEMERAL)节点和 Watcher。
  • 开发者必须手动编写复杂的重试逻辑。
  • Watcher 是“一次性”的(One-time trigger)。事件触发后立即失效,如果需要持续监听,必须在处理完事件后立即重新注册 Watcher。这个过程容易出错,且在并发场景下可能导致遗漏事件。
  • 原生 API 方法参数多,使用复杂,代码冗长。例如,create(path, data, acl, mode) 需要同时指定 ACL 和模式,不够直观。
  • 原生客户端只提供最基本的原语(如创建节点、删除节点),实现分布式锁、领导选举等高级功能需要开发者自己设计复杂的算法,并手动处理各种竞争条件和边界情况。

Curator 框架解决了原生客户端的使用问题,其核心特性可以概括为以下几点:

  • 健壮的连接管理和重试机制:

    • 连接状态监听: Curator 提供了一个 ConnectionStateListener,可以方便地监听连接状态的变化(CONNECTED, SUSPENDED, RECONNECTED, LOST)。

    • 自动重连与重试策略: 内置了强大的重试机制(Retry Policy),如 ExponentialBackoffRetry(指数退避重试)。当出现连接丢失、操作失败等情况时,Curator 会自动根据策略进行重试,极大地简化了错误处理。重试策略:

      • ExponentialBackoffRetry:指数退避重试(推荐)。
      • RetryNTimes:重试固定次数。
      • RetryUntilStopped:一直重试直到停止。
    • 会话管理: 自动处理会话过期后的恢复逻辑,透明地重建临时节点和重新注册 Watcher。

  • 简化的 API 和 Fluent 风格:Curator 使用了现代化的链式调用(Fluent API)设计模式,使得代码更简洁、可读性更高,相比原生客户端冗长的方法签名,Curator 提供了更直观的接口。

  • 高级 Watcher 封装(缓存机制):Curator 解决了原生客户端 Watcher“一次性触发”的痛点,提供了持久化和缓存功能。

    • 持久化 Watcher(Persistent Watcher): Curator 提供了高级接口,这些接口会自动管理 Watcher 的注册和重新注册。
    • 事件缓存: *Cache 等组件不仅监听事件,还会缓存节点数据,使得应用程序能够及时获取最新的节点状态,而无需每次都去 ZooKeeper 服务端拉取数据。
  • 内置的 Recipes(分布式协调组件):这是 Curator 最受欢迎的特性,它将常见的 ZooKeeper 使用场景抽象成了可以直接使用的组件。

    • 分布式锁(Distributed Locks):InterProcessMutex、InterProcessSemaphoreMutex、InterProcessReadWriteLock。
    • 领导选举(Leader Election): LeaderSelector 组件,允许多个客户端竞争领导权,并在当前领导者宕机时自动进行新的选举。
    • 分布式计数器(Distributed Counter): 提供了线程安全的分布式计数器实现。
    • 服务发现(Service Discovery): 提供了构建服务注册与发现系统的框架和工具。
    • 分布式队列/屏障: DistributedQueue、DistributedBarrier 等。
  • Zookeeper 2 风格的集成(Testing Server):Curator 提供了 TestingServer 和 TestingCluster 类,使得在单元测试和集成测试中启动一个嵌入式的 ZooKeeper 实例变得非常简单,极大地提高了开发和测试效率。

综上,Curator 是目前 Java 生态中最成熟、最稳定的 ZooKeeper 客户端,它将复杂的底层交互封装成了一套易于使用、且经过生产验证的 API 和组件。

Curator 核心组件

Curator 框架的核心设计围绕几个关键组件和接口展开,它们协同工作,提供了健壮且易于使用的 ZooKeeper 客户端功能。Curator 的核心组件主要有:

  • CuratorFramework (核心接口):CuratorFramework 是 Curator 框架的主要接口,代表了客户端与 ZooKeeper 集群的连接会话,是所有操作的起点。 它是与 ZooKeeper 服务端通信的门面(Facade),封装了连接管理、重试机制、会话状态监听以及数据操作的所有逻辑。通常使用 CuratorFrameworkFactory 的构建者模式来创建实例。

  • RetryPolicy (重试策略):RetryPolicy 是一个接口,定义了当 ZooKeeper 操作失败时(例如网络波动导致连接丢失)客户端应如何重试的规则,实现了自动化的错误恢复机制,避免开发者手动编写复杂的重试逻辑。

  • ConnectionStateListener (连接状态监听器):这个监听器用于监控 CuratorFramework 实例与 ZooKeeper 集群的连接状态。使用时应用程序可以对连接状态的变化做出反应,例如会话挂起、重新连接或完全丢失。关键状态有:

    • CONNECTED:首次成功连接或重新连接成功。
    • SUSPENDED:连接丢失,但会话可能仍然有效(在超时时间内)。
    • RECONNECTED:连接从挂起状态恢复。
    • LOST:会话超时,连接永久丢失(临时节点会被删除),需要重新建立一个新的 CuratorFramework 实例。
  • CuratorCache (原 TreeCache 等缓存组件):Curator 3.x 以后引入了 CuratorCache,整合了早期版本中的 NodeCache、PathChildrenCache 和 TreeCache 的功能。实现了本地缓存和持久化 Watcher 功能,在本地维护 ZNode 结构和数据的最新副本,并自动处理 Watcher 的重新注册,确保不错过任何事件。使用时可以直接从本地缓存获取数据,减少与 ZooKeeper 服务端的网络往返,同时简化了事件监听的复杂性。

  • Recipes (高级功能组件):Recipes 并非一个单一的接口或类,而是一系列预构建的高级协调组件,基于 ZooKeeper 原语(如临时有序节点)实现分布式系统的通用模式,为复杂的分布式协调任务提供了即插即用的解决方案。

Curator核心方法

客户端生命周期管理:

  • start(): 启动客户端实例并连接到 ZooKeeper 服务端。这是执行任何操作之前的必须步骤。
  • close(): 关闭连接,释放所有资源,并终止会话。
1
2
3
4
5
CuratorFramework client = CuratorFrameworkFactory
.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));
client.start();

client.close();

数据操作核心方法:

  • create():用于在 ZooKeeper 中创建 ZNode。可以通过链式方法指定节点的类型(持久、临时、有序)和数据。

    1
    2
    3
    4
    client.create()
    .withMode(CreateMode.EPHEMERAL) // 指定节点类型(如 CreateMode.EPHEMERAL 临时节点)。
    .compress(compressor) // 压缩数据(可选)。
    .forPath("/path/to/ephemeral/node", "data".getBytes()); //指定路径和要存储的数据,执行操作。
  • getData():开始获得ZNode节点数据的操作,可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode。

    1
    2
    3
    4
    byte[] data = client.getData()
    .storingStatsIn(stat) // 将节点的元数据存储到提供的 Stat 对象中(可选)。
    .watched() // 注册一个一次性的 Watcher 或者使用:usingWatcher(Watcher watcher),通常使用缓存替代
    .forPath("/path/to/node"); // 指定路径和新数据,执行操作。
  • setData():用于更新 ZNode 中的数据。可以通过指定版本号实现乐观锁,防止并发修改。

    1
    2
    3
    client.setData()
    .withVersion(-1) // 定期望的数据版本号(可选,默认为 -1,表示匹配任何版本)。
    .forPath("/path/to/node", "newData".getBytes()); // 指定路径和新数据,执行操作。
  • delete():用于删除 ZNode。可以指定版本号,也可以递归删除子节点。

    1
    2
    3
    4
    5
    client.delete()
    .withVersion(int version) // 指定版本号(可选)
    .guaranteed() // 保证删除成功。即使客户端连接丢失,Curator 也会在后台持续重试,直到节点被删除。
    .deletingChildrenIfNeeded() // 如果节点有子节点,则递归删除(原生客户端不支持)。
    .forPath("/path/to/node"); // 指定路径,执行操作。
  • getChildren():用于获取某个 ZNode 下的所有子节点名称列表。

    1
    List<String> children = client.getChildren().forPath("/path/to/parent");
  • checkExists():检查节点是否存在,返回 Stat 对象(存在)或 null(不存在)。

  • inTransaction():开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交

Curator事件监听

在 Curator 中,事件监听主要通过 CuratorCache 缓存组件实现,这些组件内部封装了对 ZooKeeper 原生 Watcher 的使用,并提供了更友好、更强大的事件模型,解决了Watcher一次性注册、触发即失效的问题。在Curator中事件监听的两种方式:

1
2
3
4
5
6
7
// 使用:CuratorListener
client.getCuratorListenable()
.addListener((curatorFramework, curatorEvent) -> System.out.println("事件: " + curatorEvent));

// 使用:Watcher
client.getChildren()
.usingWatcher((Watcher) watchedEvent -> System.out.println("监听: " + watchedEvent)).forPath("/cache");

CuratorListenable中CuratorEventType触发返回的数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
事件类型         事件返回数据
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
SYNC getResultCode(), getStat()
GET_ACL getResultCode(), getACLList()
SET_ACL getResultCode()
TRANSACTION getResultCode(), getOpResults()
WATCHED getWatchedEvent()
GET_CONFIG getResultCode(), getData()
RECONFIG getResultCode(), getData()

Curator提供了如下几个策略:

  • ExponentialBackoffRetry:重试一定次数,每次重试sleep更多的时间
  • RetryNTimes:重试N次
  • RetryOneTime:重试一次
  • RetryUntilElapsed:重试一定的时间

服务注册与发现 - curator-x-discovery

curator-x-discovery 是 Apache Curator 库的一个扩展组件,专门用于在分布式系统中实现服务注册与发现功能。它构建在 ZooKeeper 之上,提供了一套高级 API,简化了使用 ZooKeeper 进行服务管理的复杂性。

curator-x-discovery 核心功能:
服务注册 (Service Registration):服务注册是指将一个服务实例的信息(如服务名、地址、端口、健康状态等)发布到 Zookeeper 集群中。Curator-X-Discovery 利用 Zookeeper 的节点来存储这些信息,每个服务实例在注册时会创建一个临时节点(ephemeral node)。这些节点只要对应的服务实例还在运行,就会存在;当实例下线或不可用时,Zookeeper 会自动删除这些节点。服务注册信息通常包含 服务名称(例如 service_name)、服务地址(如 IP 或主机名)和其他元数据。使用方法:discovery.registerService(instance)。

服务发现 (Service Discovery):服务发现是指让客户端能够查询到已注册的服务实例的位置,从而进行调用。Curator-X-Discovery 使用 Zookeeper 中的目录结构(路径)来组织服务。例如,/services/ 下的每个节点代表一个服务实例。通过监听 Zookeeper 节点的变化来实现服务的动态发现。当一个服务实例被注册或移除时,客户端能够接收到通知,自动更新可用服务列表。discovery.queryForInstances(serviceName);

负载均衡策略 (Provider Strategies):Curator-X-Discovery 还可以与负载均衡机制配合使用,确保客户端能够均匀地分配请求到不同的服务实例。通过 ServiceInstance(服务实例)类,Curator 可以将多个服务节点的信息提供给客户端,客户端可以选择不同的服务实例进行负载均衡。内置的负载均衡策略:

  • RandomStrategy
  • RoundRobinStrategy
  • StickyStrategy

健康检查 (Health Checking):为了确保服务的健康状态,Curator-X-Discovery 可以通过 Zookeeper 节点的 TTL(生存时间)机制来实现健康检查。同时,通过设置心跳机制或定期检查服务实例的健康状态,确保服务实例是可用的。当服务失效时,相关的临时节点会被清除,系统会自动剔除不可用的服务。

使用示例

服务注册与发现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ZkRegistry implements Registry {

private final ServiceDiscovery<ServiceMeta> discovery;

public ZkRegistry(RegistryProperties properties) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(properties.getRegisterAddress(),
new ExponentialBackoffRetry(properties.getSleepTime(), properties.getMaxRetries()));
client.start();

JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
this.discovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)
.client(client)
.serializer(serializer)
.basePath("/" + properties.getNamespace())
.build();
this.discovery.start();
}

@Override
public void register(ServiceMeta serviceMeta) throws Exception {
ServiceInstance<ServiceMeta> instance = ServiceInstance.<ServiceMeta>builder()
.name(buildNamespace(serviceMeta.getGroup(), serviceMeta.getApplication(),
serviceMeta.getVersion()))
.address(serviceMeta.getHost())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
discovery.registerService(instance);
}

@Override
public void unregister(ServiceMeta serviceMeta) throws Exception {
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance
.<ServiceMeta>builder()
.name(buildNamespace(serviceMeta.getGroup(), serviceMeta.getApplication(),
serviceMeta.getVersion()))
.address(serviceMeta.getHost())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
discovery.unregisterService(serviceInstance);
}

@Override
public ServiceMeta lookup(String group, String serviceName,
String version, int hashCode) throws Exception {

Collection<ServiceInstance<ServiceMeta>> instances = discovery
.queryForInstances(buildNamespace(group, serviceName, version));

ServiceInstance<ServiceMeta> instance = new ConsistentHashLoadBalancer()
.select((List<ServiceInstance<ServiceMeta>>) instances, hashCode);

if (instance != null) {
return instance.getPayload();
}
return null;
}

private String buildNamespace(String group, String application, String version) {
return group + "#" + application + "#" + version;
}
}

负载均衡策略:这里自定义实现ConsistentHash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ConsistentHashLoadBalancer implements LoadBalancer<ServiceInstance<ServiceMeta>> {

private final static String VIRTUAL_NODE_SPLIT = "#";
private final static int VIRTUAL_NODE_SIZE = 10;

@Override
public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> instances,
int hashCode) {
if (instances == null || instances.isEmpty()) {
return null;
}
TreeMap<Integer, ServiceInstance<ServiceMeta>> circle = buildConsistentHashRing(instances);
return allocateNode(circle, hashCode);
}

private ServiceInstance<ServiceMeta> allocateNode(
TreeMap<Integer, ServiceInstance<ServiceMeta>> circle, int hashCode) {
Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = circle.ceilingEntry(hashCode);
if (entry == null) {
entry = circle.firstEntry();
}
return entry.getValue();
}

private TreeMap<Integer, ServiceInstance<ServiceMeta>> buildConsistentHashRing(
List<ServiceInstance<ServiceMeta>> servers) {
TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();
for (ServiceInstance<ServiceMeta> instance : servers) {
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
ring.put((buildServiceInstanceKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);
}
}
return ring;
}

private String buildServiceInstanceKey(ServiceInstance<ServiceMeta> instance) {
ServiceMeta payload = instance.getPayload();
return String.join(":", payload.getHost(), String.valueOf(payload.getPort()));
}
}

上面更标准的做法是实现 curator-x-discovery内置 ProviderStrategy 接口:

1
2
3
4
5
6
7
public class ConsistentHashStrategy<T> implements ProviderStrategy<T> {
@Override
public ServiceInstance<T> getInstance(InstanceProvider<T> instance) throws Exception {
// 这里实现
return null;
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
RegistryProperties properties = new RegistryProperties();
properties.setNamespace("test");
properties.setMaxRetries(5);
properties.setSleepTime(1000);
properties.setRegisterAddress("127.0.0.1:2181");

ZkRegistry registry = new ZkRegistry(properties);

for (int i = 1; i < 11; i++) {
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setApplication("order-service");
serviceMeta.setGroup("A");
serviceMeta.setVersion("1.0");
serviceMeta.setHost("192.168.0." + i);
serviceMeta.setPort(8080);
registry.register(serviceMeta);
}

ServiceMeta meta = registry.lookup("A", "order-service", "1.0", 2126277029);
System.out.println(meta.getApplication() + "#" + meta.getHost() + ":" + meta.getPort());
ServiceMeta meta1 = registry.lookup("A", "order-service", "1.0", 113466687);
System.out.println(meta1.getApplication() + "#" + meta1.getHost() + ":" + meta1.getPort());
}

小结:
Curator-X-Discovery 提供了一种基于 Zookeeper 的服务注册与发现机制,简化了与 Zookeeper 的交互。它的工作原理是通过 Zookeeper 的临时节点、目录结构和监听机制来实现服务的动态注册、发现以及健康检查。通过使用 Curator 库,开发者可以更容易地实现一个高可用、动态可扩展的分布式服务发现系统。

分布式锁

Apache Curator 提供的分布式锁实现主要依赖于 ZooKeeper 的 临时顺序节点(Ephemeral Sequential Nodes) 特性,其核心原理是利用 ZooKeeper 保证数据一致性、强顺序性和高可用的特点,模拟出一个分布式环境下的“公平锁”。以下两个特性是实现分布式锁的关键点:

  • 临时节点 (Ephemeral): 锁持有者与 ZooKeeper 的会话断开后,该节点会自动删除,从而避免死锁。
  • 顺序节点 (Sequential): 每次创建的子节点都会有一个全局唯一的、单调递增的序号,这保证了锁的公平性(先到先得)。

Curator 提供了如下几种类型的分布式锁:

  • InterProcessMutex:可重入的排他锁(最常用)
  • InterProcessSemaphoreMutex:不可重入的排他锁
  • InterProcessReadWriteLock:读写锁(包含读锁和写锁)
  • InterProcessMultiLock:组合多个锁为一个逻辑锁

加锁过程

  • 创建锁根节点: Curator 会指定一个持久节点作为所有锁的父节点(例如 /locks/mylock),这个节点需要预先存在。
  • 创建临时顺序子节点: 客户端 A 在父节点下创建一个子节点,例如 /locks/mylock/lock-0000000001。
  • 获取子节点列表: 客户端 A 读取父节点 /locks/mylock 下的所有当前子节点列表,并按序号升序排序。
  • 判断是否为最小节点: 客户端 A 检查自己创建的节点是否是当前列表中序号最小的那个节点。
    • 如果是: 说明客户端 A 成功获取到锁。它可以执行临界区代码。
    • 如果不是: 说明前面有其他客户端正在等待或持有锁。客户端 A 不能获取锁。
  • 监听前一个节点: 客户端 A 找到列表中位于它前面的那个节点(例如 /locks/mylock/lock-0000000000),并给这个节点设置一个 ZooKeeper Watcher(监听器),然后进入等待状态。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public interface DistributeLock extends AutoCloseable{
boolean tryLock(long timeout, TimeUnit unit) throws Exception;
}

public class ZkDistributeLock implements DistributeLock {

private final AtomicBoolean locked = new AtomicBoolean(false);
private final InterProcessMutex lock;

public ZkDistributeLock(CuratorProperties properties) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(properties.getAddress())
.connectionTimeoutMs(properties.getConnectionTimeout())
.sessionTimeoutMs(properties.getSessionTimeout())
.retryPolicy(
new ExponentialBackoffRetry(properties.getSleepTime(), properties.getMaxRetries()))
.build();
client.start();
this.lock = new InterProcessMutex(client, properties.getBasePath() + "/inter-mutex");
}

@Override
public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
if (timeout > 0) {
boolean acquired = lock.acquire(timeout, unit);
if (acquired) {
locked.compareAndSet(false, true);
}
} else {
lock.acquire();
locked.compareAndSet(false, true);
}
return locked.get();
}

@Override
public void releaseLock() throws Exception {
if (locked.compareAndSet(true, false)) {
lock.release();
}
}

@Override
public void close() throws Exception {
releaseLock();
}

public boolean isLocked() {
return locked.get();
}
}

客户端使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ZkLockExample {
public static void main(String[] args) throws Exception {
CuratorProperties properties = new CuratorProperties();
properties.setBasePath("/locks");
properties.setAddress("127.0.0.1:2181");
properties.setSessionTimeout(10000);
properties.setConnectionTimeout(1000);
properties.setMaxRetries(3);
properties.setSleepTime(3000);
// try-with-resource 自动 release
try (ZkDistributeLock lock = new ZkDistributeLock(properties)) {
lock.tryLock(3000, TimeUnit.MILLISECONDS);
}

// 手动release
ZkDistributeLock lock = new ZkDistributeLock(properties);
try {
lock.tryLock(3000, TimeUnit.MILLISECONDS);
} finally {
lock.releaseLock();
}
}
}

Curator 提供的分布式锁实现基于 ZooKeeper 的临时顺序节点,通过监听节点删除事件实现锁的获取与释放,保证了分布式环境下的可靠性和公平性。不过,需要注意的是,ZooKeeper 的性能限制可能影响系统的扩展性,尤其是在高并发场景下。


分布式中件间-zookeeper使用
http://example.com/2025/08/02/分布式中件间-zookeeper使用/
作者
ares
发布于
2025年8月2日
许可协议