NIO|NIO


一、说明 关于IO模型,请参考《IO模型》

说明:文中Java IO或传统IO简称IO。

二、NIO简介 NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。
传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
NIO的原理见《IO模型》中IO多路复用部分,地址如下:
http://blog.csdn.net/wuzhengfei1112/article/details/78242004

三、Java IO VS NIO 1.流 VS 缓冲区
IO是面向流的,NIO是面向缓冲区的。
Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
NIO的数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

2.阻塞 VS 非阻塞
Java IO的各种流是阻塞的。当一个线程调用read()或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
NIO的非阻塞模式。例如:一个线程从某channel读取数据时,如果有数据已经存在缓冲去了,那么直接读取,如果没有就不获取,线程不会被阻塞,还可以去做其他的事情。写操作也是如此。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。


四、核心组件 1.通道Channel
IO中的 Stream是单向的,如InputStream, OutputStream。NIO中的Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel分两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel,其的主要实现有:
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

2.缓冲区Buffer
缓冲区实质上是一个数组,NIO中的缓冲区提供了对数组接过话访问以及维护了其读写信息。在NIO库中,所有数据都是用缓冲区处理的,在读取数据时,它是直接读到缓冲区中的;在写入数据时,它也是写入到缓冲区中的。
1)NIO中的关键Buffer实现 ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
分别对应基本数据类型: byte, char, double, float, int,long, short。
另外还有:MappedByteBuffer, HeapByteBuffer,DirectByteBuffer等。


2)常用方法: allocate():分配一块缓冲区
put():向缓冲区写数据
get():向缓冲区读数据
filp():将缓冲区从写模式切换到读模式
clear():从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;
compact():从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
mark():对position做出标记,配合reset使用
reset():将position置为标记值
3)缓冲区的属性 capacity:缓冲区大小,无论是读模式还是写模式,此属性值不会变;
position:写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1切换到读模式时,position会被置为0,表示当前读的位置
limit:写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。

3.多路复用器Selector
多路复用器提供选择已经就绪任务的能力。简单来说:Selector会不断轮询注册在其上的Channel,如果某个Channel上发生读或写事件,这个Channel就处于就绪状态,就会被Selector轮询出来,然后通过SelectionKey就可以获取就绪的Channel集合,接着就可以进行或许的读写操作。
一个多路复用器可以同时轮询多个Channel,由于JDK使用了epool()代替传统的Select实现,所以他没有最大连接句柄1024/2048的限制,这意味着只需要一个线程负责Selector伦旭,就可以接入成千上万的客户端。

1)Selector支持的事件

SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE


如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
可使用以下方法获取已就绪事件,返回值为boolean:

selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();


可以将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法如下:

selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();


可以通过选择器的select方法获取是否有就绪的通道;

int select() int select(long timeout) int selectNow()


返回值表示上次执行select之后,就绪通道的个数。
可以通过selectedKeySet获取已就绪的通道。返回值是SelectionKey的集合,处理完相应的通道之后,需要removed因为Selector不会自己removed。select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select那么执行完wakeup后下一个执行select就会立即返回。调用close() 方法关闭selector。


五、NIO(IO多路复用) Java 1.4中引入NIO的概念,本节内容主要讲述基于此版本(即IO多路复用模型)NIO实现,其使用的IO模型,请参考《IO模型》
1.优点
客户端发起的连接操作是一步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
SocketChannel的读写操作都是异步的,如果没有可读写的数据,他不会等待直接返回,这样IO同学线程就可以处理其他的链路,不需要等待这个链路可用。
由于JDK的Selector在Linux等主流操作系统上通过epool实现,他没有连接句柄的限制(指受限于操作系统的最大句柄数或者对单个现成的句柄限制),这意味着一个Selector可以同时处理成千上万个客户端连接,而且性能不会随客户端的增加而线性下降。它适合做高性能、高负载的网络服务器。



2.NIO服务端序列图
NIO|NIO
文章图片

3.NIO服务端序列分析
1)打开ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();


2)绑定监听地址InetSocketAddress
serverSocketChannel.socket().bind(newInetSocketAddress(port), 1024); serverSocketChannel.configureBlocking(false);


