一、说明 关于IO模型,请参考《IO模型》
说明:文中Java IO或传统IO简称IO。
二、NIO简介 NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。
传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
NIO的原理见《IO模型》中IO多路复用部分,地址如下:
http://blog.csdn.net/wuzhengfei1112/article/details/78242004
三、Java IO VS NIO
1.流 VS 缓冲区
IO是面向流的,NIO是面向缓冲区的。
Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
NIO的数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
2.阻塞 VS 非阻塞
Java IO的各种流是阻塞的。当一个线程调用read()或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
NIO的非阻塞模式。例如:一个线程从某channel读取数据时,如果有数据已经存在缓冲去了,那么直接读取,如果没有就不获取,线程不会被阻塞,还可以去做其他的事情。写操作也是如此。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
四、核心组件
1.通道Channel
IO中的 Stream是单向的,如InputStream, OutputStream。NIO中的Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel分两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel,其的主要实现有:
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
2.缓冲区Buffer
缓冲区实质上是一个数组,NIO中的缓冲区提供了对数组接过话访问以及维护了其读写信息。在NIO库中,所有数据都是用缓冲区处理的,在读取数据时,它是直接读到缓冲区中的;在写入数据时,它也是写入到缓冲区中的。
1)NIO中的关键Buffer实现 ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
分别对应基本数据类型: byte, char, double, float, int,long, short。
另外还有:MappedByteBuffer, HeapByteBuffer,DirectByteBuffer等。
2)常用方法: allocate():分配一块缓冲区
put():向缓冲区写数据
get():向缓冲区读数据
filp():将缓冲区从写模式切换到读模式
clear():从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;
compact():从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
mark():对position做出标记,配合reset使用
reset():将position置为标记值
3)缓冲区的属性 capacity:缓冲区大小,无论是读模式还是写模式,此属性值不会变;
position:写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1切换到读模式时,position会被置为0,表示当前读的位置
limit:写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。
3.多路复用器Selector
多路复用器提供选择已经就绪任务的能力。简单来说:Selector会不断轮询注册在其上的Channel,如果某个Channel上发生读或写事件,这个Channel就处于就绪状态,就会被Selector轮询出来,然后通过SelectionKey就可以获取就绪的Channel集合,接着就可以进行或许的读写操作。
一个多路复用器可以同时轮询多个Channel,由于JDK使用了epool()代替传统的Select实现,所以他没有最大连接句柄1024/2048的限制,这意味着只需要一个线程负责Selector伦旭,就可以接入成千上万的客户端。
1)Selector支持的事件
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
可使用以下方法获取已就绪事件,返回值为boolean:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
可以将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
可以通过选择器的select方法获取是否有就绪的通道;
int select()
int select(long timeout)
int selectNow()
返回值表示上次执行select之后,就绪通道的个数。
可以通过selectedKeySet获取已就绪的通道。返回值是SelectionKey的集合,处理完相应的通道之后,需要removed因为Selector不会自己removed。select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select那么执行完wakeup后下一个执行select就会立即返回。调用close() 方法关闭selector。
五、NIO(IO多路复用) Java 1.4中引入NIO的概念,本节内容主要讲述基于此版本(即IO多路复用模型)NIO实现,其使用的IO模型,请参考《IO模型》
1.优点
客户端发起的连接操作是一步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
SocketChannel的读写操作都是异步的,如果没有可读写的数据,他不会等待直接返回,这样IO同学线程就可以处理其他的链路,不需要等待这个链路可用。
由于JDK的Selector在Linux等主流操作系统上通过epool实现,他没有连接句柄的限制(指受限于操作系统的最大句柄数或者对单个现成的句柄限制),这意味着一个Selector可以同时处理成千上万个客户端连接,而且性能不会随客户端的增加而线性下降。它适合做高性能、高负载的网络服务器。
2.NIO服务端序列图
文章图片
3.NIO服务端序列分析
1)打开ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
2)绑定监听地址InetSocketAddress
serverSocketChannel.socket().bind(newInetSocketAddress(port), 1024);
serverSocketChannel.configureBlocking(false);
3)创建Selector,启动线程
selector = Selector.open();
//新建线程启动Server
new Thread(new NIOServer(), "NIO-Server").start();
4)将ServerSocketChannel注册到Selector、监听
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
5)Selector轮询就绪的Key
while (true) {
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
//处理IO时间
handleInput(key);
}
}
6)handlerAcceptor()处理新的客户端接入
// Accept the new connection
ServerSocketChannelssc = (ServerSocketChannel) key.channel();
SocketChannelsc = ssc.accept();
7)设置新客户端连接的Socket参数
sc.configureBlocking(false);
8)向Selector注册监听读操作SelectionKey.OP_Read
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
9)handlerRead()异步读取请求信息到ByteBuffer
SocketChannelsc = (SocketChannel) key.channel();
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
10) decode请求消息
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body= new String(bytes,"UTF-8");
}
11) 异步写ByteBuffer到SocketChannel
byte[] bytes = response.getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
4.NIO客户端序列图
文章图片
5.NIO客户端序列分析
1)打开SocketChannel
socketChannel = SocketChannel.open();
2)设置SocketChannel为非阻塞模式,同时设置TCP参数
socketChannel.configureBlocking(false);
3)异步连接服务器
socketChannel.connect(newInetSocketAddress(host, port))
4)判断连接结果,如果连接成功,跳到10,否则到5
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if ( connected ) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
5)向Reactor线程的多路复用器注册OP_CONNECT事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
6)创建Selector,启动线程
selector = Selector.open();
TimeClientHandle client = new TimeClientHandle("127.0.0.1", port);
new Thread(client, "TimeClient-001").start();
7)Selector轮询就绪的key
while (!stop) {
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
SelectionKey key= null;
while (it.hasNext()) {
key =it.next();
it.remove();
handleInput(key);
}
}
8)如果是CONNECT事件,则handlerConnect()
SocketChannelsc = (SocketChannel) key.channel();
if (key.isConnectable()) {
// connect
}
9)判断连接是否完成,完成则执行10
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}
10) 向多路复用器注册读事件 OPEN_READ
sc.register(selector, SelectionKey.OP_READ);
11) handRead()异步渡请求消息到ByteBuffer
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
12) 读取并decode请求消息
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body= new String(bytes,"UTF-8");
}
13) 异步写ByteBuffer到SocketChannel
byte[] req = "HELLOWORLD ".getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()){
System.out.println("Send2 server succeed.");
}
1.示例
以下示例代码来自《Netty权威指南》的一个例子,仅供参考。
1)TimeServer
public class TimeServer {
public static voidmain(String[] args) throwsIOException {
int port = 8080;
if (args != null&& args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
MultiplexerTimeServer timeServer= new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
2)MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable {
privateSelector selector;
privateServerSocketChannel servChannel;
private volatile boolean stop;
publicMultiplexerTimeServer(int port) {
try{
selector= Selector.open();
servChannel= ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port),1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Thetime server is start in port : " + port);
} catch(IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while(!stop) {
try{
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iteratorit = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e){
if (key != null) {
key.cancel();
if (key.channel()!= null)
key.channel().close();
}
}
}
} catch(Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null)
try{
selector.close();
} catch(IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throwsIOException {
if(key.isValid()) {
// 处理新接入的请求消息
if(key.isAcceptable()) {
// Acceptthe new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add thenew connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
// Read thedata
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = newString(bytes, "UTF-8");
System.out.println("Thetime server receive order : " + body);
String currentTime = "QUERYTIME ORDER".equalsIgnoreCase(body)
? newjava.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc, currentTime);
} elseif (readBytes< 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
;
// 读到0字节,忽略
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if(response != null&& response.trim().length() > 0){
byte[]bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
3)TimeClient
public class TimeClient {
public staticvoid main(String[] args){
int port = 8080;
if (args != null&& args.length> 0) {
try{
port= Integer.valueOf(args[0]);
} catch(NumberFormatException e) {
// 采用默认值
}
}
newThread(new TimeClientHandle("127.0.0.1", port),"TimeClient-001")
.start();
}
}
4)TimeClientHandle
public class TimeClientHandle implements Runnable {
privateString host;
private int port;
privateSelector selector;
privateSocketChannel socketChannel;
private volatile boolean stop;
publicTimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1": host;
this.port = port;
try{
selector= Selector.open();
socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
} catch(IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try{
doConnect();
} catch(IOException e) {
e.printStackTrace();
System.exit(1);
}
while(!stop) {
try{
selector.select(1000);
Set selectedKeys = selector.selectedKeys();
Iteratorit = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e){
if (key != null) {
key.cancel();
if (key.channel()!= null)
key.channel().close();
}
}
}
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null)
try{
selector.close();
} catch(IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throwsIOException {
if(key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()) {
if(sc.finishConnect()) {
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);
// 连接失败,进程退出
}
if(key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = newString(bytes, "UTF-8");
System.out.println("Nowis : " + body);
this.stop = true;
} elseif (readBytes< 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
;
// 读到0字节,忽略
}
}
}
private void doConnect() throwsIOException {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if(socketChannel.connect(new InetSocketAddress(host,port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
private void doWrite(SocketChannel sc) throws IOException {
byte[]req = "QUERYTIME ORDER".getBytes();
ByteBuffer writeBuffer= ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining())
System.out.println("Sendorder 2 server succeed.");
}
}
一、NIO(AIO) JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,此版本正式提供了异步文件IO操作,即AIO。
1.NIO服务端序列
文章图片
2.NIO服务端序列分析
1)打开AsynchronousServerSocketChannel
asynServerSocketChannel = AsynchronousServerSocketChannel.open();
2)绑定监听地址InetSocketAddress
asynServerSocketChannel.bind(new InetSocketAddress(port));
3)创建线程并启动
AsyncServerHandlertimeServer = newAsyncServerHandler(port);
new Thread(timeServer, "AIOServerHandler").start();
4)注册接收数据的Handler
asynServerSocketChannel.accept(this, new ServerAcceptCompletionHandler());
5)接收数据,实现ServerAcceptCompletionHandler的completed、failed方法
public voidcompleted(AsynchronousSocketChannel channel,AsyncServerHandler attachment) {
/**
* 为什么需要再次调用accept方法?
* 因为如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的complete方法,表示新的客户端接入成功
* 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用他的accept方法,
* 接收其他客户端连接,最终形成一个循环。每当接收一个客户连接成功后,再异步接收新的客户端连接
*
*/
attachment.asynServerSocketChannel.accept(attachment, this);
ByteBuffer buffer= ByteBuffer.allocate(1024);
/**
* ByteBuffer:接收缓冲区,用于从异步的Channel中读取数据包
* Attachment:异步Channel携带的附件,通知回调的时候作为入参使用
* CompletionHandler:接收通知回调的业务Handler
*/
channel.read(buffer, buffer,new ServerReadCompletionHandler(channel));
}
public void failed(Throwable exc, AsyncServerHandler attachment){
exc.printStackTrace();
attachment.latch.countDown();
}
6)读取数据,实现ServerReadCompletionHandler的Complete、faild方法
public void completed(Integer result, ByteBuffer attachment){
// handler with data
}
public void failed(Throwable exc, ByteBuffer attachment){
this.channel.close();
}
7)decode数据
attachment.flip();
byte[] body = newbyte[attachment.remaining()];
attachment.get(body);
Stringreq = newString(body, "UTF-8");
8)异步写数据到Channel
byte[] bytes = (response).getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, newCompletionHandler() {
@Override
public void completed(Integer result,ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer,this);
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
try {
channel.close();
} catch(IOException e) {
// ingnoreon close
}
}
});
3.NIO客户端序列
文章图片
4.NIO客户端序列分析
1)打开AsynchronousSocketChannel
asynSocketChannel = AsynchronousSocketChannel.open();
2)异步连接服务器
asynSocketChannel.connect(newInetSocketAddress(host, port), this, this);
3)创建线程并启动
AsyncClientHandler asyncClientHandler= new AsyncClientHandler("127.0.0.1", port);
new Thread(asyncClientHandler, "AIOClientHandler").start();
4)注册连接Server成功的Handler
ClientConnectCompletionHandlerconnectCompletionHandler = new ClientConnectCompletionHandler(asynSocketChannel, latch);
asynSocketChannel.connect(newInetSocketAddress(host, port), connectCompletionHandler, connectCompletionHandler);
5)连接成功后,注册向Server写数据的Handler,实现Completed、Failed方法
public void completed(Void result, AsyncClientHandler attachment) {
byte[] req = "timestemp".getBytes();
ByteBuffer writeBuffer= ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
ClientWriteCompletionHandler writeCompletionHandler = newClientWriteCompletionHandler(
attachment.asynSocketChannel, attachment.latch);
attachment.asynSocketChannel.write(writeBuffer, writeBuffer, writeCompletionHandler);
}
@Override
public void failed(Throwable exc, AsyncClientHandler attachment){
exc.printStackTrace();
try {
attachment.asynSocketChannel.close();
attachment.latch.countDown();
} catch(IOException e) {
e.printStackTrace();
}
}
6)向Server发送数据,实现CompletionHandler的Completed、faild方法
public void completed(Integer result, ByteBuffer buffer){
if (buffer.hasRemaining()) {
asynSocketChannel.write(buffer, buffer, this);
}
}
public void failed(Throwable exc, ByteBuffer attachment){
try {
asynSocketChannel.close();
latch.countDown();
} catch(IOException e) {
// ingnore on close
}
}
7)注册读数据的Handler,实现completed、failed方法
asynSocketChannel.read(readBuffer, readBuffer, newCompletionHandler() {
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[]bytes = newbyte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body= new String(bytes,"UTF-8");
System.out.println("Now is : " + body);
latch.countDown();
} catch(UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
try {
asynSocketChannel.close();
latch.countDown();
} catch(IOException e) {
// ingnoreon close
}
}
});
9)读取并decode数据
buffer.flip();
byte[] bytes = newbyte[buffer.remaining()];
buffer.get(bytes);
Stringbody = newString(bytes, "UTF-8");
5.示例
1)TimeServer 【NIO|NIO】
public class TimeServer {
public static voidmain(String[] args) throwsIOException {
int port = 8080;
AsyncServerHandler timeServer = newAsyncServerHandler(port);
newThread(timeServer, "AIOServerHandler").start();
}
}
2)AsyncServerHandler
public class AsyncServerHandler implements Runnable {
CountDownLatch latch;
AsynchronousServerSocketChannel asynServerSocketChannel;
publicAsyncServerHandler(int port) {
try {
asynServerSocketChannel= AsynchronousServerSocketChannel.open();
asynServerSocketChannel.bind(new InetSocketAddress(port));
} catch(IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch= new CountDownLatch(1);
asynServerSocketChannel.accept(this, newServerAcceptCompletionHandler());
try {
latch.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
3)ServerAcceptCompletionHandler
public class ServerAcceptCompletionHandlerimplementsCompletionHandler {
@Override
public void completed(AsynchronousSocketChannel channel, AsyncServerHandler attachment) {
/**
* 为什么需要再次调用accept方法?
* 因为如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的complete方法,表示新的客户端接入成功
* 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用他的accept方法,
* 接收其他客户端连接,最终形成一个循环。每当接收一个客户连接成功后,再异步接收新的客户端连接
*
*/
attachment.asynServerSocketChannel.accept(attachment, this);
ByteBuffer buffer= ByteBuffer.allocate(1024);
/**
* ByteBuffer:接收缓冲区,用于从异步的Channel中读取数据包
* Attachment:异步Channel携带的附件,通知回调的时候作为入参使用
* CompletionHandler:接收通知回调的业务Handler
*/
channel.read(buffer, buffer,new ServerReadCompletionHandler(channel));
}
@Override
public void failed(Throwable exc,AsyncServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
4)ServerReadCompletionHandler
public class ServerReadCompletionHandler implements CompletionHandler {
privateAsynchronousSocketChannel channel;
publicServerReadCompletionHandler(AsynchronousSocketChannel channel){
if (this.channel ==null) {
this.channel = channel;
}
}
@Override
public void completed(Integer result,ByteBuffer attachment) {
attachment.flip();
byte[]body = newbyte[attachment.remaining()];
attachment.get(body);
try {
String req= new String(body,"UTF-8");
String currentTime= "timestemp".equalsIgnoreCase(req) ? System.currentTimeMillis() + "" : "BADORDER";
doWrite(currentTime);
} catch(UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String response){
if (response != null&& response.trim().length() > 0){
byte[]bytes = (response).getBytes();
ByteBuffer writeBuffer= ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,new CompletionHandler() {
@Override
publicvoid completed(Integer result, ByteBuffer buffer){
// 如果没有发送完成,继续发送
if(buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
publicvoid failed(Throwable exc, ByteBuffer attachment){
try{
channel.close();
} catch(IOException e) {
// ingnoreon close
}
}
});
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
try {
this.channel.close();
} catch(IOException e) {
e.printStackTrace();
}
}
}
5)TimeClient
public class TimeClient {
public static voidmain(String[] args) {
int port = 8080;
AsyncClientHandler asyncClientHandler = newAsyncClientHandler("127.0.0.1", port);
newThread(asyncClientHandler, "AIOClientHandler").start();
}
}
6)AsyncClientHandler
public class AsyncClientHandler implements Runnable {
AsynchronousSocketChannel asynSocketChannel;
privateString host;
private int port;
CountDownLatch latch;
publicAsyncClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
asynSocketChannel= AsynchronousSocketChannel.open();
} catch(IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch= new CountDownLatch(1);
ClientConnectCompletionHandler connectCompletionHandler = new ClientConnectCompletionHandler();
asynSocketChannel.connect(new InetSocketAddress(host,port), this,connectCompletionHandler);
try {
latch.await();
asynSocketChannel.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
7)ClientConnectCompletionHandler
public classClientConnectCompletionHandler implementsCompletionHandler {
@Override
public void completed(Void result,AsyncClientHandler attachment) {
byte[]req = "timestemp".getBytes();
ByteBuffer writeBuffer= ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
ClientWriteCompletionHandler writeCompletionHandler = newClientWriteCompletionHandler(
attachment.asynSocketChannel, attachment.latch);
attachment.asynSocketChannel.write(writeBuffer, writeBuffer, writeCompletionHandler);
}
@Override
public void failed(Throwable exc,AsyncClientHandler attachment) {
exc.printStackTrace();
try {
attachment.asynSocketChannel.close();
attachment.latch.countDown();
} catch(IOException e) {
e.printStackTrace();
}
}
}
8)ClientWriteCompletionHandler
public class ClientWriteCompletionHandler implements CompletionHandler {
privateAsynchronousSocketChannel asynSocketChannel;
privateCountDownLatch latch;
publicClientWriteCompletionHandler(AsynchronousSocketChannel asynSocketChannel,CountDownLatch latch) {
super();
this.asynSocketChannel = asynSocketChannel;
this.latch = latch;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
if (buffer.hasRemaining()) {
asynSocketChannel.write(buffer, buffer, this);
} else{
ByteBuffer readBuffer= ByteBuffer.allocate(1024);
asynSocketChannel.read(readBuffer, readBuffer,new CompletionHandler() {
@Override
publicvoid completed(Integer result, ByteBuffer buffer){
buffer.flip();
byte[]bytes = newbyte[buffer.remaining()];
buffer.get(bytes);
String body;
try{
body = new String(bytes, "UTF-8");
System.out.println("Nowis : " + body);
latch.countDown();
} catch(UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
publicvoid failed(Throwable exc, ByteBuffer attachment){
try{
asynSocketChannel.close();
latch.countDown();
} catch(IOException e) {
// ingnoreon close
}
}
});
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
try {
asynSocketChannel.close();
latch.countDown();
} catch(IOException e) {
// ingnoreon close
}
}
}