分布式中件间-基于zookeeper实现配置中心

概述

在微服务架构中,通常是将一个大型的单体应用拆解为多个独立的服务,这对于一个服务不同节点的配置管理上的新挑战,如果没有配置中心,这些挑战会显著增加系统的复杂性和维护成本。

  1. 配置分散,难以管理
    在单体应用中,所有配置都集中在一个配置文件(如 application.properties)中,易于管理。在微服务架构中,每个服务都有自己的配置,一个大型系统可能包含几十甚至上百个微服务,配置也随之分散。如果没有配置中心,开发和运维团队将需要手动管理每个服务的配置文件,这在多环境(开发、测试、生产)部署时尤其困难。
  2. 动态配置,无需重启
    在单体应用中,修改配置通常需要重新打包和部署整个应用,这在微服务架构中是不可接受的。配置中心支持配置的动态刷新,允许开发者在不重启微服务的情况下更新配置,例如调整日志级别、修改数据库连接、或启用/关闭某个功能。这对于快速响应和线上运维至关重要。
  3. 环境隔离,保持一致
    不同的部署环境(开发、测试、生产)需要不同的配置。例如,数据库连接、第三方API密钥等都不同。配置中心可以根据环境(如通过 application-dev.ymlapplication-prod.yml 等)存储和管理不同版本的配置,并确保每个服务在启动时都能加载正确的配置,避免因配置错误导致的环境问题。
  4. 安全管理,隔离敏感信息
    数据库密码、API密钥等敏感信息不应硬编码在代码中。配置中心可以安全地存储这些敏感信息,并提供加密和权限控制,确保只有授权的服务才能访问。
  5. 版本控制,方便回滚
    配置中心通常与Git等版本控制系统集成,可以对配置进行版本管理。这使得配置的变更历史清晰可查,如果新的配置出现问题,可以快速回滚到之前的版本,大大提高了系统的健壮性。
  6. 提升开发运维效率
    配置中心将配置从应用代码中分离,实现了配置与代码的解耦。这使得开发团队可以专注于业务逻辑的开发,而运维团队可以专注于配置的集中管理和动态调整,显著提升了工作效率。

总之:配置中心在微服务架构中扮演着“中央大脑”的角色,通过集中化、动态化、版本化的方式管理所有微服务的配置。它解决了传统静态配置文件带来的维护难题,提高了系统的灵活性、可维护性和安全性,是构建和管理大规模微服务系统的关键基础设施。

常见的实现

在选择配置中心时,需要综合考虑功能需求、高可用性、易用性、多语言支持和生态成熟度等因素。目前主流的配置中心包括 Apollo、Nacos、Consul 和 Spring Cloud Config。

Apollo

Apollo是携程开源的分布式配置管理中心,能够集中化管理应用在不同环境、不同集群的配置,并支持配置修改后实时推送到应用端。解决了微服务架构下配置管理分散、配置变更不及时、配置版本管理混乱、缺乏权限和审计等问题。Apollo支持不同维度的配置:

  • 应用(AppId): 唯一的应用程序标识。
  • 环境(Environment): 例如DEV、FAT、UAT、PRO等。
  • 集群(Cluster): 例如某个机房或特定的部署集群。
  • 命名空间(Namespace): 独立的配置管理单元,应用可以通过命名空间共享配置或拥有私有配置。

Apollo 架构是一个典型的分布式微服务架构,它由多个独立的服务组件构成,协同工作以提供完整的配置管理功能,这种设计确保了系统的高可用性、可扩展性和容错性:

  • Portal: 配置管理界面,供用户管理应用、环境、集群和命名空间。
  • Config Service: 提供配置读取、推送和管理接口。
  • Admin Service: 提供配置的增删改查接口,供Portal调用。
  • Meta Server: 负责服务发现,提供Config Service和Admin Service的地址信息。
  • Client: 应用嵌入的SDK,用于获取和监听配置。
  • Eureka: 用于服务注册和发现。

Apollo核心逻辑