3)创建Selector,启动线程
selector = Selector.open(); //新建线程启动Server new Thread(new NIOServer(), "NIO-Server").start();



4)将ServerSocketChannel注册到Selector、监听
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);



5)Selector轮询就绪的Key
while (true) { selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); //处理IO时间 handleInput(key); } }



6)handlerAcceptor()处理新的客户端接入
// Accept the new connection ServerSocketChannelssc = (ServerSocketChannel) key.channel(); SocketChannelsc = ssc.accept();




7)设置新客户端连接的Socket参数
sc.configureBlocking(false);


8)向Selector注册监听读操作SelectionKey.OP_Read
// Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ);



9)handlerRead()异步读取请求信息到ByteBuffer
SocketChannelsc = (SocketChannel) key.channel(); ByteBufferreadBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);


10) decode请求消息
if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body= new String(bytes,"UTF-8"); }


11) 异步写ByteBuffer到SocketChannel
byte[] bytes = response.getBytes(); ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer);



4.NIO客户端序列图

NIO|NIO
文章图片

5.NIO客户端序列分析
1)打开SocketChannel
socketChannel = SocketChannel.open();



2)设置SocketChannel为非阻塞模式,同时设置TCP参数
socketChannel.configureBlocking(false);




3)异步连接服务器
socketChannel.connect(newInetSocketAddress(host, port))



4)判断连接结果,如果连接成功,跳到10,否则到5
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if ( connected ) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else { socketChannel.register(selector,SelectionKey.OP_CONNECT); }



5)向Reactor线程的多路复用器注册OP_CONNECT事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);



6)创建Selector,启动线程
selector = Selector.open(); TimeClientHandle client = new TimeClientHandle("127.0.0.1", port); new Thread(client, "TimeClient-001").start();


7)Selector轮询就绪的key
while (!stop) { selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); SelectionKey key= null; while (it.hasNext()) { key =it.next(); it.remove(); handleInput(key); } }


8)如果是CONNECT事件,则handlerConnect()
SocketChannelsc = (SocketChannel) key.channel(); if (key.isConnectable()) { // connect }



9)判断连接是否完成,完成则执行10
if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); }



10) 向多路复用器注册读事件 OPEN_READ
sc.register(selector, SelectionKey.OP_READ);



11) handRead()异步渡请求消息到ByteBuffer
ByteBufferreadBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer);



12) 读取并decode请求消息
if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body= new String(bytes,"UTF-8"); }




13) 异步写ByteBuffer到SocketChannel
byte[] req = "HELLOWORLD ".getBytes(); ByteBufferwriteBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()){ System.out.println("Send2 server succeed."); }





