基于LinkedTransferQueue实现连接池

概述

连接池是一种用空间换时间、并确保资源可控性的重要设计模式。 它将昂贵的“连接创建”操作转变为廉价的“连接借用”操作,是构建高性能、高可用的数据库驱动应用不可或缺的基础设施。

  • 创建连接开销巨大: 建立一个数据库连接是一个昂贵且耗时的操作。它涉及到网络通信(三次握手)、数据库服务器的认证、权限验证等一系列步骤。这个过程可能需要几百毫秒甚至几秒钟的时间。
  • 连接池复用连接: 连接池在应用程序启动时预先创建一定数量的连接,并将它们存储在一个池中。当应用程序需要访问数据库时,它不是创建一个新连接,而是从池中借用一个已有的、空闲的连接。操作完成后,将连接归还到池中,而不是关闭它。
  • 消除创建/销毁的延迟: 通过复用连接,应用程序消除了每次请求时的连接创建和销毁开销,从而大大缩短了数据库操作的响应时间,提高了系统的整体吞吐量(Throughput)。

常见框架

Apache Commons Pool 2:
Apache Commons Pool 2 是一个通用的、高度可配置的对象池框架。它的实现原理基于一套清晰的接口和协作组件,旨在提供灵活且健壮的对象生命周期管理。 核心原理是通过将对象池 (GenericObjectPool) 的通用逻辑与特定对象的生命周期管理 (PooledObjectFactory) 解耦,实现了一个高度灵活、可配置且功能全面的通用对象池框架。它使用同步机制和后台维护线程来确保在高并发环境下的正确性和稳定性。

HikariCP:
HikariCP(简称 Hikari)之所以能成为目前 Java 生态中最快、最受欢迎的连接池,其核心实现原理在于极致的性能优化和精巧的设计。它避免了其他连接池中常见的性能瓶颈,主要通过以下几个方面实现:

  • 核心数据结构——ConcurrentBag。Hikari 没有使用传统的 BlockingQueue 或其他标准并发集合来存储连接,而是实现了一个名为 ConcurrentBag(并发背包)的自定义并发集合,这是其高性能的关键之一。ConcurrentBag 旨在最大程度地减少线程争用(contention),它内部使用了一些无锁(lock-free)技术(如 CAS 操作)来管理连接的分配和回收,避免了昂贵的锁操作。ConcurrentBag 内部利用 ThreadLocal 存储了一个小的、线程私有的连接缓存。当一个线程请求连接时,它首先会检查自己的 ThreadLocal 缓存。如果能直接从缓存中获取,就可以完全避免跨线程同步的开销,实现极快的连接获取速度。只有在私有缓存为空时,才会涉及到底层的共享池同步操作。

  • 连接代理:
    当应用程序从 Hikari 中获取连接时,它实际上并没有拿到物理数据库连接本身,而是拿到了一个实现了 Connection 接口的代理对象 (ConnectionProxy)。这个代理对象拦截了应用程序调用的 close() 方法。当应用调用 close() 时,代理并不会真正关闭底层的物理连接,而是将连接的状态重置(清理事务状态、恢复默认配置等)后,将其归还到连接池的 ConcurrentBag 中,并将连接标记为可用状态。代理负责管理连接在“空闲(IDLE)”和“使用中(IN_USE)”之间的状态转换。

  • 轻量级连接验证:
    许多传统的连接池在每次借出连接前都会执行一个 connection test query(例如 SELECT 1)来验证连接的有效性,这会带来额外的网络延迟和数据库负载。

    • 惰性验证: Hikari 采取了更智能的验证策略,它默认只在连接空闲超过 500 毫秒时进行验证。如果连接一直处于被频繁借出和归还的状态(高负载场景),它会跳过验证步骤,极大地提高了高并发下的性能。
    • 快速失败: Hikari 依赖于 JDBC 驱动本身的快速失败机制。它假定连接在大多数时候是健康的,只有在真正发生网络错误或数据库故障时才处理异常,而不是主动去探测连接的健康状况。
  • 异步连接创建与维护 (Housekeeper)
    Hikari 使用一个名为 Housekeeper(管家)的后台线程来异步管理连接池的生命周期。

    • 连接维护: Housekeeper 定期运行(默认每 30 秒),负责执行以下任务:
      • 检查并关闭空闲时间过长(超过 idleTimeout)的连接,以缩减连接池规模,如果当前连接数大于 minimumIdle。
      • 检查并关闭生存时间过长,超过 maxLifetime)的连接,确保连接定期刷新,避免数据库或中间件的强制断开。
      • 在需要时异步创建新连接,以维持 minimumIdle 要求的空闲连接数,避免在高峰期同步创建连接导致延迟。
    • 泄露检测: Housekeeper 还会检查连接是否发生泄露,即借出后长时间未归还,并在配置了 leakDetectionThreshold 时记录警告日志。