Apollo通过长轮询(Long Polling)机制结合消息队列和本地缓存,实现了配置的实时更新和高可用性。整个流程可以分为配置发布(服务端)和客户端感知(客户端)两个部分。

  1. 服务端配置发布流程:当管理员在Apollo Portal发布配置时,会触发以下流程:
  • 发布配置: 管理员在Portal上修改并发布某个应用、环境和命名空间的配置。
  • Admin Service: Portal调用Admin Service的接口,将配置变更持久化到数据库中。
  • 发送消息: Admin Service在数据库中保存配置变更的同时,会发送一个ReleaseMessage(发布消息)到消息队列(通常是InMemory队列,但可以扩展为Kafka等)。
  • Config Service监听: Config Service会监听消息队列,当收到ReleaseMessage时,表示有新的配置发布。
  1. 应用服务配置感知与更新流程:应用服务在启动后,会以长轮询的方式与Config Service保持通信,等待配置更新。
  • 客户端长轮询

    • 发起长轮询: 客户端会异步地向Config Service的notifications/v2接口发起HTTP长轮询请求。
    • 服务暂挂: Config Service接收到客户端的请求后,并不会立即响应。如果此时没有配置更新,Config Service会利用异步Servlet(如Spring的DeferredResult)将该请求挂起,最长等待60秒。
  • 配置有更新:

    • 当Config Service从消息队列中接收到配置发布消息后,会通知所有被挂起的客户端请求。
    • 所有相关客户端的请求立即返回,响应体中包含更新通知。
  • 配置无更新:

    • 如果60秒超时,但期间没有任何配置更新,Config Service会返回一个空响应给客户端。
    • 客户端收到响应后,会立即发起下一次长轮询请求,继续等待。
  • 客户端拉取最新配置

    • 收到通知:客户端从长轮询请求中得知有配置更新后,会主动向Config Service的configfiles接口发起短连接请求,拉取最新的配置内容。
    • 更新本地缓存:客户端会将最新配置写入本地缓存文件,以防止网络或服务器故障,确保应用重启时能够恢复。
    • 通知应用:客户端的监听器(ConfigChangeListener)会被触发,将最新的配置通知到应用程序,从而实现配置热更新。

关键机制

  • 长轮询(Long Polling):客户端发起请求后,服务器端保持连接,直到有新数据或者超时。这种方式比传统短轮询更高效,延迟更低,但又不像WebSocket那样需要维持大量长连接。
  • 消息队列:服务端使用消息队列实现了配置发布和通知机制的解耦,使得Config Service可以及时收到配置变更通知,并通知客户端。
  • 本地缓存:客户端本地会缓存配置,保障了服务的可用性。即使Apollo服务器宕机,应用重启后仍能使用本地缓存的配置。
  • 异步Servlet:Config Service使用异步Servlet(如DeferredResult),在处理长轮询请求时不会阻塞线程,从而可以处理大量客户端的连接,提高系统并发能力。

Nacos

Nacos 配置中心的原理主要围绕 配置的拉取(pull)和动态推送(push) 机制展开,其核心是巧妙地利用 长轮询(Long Polling) 来高效地实现配置的实时更新。

Nacos核心逻辑

  1. 客户端启动拉取
    • 当微服务应用启动时,客户端会通过 HTTP 请求从 Nacos Server 拉取当前应用所需的配置信息,并加载到本地内存。
    • 这个过程通常在应用的 bootstrap 阶段完成,确保应用在启动之初就拥有正确的配置。
  2. 配置动态监听与推送
    • 为了实现配置的动态刷新,Nacos 采用了长轮询的机制,而不是传统的短轮询或 WebSocket。
    • 客户端:在获取配置后,客户端会开启一个后台线程,每隔一段时间(例如 30 秒)向 Nacos Server 发送一个特殊的 HTTP 长轮询请求,以询问配置是否有变更。
    • 服务端
      • 当客户端的请求到达服务端时,服务端并不会立即返回,而是将该请求“挂起”。
      • 服务端会持有一个监听器,用于监听数据库中的配置变更。
      • 如果配置在挂起期间发生了变更,服务端会立即响应客户端的请求,并告知客户端配置已更新。
      • 如果挂起时间超过设定的超时时间(例如 29.5 秒),服务端也会返回一个响应(可能是一个空响应),客户端收到响应后会立即发起下一个长轮询请求。
  3. 客户端配置更新
    • 客户端收到配置变更的通知后,会立即重新发起一次 HTTP 请求,从 Nacos Server 拉取最新的配置内容。
    • 客户端会根据新配置和旧配置的 MD5 值进行比对,以确保配置确实发生了变更。
    • 配置更新后,如果使用了 @RefreshScope 等注解,Spring 容器会刷新相关的 Bean,使得应用能够无需重启就应用新的配置。

