计算机基础|NIO案例详解

一、
NIO解释:NIO官方的解释是NEW I/O,意思是相对与BIO(同步阻塞I/O)来说是全新的,但是大部分人更喜欢叫他no-block I/O(同步非阻塞I/O),因为他相对于BIO的最大改变就是非阻塞的,我们知道传统的Socket下的流都是阻塞I/O,也就是客户端的一个请求对应服务端的一个线程,并且这个线程是一直阻塞的直到请求结束,对于线程这种宝贵的资源来说,如果并发量太大同时处理线程又阻塞时,那么势必会导致服务端资源耗尽,拖垮整个系统,就算我们用线程池代替传统的手动创建线程(这样避免了频繁的新建、销毁线程,因为线程创建也是消耗资源的,达到重复利用线程的目的,同时通过线程池也能控制客户端的并发数)但是这并没有解决一个请求对应一个线程的问题,所提他本质上还是BIO,此时NIO便产生了,NIO的最大改变就是改变了一请求一线程的状况,只要只要一个Selector就可以监听注册到其上的所有Channel,通过轮询准备好的Channel实现单线程处理多客户端请求;
接下来我们通过一个例子来看看BIO:这个例子就是一个client像server发送一个请求,server像client端返回当前时间;
TimeServer:

package com.ck.prefix.nio; import java.io.IOException; public class TimeServer {public static void main(String[] args) throws IOException { MultiplexerTimeServer m = new MultiplexerTimeServer(8081); //起一个单独的线程来处理来自客户端的请求 new Thread(m,"NIO-MultiplexerTimeServer").start(); } }


MultiplexerTimeServer:
package com.ck.prefix.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Date; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { //多路复用选择器 private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean isStop; /** * 初始化多路复用器,绑定接口 * * @param port * @throws IOException */ public MultiplexerTimeServer(int port) throws IOException { //初始化多路复用器 selector = Selector.open(); //初始化ServerChannel serverChannel = ServerSocketChannel.open(); //设置为未阻塞模式 serverChannel.configureBlocking(false); //绑定ip、port,设置backlog serverChannel.socket().bind(new InetSocketAddress("127.0.0.1", port), 1024); //1、将serverChannel注册到selector上,并监听accept事件(就是客户端的connect请求) serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server初始化完成"); }public void run() { //服务端单线程一直循环监听来自客户端的请求 while (!isStop) { try { //延迟1s,不管有没有客户端请求都阻塞1s然后往下执行,如果不设置那么会一直阻塞直到有客户端的请求过来 selector.select(1000); //循环取SelectionKey Set selectionKeys = selector.selectedKeys(); Iterator it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { //处理取到的SelectionKey handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable e) { e.printStackTrace(); } }if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }public void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { //4、处理客户端的接入的请求信息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); //设置为非阻塞 sc.configureBlocking(false); //5、这时候会像客户端发送一个连接请求,并同时注册监听一个读事件 sc.register(this.selector, SelectionKey.OP_READ); } //7、服务端监听到来自客户端的写请求(客户端的写对应着就是服务端的读),服务端开始读数据 //并处理结束后像客户端发送写请求 if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(byteBuffer); if (readBytes > 0) { byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("the Server receive body = " + body); if (body.equals("query time")) { doWrite(sc); } } else if (readBytes < 0) { key.cancel(); sc.close(); } else { } } } }public void doWrite(SocketChannel channel) throws IOException { byte [] request = new Date().toString().getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(request.length); writeBuffer.put(request); writeBuffer.flip(); channel.write(writeBuffer); if (!writeBuffer.hasRemaining()){ System.out.println("client send message success"); } } }

ClientServer:
package com.ck.prefix.nio; public class ClientServer {public static void main(String[] args) { new Thread(new TimeClientHandle("127.0.0.1",8081)).start(); } }

TimeClientHandle :
package com.ck.prefix.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable {private String host; private int port; private SocketChannel socketChannel; private Selector selector; private volatile boolean isStop = false; public TimeClientHandle (String host, int port){ this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); System.out.println("client端初始化完成"); } catch (IOException e) { e.printStackTrace(); System.exit(1); } }public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!isStop){ try { selector.select(1000); Set selectionKey =selector.selectedKeys(); Iterator keys= selectionKey.iterator(); while (keys.hasNext()){ SelectionKey key = keys.next(); keys.remove(); try { handleInput(key); }catch (Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } }if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }public void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ //判断连接是否成功 SocketChannel sc = (SocketChannel)key.channel(); //6、监听到来自服务端的连接请求,连接成功后,客户端开始往服务端发送数据,并监听读请求 if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else { System.exit(1); } } //8、客户端接受来自服务端的写请求(同理也是客户端的读请求)后,开始处理 if(key.isReadable()){ ByteBuffer byteBuffer =ByteBuffer.allocate(1024); int readL =sc.read(byteBuffer); if( readL > 0){ byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("the client receive body = " + body); this.isStop =true; }else if(readL <0){ key.cancel(); sc.close(); }else {} } } }public void doConnect() throws IOException { //2、首先尝试连接server端,如果连接成功,监听读请求 boolean connect = socketChannel.connect(new InetSocketAddress(this.host,this.port)); if(connect){ socketChannel.register(this.selector, SelectionKey.OP_READ); doWrite(socketChannel); }else { //3、注册监听一个连接事件,接受来自服务端的连接监听 socketChannel.register(this.selector, SelectionKey.OP_CONNECT); } }public void doWrite(SocketChannel sc) throws IOException { byte [] request = "query time".getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(request.length); byteBuffer.put(request); byteBuffer.flip(); sc.write(byteBuffer); if (!byteBuffer.hasRemaining()){ System.out.println("client send message success"); } } }

执行步骤:
1、初始化服务端的ServerSocketChannel,并监听来自客户端的accept请求:
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

2、客户端SocketChannel,并发送accept请求,服务端接受到accept请求后会像客户端发送connect连接请求,并同时监听read请求:
//2、首先尝试连接server端,如果连接成功,监听读请求 boolean connect = socketChannel.connect(new InetSocketAddress(this.host,this.port));


//4、处理客户端的接入的请求信息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); //设置为非阻塞 sc.configureBlocking(false); //5、这时候会像客户端发送一个连接请求,并同时注册监听一个读事件 sc.register(this.selector, SelectionKey.OP_READ); }

计算机基础|NIO案例详解
文章图片


3、客户端监听来自服务端的连接请求:
//3、注册监听一个连接事件,接受来自服务端的连接监听 socketChannel.register(this.selector, SelectionKey.OP_CONNECT);

计算机基础|NIO案例详解
文章图片

4、接下来此时客户端会监听到来自服务端的connect请求后,发现已经建立了连接,接下来就会像服务端发送数据,并监听read请求(这是为了监听服务端的返回);
if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else { System.exit(1); } }

5、服务端接收到读请求后,处理完成后返回数据给客户端:
//7、服务端监听到来自客户端的写请求(客户端的写对应着就是服务端的读),服务端开始读数据 //并处理结束后像客户端发送写请求 if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(byteBuffer); if (readBytes > 0) { byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("the Server receive body = " + body); if (body.equals("query time")) { doWrite(sc); } } else if (readBytes < 0) { key.cancel(); sc.close(); } else { } }


6、客户端监听到来自服务端的返回,处理返回;
//8、客户端接受来自服务端的写请求(同理也是客户端的读请求)后,开始处理 if(key.isReadable()){ ByteBuffer byteBuffer =ByteBuffer.allocate(1024); int readL =sc.read(byteBuffer); if( readL > 0){ byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("the client receive body = " + body); this.isStop =true; }else if(readL <0){ key.cancel(); sc.close(); }else {} }

【计算机基础|NIO案例详解】 这就是个人觉得NIO请求的完整流程了,纯属于个人理解,不对之处希望大牛指出,非常感谢!

    推荐阅读