#|NIO通信示例

【#|NIO通信示例】读者可以将代码拷贝到编译器上跑一跑,如果看了注释还有不懂的地方,可以参考:
NIO基础(一)之Buffer
NIO基础(二)之Channel
NIO基础(三)之Selector
第三篇尤为重要对看懂大体框架有很大意义。

public class Const { public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; public static String response(String msg){ return "Hello,"+msg+",Now is "+new java.util.Date( System.currentTimeMillis()).toString() ; } }

/** * 类说明:nio通信客户端 */ public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Server").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } }

/** * 类说明:nio通信客户端处理器 */ public class NioClientHandle implements Runnable{ private String host; private int port; private volatile boolean started; private Selector selector; private SocketChannel socketChannel; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { /*创建选择器*/ this.selector = Selector.open(); /*打开监听通道*/ socketChannel = SocketChannel.open(); /*如果为 true,则此通道将被置于阻塞模式; * 如果为 false,则此通道将被置于非阻塞模式 * 缺省为true*/ socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } public void stop(){ started = false; }@Override public void run() { //连接服务器 try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(-1); } /*循环遍历selector*/ while(started){ try { /*阻塞方法,当至少一个注册的事件发生的时候就会继续*/ selector.select(); /*获取当前有哪些事件可以使用*/ Set> keys = selector.selectedKeys(); /*转换为迭代器*/ Iterator> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。 如果我们没有删除处理过的键,那么它仍然会在事件集合中以一个激活 的键出现,这会导致我们尝试再次处理它。*/ it.remove(); try { handleInput(key); } catch (Exception e) { if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } }} catch (IOException e) { e.printStackTrace(); System.exit(-1); } }if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } }}/*具体的事件处理方法*/ private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ /*获得关心当前事件的channel*/ SocketChannel sc =(SocketChannel)key.channel(); /*处理连接就绪事件 * 但是三次握手未必就成功了,所以需要等待握手完成和判断握手是否成功*/ if(key.isConnectable()){ /*finishConnect的主要作用就是确认通道连接已建立, 方便后续IO操作(读写)不会因连接没建立而 导致NotYetConnectedException异常。*/ if(sc.finishConnect()){ /*连接既然已经建立,当然就需要注册读事件, 写事件一般是不需要注册的。*/ socketChannel.register(selector,SelectionKey.OP_READ); }else System.exit(-1); }/*处理读事件,也就是当前有数据可读*/ if(key.isReadable()){ /*创建ByteBuffer,并开辟一个1k的缓冲区*/ ByteBuffer buffer = ByteBuffer.allocate(1024); /*将通道的数据读取到缓冲区,read方法返回读取到的字节数*/ int readBytes = sc.read(buffer); if(readBytes>0){ buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("客户端收到消息:"+result); } /*链路已经关闭,释放资源*/ else if(readBytes<0){ key.cancel(); sc.close(); }} } }/*进行连接*/ private void doConnect() throws IOException { /*如果此通道处于非阻塞模式,则调用此方法将启动非阻塞连接操作。 如果连接马上建立成功,则此方法返回true。 否则,此方法返回false, 因此我们必须关注连接就绪事件, 并通过调用finishConnect方法完成连接操作。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){ /*连接成功,关注读事件*/ socketChannel.register(selector,SelectionKey.OP_READ); } else{ socketChannel.register(selector,SelectionKey.OP_CONNECT); } }/*写数据对外暴露的API*/ public void sendMsg(String msg) throws IOException { doWrite(socketChannel,msg); }private void doWrite(SocketChannel sc,String request) throws IOException { byte[] bytes = request.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } }

/** * 类说明:nio通信服务端 */ public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } }

/** * 类说明:nio通信服务端处理器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 构造方法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try{ //创建选择器 selector = Selector.open(); //打开监听通道 serverChannel = ServerSocketChannel.open(); //如果为 true,则此通道将被置于阻塞模式; // 如果为 false,则此通道将被置于非阻塞模式 serverChannel.configureBlocking(false); //开启非阻塞模式 serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); //标记服务器已开启 started = true; System.out.println("服务器已启动,端口号:" + port); }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; }@Override public void run() { //循环遍历selector while(started){ try{ //阻塞,只有当至少一个注册的事件发生的时候才会继续. selector.select(); Set> keys = selector.selectedKeys(); Iterator> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //处理新接入的请求消息 if(key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); SocketChannel sc = ssc.accept(); System.out.println("=======建立连接==="); sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); }//读消息 if(key.isReadable()){ System.out.println("======socket channel 数据准备完成," + "可以去读==读取======="); SocketChannel sc = (SocketChannel) key.channel(); //创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服务器收到消息:" + message); //处理数据 String result = response(message) ; //发送应答消息 doWrite(sc,result); } //链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送应答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //将消息编码为字节数组 byte[] bytes = response.getBytes(); //根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } }

    推荐阅读