JAVA网络编程基础
IO模型
IO请求的两个阶段(Linux)
- IO调用阶段:用户进程向内核发起系统调用
- IO执行阶段:此时用户进行等待IO请求处理完成返回,此阶段分为两步
- 等待数据就绪,并写入内核缓冲区
- 数据从内核缓冲区 到 用户态缓冲区
- 内核态:运行操作系统程序,操作硬件
- 用户态:运行用户程序
1.同步阻塞IO(BIO) 内核只能同时处理一个请求,分两个阶段(即上述的IO执行阶段):
- 系统调用
- 数据从内核缓冲区读取到用户缓冲区
2.同步非阻塞IO(NIO) 进程的请求不会一直等待而是有专门的线程来轮询这些IO进程是否存有数据,但是轮询过程中会存在着系统调用导致的上下问切换,如果请求过多会存在严重的系统性能消耗
3.IO多路复用 多路是指多个数据通道,复用指的是一个或多个固定的线程来处理每一
Socket
连接, select
poll
epoll
都是IO多路复用的实现,线程一次可以select
多个数据通道的数据状态,解决了NIO
性能消耗过重的问题-文件描述符fd 文件描述符(File descriptor)形式上是一个非负整数,是一个索引值,指向内核为每一个进程所维护的该进程所打开文件的记录表.
- select 这个函数会监视3类文件描述符,分别是
writefds
,readfds
,exceptfds
调用select函数时会阻塞,直到select有以上3中描述符文件就绪或者超时,一旦某个描述符就绪了,会通知程序进行相关的读写操作,由于select poll epoll都是同步IO,所以它们都需要在事件就绪后自己负责读写.也就是select会阻塞监听相关事件,直到处理完读写操作或者超时后才会解除阻塞.select单个进程能够监听的文件数量是有限的,linux一般默认是1024int select(int n,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
- poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd {
int fd;
/* file descriptor */
short events;
/* requested events to watch */
short revents;
/* returned events witnessed */
};
poll使用一个
pollfd
的结构体来传导需要监听的事件和要发生的事件,另外poll监听的文件描述符个数是没有限制的- epoll ? 不需要轮询,时间复杂度为O(1)
? epoll_create 创建一个白板 存放
fd_events
? epoll_ctl 用于向内核注册新的描述符或者是改变某个文件描述符的状态。已注册的描述符在内核中会被维护在一棵红黑树上
? epoll_wait 通过回调函数内核会将 I/O 准备好的描述符加入到一个链表中管理,进程调用
epoll_wait()
便可以得到事件完成的描述符? 两种触发模式:
? LT:水平触发
? 当
epoll_wait()
检测到描述符事件到达时,将此事件通知进程,进程可以不立即处理该事件,下次调用 epoll_wait()
会再次通知进程。是默认的一种模式,并且同时支持 Blocking
和 No-Blocking
。? ET:边缘触发
? 和 LT 模式不同的是,通知之后进程必须立即处理事件。
? 下次再调用
epoll_wait()
时不会再得到事件到达的通知。很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。只支持 No-Blocking
,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。4.信号驱动模型 信号驱动模型并不常用,是一种半异步IO.当数据准备就绪后,内核会发送一个SIGIO消息给应用进程,进程然后开始读写消息.
5.异步IO 系统调用会被立即返回结果,然后读取写消息由异步完成.
BIO BIO - Block-IO 阻塞同步的通讯方式
BIO的问题:
阻塞\同步,BIO很依赖于网络,网速不好阻塞时间会很长; 每次请求都由程序执行并返回,这是同步的缺陷
BIO的工作流程:
- 服务端启动
- 阻塞等待客户端连接
- 客户端连接
- 监听客户端内容
- 客户端断开
- 回到第一步
public class BioServer {
public static void main(String[] args) {
try {
// 服务端绑定端口
ServerSocket server= new ServerSocket(9000);
while (true) {
// 创建一个Socket接收连接 - 当没有时阻塞
Socket socket = server.accept();
// 获取输入流
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String message;
while (null != (message = reader.readLine())) {
System.out.println(message);
}
inputStream.close();
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
BioClient
public class BioClient {
public static void main(String[] args) {
try {
// 创建socket
Socket socket= new Socket("localhost",9000);
// 获取Socket输出流
OutputStreamoutputStream = socket.getOutputStream();
// 输出流
outputStream.write("hello socket".getBytes());
// 关闭
outputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
多线程解决BIO阻塞问题
- 解决了的问题:多个线程处理当一个客户端迟迟不退出时,其他线程依然可以处理其它客户端发送过来的请求.避免了一个请求阻塞导致其他客户端请求一直等待的问题
- 仍然存在问题:加入服务端给定固定线程数是10,有10个客户端创建了连接 但是没有一个人发送消息 那么10个线程将全部阻塞,或者有些客户端迟迟没有操作会造成不必要的资源占用.
public class BioServer {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
ServerSocket serverSocket;
try {
serverSocket= new ServerSocket(9000);
while (true){
//new Thread(new BioHandler(serverSocket.accept()){}).start();
executorService.execute(new BioHandler(serverSocket.accept()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class BioHandler implements Runnable {
private Socketsocket;
public BioHandler(Socket socket) {
this.socket = socket;
}public void run() {
try {
InputStream input= socket.getInputStream();
BufferedReader reader= new BufferedReader(new InputStreamReader(input));
String m;
while (null != (m = reader.readLine())){
System.out.println(m);
}
input.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO
java 1.4版本引入,给予缓冲区面 \ 向通道的io操作
bio | nio |
---|---|
面向流 | 面向缓冲区(buffer) |
阻塞io | 非阻塞io |
同步 | 同步 |
无 | Selector(选择器) |
缓冲区介绍
缓冲区是一个特定数据类型的容器,有java.nio包定义,所有的缓冲区都是Buffer抽象类的子类
Buffer主要用于和NIO通道进行通信,数据从通道读入到缓冲区,再从缓冲区读取到通道
Buffer就像是一个数据可以保存多个类型相同的数据
子类
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
基本属性 1.容量(capacity):表示缓冲区的最大容量 一旦创建不能修改
2.限制(limit):第一个不可读的索引,即位于limit后面的数据不可读
3.位置(position):下一个要读取或写入数据的索引
4.flip:将此时的position设为limit,position置为0 ,一般是从inputChannel将数据读入到buffer 然后将buffer flip后 为了从buffer中读取数据outputChannel
5.标记(mark)和恢复(reset):标记是一个索引,通过Buffer.mark()指定一个特定的位置,使用reset方法可以恢复到这个位置
public class BufferSample {
public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("capacity:" + buffer.capacity());
System.out.println("limit:" + buffer.limit(10));
System.out.println("position:" +buffer.position());
/**
* 结果:
* capacity:1024
* limit:java.nio.HeapByteBuffer[pos=0 lim=10 cap=1024]
* position:0
*/System.out.println("==============================");
String str = "hello";
buffer.put(str.getBytes());
System.out.println("position:" +buffer.position());
/**
* 结果:
* position:5
*/System.out.println("==============================");
System.out.println("pos 和 limit之间元素的个数:" + buffer.remaining());
buffer.mark();
buffer.put("oo".getBytes());
System.out.println("reset前position:" +buffer.position());
buffer.reset();
System.out.println("reset后position:" +buffer.position());
/**
* 结果:
* pos 和 limit之间元素的个数:5
* reset前position:7
* reset后position:5
*/System.out.println("==============================");
buffer.rewind();
System.out.println("position:" + buffer.position());
/**
* 结果:
* position:0
*/System.out.println("==============================");
byte[] dst = new byte[3];
buffer.get(dst);
System.out.println(new String(dst));
System.out.println("position:" + buffer.position());
/**
* 结果:
* hel
* position:3
*/System.out.println("==============================");
//将此时的position转为limit,并将position置为0 - 一般flip以后就是开始读取缓冲区类
buffer.flip();
System.out.println("capacity:" + buffer.capacity());
System.out.println("limit:" + buffer.limit());
System.out.println("position:" +buffer.position());
byte[] b = new byte[buffer.limit()];
buffer.get(b,0,2);
System.out.println(new String(b,0,2));
/**
* 结果:
* capacity:1024
* limit:3
* position:0
* he
*/
}
}
直接/非直接缓冲区
- 直接缓冲区:程序直接操作物理映射文件
- 非直接缓冲区:jvm - 操作系统 - 物理内存
Channel:类似于流,但是Channel不能直接访问数据,只能与缓冲区进行交互
通道主体实现类 1.
FileChannel
:用于读取 写入 映射和操作文件的通道2.
DataGramChannel
:通过UDP读取网络中的数据通道3.
SocketChannel
:通过Tcp读写通道的数据4.
ServerSocketChannel
:可以监听新进入的Tcp连接,对每一个新连接创建一个SocketChannel提供getChannel()方法的类 1.
FileInputStream
2.
FileOutputStream
3.
RandomAccessFile
4.
Socket
5.
ServerSocket
6.
DataGramSocket
通道直接传输 1.
transferFrom()
2.
transferTo()
public class ChannelSimple {
/**
* 利用通道完成文件复制(非直接缓冲区)
*/
public static void FileNoDirectBufferTest(){
try {
//创建输入输出流
FileInputStream inputStream = new FileInputStream("../test.txt");
FileOutputStream outputStream = new FileOutputStream("../test2.txt");
//根据流获取通道
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//从通道读取数据到缓冲区
while (-1 != inputChannel.read(byteBuffer)){
//limit - position,position - 0
byteBuffer.flip();
//将缓冲区中的数据写出
outputChannel.write(byteBuffer);
byteBuffer.clear();
}
outputChannel.close();
inputChannel.close();
outputStream.close();
inputStream.close();
} catch (IOExceptione) {
e.printStackTrace();
}
}/**
* 利用直接缓冲区完成文件复制(内存映射文件)
* @throws IOException
*/
public static void FileMpDirectBufferTest() throws IOException{
//创建通道
FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);
FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);
//内存映射文件
MappedByteBuffer inputBuffer = inputChannel.map(FileChannel.MapMode.READ_ONLY,0,inputChannel.size());
MappedByteBuffer outputBuffer =outputChannel.map(FileChannel.MapMode.READ_WRITE,0,inputChannel.size());
//直接对缓冲区进行数据读写操作
byte [] dst = new byte[inputBuffer.limit()];
inputBuffer.get(dst);
outputBuffer.put(dst);
outputChannel.close();
inputChannel.close();
}/**
* 利用直接缓冲区复制
* @throws IOException
*/
public static void FileDirectBufferTest() throws IOException {
//创建通道
FileChannel inputChannel = FileChannel.open(Paths.get("../test.txt"), StandardOpenOption.READ);
FileChannel outputChannel = FileChannel.open(Paths.get("../test2.txt"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ);
//inputChannel.transferTo(0,inputChannel.size(),outputChannel);
//等同 上面的注释
outputChannel.transferFrom(inputChannel,0,inputChannel.size());
outputChannel.close();
inputChannel.close();
}
}
分散读取和聚集写入
- 分散读取(Scatter):将一个
Channel
中的数据分散储存到多个Buffer
中 - 聚集写入(Gather):将多个
Buffer
中的数据写入同一个Channel
中
public class ScatterAndGather {
public static void main(String[] args) {
try {
//创建输入输出流
FileInputStream inputStream = new FileInputStream("../test.txt");
FileOutputStream outputStream = new FileOutputStream("../test2.txt");
//根据流获取通道
FileChannel inputChannel = inputStream.getChannel();
FileChannel outputChannel = outputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate((int)inputChannel.size()/2);
ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)inputChannel.size()/2);
ByteBuffer[] byteBuffers = new ByteBuffer[]{byteBuffer1,byteBuffer2};
//从通道读取数据到缓冲区 - 分散写入
while (-1 != inputChannel.read(byteBuffers)){
for (ByteBuffer buffer:byteBuffers){
//limit - position,position - 0
buffer.flip();
}
//聚集写出
for (ByteBuffer buffer:byteBuffers) {
//将缓冲区中的数据写出
outputChannel.write(buffer);
buffer.clear();
}
}
outputChannel.close();
inputChannel.close();
outputStream.close();
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
选择器(Selector)
Selector
一般被称为选择器,也被称为多路复用器.用于检查一个或多个通道是否处于可读\可写,如此可以实现一个线程管理多个Channel
使用Selector带来的好处 使用更少的线程来处理
Channel
,可以防止上下文切换带来的性能消耗可以多路复用的
Channel
可以被选择(多路复用)的
Channel
都继承自SelectableChannel
SelectableChannel
||
AbstractSelectableChannel
||||||
DataGramChannelSocketChannelServerSocketChannel
所以
FileChannel
不适应与Selector
,即不能切换为非阻塞模式Selector使用基本步骤 1.创建
Selector: Selector selector = Selector.open();
2.设置为非阻塞为:
`channel.configureBlocking(false);
`
3.注册
Channel
到Selector
:/**
* 参数-1:要注册到的多路复用器
* 参数-2:是一个"interest集合",即要监听事件的集合(有以下四种)
* OP_CONNECT 连接
* OP_ACEEPT接收
* OP_READ读
* OP_WRITE写
*/
SelectionKey key = channel.register(selector,SelectionKey.OP_READ);
如果要监听多种事件如下:
SelectionKey key = channel.register(selector,SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
4.然后就 连接就绪 | 接收就绪 | 读就绪 | 写就绪
Selector主要方法
方法 | 描述 |
---|---|
Set keys() |
返回所有SelectionKey 集合,代表 注册在这个Selector上 的Channel |
Set selectedKeys() |
返回已选择了的(即有io操作的)SelectionKey |
int select() |
监控所有注册了的Channel ,如果有需要 io的操作时会将对应的selectKey 加入到 selectedKeys 集合中,返回的则是被选择 (有io操作的)Channel 数量,这个操作时阻 塞的即只有被选择的Channel 数量>=1才 返回 |
int select(timeout) |
有超时时长,一直没有io操作的Channel 出现, 到达timeout出现的时间后将自动返回 |
int selectNow() |
无阻塞 立即返回 |
Selector wakeUp() |
使正在select() 立即返回 |
void close() |
关闭 |
SelectionKey
表示Channel
和Selector
之间的关系,Channel
向Selector
注册就会产生一个SelectionKey
方法 | 描述 |
---|---|
int interestOps() |
感兴趣事件的集合 boolean isInterested = interestSet & SelectionKey.OP_CONNECT ... |
int readyOps() |
获取通道准备好就绪的操作 |
SelectableChannel channel() |
获取注册通道 |
Selector selector() |
获取选择器 |
boolean isConnectable() |
检测Channel 中是否有连接事件就绪 |
boolean isAcceptable() |
检测Channel 中是否有接收事件就绪 |
boolean isReadaable() |
检测Channel 中是否有读事件就绪 |
boolean isWriteable() |
检测Channel 中是否有写事件就绪 |
Object attach() |
将一个对象附着到SelectionKey 上, 主要是一些用于标识的信息 |
Object attachment() |
获取注册信息 也可以在Channel 注册的时候附着信息 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
|
void cancel() |
请求取消此键的通道到其选择器的注册 |
public class NioServer {
public static void main(String[] args) throws IOException {
Integer flag = 0;
//创建服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(9021));
//创建选择器
Selector selector = Selector.open();
//注册 接收
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//有一个事件时就操作
while (selector.select() > 0) {
//获取事件集合
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//如果是接收就绪
if (selectionKey.isAcceptable()) {
//获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
//切换成非阻塞
socketChannel.configureBlocking(false);
//注册在多路复用器上 读
socketChannel.register(selector, SelectionKey.OP_READ);
//读事件
} else if (selectionKey.isReadable()) {
//获取客户端连接
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//设置缓存
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = 0;
while (-1 != (len = socketChannel.read(byteBuffer))) {
flag = 0;
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(),0,len));
byteBuffer.clear();
}
flag++;
//判断此时是否有io事件,陷入空轮询 - 连续空轮询100次
//请求取消此键的通道在其选择器的注册,也就是 selector.select();
的数量 -1
if(flag == 100){
selectionKey.cancel();
socketChannel.close();
}
}
}
iterator.remove();
}
}
}
NioClient
package com.yuan.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NioClient {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",9021));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("hello".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- Python基础|Python基础 - 练习1
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用