Reactor单线程模型详解与实现


Reactor单线程模型详解

  • 目录
    • Reactor模式原理
    • Reactor实现
    • 总结

目录 Reactor模式原理 【Reactor单线程模型详解与实现】1.原理说明
1.Reactor模型是相对传统IO机构来说的,也就是NIO模型, NIO模型之所以可以优化,得益于 它是基于事件,基于异步,不像传统IO,是阻塞的,很难做架构上的改变 2.Reactor模型分为几个组件,分别是Reactor,Acceptor,Handler 3.Reactor组件负责分发事件,如果是连接,那么交给Acceptor,如果是读写事件,那么交给Handler 4.Acceptor负责处理连接事件(获取新连接,注册到Selector上,注册读写事件,绑定Handler) 5.Handler负责处理读写事件(使用channel进行读写)

Reactor实现
package com.example.nio.netty; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * 单Reactor单线程模型 */ class ReactorModel1 {private Reactor reactor; private Integer port; public ReactorModel1(Integer port) throws IOException {this.port = port; this.reactor=new Reactor(port); }public void start(){Thread thread = new Thread(this.reactor); thread.start(); }}/** * 组件Reactor(事件分发) */ class Reactor implements Runnable{/** * 端口 */ private Integer prot; /** * 用于接受连接的ServerChannel */ private ServerSocketChannel serverSocketChannel; /** * 管理连接的选择器 */ private Selector selector; public Reactor(Integer prot) throws IOException {this.prot = prot; this.serverSocketChannel = ServerSocketChannel.open(); this.selector = Selector.open(); //绑定端口 this.serverSocketChannel.bind(new InetSocketAddress(prot)); //设置为非阻塞 this.serverSocketChannel.configureBlocking(false); //注册到选择器上面 this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }@Override public void run() {while (true){try {int select = selector.select(); Set> selectionKeys = selector.selectedKeys(); Iterator> iterator = selectionKeys.iterator(); while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next(); //分发事件 dispatch(selectionKey); iterator.remove(); } } catch (IOException e) {e.printStackTrace(); } }}/** * 进行事件分发 * @param selectionKey */ private void dispatch(SelectionKey selectionKey){if (selectionKey.isAcceptable()) {//连接事件 new Acceptor(serverSocketChannel,selector).run(); } else{//读写事件 new Handler(selectionKey).run(); }}}/** * accepor(处理读写) */ class Acceptor implements Runnable{/** * 用于接受连接的ServerChannel */ private ServerSocketChannel serverSocketChannel; /** * 管理连接的选择器 */ private Selector selector; public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {this.serverSocketChannel = serverSocketChannel; this.selector = selector; }@Override public void run() {//serverSocketChannel的连接事件 SocketChannel accept = null; try {accept = this.serverSocketChannel.accept(); //设置为非阻塞 accept.configureBlocking(false); //注册到selector, 注册读写事件 accept.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE); System.out.println("新连接:"+accept.getRemoteAddress()); } catch (IOException e) {e.printStackTrace(); }} }class Handler implements Runnable{private SelectionKey selectionKey; public Handler(SelectionKey selectionKey) {this.selectionKey = selectionKey; }@Override public void run() {try {if(selectionKey.isReadable()){//读事件处理 this.read(); }else {//写事件处理 this.write(); } }catch (Exception e){}}/** * 处理读事件 * @throws IOException */ private void read() throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer allocate = ByteBuffer.allocate(1024); int read = channel.read(allocate); if(read>0) {System.out.println("接收到消息:" + new String(allocate.array(), 0,read)); } }/** * 处理写事件 * @throws IOException */ private void write() throws IOException {} }

总结 单Reactor单线程模式虽然在架构上区分了组件, Reactor负责事件分发,Acceptor负责处理接受新连接事件,Handle负责读写事件处理, 但是总体上来看,还是一个Reactor线程处理,很容易产生故障,而且一个线程处理也是有限的. 但是Reactor,Acceptor,Handle都实现了Runnable接口,单独抽离出来成为一个组件,我们可以基于线程池无限扩展,这还得得益于NIO是基于事件(一个Selector响应多个Channel的事件), 不像BIO是阻塞,无法单独抽离,也就扩展起来很难.

    推荐阅读