LinkedTransferQueue

LinkedTransferQueue 是 Java util.concurrent 包下提供的一个高性能的并发队列实现,它实现了 TransferQueue 接口。它的设计目标是在生产者和消费者之间实现高效的直接切换(handoff)。

1
2
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {}

主要特性 (Characteristics)

  • 无界(Unbounded): 理论上,LinkedTransferQueue 是一个无界队列。只要内存允许,你可以向其中添加任意多的元素。
  • 高性能的无锁(Lock-Free)实现: 它基于 CAS(Compare-And-Swap)操作实现了一个高效的无锁数据结构。在大多数并发场景下,它的性能通常优于基于锁的阻塞队列(如 LinkedBlockingQueue)。
  • 直接传递(Handoff)机制: 这是它最显著的特性。当一个生产者线程调用 transfer() 方法时,它会阻塞,直到一个消费者线程准备好接收该元素,然后元素会直接从生产者传递给消费者,省去了元素在队列中排队的过程。
  • 公平性(Fairness): LinkedTransferQueue 保证了等待的生产者和消费者线程的公平性。等待时间最长的线程将首先被匹配和唤醒。
  • 实现了 TransferQueue 接口: 它继承了 AbstractQueue 并实现了 TransferQueue 接口,提供了比标准 BlockingQueue 更丰富的同步方法。

LinkedTransferQueue 提供了标准队列操作(如 put, offer, take, poll)以及特有的 transfer 操作:

  • 核心 Transfer 方法 (直接传递):

    • V transfer(V e) throws InterruptedException:尝试立即将元素 e 传递给一个等待接收的消费者。阻塞,如果当前没有消费者在等待接收元素,则当前调用线程会阻塞,直到有一个消费者出现并接收了该元素。
    • boolean tryTransfer(V e):尝试立即将元素 e 传递给一个等待接收的消费者。非阻塞/立即返回,如果有等待的消费者,立即传递并返回 true。如果没有等待的消费者,则立即返回 false,元素不会被放入队列。
    • boolean tryTransfer(V e, long timeout, TimeUnit unit) throws InterruptedException:尝试在指定的超时时间内将元素 e 传递给一个消费者。超时阻塞,如果在超时时间内成功匹配消费者,则返回 true;如果超时仍未匹配到消费者,则返回 false(元素不会进入队列)。
  • 标准 Queue/BlockingQueue 方法 (排队/阻塞)

    • void put(V e): 将指定的元素插入队列的尾部。非阻塞,由于队列是无界的,此方法不会阻塞,它总是立即返回。
    • boolean offer(V e): 相当于 put(e),用于将元素添加到队列。非阻塞,总是返回 true。
    • V take() throws InterruptedException: 检索并移除此队列的头部元素。阻塞,如果队列当前为空,则当前线程会阻塞,直到有元素可用。
    • V poll(long timeout, TimeUnit unit) throws InterruptedException: 检索并移除队列的头部,使用超时等待。超时阻塞,如果在指定时间内队列仍然为空,则返回 null。
    • V poll(): 检索并移除队列的头部。非阻塞,如果队列为空,立即返回 null。

LinkedTransferQueue 非常适合实现高性能的“任务分发中心”或连接池等场景,其中生产者希望在有消费者准备好处理数据时立即将数据移交给消费者,最大限度地减少延迟和排队时间。

实现

使用 LinkedTransferQueue (LTQ) 来实现对象池是一个高性能的选择,尤其是在高并发场景下。LTQ 的核心优势在于其 transfer() 和 tryTransfer() 方法,它们支持“直接交接”,当一个线程归还对象时(生产者),它可以尝试 tryTransfer(),如果此时有另一个线程正在 poll() 或 take() 等待获取对象(消费者),LTQ 会将该对象直接从归还线程交给等待线程,而无需将对象放入队列中再取出。这极大地减少了锁竞争和上下文切换,提高了吞吐量。

