Zookeeper 是一个开源的分布式协调框架,基于ZAB协议来确保在分布式环境下的数据一致性和可靠性,实现了一个高可用的、小型的、树形结构(类似文件系统)的数据存储。通常用于分布式系统中的配置管理、同步服务、命名服务等,Zookeeper 主要用于以下几种场景:
- 分布式锁:通过 Zookeeper 提供的节点机制,可以实现分布式环境中的锁机制。
- 配置管理:Zookeeper 用作分布式系统的配置中心,客户端可以从 Zookeeper 获取共享的配置信息。
- 命名服务:Zookeeper 可以作为一个高效的命名服务,提供唯一的命名空间。
- 集群管理:Zookeeper 可以用来管理分布式系统中节点的健康状况和成员变更。
zookeeper客户端
在 Java 生态中,有多种 ZooKeeper 客户端可供选择。主要包括 ZooKeeper 官方原生客户端、以及两个流行的第三方开源客户端 ZkClient 和 Apache Curator。目前在生产环境中最被推荐和广泛使用的是 Apache Curator。
| 客户端 |
优点 |
缺点 |
| 官方原生客户端:Zookeeper |
官方支持,最基础的 API |
API 复杂、功能简单,需要手动处理连接丢失、Watcher(观察者)一次性注册等问题,不推荐在生产环境直接使用 |
| ZkClient |
对原生 API 进行了封装,提供了更简洁的 API |
社区不活跃,文档不完善,异常处理简化(抛出 RuntimeException),重试机制较难用 |
| Apache Curator |
简化了 ZooKeeper 的复杂性,提供了高级 API、连接管理、重试机制、各种分布式场景的抽象封装(如分布式锁、领导选举) |
学习曲线相对复杂(相比 ZkClient) |
官方原生客户端:Zookeeper
ZooKeeper 的官方原生 Java 客户端库org.apache.zookeeper虽然功能基础,但提供了所有与 ZooKeeper 服务端交互的核心接口和类,实现了建立、管理连接和执行所有数据操作。下面是ZookKeeper的其中一个构造方法:
1 2
| public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher){}
|
- connectString: 集群地址列表(如 “host1:2181,host2:2181”)。
- sessionTimeout: 会话超时时间(毫秒),ZooKeeper 依赖心跳来维持会话,超时后会话失效,临时节点将被删除。
- watcher: 默认的全局 Watcher,用于处理连接状态变化等事件。
ZooKeeper增删改的核心方法如下:
create:用于创建节点,可以指定节点路径、节点数据、节点的访问权限、节点类型
delete:删除节点,每个节点都有一个版本,删除时可指定删除的版本,类似乐观锁。设置-1,则就直接删除节点。
exists:节点存不存在,若存在返回节点Stat信息,否则返回null。
getChildren:获取子节点。
getData/setData:获取/设置节点数据。
getACL/setACL:获取节点访问权限列表,每个节点都可以设置访问权限,指定只有特定的客户端才能访问和操作节点。ACL说明:
- Ids.CREATOR_ALL_ACL:只有创建节点的客户端才有所有权限
- Ids.OPEN_ACL_UNSAFE:这是一个完全开放的权限,所有客户端都有权限
- Ids.READ_ACL_UNSAFE:所有客户端只有读取的
close:关闭连接并终止会话。
Watcher 接口(事件监听):org.apache.zookeeper.Watcher 是处理 ZooKeeper 客户端事件的核心接口。ZooKeeper 的设计原则是“一次性通知”(One-time trigger),即一个 Watcher 只能监听一次事件,事件触发后就需要重新注册。
1 2
| void process(WatchedEvent event);
|
在使用时可以通过实现 process(WatchedEvent event) 来做一些自定义处理逻辑与Watcher重新注册。WatchedEvent 包含事件的所有信息:
- EventType: 事件类型(如 NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged)。
- KeeperState: 客户端连接状态(如 SyncConnected, Disconnected, Expired)。
- getPath(): 发生事件的 ZNode 路径。
Stat元数据类:org.apache.zookeeper.data.Stat 类用于存储 ZNode 的所有元数据(Metadata),例如版本号、创建/修改时间等。在调用 getData() 或 exists() 方法时,该对象会被填充。重要的属性包括:
- czxid: 创建 ZNode 的事务 ID。
- mzxid: 最后修改 ZNode 数据的事务 ID。
- version: 数据的版本号。
- cversion: 子节点列表的版本号。
- dataLength: 数据长度。
AsyncCallback 接口(异步操作回调):原生客户端支持同步(Synchronous)和异步(Asynchronous)两种 API。当使用异步方法时(方法名通常以 Async 结尾),需要实现 AsyncCallback 接口来处理操作完成后的结果。常见的子接口:
- DataCallback: 用于 getData 异步操作。
- StatCallback: 用于 exists 异步操作。
- VoidCallback: 用于 delete 异步操作。
- StringCallback: 用于 create 异步操作。
- ChildrenCallback: 用于 getChildren 异步操作。
基于原生客户端实现同步队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public class Sync implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(Sync.class); protected final ZooKeeper zookeeper; protected final String root; protected static final Object mutex = new Object();
Sync(String addr, String root) throws IOException { this.zookeeper = new ZooKeeper(addr, 3000, this); this.root = root; }
@Override synchronized public void process(WatchedEvent watchedEvent) { logger.info(watchedEvent.toString()); synchronized (mutex) { mutex.notify(); } }
static public class Queue extends Sync {
Queue(String address, String root) throws IOException, InterruptedException, KeeperException { super(address, root); if (zookeeper == null) { throw new RuntimeException("zookeeper is null"); }
Stat stat = zookeeper.exists(root, false); if (stat == null) { zookeeper.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }
boolean produce(int i) throws InterruptedException, KeeperException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zookeeper.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; }
int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null;
while (true) { synchronized (mutex) { List<String> list = zookeeper.getChildren(root, true); if (list.isEmpty()) { logger.info("consumer going to wait"); mutex.wait(); } else { int min = Integer.parseInt(list.get(0).substring(7)); String minNode = list.get(0); for (String s : list) { int tempValue = Integer.parseInt(s.substring(7)); if (tempValue < min) { min = tempValue; minNode = s; } } logger.info("Temporary value: {}/{}", root, minNode); byte[] b = zookeeper.getData(root + "/" + minNode, false, stat); zookeeper.delete(root + "/" + minNode, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } }
|
原生客户端的使用流程通常如下:
- 创建 ZooKeeper 实例,传入全局 Watcher。
- 等待连接建立(SyncConnected 状态)。
- 执行操作(create, getData 等),并根据需要注册临时的 Watcher。
- 处理 KeeperException 异常。
- 处理 Watcher 事件(并在需要时重新注册 Watcher)。
- 调用 close() 结束会话。
由于需要手动管理连接状态和 Watcher 的重复注册,原生客户端使用起来相对繁琐,这也是 Apache Curator 等第三方客户端流行的原因。
Apache Curator5.7
Apache Curator 是 Netflix 开源并贡献给 Apache 基金会的 ZooKeeper 客户端框架,它极大地简化了 ZooKeeper 的使用,提供了许多强大的特性来解决原生客户端的痛点。使用原生Zookeeper客户端时存在着如下几个问题:
- 连接丢失(ConnectionLossException)后不会自动重连。
- 会话过期(Session Expired)后,需要手动关闭旧客户端、重新创建新客户端、并恢复所有瞬时(EPHEMERAL)节点和 Watcher。
- 开发者必须手动编写复杂的重试逻辑。
- Watcher 是“一次性”的(One-time trigger)。事件触发后立即失效,如果需要持续监听,必须在处理完事件后立即重新注册 Watcher。这个过程容易出错,且在并发场景下可能导致遗漏事件。
- 原生 API 方法参数多,使用复杂,代码冗长。例如,create(path, data, acl, mode) 需要同时指定 ACL 和模式,不够直观。
- 原生客户端只提供最基本的原语(如创建节点、删除节点),实现分布式锁、领导选举等高级功能需要开发者自己设计复杂的算法,并手动处理各种竞争条件和边界情况。
Curator 框架解决了原生客户端的使用问题,其核心特性可以概括为以下几点:
健壮的连接管理和重试机制:
连接状态监听: Curator 提供了一个 ConnectionStateListener,可以方便地监听连接状态的变化(CONNECTED, SUSPENDED, RECONNECTED, LOST)。
自动重连与重试策略: 内置了强大的重试机制(Retry Policy),如 ExponentialBackoffRetry(指数退避重试)。当出现连接丢失、操作失败等情况时,Curator 会自动根据策略进行重试,极大地简化了错误处理。重试策略:
- ExponentialBackoffRetry:指数退避重试(推荐)。
- RetryNTimes:重试固定次数。
- RetryUntilStopped:一直重试直到停止。
会话管理: 自动处理会话过期后的恢复逻辑,透明地重建临时节点和重新注册 Watcher。
简化的 API 和 Fluent 风格:Curator 使用了现代化的链式调用(Fluent API)设计模式,使得代码更简洁、可读性更高,相比原生客户端冗长的方法签名,Curator 提供了更直观的接口。
高级 Watcher 封装(缓存机制):Curator 解决了原生客户端 Watcher“一次性触发”的痛点,提供了持久化和缓存功能。
- 持久化 Watcher(Persistent Watcher): Curator 提供了高级接口,这些接口会自动管理 Watcher 的注册和重新注册。
- 事件缓存: *Cache 等组件不仅监听事件,还会缓存节点数据,使得应用程序能够及时获取最新的节点状态,而无需每次都去 ZooKeeper 服务端拉取数据。
内置的 Recipes(分布式协调组件):这是 Curator 最受欢迎的特性,它将常见的 ZooKeeper 使用场景抽象成了可以直接使用的组件。
- 分布式锁(Distributed Locks):InterProcessMutex、InterProcessSemaphoreMutex、InterProcessReadWriteLock。
- 领导选举(Leader Election): LeaderSelector 组件,允许多个客户端竞争领导权,并在当前领导者宕机时自动进行新的选举。
- 分布式计数器(Distributed Counter): 提供了线程安全的分布式计数器实现。
- 服务发现(Service Discovery): 提供了构建服务注册与发现系统的框架和工具。
- 分布式队列/屏障: DistributedQueue、DistributedBarrier 等。
Zookeeper 2 风格的集成(Testing Server):Curator 提供了 TestingServer 和 TestingCluster 类,使得在单元测试和集成测试中启动一个嵌入式的 ZooKeeper 实例变得非常简单,极大地提高了开发和测试效率。
综上,Curator 是目前 Java 生态中最成熟、最稳定的 ZooKeeper 客户端,它将复杂的底层交互封装成了一套易于使用、且经过生产验证的 API 和组件。
Curator 核心组件
Curator 框架的核心设计围绕几个关键组件和接口展开,它们协同工作,提供了健壮且易于使用的 ZooKeeper 客户端功能。Curator 的核心组件主要有:
CuratorFramework (核心接口):CuratorFramework 是 Curator 框架的主要接口,代表了客户端与 ZooKeeper 集群的连接会话,是所有操作的起点。 它是与 ZooKeeper 服务端通信的门面(Facade),封装了连接管理、重试机制、会话状态监听以及数据操作的所有逻辑。通常使用 CuratorFrameworkFactory 的构建者模式来创建实例。
RetryPolicy (重试策略):RetryPolicy 是一个接口,定义了当 ZooKeeper 操作失败时(例如网络波动导致连接丢失)客户端应如何重试的规则,实现了自动化的错误恢复机制,避免开发者手动编写复杂的重试逻辑。
ConnectionStateListener (连接状态监听器):这个监听器用于监控 CuratorFramework 实例与 ZooKeeper 集群的连接状态。使用时应用程序可以对连接状态的变化做出反应,例如会话挂起、重新连接或完全丢失。关键状态有:
- CONNECTED:首次成功连接或重新连接成功。
- SUSPENDED:连接丢失,但会话可能仍然有效(在超时时间内)。
- RECONNECTED:连接从挂起状态恢复。
- LOST:会话超时,连接永久丢失(临时节点会被删除),需要重新建立一个新的 CuratorFramework 实例。
CuratorCache (原 TreeCache 等缓存组件):Curator 3.x 以后引入了 CuratorCache,整合了早期版本中的 NodeCache、PathChildrenCache 和 TreeCache 的功能。实现了本地缓存和持久化 Watcher 功能,在本地维护 ZNode 结构和数据的最新副本,并自动处理 Watcher 的重新注册,确保不错过任何事件。使用时可以直接从本地缓存获取数据,减少与 ZooKeeper 服务端的网络往返,同时简化了事件监听的复杂性。
Recipes (高级功能组件):Recipes 并非一个单一的接口或类,而是一系列预构建的高级协调组件,基于 ZooKeeper 原语(如临时有序节点)实现分布式系统的通用模式,为复杂的分布式协调任务提供了即插即用的解决方案。
Curator核心方法
客户端生命周期管理:
- start(): 启动客户端实例并连接到 ZooKeeper 服务端。这是执行任何操作之前的必须步骤。
- close(): 关闭连接,释放所有资源,并终止会话。
1 2 3 4 5
| CuratorFramework client = CuratorFrameworkFactory .newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3)); client.start();
client.close();
|
数据操作核心方法:
create():用于在 ZooKeeper 中创建 ZNode。可以通过链式方法指定节点的类型(持久、临时、有序)和数据。
1 2 3 4
| client.create() .withMode(CreateMode.EPHEMERAL) .compress(compressor) .forPath("/path/to/ephemeral/node", "data".getBytes());
|
getData():开始获得ZNode节点数据的操作,可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode。
1 2 3 4
| byte[] data = client.getData() .storingStatsIn(stat) .watched() .forPath("/path/to/node");
|
setData():用于更新 ZNode 中的数据。可以通过指定版本号实现乐观锁,防止并发修改。
1 2 3
| client.setData() .withVersion(-1) .forPath("/path/to/node", "newData".getBytes());
|
delete():用于删除 ZNode。可以指定版本号,也可以递归删除子节点。
1 2 3 4 5
| client.delete() .withVersion(int version) .guaranteed() .deletingChildrenIfNeeded() .forPath("/path/to/node");
|
getChildren():用于获取某个 ZNode 下的所有子节点名称列表。
1
| List<String> children = client.getChildren().forPath("/path/to/parent");
|
checkExists():检查节点是否存在,返回 Stat 对象(存在)或 null(不存在)。
inTransaction():开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交
Curator事件监听
在 Curator 中,事件监听主要通过 CuratorCache 缓存组件实现,这些组件内部封装了对 ZooKeeper 原生 Watcher 的使用,并提供了更友好、更强大的事件模型,解决了Watcher一次性注册、触发即失效的问题。在Curator中事件监听的两种方式:
1 2 3 4 5 6 7
| client.getCuratorListenable() .addListener((curatorFramework, curatorEvent) -> System.out.println("事件: " + curatorEvent));
client.getChildren() .usingWatcher((Watcher) watchedEvent -> System.out.println("监听: " + watchedEvent)).forPath("/cache");
|
CuratorListenable中CuratorEventType触发返回的数据如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 事件类型 事件返回数据 CREATE getResultCode() and getPath() DELETE getResultCode() and getPath() EXISTS getResultCode(), getPath() and getStat() GET_DATA getResultCode(), getPath(), getStat() and getData() SET_DATA getResultCode(), getPath() and getStat() CHILDREN getResultCode(), getPath(), getStat(), getChildren() SYNC getResultCode(), getStat() GET_ACL getResultCode(), getACLList() SET_ACL getResultCode() TRANSACTION getResultCode(), getOpResults() WATCHED getWatchedEvent() GET_CONFIG getResultCode(), getData() RECONFIG getResultCode(), getData()
|
Curator提供了如下几个策略:
- ExponentialBackoffRetry:重试一定次数,每次重试sleep更多的时间
- RetryNTimes:重试N次
- RetryOneTime:重试一次
- RetryUntilElapsed:重试一定的时间
服务注册与发现 - curator-x-discovery
curator-x-discovery 是 Apache Curator 库的一个扩展组件,专门用于在分布式系统中实现服务注册与发现功能。它构建在 ZooKeeper 之上,提供了一套高级 API,简化了使用 ZooKeeper 进行服务管理的复杂性。
curator-x-discovery 核心功能:
服务注册 (Service Registration):服务注册是指将一个服务实例的信息(如服务名、地址、端口、健康状态等)发布到 Zookeeper 集群中。Curator-X-Discovery 利用 Zookeeper 的节点来存储这些信息,每个服务实例在注册时会创建一个临时节点(ephemeral node)。这些节点只要对应的服务实例还在运行,就会存在;当实例下线或不可用时,Zookeeper 会自动删除这些节点。服务注册信息通常包含 服务名称(例如 service_name)、服务地址(如 IP 或主机名)和其他元数据。使用方法:discovery.registerService(instance)。
服务发现 (Service Discovery):服务发现是指让客户端能够查询到已注册的服务实例的位置,从而进行调用。Curator-X-Discovery 使用 Zookeeper 中的目录结构(路径)来组织服务。例如,/services/ 下的每个节点代表一个服务实例。通过监听 Zookeeper 节点的变化来实现服务的动态发现。当一个服务实例被注册或移除时,客户端能够接收到通知,自动更新可用服务列表。discovery.queryForInstances(serviceName);
负载均衡策略 (Provider Strategies):Curator-X-Discovery 还可以与负载均衡机制配合使用,确保客户端能够均匀地分配请求到不同的服务实例。通过 ServiceInstance(服务实例)类,Curator 可以将多个服务节点的信息提供给客户端,客户端可以选择不同的服务实例进行负载均衡。内置的负载均衡策略:
- RandomStrategy
- RoundRobinStrategy
- StickyStrategy
健康检查 (Health Checking):为了确保服务的健康状态,Curator-X-Discovery 可以通过 Zookeeper 节点的 TTL(生存时间)机制来实现健康检查。同时,通过设置心跳机制或定期检查服务实例的健康状态,确保服务实例是可用的。当服务失效时,相关的临时节点会被清除,系统会自动剔除不可用的服务。
使用示例
服务注册与发现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class ZkRegistry implements Registry {
private final ServiceDiscovery<ServiceMeta> discovery;
public ZkRegistry(RegistryProperties properties) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(properties.getRegisterAddress(), new ExponentialBackoffRetry(properties.getSleepTime(), properties.getMaxRetries())); client.start();
JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class); this.discovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class) .client(client) .serializer(serializer) .basePath("/" + properties.getNamespace()) .build(); this.discovery.start(); }
@Override public void register(ServiceMeta serviceMeta) throws Exception { ServiceInstance<ServiceMeta> instance = ServiceInstance.<ServiceMeta>builder() .name(buildNamespace(serviceMeta.getGroup(), serviceMeta.getApplication(), serviceMeta.getVersion())) .address(serviceMeta.getHost()) .port(serviceMeta.getPort()) .payload(serviceMeta) .build(); discovery.registerService(instance); }
@Override public void unregister(ServiceMeta serviceMeta) throws Exception { ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance .<ServiceMeta>builder() .name(buildNamespace(serviceMeta.getGroup(), serviceMeta.getApplication(), serviceMeta.getVersion())) .address(serviceMeta.getHost()) .port(serviceMeta.getPort()) .payload(serviceMeta) .build(); discovery.unregisterService(serviceInstance); }
@Override public ServiceMeta lookup(String group, String serviceName, String version, int hashCode) throws Exception {
Collection<ServiceInstance<ServiceMeta>> instances = discovery .queryForInstances(buildNamespace(group, serviceName, version));
ServiceInstance<ServiceMeta> instance = new ConsistentHashLoadBalancer() .select((List<ServiceInstance<ServiceMeta>>) instances, hashCode);
if (instance != null) { return instance.getPayload(); } return null; }
private String buildNamespace(String group, String application, String version) { return group + "#" + application + "#" + version; } }
|
负载均衡策略:这里自定义实现ConsistentHash
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class ConsistentHashLoadBalancer implements LoadBalancer<ServiceInstance<ServiceMeta>> {
private final static String VIRTUAL_NODE_SPLIT = "#"; private final static int VIRTUAL_NODE_SIZE = 10;
@Override public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> instances, int hashCode) { if (instances == null || instances.isEmpty()) { return null; } TreeMap<Integer, ServiceInstance<ServiceMeta>> circle = buildConsistentHashRing(instances); return allocateNode(circle, hashCode); }
private ServiceInstance<ServiceMeta> allocateNode( TreeMap<Integer, ServiceInstance<ServiceMeta>> circle, int hashCode) { Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = circle.ceilingEntry(hashCode); if (entry == null) { entry = circle.firstEntry(); } return entry.getValue(); }
private TreeMap<Integer, ServiceInstance<ServiceMeta>> buildConsistentHashRing( List<ServiceInstance<ServiceMeta>> servers) { TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>(); for (ServiceInstance<ServiceMeta> instance : servers) { for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) { ring.put((buildServiceInstanceKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance); } } return ring; }
private String buildServiceInstanceKey(ServiceInstance<ServiceMeta> instance) { ServiceMeta payload = instance.getPayload(); return String.join(":", payload.getHost(), String.valueOf(payload.getPort())); } }
|
上面更标准的做法是实现 curator-x-discovery内置 ProviderStrategy 接口:
1 2 3 4 5 6 7
| public class ConsistentHashStrategy<T> implements ProviderStrategy<T> { @Override public ServiceInstance<T> getInstance(InstanceProvider<T> instance) throws Exception { return null; } }
|
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { RegistryProperties properties = new RegistryProperties(); properties.setNamespace("test"); properties.setMaxRetries(5); properties.setSleepTime(1000); properties.setRegisterAddress("127.0.0.1:2181");
ZkRegistry registry = new ZkRegistry(properties);
for (int i = 1; i < 11; i++) { ServiceMeta serviceMeta = new ServiceMeta(); serviceMeta.setApplication("order-service"); serviceMeta.setGroup("A"); serviceMeta.setVersion("1.0"); serviceMeta.setHost("192.168.0." + i); serviceMeta.setPort(8080); registry.register(serviceMeta); }
ServiceMeta meta = registry.lookup("A", "order-service", "1.0", 2126277029); System.out.println(meta.getApplication() + "#" + meta.getHost() + ":" + meta.getPort()); ServiceMeta meta1 = registry.lookup("A", "order-service", "1.0", 113466687); System.out.println(meta1.getApplication() + "#" + meta1.getHost() + ":" + meta1.getPort()); }
|
小结:
Curator-X-Discovery 提供了一种基于 Zookeeper 的服务注册与发现机制,简化了与 Zookeeper 的交互。它的工作原理是通过 Zookeeper 的临时节点、目录结构和监听机制来实现服务的动态注册、发现以及健康检查。通过使用 Curator 库,开发者可以更容易地实现一个高可用、动态可扩展的分布式服务发现系统。
分布式锁
Apache Curator 提供的分布式锁实现主要依赖于 ZooKeeper 的 临时顺序节点(Ephemeral Sequential Nodes) 特性,其核心原理是利用 ZooKeeper 保证数据一致性、强顺序性和高可用的特点,模拟出一个分布式环境下的“公平锁”。以下两个特性是实现分布式锁的关键点:
- 临时节点 (Ephemeral): 锁持有者与 ZooKeeper 的会话断开后,该节点会自动删除,从而避免死锁。
- 顺序节点 (Sequential): 每次创建的子节点都会有一个全局唯一的、单调递增的序号,这保证了锁的公平性(先到先得)。
Curator 提供了如下几种类型的分布式锁:
- InterProcessMutex:可重入的排他锁(最常用)
- InterProcessSemaphoreMutex:不可重入的排他锁
- InterProcessReadWriteLock:读写锁(包含读锁和写锁)
- InterProcessMultiLock:组合多个锁为一个逻辑锁
加锁过程
- 创建锁根节点: Curator 会指定一个持久节点作为所有锁的父节点(例如 /locks/mylock),这个节点需要预先存在。
- 创建临时顺序子节点: 客户端 A 在父节点下创建一个子节点,例如 /locks/mylock/lock-0000000001。
- 获取子节点列表: 客户端 A 读取父节点 /locks/mylock 下的所有当前子节点列表,并按序号升序排序。
- 判断是否为最小节点: 客户端 A 检查自己创建的节点是否是当前列表中序号最小的那个节点。
- 如果是: 说明客户端 A 成功获取到锁。它可以执行临界区代码。
- 如果不是: 说明前面有其他客户端正在等待或持有锁。客户端 A 不能获取锁。
- 监听前一个节点: 客户端 A 找到列表中位于它前面的那个节点(例如 /locks/mylock/lock-0000000000),并给这个节点设置一个 ZooKeeper Watcher(监听器),然后进入等待状态。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public interface DistributeLock extends AutoCloseable{ boolean tryLock(long timeout, TimeUnit unit) throws Exception; }
public class ZkDistributeLock implements DistributeLock {
private final AtomicBoolean locked = new AtomicBoolean(false); private final InterProcessMutex lock;
public ZkDistributeLock(CuratorProperties properties) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(properties.getAddress()) .connectionTimeoutMs(properties.getConnectionTimeout()) .sessionTimeoutMs(properties.getSessionTimeout()) .retryPolicy( new ExponentialBackoffRetry(properties.getSleepTime(), properties.getMaxRetries())) .build(); client.start(); this.lock = new InterProcessMutex(client, properties.getBasePath() + "/inter-mutex"); }
@Override public boolean tryLock(long timeout, TimeUnit unit) throws Exception { if (timeout > 0) { boolean acquired = lock.acquire(timeout, unit); if (acquired) { locked.compareAndSet(false, true); } } else { lock.acquire(); locked.compareAndSet(false, true); } return locked.get(); }
@Override public void releaseLock() throws Exception { if (locked.compareAndSet(true, false)) { lock.release(); } }
@Override public void close() throws Exception { releaseLock(); }
public boolean isLocked() { return locked.get(); } }
|
客户端使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class ZkLockExample { public static void main(String[] args) throws Exception { CuratorProperties properties = new CuratorProperties(); properties.setBasePath("/locks"); properties.setAddress("127.0.0.1:2181"); properties.setSessionTimeout(10000); properties.setConnectionTimeout(1000); properties.setMaxRetries(3); properties.setSleepTime(3000); try (ZkDistributeLock lock = new ZkDistributeLock(properties)) { lock.tryLock(3000, TimeUnit.MILLISECONDS); }
ZkDistributeLock lock = new ZkDistributeLock(properties); try { lock.tryLock(3000, TimeUnit.MILLISECONDS); } finally { lock.releaseLock(); } } }
|
Curator 提供的分布式锁实现基于 ZooKeeper 的临时顺序节点,通过监听节点删除事件实现锁的获取与释放,保证了分布式环境下的可靠性和公平性。不过,需要注意的是,ZooKeeper 的性能限制可能影响系统的扩展性,尤其是在高并发场景下。