reactor模型原理与实现

Java 中的 Reactor 模型是一种基于事件驱动的设计模式,常用于处理并发 I/O 操作。它特别适用于高并发、网络服务开发(如 Web 服务器、数据库连接池、消息队列等),是现代 Java NIO(非阻塞 I/O)编程的核心设计思想之一。Reactor 模型的核心思想是:将 I/O 事件注册到一个事件循环中,由事件循环监听这些事件并分发给相应的处理器进行处理。
这种模型可以高效地管理大量并发连接,而不需要为每个连接创建一个线程,从而避免了传统多线程模型中线程切换和资源消耗的问题。

Reactor模型组成:
Reactor:负责监听和分发事件。通常使用 Selector来监控多个通道(Channel)上的 I/O 事件(如读、写)。
Acceptor:当监听到客户端连接请求时,由 Acceptor 接收连接,并将其注册到 Reactor 中。
Handler:处理具体的 I/O 事件。每个 Channel 对应一个 Handler,负责处理该 Channel 上发生的事件(如连接建立、数据读取、写入等)。

1
2
3
4
5
6
7
8
9
10
Client --> ServerSocketChannel (OP_ACCEPT)
|
v
Reactor (Selector)
|
+-----------v--------------+
| |
Acceptor(SocketChannel ) ... (多个连接)
|
Handler (读/写)

单Reactor单线程模式

所有操作都在一个线程中完成:事件监听、事件分发、业务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;

Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// 注册keys
SelectionKey key = serverSocket.register(selector, OP_ACCEPT);
key.attach(new Acceptor());
}

public void run() {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
dispatch(iterator.next());
iterator.remove();
}
}
}
// 事件分发: 将就绪通道的注册键关联的处理器取出并执行
void dispatch(SelectionKey key) {
Object handler = key.attachment();
if (handler instanceof Runnable) {
((Runnable) handler).run();
}
}

class Acceptor implements Runnable {
public void run() {
try {
SocketChannel clientSocket = serverSocket.accept();
if (clientSocket != null){
new Handler(clientSocket, selector);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
  • ChannelHandler接口
1
2
3
4
5
6
public interface ChannelHandler extends Runnable {

void handleRead(SelectionKey key) throws IOException;

void handleWrite(SelectionKey key) throws IOException;
}
  • EchoServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EchoServerHandler implements ChannelHandler {
@Override
public void run() {
try {
if (state == ChannelState.READING) {
handleRead(selectionKey);
} else if (state == ChannelState.SENDING) {
handleWrite(selectionKey);
}
} catch (IOException e) {
log.error("read or write error", e);
}
}

@Override
public void handleRead(SelectionKey selectionKey) throws IOException {
// read ....
}

@Override
public void handleWrite(SelectionKey selectionKey) throws IOException {
// write ....
}
}

单Reactor多线程模型实现

Reactor 作用就是要迅速的触发 Handler ,在单线程 Reactor 中,Handler 与 Reactor 处于同一线程,Handler 进行业务处理的过程会导致 Reactor 变慢。根据上面分而治之的优化思想,可以将业务处理过程从 Reactor 线程中拆出来,到单独的 Handler 线程池中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EchoServerHandler implements ChannelHandler {
@Override
public void run() {
try {
if (state == ChannelState.READING) {
executor.submit(()->handleRead(selectionKey));
} else if (state == ChannelState.SENDING) {
executor.submit(()->handleWrite(selectionKey));
}
} catch (IOException e) {
log.error("read or write error", e);
}
}

@Override
public void handleRead(SelectionKey selectionKey) throws IOException {
// read ....
}

@Override
public void handleWrite(SelectionKey selectionKey) throws IOException {
// write ....
}
}

主从Reactor模型

单 Reactor 多线程的情况下,Handler 线程池中业务可能处理很快,大部分的时间都花在 Reactor 线程处理 I/O 上,导致 CPU 闲置,降低了响应速度。主从Reactor模型主要就是把IO连接事事件处理交给主Reactor,从Reactor处理实际的 I/O(OP_READ、OP_WRITE)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public abstract class Reactor implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Reactor.class);
protected final Selector selector;

public Reactor() throws IOException {
selector = Selector.open();
}

@Override
public void run() {
while (true) {
try {
if (selector.select() == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 必须先移除,避免重复处理
iterator.remove();
if (!selectionKey.isValid()) {
continue;
}
try {
dispatch(selectionKey);
} catch (IOException e) {
logger.error("Error handling key: " + selectionKey, e);
selectionKey.cancel();
try {
selectionKey.channel().close();
} catch (IOException ex) {
logger.error("Error closing channel", ex);
}
}
}
} catch (Exception e) {
logger.error("selector error", e);
}
}
}

protected abstract void dispatch(SelectionKey selectionKey) throws IOException;
}
  • MainReactor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MainReactor extends Reactor {

private final ServerSocketChannel serverSocketChannel;
private final ServerBootstrapAcceptor acceptor;

@Override
protected void dispatch(SelectionKey key) {
if (key.isAcceptable()) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
acceptor.handleAccept(socketChannel);
}
} catch (IOException e) {
log.error("get socket channel error", e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SubReactor extends Reactor {
@Override
protected synchronized void dispatch(SelectionKey selectionKey) throws IOException {
executor.execute(() -> {
try {
if (!selectionKey.isValid()) {
return;
}
ChannelHandler handler = (ChannelHandler) selectionKey.attachment();
if (selectionKey.isReadable()) {
handler.handleRead(selectionKey);
} else if (selectionKey.isWritable()) {
handler.handleWrite(selectionKey);
}
} catch (IOException e) {
log.error("Handler processing error", e);
try {
selectionKey.cancel();
selectionKey.channel().close();
} catch (IOException ex) {
log.error("Error closing channel", ex);
}
}
});
}
}
  • EchoServerHandler
1
2
3
4
5
6
7
8
9
10
11
public class EchoServerHandler implements ChannelHandler {
@Override
public void handleRead(SelectionKey selectionKey) throws IOException {
// read ....
}

@Override
public void handleWrite(SelectionKey selectionKey) throws IOException {
// write ....
}
}

reactor模型原理与实现
http://example.com/2025/06/14/network-reactor模型原理与实现/
作者
ares
发布于
2025年6月14日
许可协议