多路复用器java代码 多路复用器java代码怎么写( 二 )


##Reactor模式
Reactor模式用于解决事件分发处理的问题,Handler把自己的channel和关注的事件注册到Selector中 , 当对应的事件发生在自己的channel上时,对应的handler就会得到通知并进行处理 。
- 单线程的Reactor
消息的分发、读写、处理都在一个线程中处理 , 是Reactor最简单的实现方式,如果消息的处理需要较长时间,会影响效率 。
```java
//Reactor类,负责分发事件并调用对应的handler
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
//Reactor初始化
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false); //必须配置为非阻塞
//Acceptor会在Reactor初始化时就注册到Selector中,用于接受connect请求
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor()); //attach callback object, Acceptor
}
//分发消息并调用对应的handler
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
【多路复用器java代码 多路复用器java代码怎么写】Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
if (r != null)
r.run();
}
//Acceptor也是一个handler,负责创建socket并把新建的socket也注册到selector中
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
//Concrete Handler:用于收发和处理消息 。
//在当前的实现中,使用Runnable接口作为每个具体Handler的统一接口
//如果在处理时需要参数和返回值,也可以为Handler另外声明一个统一接口来代替Runnable接口
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c; c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler作为callback对象
sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
}
}
//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断
//我们可以用State-Object pattern来更优雅的实现

推荐阅读