多线程Reactor模式


目录

  • 1.1 主服务器
  • 2.1 IO请求handler+线程池
  • 3.1 客户端

【多线程Reactor模式】多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。
让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度
1.1 主服务器
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; class MultiThreadEchoServerReactor { ServerSocketChannel serverSocket; AtomicInteger next = new AtomicInteger(0); Selector bossSelector = null; Reactor bossReactor = null; //selectors集合,引入多个selector选择器 //多个选择器可以更好的提高通道的并发量 Selector[] workSelectors = new Selector[2]; //引入多个子反应器 //如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。 //每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量 Reactor[] workReactors = null; MultiThreadEchoServerReactor() throws IOException {bossSelector = Selector.open(); //初始化多个selector选择器 workSelectors[0] = Selector.open(); workSelectors[1] = Selector.open(); serverSocket = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); serverSocket.socket().bind(address); //非阻塞 serverSocket.configureBlocking(false); //第一个selector,负责监控新连接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新连接处理handler处理器到SelectionKey(选择键) sk.attach(new AcceptorHandler()); //处理新连接的反应器 bossReactor = new Reactor(bossSelector); //第一个子反应器,一子反应器负责一个选择器 Reactor subReactor1 = new Reactor(workSelectors[0]); //第二个子反应器,一子反应器负责一个选择器 Reactor subReactor2 = new Reactor(workSelectors[1]); workReactors = new Reactor[]{subReactor1, subReactor2}; }private void startService() { new Thread(bossReactor).start(); // 一子反应器对应一条线程 new Thread(workReactors[0]).start(); new Thread(workReactors[1]).start(); }//反应器 class Reactor implements Runnable { //每条线程负责一个选择器的查询 final Selector selector; public Reactor(Selector selector) { this.selector = selector; }public void run() { try { while (!Thread.interrupted()) { //单位为毫秒 //每隔一秒列出选择器感应列表 selector.select(1000); Set selectedKeys = selector.selectedKeys(); if (null == selectedKeys || selectedKeys.size() == 0) { //如果列表中的通道注册事件没有发生那就继续执行 continue; } Iterator it = selectedKeys.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 SelectionKey sk = it.next(); dispatch(sk); } //清楚掉已经处理过的感应事件,防止重复处理 selectedKeys.clear(); } } catch (IOException ex) { ex.printStackTrace(); } }void dispatch(SelectionKey sk) { Runnable handler = (Runnable) sk.attachment(); //调用之前attach绑定到选择键的handler处理器对象 if (handler != null) { handler.run(); } } }// Handler:新连接处理器 class AcceptorHandler implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); Logger.info("接收到一个新的连接"); if (channel != null) { int index = next.get(); Logger.info("选择器的编号:" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel); } } catch (IOException e) { e.printStackTrace(); } if (next.incrementAndGet() == workSelectors.length) { next.set(0); } } }public static void main(String[] args) throws IOException { MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor(); server.startService(); }}

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。
这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。
//第一个selector,负责监控新连接事件 SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT); //附加新连接处理handler处理器到SelectionKey(选择键) sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。
int index = next.get(); Logger.info("选择器的编号:" + index); Selector selector = workSelectors[index]; new MultiThreadEchoHandler(selector, channel);

2.1 IO请求handler+线程池
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MultiThreadEchoHandler implements Runnable { final SocketChannel channel; final SelectionKey sk; final ByteBuffer byteBuffer = ByteBuffer.allocate(1024); static final int RECIEVING = 0, SENDING = 1; int state = RECIEVING; //引入线程池 static ExecutorService pool = Executors.newFixedThreadPool(4); MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException { channel = c; channel.configureBlocking(false); //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss selector.wakeup(); //仅仅取得选择键,后设置感兴趣的IO事件 sk = channel.register(selector, 0); //将本Handler作为sk选择键的附件,方便事件dispatch sk.attach(this); //向sk选择键注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); //唤醒选择,是的OP_READ生效 selector.wakeup(); Logger.info("新的连接 注册完成"); }public void run() { //异步任务,在独立的线程池中执行 pool.execute(new AsyncTask()); }//异步任务,不在Reactor线程中执行 public synchronized void asyncRun() { try { if (state == SENDING) { //写入通道 channel.write(byteBuffer); //写完后,准备开始从通道读,byteBuffer切换成写模式 byteBuffer.clear(); //写完后,注册read就绪事件 sk.interestOps(SelectionKey.OP_READ); //写完后,进入接收的状态 state = RECIEVING; } else if (state == RECIEVING) { //从通道读 int length = 0; while ((length = channel.read(byteBuffer)) > 0) { Logger.info(new String(byteBuffer.array(), 0, length)); } //读完后,准备开始写入通道,byteBuffer切换成读模式 byteBuffer.flip(); //读完后,注册write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); //读完后,进入发送的状态 state = SENDING; } //处理结束了, 这里不能关闭select key,需要重复使用 //sk.cancel(); } catch (IOException ex) { ex.printStackTrace(); } }//异步任务的内部类 class AsyncTask implements Runnable { public void run() { MultiThreadEchoHandler.this.asyncRun(); } }}

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。
3.1 客户端
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Dateutil; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * create by 尼恩 @ 疯狂创客圈 **/ public class EchoClient {public void start() throws IOException {InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); // 1、获取通道(channel) SocketChannel socketChannel = SocketChannel.open(address); Logger.info("客户端连接成功"); // 2、切换成非阻塞模式 socketChannel.configureBlocking(false); //不断的自旋、等待连接完成,或者做一些其他的事情 while (!socketChannel.finishConnect()) {} Logger.tcfo("客户端启动成功!"); //启动接受线程 Processer processer = new Processer(socketChannel); new Thread(processer).start(); }static class Processer implements Runnable { final Selector selector; final SocketChannel channel; Processer(SocketChannel channel) throws IOException { //Reactor初始化 selector = Selector.open(); this.channel = channel; channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); }public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); if (sk.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); Scanner scanner = new Scanner(System.in); Logger.tcfo("请输入发送内容:"); if (scanner.hasNext()) { SocketChannel socketChannel = (SocketChannel) sk.channel(); String next = scanner.next(); buffer.put((Dateutil.getNow() + " >>" + next).getBytes()); buffer.flip(); // 操作三:发送数据 socketChannel.write(buffer); buffer.clear(); }} if (sk.isReadable()) { // 若选择键的IO事件是“可读”事件,读取数据 SocketChannel socketChannel = (SocketChannel) sk.channel(); //读取数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0; while ((length = socketChannel.read(byteBuffer)) > 0) { byteBuffer.flip(); Logger.info("server echo:" + new String(byteBuffer.array(), 0, length)); byteBuffer.clear(); }} //处理结束了, 这里不能关闭select key,需要重复使用 //selectionKey.cancel(); } selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } }public static void main(String[] args) throws IOException { new EchoClient().start(); } }

    推荐阅读