核心设计思路

  • PooledFactory<T> 接口: 定义如何创建、销毁和验证池中对象。这是池化框架(如 Apache Commons Pool)的标准实践。
    PooledProperties 类: 存放配置,例如最大池大小 maxSize 和获取超时 acquireTimeout等。

  • TransferQueuePool<T> : 核心数据结构,用于存放空闲的对象。

  • tryAcquire() (获取对象):

    • 首先尝试 pool.poll() 非阻塞地获取一个空闲对象。
    • 如果获取到,验证(validate)对象。如果有效,返回;如果无效,销毁(destroy)对象,总数(totalSize)减一,然后重试。
    • 如果没有空闲对象,检查 total。 如果 totalSize < maxSize,尝试使用 AtomicInteger.compareAndSet() 增加 total 并创建一个新对象(factory.create())。 如果 totalSize >= maxSize(池已满),则调用 pool.poll(timeout, unit) 阻塞等待,直到有其他线程归还对象。
  • release(T obj) (归还对象):

    • 首先验证对象。如果无效,直接销毁,total 减一,不归还到池中。
    • 核心: 调用 pool.tryTransfer(obj)。 如果 tryTransfer 返回 true,意味着对象已成功“直接交接”给一个正在等待的线程。如果返回 false(没有线程在等待),则调用 idleObjects.offer(obj) 将对象放入空闲队列中。

代码实现

PooledFactory:通用工厂接口

1
2
3
4
5
6
7
8
public interface PooledFactory<T> {

T create() throws Exception;

void destroy(T obj);

boolean validate(T obj);
}

PooledProperties:配置类

1
2
3
4
5
6
7
8
9
10
public class PooledProperties {

int maximumPoolSize = 10;
int minimumIdle = 5;
long idleInterval = 60L;
long idleTimeout = 3000L;
long maxLifetime = 60 * 1000L;
long acquireTimeout = 5000L;
TimeUnit acquireTimeunit = TimeUnit.MILLISECONDS;
}

PooledWrapper:资源封装类

1
2
3
4
5
6
7
8
9
10
11
12
public class PooledWrapper<T> {

private final T resource;
private final long creationTime;
private final long lastAccessTime;

PooledWrapper(T resource) {
this.resource = resource;
this.creationTime = System.currentTimeMillis();
this.lastAccessTime = this.creationTime;
}
}

TransferQueuePool:具体实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
public class TransferQueuePool<T> implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(TransferQueuePool.class);
private final PooledProperties properties;
private final LinkedTransferQueue<PooledWrapper<T>> pool;
private final PooledFactory<T> factory;
private final ScheduledExecutorService scheduler;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicInteger total = new AtomicInteger(0);
private final AtomicInteger idle = new AtomicInteger(0);

public TransferQueuePool(PooledProperties properties, PooledFactory<T> factory) throws Exception {
this.properties = properties;
this.factory = factory;
this.pool = new LinkedTransferQueue<>();
this.scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("transfer-pool-scheduler-" + t.getId());
return t;
});
((ScheduledThreadPoolExecutor) this.scheduler).setRemoveOnCancelPolicy(true);
initializePool();
startCheckTask();
}

private void initializePool() throws Exception {
for (int i = 0; i < properties.getMinimumIdle(); i++) {
createPoolWrapper();
}
}

private void createPoolWrapper() throws Exception {
logger.info("create pool wrapper......................");
T wrapper = factory.create();
PooledWrapper<T> pooledWrapper = new PooledWrapper<>(wrapper);
pool.offer(pooledWrapper);
total.incrementAndGet();
idle.incrementAndGet();
}

private void startCheckTask() {
scheduler.scheduleAtFixedRate(
this::checkIdleTask,
properties.getIdleInterval(),
properties.getIdleInterval(),
TimeUnit.SECONDS);
}

private void checkIdleTask() {
long now = System.currentTimeMillis();
logger.info("check idle task,runtime:{}", now);
try {
pool.removeIf(wrapper -> {
boolean removed = false;
// 检查是否过期
if (now - wrapper.getCreationTime() > properties
.getMaxLifetime()) {
removed = true;
} else if (!factory.validate(wrapper.getResource())) {
removed = true;
}

if (removed) {
factory.destroy(wrapper.getResource());
total.decrementAndGet();
idle.decrementAndGet();
}
return removed;
});

// 确保维持最小连接数
while (total.get() < properties.getMinimumIdle()) {
createPoolWrapper();
}
} catch (Exception e) {
logger.error("check idle task error", e);
}
}

public T tryAcquire() throws Exception {
return tryAcquire(properties.getAcquireTimeout(), TimeUnit.SECONDS);
}

public T tryAcquire(long timeout, TimeUnit unit) throws Exception {
if (closed.get()) {
throw new IllegalStateException("pool is closed!!!");
}
long deadline = System.nanoTime() + unit.toNanos(timeout);
while (true) {
long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
throw new TimeoutException("Timeout waiting for object from pool.");
}

PooledWrapper<T> wrapper = pool.poll();
if (wrapper != null) {
if (factory.validate(wrapper.getResource())) {
idle.decrementAndGet();
return wrapper.getResource();
} else {
factory.destroy(wrapper.getResource());
total.decrementAndGet();
}
}
// 队列为空,检查是否可以创建新对象
if (total.get() < properties.getMaximumPoolSize()) {
synchronized (pool) {
if (total.get() < properties.getMaximumPoolSize()) {
createPoolWrapper();
wrapper = pool.poll();
if (wrapper != null) {
idle.decrementAndGet();
return wrapper.getResource();
}
}
}
}

// 队列已满,必须阻塞等待
wrapper = pool.poll(timeout, unit);
if (wrapper != null) {
idle.decrementAndGet();
return wrapper.getResource();
}
throw new TimeoutException("resource acquisition timed out");
}
}