关键机制

  • 长轮询(Long Polling)

    • 优点
      • 效率高:避免了短轮询带来的大量无效请求和网络开销。
      • 实时性好:当配置变更时,能立即通知客户端,不像短轮询那样有延迟。
    • 实现:Nacos 在客户端和服务端都维护了长轮询的逻辑。客户端发送请求,服务端挂起连接,直到配置变更或超时。这种模式既保障了实时性,又有效地利用了网络资源。
  • MD5 值比对

    • 作用:为了减少网络传输和处理开销,客户端在发起长轮询请求时,会将本地配置的 MD5 值发送给服务端。
    • 过程
      • 客户端发起长轮询请求时,携带 configIdmd5 等参数。
      • 服务端接收请求,将请求挂起。
      • 当配置发生变更时,服务端会对比新的 MD5 和客户端传入的 MD5,如果不同,就立即响应客户端。
      • 这种方式避免了每次都传输完整的配置内容,提高了效率。
  • 本地缓存

    • 作用:为了在网络故障或 Nacos Server 宕机时,客户端仍然能够正常运行。
    • 实现:客户端在成功获取配置后,会将其缓存到本地文件。如果下次启动时无法连接到 Nacos Server,它会加载本地缓存的配置。

基于zookeeper实现配置中心

使用Curator基于zookeeper实现配置中心的核心,是利用 ZooKeeper 的 ZNode 数据模型和 Watcher 事件通知机制,再通过 Curator 自身的 Cache 组件(如 NodeCache)对原生 Watcher 的“一次性”缺陷进行封装和优化,从而实现配置的持久化存储和客户端的自动、持续监听。

实现机制

ZNode 数据模型:ZooKeeper 提供了一个类似文件系统的树形结构(ZNode Tree),每个 ZNode(节点)可以存储一小块数据(通常小于1MB)。同时ZooKeeper集群可以保证数据的强一致性,所有写操作都会在集群的多数节点上达成一致后才返回成功。这确保了所有客户端读取到的配置数据是最新、一致的。在Znode中创建根目录/config持久化节点(PERSISTENT),然后在该路径下创建子节点来存储不同的配置项:

  • 配置项:/config/app/dev/database_url -> jdbc:mysql://localhost:3306/mydb。
  • snapshot作为快照:/config/application/dev/snapshot/releases_id,用来保存具体配置项快照。
  • releases_meta作为其他信息,如操作信息、历史版本、回滚版本等

Leader写入:利用ZooKeeper的Leader 选举机制,确保配置的写入操作只由集群中的一个实例(Leader)执行,从而避免了多个实例同时写入导致的冲突。

ZooKeeper Watcher 与配置变更通知:对一个 ZNode 设置一个 Watcher,当该 ZNode 的数据发生变更(NodeDataChanged)、被删除(NodeDeleted)或其子节点列表发生变化时,ZooKeeper 服务器会向设置 Watcher 的客户端发送一个事件通知。Curator

具体实现

  • CuratorClientFactory用于创建CuratorFramework
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

public class CuratorClientFactory {

public static CuratorFramework createCuratorClient(CuratorProperties properties) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(properties.getAddress())
.sessionTimeoutMs(properties.getSessionTimeout())
.connectionTimeoutMs(properties.getConnectionTimeout())
.retryPolicy(
new ExponentialBackoffRetry(properties.getSleepTime(), properties.getMaxRetries()));

if (properties.getAuthDigest() == null || properties.getAuthDigest().isBlank()) {
CuratorFramework client = builder.build();
client.start();
return client;
}

// authDigest 格式: "user:password"
builder = builder.authorization("digest", properties.getAuthDigest().getBytes());
// 提供 ACLProvider,让新创建的节点有 digest ACL(管理服务可以写)
builder = builder.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
List<ACL> acls = new ArrayList<>();
// grant all to the digest auth we provided
acls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.CREATOR_ALL_ACL.get(0).getId()));
// grant read to anyone (可改为更严格)
acls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
return acls;
}

@Override
public List<ACL> getAclForPath(String path) {
return getDefaultAcl();
}
});

CuratorFramework client = builder.build();
client.start();
return client;
}
}
  • LeaderService 用于应用服务Leader选举:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class LeaderService extends LeaderSelectorListenerAdapter implements AutoCloseable {

private final LeaderSelector selector;
private final AtomicBoolean isLeader = new AtomicBoolean(false);

public LeaderService(CuratorFramework client) throws Exception {
this.selector = new LeaderSelector(client, "/config/leader", this);
InetAddress address = InetAddress.getLocalHost();
String id = address.getHostAddress();
this.selector.setId(id);
// 参加选举后,当释放领导权,会自动重新加入
this.selector.autoRequeue();
this.selector.start();
}

@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// 线程获得领导权时会调用此方法,直到方法返回领导权才会交出
isLeader.set(true);
try {
// Block until we are interrupted (release leadership when interrupted)
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// lost leadership or closing
} finally {
isLeader.set(false);
}
}

