网络编程|BIO和NIO

开始学习网络编程,刚入门NIO,还有很多概念没清楚,codeing还在提升,有看到我的友友可以给我些学习的建议吗 。。。
目录
一、什么是阻塞和非阻塞
二、BIO
2.1 初始BIO
2.2 多线程 BIO
2.3 线程池
2.4 BIO总结
三、NIO
3.1 初始NIO FIleChannel
3.2 NIO下实现CS通信
3.3 黏包和半包
3.4 小整合(CS通信 + 消息边界问题 + buffer容量)
3.4.1 log4j日志
3.4.2 CS通信
3.4.3 完整代码
3.5 服务端大数据传输
3.6 多线程环境下的NIO
3.6.1 初始版
3.6.2 方案一
3.6.3 方案二: 使用阻塞队列
3.7 NIO总结

一、什么是阻塞和非阻塞 阻塞:对于每个线程,只能处理一个请求;在数据接受前和accept前会陷入阻塞状态
如:每个顾客都有一名专门的服务生接待
非阻塞:对于每个线程,可以同时管理多个客户端的请求;但是在(Buffer + Channel)NIO中,服务端与某一个客户端建立连接后,仍不能去处理其他请求;需要Selector的配合

我在学习NIO后在稍微懂点这两个概念。。。
二、BIO BIO:阻塞IO, 传统使用的IO流都是BIO, 当使用accept()和read()的时候都会陷入阻塞状态,直到数据准备完成。
2.1 初始BIO 目标:完成初始版的server 和 client 通信
server:
public class Server {public static void main(String[] args) { try { System.out.println("我是服务端,我开启了"); // 1. 创建服务端的Socket连接 ServerSocket serverSocket = new ServerSocket(9999); // 2. 等待接受客户端信息 Socket socket = serverSocket.accept(); // 3.通过socket获取输入流 InputStream is = socket.getInputStream(); // 4.将字节流封装成字符缓冲流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); // 5.获取数据 String res = null; /* 这里使用if,ServerSocket和Socket在BIO模式下是共死的 当Client的数据发送完,服务端也会自动刷新了解,如果使用 while会一直等待数据,而Client却已经关闭,读取失败 */ if ((res = br.readLine()) != null) { System.out.println(res); } } catch (IOException e) { e.printStackTrace(); } }}

client:
public class Client {public static void main(String[] args) { try { // 1.创建客户端的Socket连接 Socket socket = new Socket("127.0.0.1", 9999); // 2.获取字符输出流 OutputStream os = socket.getOutputStream(); String res = "我是客户端发送的数据"; // 3.将字节流封装成字符打印流,速度更快 PrintWriter pw = new PrintWriter(os); pw.println(res); pw.flush(); } catch (IOException e) { e.printStackTrace(); } }}

问题:初始版中,只能处理一个服务端一个客户端,效率低
2.2 多线程 BIO 使用多线程的方式,定义单独的线程处理数据的读写
server:
/** *实现一台服务器和多台客户端进行通信 *解决方案:使用多线程的方式 * *通过多线程的方式能够达到一对多的场景,但是当并发量高的时候,线程 *数量增多,很容易导致资源泄露 */ public class Server {public static void main(String[] args) { try { System.out.println("客户端已经启动。。。。"); ServerSocket serverSocket = new ServerSocket(9999); while (true) { // 等待接受客户端的连接 Socket socket = serverSocket.accept(); System.out.println("创建连接"); // 将接受的客户端socket交给线程执行 new HandlerSocket(socket).start(); }} catch (IOException e) { e.printStackTrace(); } }}class HandlerSocket extends Thread {private Socket socket; public HandlerSocket(Socket socket) { this.socket = socket; }@Override public void run() { try { InputStream is = socket.getInputStream(); // 封装成字符缓冲区 BufferedReader bf = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = bf.readLine()) != null) { // 读取数据 System.out.println("客户端" + socket.getLocalAddress() + ":" + msg); } } catch (IOException e) { e.printStackTrace(); } } }

client:
public class Client {public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1", 9999); OutputStream os = socket.getOutputStream(); // 将输出流封装成打印流 PrintWriter pw = new PrintWriter(os); Scanner in = new Scanner(System.in); while (true) { System.out.println("请说:"); String msg = in.next(); pw.println(msg); pw.flush(); }} catch (IOException e) { e.printStackTrace(); } }}

问题:每个客户端由单个线程进行执行,当请求过多的时候,需要等量的线程进行处理,资源占有率极高,容易造成服务器宕机。
2.3 线程池 为了解决2.2线程过多的问题,考虑通过线程池控制创建线程的数量
handlerThreadPool:
/** *线程池工具类 */ public class HandlerSocketPool {// 创建一个线程池 private static ExecutorService executorService; /* 线程池参数: ThreadPoolExecutor(int corePoolSize,// 在线程池中保持活跃线程的最大数量 int maximumPoolSize,// 线程池中最多创建的数量 long keepAliveTime,// 线程存活时间 TimeUnit unit, // 单位 BlockingQueue workQueue)//阻塞队列 */ public static ExecutorService createPool(int maximumPoolSize, int queueSize) { executorService = new ThreadPoolExecutor(3 , maximumPoolSize , 120, TimeUnit.SECONDS , new ArrayBlockingQueue(queueSize)); return executorService; }public static void executeTask(Runnable myRunnable) { executorService.execute(myRunnable); }}

server:
public class Server {public static void main(String[] args) { System.out.println("服务端启动。。。。"); try { ServerSocket serverSocket = new ServerSocket(9999); // 创建一个线程池 HandlerSocketPool.createPool(3, 5); while (true) { Socket socket = serverSocket.accept(); // 创建Runnable HandlerSocketPool.executeTask(new MyRunnable(socket)); }} catch (IOException e) { e.printStackTrace(); } } }class MyRunnable implements Runnable {private Socket socket; public MyRunnable(Socket socket) { this.socket = socket; }public void run() { try { InputStream is = socket.getInputStream(); BufferedReader bf = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = bf.readLine()) != null) { System.out.println("客户端" + socket.getLocalAddress() + ":" + msg); }} catch (IOException e) { e.printStackTrace(); } } }

client:
public class Client { public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1", 9999); OutputStream os = socket.getOutputStream(); // 将输出流封装成打印流 PrintWriter pw = new PrintWriter(os); Scanner in = new Scanner(System.in); while (true) { System.out.println("请说:"); String msg = in.next(); pw.println(msg); pw.flush(); }} catch (IOException e) { e.printStackTrace(); } } }

2.4 BIO总结
1.BIO模式,是一种阻塞IO的方式,也是最传统的IO; 使用Socket进行通信,每一个客户端和服务端正常只能建立一次连接 这种方式导致服务端与连接方建立连接后,服务端会一直等待客户端的信息(阻塞),一旦 客户端的发送完信息断开连接,服务端立马抛出异常(殉情)2.为了解决一个服务端只能和一个客户端通信,所以可以使用线程的方式,将不同的客户端socket 交由不同的线程执行;同时,为了避免线程过多导致资源消耗殆尽,可以使用线程池的方式进行管理2.使用BIO进行文件上传,使用DataXXXStream系列可以实现数据分段发送;在客户端发送信息结束后, 应该使用socket.shutdown()通知服务端信息已经发送完成

三、NIO 非阻塞IO,可以实现一个线程解决多个客户端的请求,核心组件:Buffer, Channel,Selector
Buffer是数据的载体,数据流向是双向的;相比于BIO数据单项流动,使用Buffer在Channel(通道)中进行传递。Channel的类型:FileChannel, DatagramChannel,ServerSocketChannel, SocketChannel。Selector的作用用于管理多个管道,通过selector()进行阻塞,当某一通道的数据准备好(可连接,可读,可写),那么就执行相应的回调函数
3.1 初始NIO FIleChannel
public class TestReadFIle {@Test public void testRead() { try { // 1.通过传统IO获取得到文件输入流 FileInputStream is = new FileInputStream("D:\\IDEA_Work\\MavenProject\\9_BIO_NIO\\src\\main\\java\\com\\righteye\\nio\\com\\righteye\\demo01\\test.txt"); // 2.获取管道 FileChannel channel = is.getChannel(); // 3.创建缓冲区allocate:Allocates a new byte buffer. ByteBuffer buffer = ByteBuffer.allocate(1024); // 4.将数据存入缓冲区 channel.read(buffer); // 6.将文件变为读模式Flips this buffer. // The limit is set to the current position and then the position is set to zero. buffer.flip(); // 5.封装成String String res = new String(buffer.array(), 0, buffer.remaining()); // 打印结果 System.out.println(res); } catch (Exception e) { e.printStackTrace(); } }}

1.FileChannel是用于文件读写的管道
2. Buffer中的常见API: 1.hashRemaining, 判断缓冲器是否有数据 2.mark() 设置标记 3.remaining, 返回position和limit之间的元素个数 4.flip(),将当前位置设置为limit, 然后当前下标归零,表示可读状态 5.clear() 进入写模式 操作数据的方法: get和put

3.2 NIO下实现CS通信 server:
public class Server {public static void main(String[] args) {try { System.out.println("服务端等待连接。。。"); // 1.创建服务端的管道,用于接收客户端的请求 // open(): Opens a server-socket channel. ServerSocketChannel ssChannel = ServerSocketChannel.open(); // 2.将通道转换为非阻塞模式 ssChannel.configureBlocking(false); // 3.将当前通道绑定一个端口 ssChannel.bind(new InetSocketAddress(9999)); // 4.获取选择器Selector Selector selector = Selector.open(); // 5.在通道上绑定Selector,监听连接事件 ssChannel.register(selector, SelectionKey.OP_ACCEPT); // 6.当连接建立成功,说明Selector中会触发相应事件 // select()方法, 阻塞直到某一通道中数据准备好,否则进入轮询状态 while (selector.select() > 0) {// 7.获取Selector中所有准备好的事件 Iterator it = selector.selectedKeys().iterator(); // 8.遍历准备好的事件 while (it.hasNext()) {// 9.获取当前要执行的事件 SelectionKey sk = it.next(); // 10. 如果当前事件是建立连接 if (sk.isAcceptable()) { // 11.获取客户端的管道,用于获取数据 SocketChannel scChannel = ssChannel.accept(); // 12.将管道设置为非阻塞 scChannel.configureBlocking(false); // 13.监听客户端写数据,在服务端部分用于读数据 scChannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) {// 可以读数据 System.out.println("测试开始。。。"); // 14. 获取客户端的数据管道 SocketChannel scChannel = (SocketChannel) sk.channel(); // 15.创建缓冲区,NIO中数据的读取在缓冲区中 ByteBuffer buffer = ByteBuffer.allocate(1024); try { int len = 0; while ((len = scChannel.read(buffer)) > 0) {// 这块不能写 != -1 // 设置读模式 buffer.flip(); System.out.println(new String(buffer.array(), 0, len)); buffer.clear(); } } catch (IOException e) { System.out.println("有人下线了"); sk.cancel(); scChannel.close(); e.printStackTrace(); } } // 16.事件处理完进行清除,不去除事件下一次容易发生空指针异常 it.remove(); } } } catch (IOException e) { e.printStackTrace(); } } }

// 8.遍历准备好的事件 while (it.hasNext()) {// 。。。。 // 16.事件处理完进行清除,不去除事件下一次容易发生空指针异常 it.remove(); }

这里的 it.remove不要忘记加,因为所有的事件在注册后会加入到 selectionKey集合中,在NIO中,所有的事件要么被执行,如果是未执行的事件会在下次继续加入到集合中,而在下一次重新进行遍历,此时会进入 isAcceptable()分支:
网络编程|BIO和NIO
文章图片


SocketChannel scChannel = ssChannel.accept(); // 没有接受返回值为NULLscChannel.configureBlocking(false); // 空指针异常

client:
public class Client {public static void main(String[] args) { try { // 1.获取客户端的通道 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999)); // 2.将通道设置为非阻塞 //socketChannel.configureBlocking(false); // 3.创建缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner in = new Scanner(System.in); while (true) { System.out.println("请说:"); String msg = in.nextLine(); // 4.向缓冲区中写数据 buffer.put(("我:" + msg).getBytes()); // 5.将buffer中的数据变为可读态 buffer.flip(); socketChannel.write(buffer); buffer.clear(); } //socketChannel.shutdownOutput(); } catch (IOException e) { e.printStackTrace(); }} }

3.3 黏包和半包 问题描述:在网络传输中,比如有:“Helloworld\nI`m zhangsan\nHo” 和 “w are you\n”
由于为了通信的效率,传输的数据报中的内容可能会不同于自己发送的初衷。
如:"Helloworld\nI`m zhangsan\n" 为两段话糅合在一起,称为黏包
而: “Ho” 和 “w are you\n” 是完整的一段话,但是在传输的过程被分开传输了,称为半包
解决方案:(消息边界问题)
处理消息边界的方法: -》 黏包 半包的状况 1). 所有客户端统一一个最长的ByteBuffer容量浪费带宽 2). 在每段消息的后面添加 标志, 然后进行特殊处理按1bit进行遍历,效率低 3). 类比Http协议,数据包分为两部分,第一部分长度固定表示内容长度,第二部分表示内容 如:Http2.0 使用LTV格式,代表长度,类型,内容

代码实现:
public void split(ByteBuffer source) { // 进入读模式 source.flip(); for (int i = 0; i < source.remaining(); i++) { // 遇到标识表示已经接受一段话 if (source.get(i) == '\n') { int len = i + 1 - source.position(); ByteBuffer target = ByteBuffer.allocate(len); for (int j = 0; j < len; j++) { // 使用get(),一个字节的进行读取,复制到新的缓冲区 target.put(source.get()); } System.out.println(new String(target.array(), 0, len)); } }logger.debug("split() end..."); // 每次划分之后可能还有剩余的数据,所以进行压缩,用于下次接受新数据 source.compact(); }

3.4 小整合(CS通信 + 消息边界问题 + buffer容量) 问题描述:
1. 在满足服务端和客户端能通信的前提下,模拟客户端的(非)正常退出
2.验证当客户端发送的数据超过缓冲区大小的时候发生的黏包,半包现象;
3. buffer容量问题,客户端发送消息不能保证满足buffer的最大容量,需要考虑扩容问题
3.4.1 log4j日志
使用日志进行控制台的显示,便于观察
pom依赖:
org.slf4j slf4j-api 1.6.1 org.slf4j slf4j-log4j12 1.6.1

log4j.properties
log4j.rootLogger=DEBUG,consoleAppender,logfilelog4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout log4j.appender.consoleAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%p] %m%nlog4j.appender.logfile=org.apache.log4j.DailyRollingFileAppender log4j.appender.logfile.File=/home/admin/demo/logs/demo.log log4j.appender.logfile.Append = true log4j.appender.logfile.DatePattern='.'yyyy-MM-dd #log4j.appender.logfile=org.apache.log4j.RollingFileAppender #log4j.appender.logfile.File=/home/admin/demo/logs/demo.log #log4j.appender.logfile.Append = true #log4j.appender.logfile.MaxFileSize = 10MB #log4j.appender.logfile.MaxBackupIndex = 20 log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

3.4.2 CS通信
定义成员变量:
private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class); private Selector selector; // 用于管理Channel private ServerSocketChannel ssc; // 服务端通道 private static final int PORT = 9999;

构造器进行初始化
// 初始化 public TestSelectorINfoRange() throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); // 设置非阻塞模式 ssc.configureBlocking(false); // 为通道绑定端口 ssc.bind(new InetSocketAddress(PORT)); // 绑定监听连接事件 ssc.register(selector, SelectionKey.OP_ACCEPT); }

Acceptable事件监听:
if (sk.isAcceptable()) { SocketChannel sc = ssc.accept(); // 获取客户端连接 sc.configureBlocking(false); // 用于读取客户端中的buffer // 为了实现同一次事件中使用的Buffer是相同的,所以要将buffer与通道绑定, register中的第三个参数,作为attachment附件存在 ByteBuffer buffer = ByteBuffer.allocate(16); SelectionKey clientSK = sc.register(selector, 0, buffer); // 为客户端的事件添加监听 clientSK.interestOps(SelectionKey.OP_READ); }

register()函数说明:
SelectionKey clientSK = sc.register(selector, 0, buffer);
Registers this channel with the given selector, returning a selection key.

返回值是SelectionKey, 表示当前触发的事件
第三个参数表示:参数如果不为空,那么就将这个Object作为附件绑定到Channel中
If the att argument is not null then the key's attachment will have been set to that value.

Readable事件监听:
else if (sk.isReadable()) { // ByteBuffer buffer = ByteBuffer.allocate(16); 修改前 SocketChannel sc = (SocketChannel) sk.channel(); // 从事件中获取附件 buffer ByteBuffer buffer = (ByteBuffer) sk.attachment(); try { int len = sc.read(buffer); // 客户端连接断开,没有数据了;一次连接close也表示一次读请求 if (len == -1) { // 从selectionKey集合中删除事件 sk.cancel(); } else { // 开启读模式 //buffer.flip(); //System.out.println(new String(buffer.array(), 0, buffer.limit())); split(buffer); // 说明buffer不够,需要进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 更新附件 sk.attach(newBuffer); } } } catch (IOException e) { sk.cancel(); // 异常删除事件 } }

if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 更新附件 sk.attach(newBuffer); }

扩容操作,当客户端发送的数据超过buffer的容量的时候,处理黏包,半包的split()方法因为无法检测到结尾标识所以无法进行输出,从而进行下一次的数据读写;导致数据会产生丢失
3.4.3 完整代码
client:
public class Client implements AutoCloseable{ private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class); private SocketChannel sc; private static final int PORT = 9999; private static final String url = "127.0.0.1"; public Client() throws IOException { sc = SocketChannel.open(new InetSocketAddress(url, PORT)); sc.configureBlocking(false); logger.debug("client initial finish..."); }public static void main(String[] args) { try (Client client = new Client()){client.sendInfo(); } catch (Exception e) { e.printStackTrace(); } }private void sendInfo() { try { while (true) { Scanner in = new Scanner(System.in); System.out.println("请说:"); String msg = in.nextLine(); sc.write(Charset.defaultCharset().encode(msg + "\n")); logger.debug("client info finish..."); }} catch (IOException e) { e.printStackTrace(); } }@Override public void close() throws Exception { if (sc != null) { sc.close(); } } }

server: (CS通信 + Split函数) split的书写在上面有实例
public class TestSelectorINfoRange implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(TestSelectorINfoRange.class); private Selector selector; // 用于管理Channel private ServerSocketChannel ssc; // 服务端通道 private static final int PORT = 9999; // 初始化 public TestSelectorINfoRange() throws IOException { selector = Selector.open(); ssc = ServerSocketChannel.open(); // 设置非阻塞模式 ssc.configureBlocking(false); // 为通道绑定端口 ssc.bind(new InetSocketAddress(PORT)); // 绑定监听连接事件 ssc.register(selector, SelectionKey.OP_ACCEPT); logger.debug("初始化服务端完成。。。。"); }public void close() { if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } if (ssc != null) { try { ssc.close(); } catch (IOException e) { e.printStackTrace(); } } }public static void main(String[] args) { try (TestSelectorINfoRange testSelectorINfoRange = new TestSelectorINfoRange()){// 开启监听事件 testSelectorINfoRange.listen(); } catch (IOException e) { e.printStackTrace(); } }private void listen() {while (true) { try { logger.debug("select before ..."); selector.select(); // 阻塞等待事件发生// 获取事件集并进行迭代 Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); // 处理事件 if (sk.isAcceptable()) { SocketChannel sc = ssc.accept(); // 获取客户端连接 sc.configureBlocking(false); // 用于读取客户端中的buffer// 为了实现同一次事件中使用的Buffer是相同的,所以要将buffer与通道绑定, register中的第三个参数,作为attachment附件存在 ByteBuffer buffer = ByteBuffer.allocate(16); SelectionKey clientSK = sc.register(selector, 0, buffer); // 为客户端的事件添加监听 clientSK.interestOps(SelectionKey.OP_READ); logger.debug("client socket: {}", clientSK); } else if (sk.isReadable()) { logger.debug("read listener ..."); // ByteBuffer buffer = ByteBuffer.allocate(16); 修改前SocketChannel sc = (SocketChannel) sk.channel(); // 从事件中获取附件 buffer ByteBuffer buffer = (ByteBuffer) sk.attachment(); try { int len = sc.read(buffer); // 客户端连接断开,没有数据了;一次连接close也表示一次读请求 if (len == -1) { // 从selectionKey集合中删除事件 sk.cancel(); logger.debug("client socket formal exit: {}", sk); } else { // 开启读模式 //buffer.flip(); //System.out.println(new String(buffer.array(), 0, buffer.limit())); split(buffer); // 说明buffer不够,需要进行扩容 if (buffer.position() == buffer.limit()) { logger.debug("buffer Expansion..."); ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); // 更新附件 sk.attach(newBuffer); } } } catch (IOException e) { logger.error("client socket informal exit: {}", sk); sk.cancel(); } }// 每一次迭代都带将执行完的事件从集合中移除,避免下次再次分配进入集合 it.remove(); }} catch (IOException e) { e.printStackTrace(); logger.error("连接终止。。。"); } }}// 解决消息边界问题 /* 当客户端发送的数据超过buffer, 进行二倍扩容 */ public void split(ByteBuffer source) { // 进入读模式 source.flip(); for (int i = 0; i < source.remaining(); i++) { // 遇到标识表示已经接受一段话 if (source.get(i) == '\n') { int len = i + 1 - source.position(); ByteBuffer target = ByteBuffer.allocate(len); for (int j = 0; j < len; j++) { // 使用get(),一个字节的进行读取,复制到新的缓冲区 target.put(source.get()); } System.out.println(new String(target.array(), 0, len)); } }logger.debug("split() end..."); // 每次划分之后可能还有剩余的数据,所以进行压缩,用于下次接受新数据 source.compact(); } }

运行结果:
server:
网络编程|BIO和NIO
文章图片

client:
网络编程|BIO和NIO
文章图片


3.5 服务端大数据传输 问题描述:在服务端向数据库写数据的时候,会使用通道中的buffer写入数据,但是buffer也是有最大容量的,当缓冲区满了服务端写入的数据就会是0, 由于轮询操作会导致大量无用的写操作产生。
解决方案:不在一次性写入所有数据,而是在buffer可以写的情况下触发 写事件,由selector进行监听
public class Server { private static final Logger logger = LoggerFactory.getLogger(Server.class); public static void main(String[] args) { try (Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open()) {ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(9999)); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if (sk.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); SelectionKey skClient = sc.register(selector, 0, null); skClient.interestOps(SelectionKey.OP_READ); logger.debug("connect finish..."); StringBuilder sb = new StringBuilder(); // 给客户端发送消息 for (int i = 0; i < 250000000; i++) sb.append("a"); ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); int count = sc.write(buffer); logger.debug("当前打印数量:" + count); // 避免缓冲区满导致无用的执行 if (buffer.hasRemaining()) { skClient.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE); skClient.attach(buffer); // 添加一个附件 } logger.debug("print finish..."); } else if (sk.isWritable()) {// 核心操作 SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = (ByteBuffer) sk.attachment(); int count = sc.write(buffer); System.out.println(count); // 如果没有数据了,删除写事件 while (!buffer.hasRemaining()) { sk.attach(null); sk.interestOps(SelectionKey.OP_READ); } } } }} catch (IOException e) { e.printStackTrace(); } } }

3.6 多线程环境下的NIO 主要处理某一个事件占用时间过长,导致selector监听事件效率下降
解决方案:配置多个线程,由一个boss线程负责连接创建,若干worker线程负责读写操作

3.6.1 初始版
worker线程:
static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private boolean flag = false; // 标志当前worker是否是第一次创建 public Worker(String name) { this.name = name; } // 初始化 public void register() throws IOException { if (!flag) { thread = new Thread(this, name); selector = Selector.open(); thread.start(); flag = true; } } // 具体worker进行工作 @Override public void run() { while (true) { try { selector.select(); // 阻塞事件Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if (sk.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) sk.channel(); channel.read(buffer); buffer.flip(); log.debug("print data: {}", new String(buffer.array(), 0, buffer.remaining())); } } } catch (IOException e) { e.printStackTrace(); } } } }

主线程:
public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); Selector boss = Selector.open()) {Thread.currentThread().setName("boss"); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(9999)); ssc.register(boss, SelectionKey.OP_ACCEPT); // boss线程负责连接创建/* 创建工作线程,给工作线程中的selector注册写事件 不能再每一次连接创建就创建一个worker节点,如果这样做和传统线程池没什么区别 */ Worker worker = new Worker("worker-0"); while (boss.select() > 0) { Iterator it = boss.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if (sk.isAcceptable()) { log.debug("connect wait ..."); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); worker.register(); sc.register(worker.selector, SelectionKey.OP_READ); } } }} catch (IOException e) { e.printStackTrace(); } }

启动客户端和服务端后发送消息:
网络编程|BIO和NIO
文章图片

原因:
主线程中:
worker.register(); // 执行后启动了 worker线程的run方法,selector阻塞线程
sc.register(worker.selector, SelectionKey.OP_READ); // 事件没有被注册
3.6.2 方案一
worker.register(sc); worker.selector.wakeup(); // 2.方案一:唤醒一次selector,保证事件注册sc.register(worker.selector, SelectionKey.OP_READ);

3.6.3 方案二: 使用阻塞队列
@Slf4j public class MutilThreadServer {public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); Selector boss = Selector.open()) {Thread.currentThread().setName("boss"); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(9999)); ssc.register(boss, SelectionKey.OP_ACCEPT); // boss线程负责连接创建/* 创建工作线程,给工作线程中的selector注册写事件 不能再每一次连接创建就创建一个worker节点,如果这样做和传统线程池没什么区别 */ Worker worker = new Worker("worker-0"); while (boss.select() > 0) { Iterator it = boss.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if (sk.isAcceptable()) { log.debug("connect wait ..."); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connect finish ...{}", sc.getRemoteAddress()); /* 1.此时客户端发送信息无法接受;因为worker线程和主线程不是同步的,worker线程首先启动然后selector.select() *进入阻塞状态,此时事件还没有被注册 *解决方案:如何控制事件注册一定发生在阻塞前 -》 注册时候唤醒一次select */ worker.register(sc); // 将通道作为参数传递 } } }} catch (IOException e) { e.printStackTrace(); } }/* 对于Worker线程中的成员变量,应该是每个worker独享一份;不能使用static,否则表示整个类创建的实例都共享一份 */ static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private boolean flag = false; // 标志当前worker是否是第一次创建 private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); public Worker(String name) { this.name = name; }// 初始化 public void register(SocketChannel sc) throws IOException { log.debug("initial begin..."); if (!flag) { thread = new Thread(this, name); selector = Selector.open(); thread.start(); flag = true; } log.debug("initial end..."); log.debug("register begin..."); /* 4. 但是此时调用register的还是boss线程,没有实际解决线程同步问题 */ //sc.register(selector, SelectionKey.OP_READ); // 5.使用一种阻塞队列的方式,实现线程之间的通信 queue.add(() -> { try { sc.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }); log.debug("register end..."); selector.wakeup(); // 唤醒一次,避免阻塞,保证事件一定注册上 }// 具体worker进行工作 @Override public void run() { while (true) { try { selector.select(); // 阻塞事件 // 拿到队列中的事件 Runnable task = queue.poll(); if (task != null) { task.run(); }Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if (sk.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) sk.channel(); channel.read(buffer); buffer.flip(); log.debug("print data: {}", new String(buffer.array(), 0, buffer.remaining())); } } } catch (IOException e) { e.printStackTrace(); } } } } }


3.7 NIO总结 记录一部分笔记:
8.IO模型:阻塞IO,非阻塞IO,多路复用,信号驱动,异步IO 请求资源(用户态)和等待资源(内核态)阶段 内核态中的读取操作:分为等待数据阶段,最后再到复制数据阶段(实现数据从磁盘到内核内存中)阻塞IO和多路复用的区别: 1.BIO涉及一次系统调用;多路复用设计两次系统调用(select, read) 2.在请求过多的情况下,多路复用可以通过selector同时监听很多事件,只要获取到需要的数据便可以在相应通道执行操作 但是对于阻塞IO,每个操作都是串行的,只有上一个操作结束才能执行下一项同步:发生在一个线程中,由一个线程发送请求并等待数据 异步:两个线程进行,发送请求线程等待其他线程(执行对应的回调函数)回复数据9.零拷贝问题: 传统数据传输问题:java代码: read -> write 切换内核态:磁盘 -> 内核缓冲区 -> 用户态 -> socket缓冲区 -> 网卡 (这个过程:用户态和内核态的切换 3 次: 用户态 -》(read)内核态 -》 用户态 -》(write) -》 内核态 数据拷贝:4次)NIO优化: 1.使用ByteBuffer.allocateDirect() 分配直接内存,属于OS中的内存,由java和OS共享 2.Linux2.1后提出sendFile, 对应java中使用NIO中的transferTo方法,能直接从内核缓冲区 -》 socket缓冲区 3.Linux2.4后实现优化,不在直接将数据拷贝到socket缓冲区,而是记录一些偏移量,从内核缓冲区直接复制数据到网卡 零拷贝针对的是内核态与用户态之间没有数据的拷贝,不会将数据拷贝到JVM内存

【网络编程|BIO和NIO】

    推荐阅读