public void release(T resource) {
logger.info("release resource {}", resource);
if (resource == null) {
return;
}

if (!closed.get() && factory.validate(resource)) {
PooledWrapper<T> wrapper = new PooledWrapper<>(resource);
if (!pool.tryTransfer(wrapper)) {
pool.offer(wrapper);
idle.incrementAndGet();
}
} else {
factory.destroy(resource);
total.decrementAndGet();
idle.decrementAndGet();
}
}

@Override
public void close() throws Exception {
logger.info("starting close pool..........................");
if (closed.compareAndSet(false, true)) {
scheduler.shutdown();
try {
// 如果等待超时则强制关闭
if (!scheduler.awaitTermination(properties.getAcquireTimeout(), TimeUnit.MILLISECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
PooledWrapper<T> wrapper;
while ((wrapper = pool.poll()) != null) {
factory.destroy(wrapper.getResource());
total.decrementAndGet();
idle.decrementAndGet();
}
}
}
}

使用

实现具体的工厂类——DbConnectionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DbConnectionFactory implements PooledFactory<MockDbConnection> {

@Override
public MockDbConnection create() throws Exception {
return new MockDbConnection();
}

@Override
public void destroy(MockDbConnection obj) {
if (obj != null) {
obj.close();
}
}

@Override
public boolean validate(MockDbConnection obj) {
// 在借出和归还时检查连接是否“存活”
return obj != null && !obj.isClosed();
}
}

实现具体的资源类——MockDbConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MockDbConnection {
private static final AtomicInteger idCounter = new AtomicInteger(0);
private final int id;
private boolean closed = false;

public MockDbConnection() {
this.id = idCounter.incrementAndGet();
System.out.println("创建连接 #" + id);
}

public void executeQuery(String query) {
if (closed) {
throw new RuntimeException("连接 #" + id + " 已关闭。");
}
System.out.println("线程 " + Thread.currentThread().getName() + " 使用连接 #" + id + " 执行查询: " + query);
// 模拟耗时
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public boolean isClosed() {
return closed;
}

public void close() {
this.closed = true;
System.out.println("销毁连接 #" + id);
}

@Override
public String toString() {
return "MockDbConnection[id=" + id + "]";
}

}

客户端使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MockDbExample {
private static final Logger logger = LoggerFactory.getLogger(MockDbExample.class);

public static void main(String[] args) throws Exception {
// 1.配置
PooledProperties properties = new PooledProperties();
properties.setMaximumPoolSize(10);
properties.setAcquireTimeout(2000);
properties.setAcquireTimeunit(TimeUnit.MILLISECONDS);

// 2.创建池
try (TransferQueuePool<MockDbConnection> pool = new TransferQueuePool<>(properties,
new DbConnectionFactory())) {

// 3. 创建线程池来模拟并发请求
int numTasks = 15;
ExecutorService executor = Executors.newFixedThreadPool(numTasks);

logger.info("--- 启动 %d 个并发任务,池大小为 %d ---\n", numTasks,
properties.getMaximumPoolSize());

for (int i = 0; i < numTasks; i++) {
final int taskId = i;
executor.submit(() -> {
MockDbConnection connection = null;
try {
// 4. 借用连接
logger.info("任务 " + taskId + " 尝试借用连接...");
connection = pool.tryAcquire();

// 5. 使用连接
connection.executeQuery("SELECT * FROM users WHERE id=" + taskId);

} catch (Exception e) {
logger.info("任务 " + taskId + " 获取连接失败: " + e.getMessage());
} finally {
if (connection != null) {
// 6. 归还连接
logger.info("任务 " + taskId + " 归还连接 " + connection);
pool.release(connection);
}
}
});
}

// 7. 关闭
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 超时则强制终止线程
}
}
}
}

总结

上面实现了一个简化的通用的池,利用了 LinkedTransferQueue 的高性能特性,同时通过 AtomicInteger 和 CAS 操作来管理池的边界,实现了一个健壮、高性能的通用对象池。


基于LinkedTransferQueue实现连接池
http://example.com/2025/07/19/java-juc-基于LinkedTransferQueue实现连接池/
作者
ares
发布于
2025年7月19日
许可协议