public <T> T returnIfLeader(Supplier<T> supplier) throws Exception {
if (!checkLeader()) {
throw new RuntimeException("Leader is not leader");
}
return supplier.get();
}

public boolean checkLeader() {
return isLeader.get();
}


@Override
public void close() throws Exception {
selector.close();
}
}
  • ConfigService配置核心逻辑,实现了配置发布与回滚:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
public class ConfigService {

private static final Logger logger = LoggerFactory.getLogger(ConfigService.class);
private final CuratorFramework client;
private final ObjectMapper mapper;
private final SnowflakeIdGenerator idGenerator;
private final String configRootPath = "/config";

public ConfigService(CuratorFramework client, String applicationName) {
this.client = client;
String watchPath = configRootPath + "/" + applicationName;

CuratorCache cache = CuratorCache.build(client, watchPath);
CuratorCacheListener listener = CuratorCacheListener
.builder().forAll((type, oldData, newData) -> {
String path = newData != null ? newData.getPath()
: (oldData != null ? oldData.getPath() : "unknown");
logger.info("[CuratorCache] event={} path={}}", type, path);
}).build();
cache.listenable().addListener(listener);
cache.start();
this.mapper = new ObjectMapper();
this.idGenerator = new SnowflakeIdGenerator();
}

// 获取正式值(读取 cache first, fallback to zk)
public Optional<String> getRelease(String app, String env, String key) throws Exception {
String path = keyPath(app, env, key);
byte[] data = client.getData().forPath(path);
return Optional.ofNullable(data).map(b -> new String(b, StandardCharsets.UTF_8));
}

// 创建或更新正式值(由 leader 执行):先保存快照,再写新值,并在 releases_meta 记录 releaseId
public String putRelease(String app, String env, Map<String, String> kvs, String author)
throws Exception {
String releaseId = String.valueOf(idGenerator.nextId());
List<CuratorOp> ops = new ArrayList<>();

String releasesPath = String.format("%s/%s/%s/snapshot/%s", configRootPath, app, env,
releaseId);
ensurePathExists(releasesPath);

// 为每个 key 做两个操作:
// 1) 备份当前值到 snapshot/{releaseId}/{key}(如果存在)
// 2) setData 到正式路径(create if not exist)
for (Map.Entry<String, String> kv : kvs.entrySet()) {
String key = kv.getKey();
String newValue = kv.getValue();
String targetPath = keyPath(app, env, key);
String snapshotPath = releaseSnapshotPath(app, env, releaseId, key);
byte[] newBytes = newValue.getBytes(StandardCharsets.UTF_8);

// 创建 snapshot node: 获取原值(若不存在则写空)
byte[] old = null;
if (checkPathExists(targetPath)) {
old = client.getData().forPath(targetPath);
} else {
old = new byte[0];
}
byte[] snapshotBytes = old;

CuratorOp opCreateSnapshot = client.transactionOp()
.create()
.forPath(snapshotPath, snapshotBytes);
ops.add(opCreateSnapshot);

// 处理新配置
ensurePathExists(String.format("%s/%s/%s", configRootPath, app, env));
if (checkPathExists(targetPath)) {
CuratorOp setOp = client.transactionOp().setData().forPath(targetPath, newBytes);
ops.add(setOp);
} else {
// node 不存在 -> create op
CuratorOp createOp = client.transactionOp().create().withMode(CreateMode.PERSISTENT)
.forPath(targetPath, newBytes);
ops.add(createOp);
}
}

// 添加 release meta
String metaPath = releasesMetaPath(app, env, releaseId);
Map<String, Object> meta = new HashMap<>();
meta.put("id", releaseId);
meta.put("author", author);
meta.put("time", Instant.now().toString());
byte[] metaBytes = mapper.writeValueAsBytes(meta);

ensurePathExists(metaPath);
CuratorOp metaOp = client.transactionOp().setData().forPath(metaPath, metaBytes);
ops.add(metaOp);

// 执行事务 (atomic): create snapshots + setData/create targets + create meta
client.transaction().forOperations(ops.toArray(new CuratorOp[0]));
return releaseId;
}

// 回滚到某个 releaseId(将 releases/{releaseId} 快照写回正式路径)
public void rollbackToRelease(String app, String env, String releaseId, String author)
throws Exception {
String releaseBase = String.format("%s/%s/%s/snapshot/%s", configRootPath, app, env, releaseId);
List<String> keys = client.getChildren().forPath(releaseBase);

if (keys == null || keys.isEmpty()) {
throw new IllegalArgumentException("release snapshot empty");
}

List<CuratorOp> ops = new ArrayList<>();
for (String key : keys) {
String snapshotPath = releaseBase + "/" + key;
byte[] snap = client.getData().forPath(snapshotPath);
String targetPath = keyPath(app, env, key);
ensurePathExists(String.format("%s/%s/%s", configRootPath, app, env));
if (checkPathExists(targetPath)) {
CuratorOp setOp = client.transactionOp().setData().forPath(targetPath, snap);
ops.add(setOp);
} else {
CuratorOp createOp = client.transactionOp().create().forPath(targetPath, snap);
ops.add(createOp);
}
}

// add rollback meta entry
String rbId = "rollback-" + idGenerator.nextId();
String metaPath = releasesMetaPath(app, env, rbId);
Map<String, Object> meta = new HashMap<>();
meta.put("id", rbId);
meta.put("author", author);
meta.put("time", Instant.now().toString());
meta.put("rollbackFrom", releaseId);
byte[] metaBytes = mapper.writeValueAsBytes(meta);

ensurePathExists(metaPath);
CuratorOp metaOp = client.transactionOp().setData().forPath(metaPath, metaBytes);
ops.add(metaOp);

client.transaction().forOperations(ops.toArray(new CuratorOp[0]));
}

// path 存在返回 true
private boolean checkPathExists(String path) throws Exception {
return client.checkExists().forPath(path) != null;
}

private void ensurePathExists(String path) throws Exception {
if (!checkPathExists(path)) {
System.out.println("path = " + path);
client.create().creatingParentsIfNeeded().forPath(path, new byte[0]);
}
}

private String keyPath(String app, String env, String key) {
return String.format("%s/%s/%s/%s", configRootPath, app, env, key);
}

private String releaseSnapshotPath(String app, String env, String releaseId, String key) {
return String.format("%s/%s/%s/snapshot/%s/%s", configRootPath, app, env, releaseId, key);
}

private String releasesMetaPath(String app, String env, String releaseId) {
return String.format("%s/%s/%s/releases_meta/%s", configRootPath, app, env, releaseId);
}
}