1.示例
以下示例代码来自《Netty权威指南》的一个例子,仅供参考。
1)TimeServer
public class TimeServer { public static voidmain(String[] args) throwsIOException { int port = 8080; if (args != null&& args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } MultiplexerTimeServer timeServer= new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }





2)MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable { privateSelector selector; privateServerSocketChannel servChannel; private volatile boolean stop; publicMultiplexerTimeServer(int port) { try{ selector= Selector.open(); servChannel= ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port),1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Thetime server is start in port : " + port); } catch(IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while(!stop) { try{ selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iteratorit = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e){ if (key != null) { key.cancel(); if (key.channel()!= null) key.channel().close(); } } } } catch(Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if(selector != null) try{ selector.close(); } catch(IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throwsIOException { if(key.isValid()) { // 处理新接入的请求消息 if(key.isAcceptable()) { // Acceptthe new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add thenew connection to the selector sc.register(selector, SelectionKey.OP_READ); } if(key.isReadable()) { // Read thedata SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = newString(bytes, "UTF-8"); System.out.println("Thetime server receive order : " + body); String currentTime = "QUERYTIME ORDER".equalsIgnoreCase(body) ? newjava.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } elseif (readBytes< 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if(response != null&& response.trim().length() > 0){ byte[]bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }





3)TimeClient
public class TimeClient { public staticvoid main(String[] args){ int port = 8080; if (args != null&& args.length> 0) { try{ port= Integer.valueOf(args[0]); } catch(NumberFormatException e) { // 采用默认值 } } newThread(new TimeClientHandle("127.0.0.1", port),"TimeClient-001") .start(); } }





4)TimeClientHandle
public class TimeClientHandle implements Runnable { privateString host; private int port; privateSelector selector; privateSocketChannel socketChannel; private volatile boolean stop; publicTimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1": host; this.port = port; try{ selector= Selector.open(); socketChannel= SocketChannel.open(); socketChannel.configureBlocking(false); } catch(IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try{ doConnect(); } catch(IOException e) { e.printStackTrace(); System.exit(1); } while(!stop) { try{ selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iteratorit = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e){ if (key != null) { key.cancel(); if (key.channel()!= null) key.channel().close(); } } } } catch(Exception e) { e.printStackTrace(); System.exit(1); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if(selector != null) try{ selector.close(); } catch(IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throwsIOException { if(key.isValid()) { // 判断是否连接成功 SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()) { if(sc.finishConnect()) { sc.register(selector,SelectionKey.OP_READ); doWrite(sc); } else System.exit(1); // 连接失败,进程退出 } if(key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = newString(bytes, "UTF-8"); System.out.println("Nowis : " + body); this.stop = true; } elseif (readBytes< 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doConnect() throwsIOException { // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if(socketChannel.connect(new InetSocketAddress(host,port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector,SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[]req = "QUERYTIME ORDER".getBytes(); ByteBuffer writeBuffer= ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()) System.out.println("Sendorder 2 server succeed."); } }








一、NIO(AIO) JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,此版本正式提供了异步文件IO操作,即AIO。

1.NIO服务端序列
NIO|NIO
文章图片

2.NIO服务端序列分析
1)打开AsynchronousServerSocketChannel
asynServerSocketChannel = AsynchronousServerSocketChannel.open();


2)绑定监听地址InetSocketAddress
asynServerSocketChannel.bind(new InetSocketAddress(port));



3)创建线程并启动
AsyncServerHandlertimeServer = newAsyncServerHandler(port); new Thread(timeServer, "AIOServerHandler").start();



4)注册接收数据的Handler
asynServerSocketChannel.accept(this, new ServerAcceptCompletionHandler());



5)接收数据,实现ServerAcceptCompletionHandler的completed、failed方法
public voidcompleted(AsynchronousSocketChannel channel,AsyncServerHandler attachment) { /** * 为什么需要再次调用accept方法? * 因为如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的complete方法,表示新的客户端接入成功 * 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用他的accept方法, * 接收其他客户端连接,最终形成一个循环。每当接收一个客户连接成功后,再异步接收新的客户端连接 * */ attachment.asynServerSocketChannel.accept(attachment, this); ByteBuffer buffer= ByteBuffer.allocate(1024); /** * ByteBuffer:接收缓冲区,用于从异步的Channel中读取数据包 * Attachment:异步Channel携带的附件,通知回调的时候作为入参使用 * CompletionHandler:接收通知回调的业务Handler */ channel.read(buffer, buffer,new ServerReadCompletionHandler(channel)); } public void failed(Throwable exc, AsyncServerHandler attachment){ exc.printStackTrace(); attachment.latch.countDown(); }





6)读取数据,实现ServerReadCompletionHandler的Complete、faild方法
public void completed(Integer result, ByteBuffer attachment){ // handler with data } public void failed(Throwable exc, ByteBuffer attachment){ this.channel.close(); }



7)decode数据
attachment.flip(); byte[] body = newbyte[attachment.remaining()]; attachment.get(body); Stringreq = newString(body, "UTF-8");




8)异步写数据到Channel
byte[] bytes = (response).getBytes(); ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, newCompletionHandler() { @Override public void completed(Integer result,ByteBuffer buffer) { // 如果没有发送完成,继续发送 if (buffer.hasRemaining()) channel.write(buffer, buffer,this); } @Override public void failed(Throwable exc,ByteBuffer attachment) { try { channel.close(); } catch(IOException e) { // ingnoreon close } } });




3.NIO客户端序列
NIO|NIO
文章图片

4.NIO客户端序列分析
1)打开AsynchronousSocketChannel
asynSocketChannel = AsynchronousSocketChannel.open();



2)异步连接服务器
asynSocketChannel.connect(newInetSocketAddress(host, port), this, this);



