Java 中的 Reactor 模型是一种基于事件驱动的设计模式,常用于处理并发 I/O 操作。它特别适用于高并发、网络服务开发(如 Web 服务器、数据库连接池、消息队列等),是现代 Java NIO(非阻塞 I/O)编程的核心设计思想之一。Reactor 模型的核心思想是:将 I/O 事件注册到一个事件循环中,由事件循环监听这些事件并分发给相应的处理器进行处理。
这种模型可以高效地管理大量并发连接,而不需要为每个连接创建一个线程,从而避免了传统多线程模型中线程切换和资源消耗的问题。
Reactor模型组成:
Reactor:负责监听和分发事件。通常使用 Selector来监控多个通道(Channel)上的 I/O 事件(如读、写)。
Acceptor:当监听到客户端连接请求时,由 Acceptor 接收连接,并将其注册到 Reactor 中。
Handler:处理具体的 I/O 事件。每个 Channel 对应一个 Handler,负责处理该 Channel 上发生的事件(如连接建立、数据读取、写入等)。
1 2 3 4 5 6 7 8 9 10
| Client --> ServerSocketChannel (OP_ACCEPT) | v Reactor (Selector) | +-----------v--------------+ | | Acceptor(SocketChannel ) ... (多个连接) | Handler (读/写)
|
单Reactor单线程模式
所有操作都在一个线程中完成:事件监听、事件分发、业务处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey key = serverSocket.register(selector, OP_ACCEPT); key.attach(new Acceptor()); }
public void run() { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { dispatch(iterator.next()); iterator.remove(); } } } void dispatch(SelectionKey key) { Object handler = key.attachment(); if (handler instanceof Runnable) { ((Runnable) handler).run(); } }
class Acceptor implements Runnable { public void run() { try { SocketChannel clientSocket = serverSocket.accept(); if (clientSocket != null){ new Handler(clientSocket, selector); } } catch (IOException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6
| public interface ChannelHandler extends Runnable {
void handleRead(SelectionKey key) throws IOException;
void handleWrite(SelectionKey key) throws IOException; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class EchoServerHandler implements ChannelHandler { @Override public void run() { try { if (state == ChannelState.READING) { handleRead(selectionKey); } else if (state == ChannelState.SENDING) { handleWrite(selectionKey); } } catch (IOException e) { log.error("read or write error", e); } }
@Override public void handleRead(SelectionKey selectionKey) throws IOException { }
@Override public void handleWrite(SelectionKey selectionKey) throws IOException { } }
|
单Reactor多线程模型实现
Reactor 作用就是要迅速的触发 Handler ,在单线程 Reactor 中,Handler 与 Reactor 处于同一线程,Handler 进行业务处理的过程会导致 Reactor 变慢。根据上面分而治之的优化思想,可以将业务处理过程从 Reactor 线程中拆出来,到单独的 Handler 线程池中处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class EchoServerHandler implements ChannelHandler { @Override public void run() { try { if (state == ChannelState.READING) { executor.submit(()->handleRead(selectionKey)); } else if (state == ChannelState.SENDING) { executor.submit(()->handleWrite(selectionKey)); } } catch (IOException e) { log.error("read or write error", e); } }
@Override public void handleRead(SelectionKey selectionKey) throws IOException { }
@Override public void handleWrite(SelectionKey selectionKey) throws IOException { } }
|
主从Reactor模型
单 Reactor 多线程的情况下,Handler 线程池中业务可能处理很快,大部分的时间都花在 Reactor 线程处理 I/O 上,导致 CPU 闲置,降低了响应速度。主从Reactor模型主要就是把IO连接事事件处理交给主Reactor,从Reactor处理实际的 I/O(OP_READ、OP_WRITE)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public abstract class Reactor implements Runnable { private final Logger logger = LoggerFactory.getLogger(Reactor.class); protected final Selector selector;
public Reactor() throws IOException { selector = Selector.open(); }
@Override public void run() { while (true) { try { if (selector.select() == 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (!selectionKey.isValid()) { continue; } try { dispatch(selectionKey); } catch (IOException e) { logger.error("Error handling key: " + selectionKey, e); selectionKey.cancel(); try { selectionKey.channel().close(); } catch (IOException ex) { logger.error("Error closing channel", ex); } } } } catch (Exception e) { logger.error("selector error", e); } } }
protected abstract void dispatch(SelectionKey selectionKey) throws IOException; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MainReactor extends Reactor {
private final ServerSocketChannel serverSocketChannel; private final ServerBootstrapAcceptor acceptor;
@Override protected void dispatch(SelectionKey key) { if (key.isAcceptable()) { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { acceptor.handleAccept(socketChannel); } } catch (IOException e) { log.error("get socket channel error", e); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class SubReactor extends Reactor { @Override protected synchronized void dispatch(SelectionKey selectionKey) throws IOException { executor.execute(() -> { try { if (!selectionKey.isValid()) { return; } ChannelHandler handler = (ChannelHandler) selectionKey.attachment(); if (selectionKey.isReadable()) { handler.handleRead(selectionKey); } else if (selectionKey.isWritable()) { handler.handleWrite(selectionKey); } } catch (IOException e) { log.error("Handler processing error", e); try { selectionKey.cancel(); selectionKey.channel().close(); } catch (IOException ex) { log.error("Error closing channel", ex); } } }); } }
|
1 2 3 4 5 6 7 8 9 10 11
| public class EchoServerHandler implements ChannelHandler { @Override public void handleRead(SelectionKey selectionKey) throws IOException { }
@Override public void handleWrite(SelectionKey selectionKey) throws IOException { } }
|