总结

  • 配置存储(层次化 ZNode)

    • ZooKeeper 提供一个类似文件系统的树形结构命名空间,每个节点称为一个 ZNode。
    • 可以将配置项作为数据存储在特定的 ZNode 中。例如,可以为不同的应用、不同的环境和不同的配置项创建对应的 ZNode 路径,如 /config/application/dev/database.url。
    • 每个 ZNode 都可以存储少量数据(如配置值、JSON 或 Properties 文件内容),并且有元数据(如版本号、ACL 等)。
  • 客户端初始化与读取配置

    • 应用客户端在启动时,连接 ZooKeeper 集群。
    • 客户端根据应用的需要,读取相应的 ZNode 数据,并将配置加载到本地内存(如 Java 的 Properties 或其他配置管理对象)。
  • 动态更新(Watcher 机制)

    • Watcher是实现配置中心的关键机制。客户端在读取 ZNode 数据的同时,会向该 ZNode 注册一个 watcher 监听器。
    • 一旦该 ZNode 的数据发生变化(例如,配置被修改),ZooKeeper 服务器会主动通知注册了 watcher 的客户端。客户端收到通知后,会重新去 ZooKeeper 拉取最新的配置数据,并更新本地内存中的配置值,从而实现配置的动态刷新,无需重启应用。
  • 高可用性与一致性

    • ZooKeeper 通常以集群(Ensemble)模式部署,推荐使用奇数节点(如 3、5 个节点),只要集群中大多数节点(Quorum)正常工作,服务就可用,从而避免单点故障。
    • ZooKeeper 通过 ZAB(ZooKeeper Atomic Broadcast)协议保证数据的一致性,确保所有客户端在同一时刻看到的数据是一致的(强一致性)。

分布式中件间-基于zookeeper实现配置中心
http://example.com/2025/08/07/分布式中间件-基于zookeeper实现配置中心/
作者
ares
发布于
2025年8月7日
许可协议