3)创建线程并启动
AsyncClientHandler asyncClientHandler= new AsyncClientHandler("127.0.0.1", port); new Thread(asyncClientHandler, "AIOClientHandler").start();



4)注册连接Server成功的Handler
ClientConnectCompletionHandlerconnectCompletionHandler = new ClientConnectCompletionHandler(asynSocketChannel, latch); asynSocketChannel.connect(newInetSocketAddress(host, port), connectCompletionHandler, connectCompletionHandler);







5)连接成功后,注册向Server写数据的Handler,实现Completed、Failed方法

public void completed(Void result, AsyncClientHandler attachment) { byte[] req = "timestemp".getBytes(); ByteBuffer writeBuffer= ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); ClientWriteCompletionHandler writeCompletionHandler = newClientWriteCompletionHandler( attachment.asynSocketChannel, attachment.latch); attachment.asynSocketChannel.write(writeBuffer, writeBuffer, writeCompletionHandler); } @Override public void failed(Throwable exc, AsyncClientHandler attachment){ exc.printStackTrace(); try { attachment.asynSocketChannel.close(); attachment.latch.countDown(); } catch(IOException e) { e.printStackTrace(); } }



6)向Server发送数据,实现CompletionHandler的Completed、faild方法
public void completed(Integer result, ByteBuffer buffer){ if (buffer.hasRemaining()) { asynSocketChannel.write(buffer, buffer, this); } } public void failed(Throwable exc, ByteBuffer attachment){ try { asynSocketChannel.close(); latch.countDown(); } catch(IOException e) { // ingnore on close } }



7)注册读数据的Handler,实现completed、failed方法
asynSocketChannel.read(readBuffer, readBuffer, newCompletionHandler() { @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[]bytes = newbyte[buffer.remaining()]; buffer.get(bytes); String body; try { body= new String(bytes,"UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch(UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { try { asynSocketChannel.close(); latch.countDown(); } catch(IOException e) { // ingnoreon close } } });




9)读取并decode数据
buffer.flip(); byte[] bytes = newbyte[buffer.remaining()]; buffer.get(bytes); Stringbody = newString(bytes, "UTF-8");



5.示例

1)TimeServer 【NIO|NIO】
public class TimeServer { public static voidmain(String[] args) throwsIOException { int port = 8080; AsyncServerHandler timeServer = newAsyncServerHandler(port); newThread(timeServer, "AIOServerHandler").start(); } }





2)AsyncServerHandler
public class AsyncServerHandler implements Runnable { CountDownLatch latch; AsynchronousServerSocketChannel asynServerSocketChannel; publicAsyncServerHandler(int port) { try { asynServerSocketChannel= AsynchronousServerSocketChannel.open(); asynServerSocketChannel.bind(new InetSocketAddress(port)); } catch(IOException e) { e.printStackTrace(); } } @Override public void run() { latch= new CountDownLatch(1); asynServerSocketChannel.accept(this, newServerAcceptCompletionHandler()); try { latch.await(); } catch(InterruptedException e) { e.printStackTrace(); } } }





3)ServerAcceptCompletionHandler
public class ServerAcceptCompletionHandlerimplementsCompletionHandler { @Override public void completed(AsynchronousSocketChannel channel, AsyncServerHandler attachment) { /** * 为什么需要再次调用accept方法? * 因为如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的complete方法,表示新的客户端接入成功 * 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用他的accept方法, * 接收其他客户端连接,最终形成一个循环。每当接收一个客户连接成功后,再异步接收新的客户端连接 * */ attachment.asynServerSocketChannel.accept(attachment, this); ByteBuffer buffer= ByteBuffer.allocate(1024); /** * ByteBuffer:接收缓冲区,用于从异步的Channel中读取数据包 * Attachment:异步Channel携带的附件,通知回调的时候作为入参使用 * CompletionHandler:接收通知回调的业务Handler */ channel.read(buffer, buffer,new ServerReadCompletionHandler(channel)); } @Override public void failed(Throwable exc,AsyncServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }




4)ServerReadCompletionHandler
public class ServerReadCompletionHandler implements CompletionHandler { privateAsynchronousSocketChannel channel; publicServerReadCompletionHandler(AsynchronousSocketChannel channel){ if (this.channel ==null) { this.channel = channel; } } @Override public void completed(Integer result,ByteBuffer attachment) { attachment.flip(); byte[]body = newbyte[attachment.remaining()]; attachment.get(body); try { String req= new String(body,"UTF-8"); String currentTime= "timestemp".equalsIgnoreCase(req) ? System.currentTimeMillis() + "" : "BADORDER"; doWrite(currentTime); } catch(UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String response){ if (response != null&& response.trim().length() > 0){ byte[]bytes = (response).getBytes(); ByteBuffer writeBuffer= ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer,new CompletionHandler() { @Override publicvoid completed(Integer result, ByteBuffer buffer){ // 如果没有发送完成,继续发送 if(buffer.hasRemaining()) channel.write(buffer, buffer, this); } @Override publicvoid failed(Throwable exc, ByteBuffer attachment){ try{ channel.close(); } catch(IOException e) { // ingnoreon close } } }); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { try { this.channel.close(); } catch(IOException e) { e.printStackTrace(); } } }




5)TimeClient
public class TimeClient { public static voidmain(String[] args) { int port = 8080; AsyncClientHandler asyncClientHandler = newAsyncClientHandler("127.0.0.1", port); newThread(asyncClientHandler, "AIOClientHandler").start(); } }






6)AsyncClientHandler
public class AsyncClientHandler implements Runnable { AsynchronousSocketChannel asynSocketChannel; privateString host; private int port; CountDownLatch latch; publicAsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { asynSocketChannel= AsynchronousSocketChannel.open(); } catch(IOException e) { e.printStackTrace(); } } @Override public void run() { latch= new CountDownLatch(1); ClientConnectCompletionHandler connectCompletionHandler = new ClientConnectCompletionHandler(); asynSocketChannel.connect(new InetSocketAddress(host,port), this,connectCompletionHandler); try { latch.await(); asynSocketChannel.close(); } catch(Exception e) { e.printStackTrace(); } } }







7)ClientConnectCompletionHandler
public classClientConnectCompletionHandler implementsCompletionHandler { @Override public void completed(Void result,AsyncClientHandler attachment) { byte[]req = "timestemp".getBytes(); ByteBuffer writeBuffer= ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); ClientWriteCompletionHandler writeCompletionHandler = newClientWriteCompletionHandler( attachment.asynSocketChannel, attachment.latch); attachment.asynSocketChannel.write(writeBuffer, writeBuffer, writeCompletionHandler); } @Override public void failed(Throwable exc,AsyncClientHandler attachment) { exc.printStackTrace(); try { attachment.asynSocketChannel.close(); attachment.latch.countDown(); } catch(IOException e) { e.printStackTrace(); } } }




8)ClientWriteCompletionHandler
public class ClientWriteCompletionHandler implements CompletionHandler { privateAsynchronousSocketChannel asynSocketChannel; privateCountDownLatch latch; publicClientWriteCompletionHandler(AsynchronousSocketChannel asynSocketChannel,CountDownLatch latch) { super(); this.asynSocketChannel = asynSocketChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { if (buffer.hasRemaining()) { asynSocketChannel.write(buffer, buffer, this); } else{ ByteBuffer readBuffer= ByteBuffer.allocate(1024); asynSocketChannel.read(readBuffer, readBuffer,new CompletionHandler() { @Override publicvoid completed(Integer result, ByteBuffer buffer){ buffer.flip(); byte[]bytes = newbyte[buffer.remaining()]; buffer.get(bytes); String body; try{ body = new String(bytes, "UTF-8"); System.out.println("Nowis : " + body); latch.countDown(); } catch(UnsupportedEncodingException e) { e.printStackTrace(); } } @Override publicvoid failed(Throwable exc, ByteBuffer attachment){ try{ asynSocketChannel.close(); latch.countDown(); } catch(IOException e) { // ingnoreon close } } }); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { try { asynSocketChannel.close(); latch.countDown(); } catch(IOException e) { // ingnoreon close } } }










    推荐阅读