Netty学习之组件

一、入门案例 1. 服务器端代码

public class HelloServer { public static void main(String[] args) { // 1、启动器,负责装配netty组件,启动服务器 new ServerBootstrap() // 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector .group(new NioEventLoopGroup()) // 3、选择服务器的 ServerSocketChannel 实现 .channel(NioServerSocketChannel.class) // 4、child 负责处理读写,该方法决定了 child 执行哪些操作 // ChannelInitializer 处理器(仅执行一次) // 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器 .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { // 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String nioSocketChannel.pipeline().addLast(new StringDecoder()); // 6、SocketChannel的业务处理,使用上一个处理器的处理结果 nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } // 7、ServerSocketChannel绑定8080端口 }).bind(8080); } }

2. 客户端代码
public class HelloClient { public static void main(String[] args) throws InterruptedException { new Bootstrap() .group(new NioEventLoopGroup()) // 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现 .channel(NioSocketChannel.class) // ChannelInitializer 处理器(仅执行一次) // 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器 .handler(new ChannelInitializer() { @Override protected void initChannel(Channel channel) throws Exception { // 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出 channel.pipeline().addLast(new StringEncoder()); } }) // 指定要连接的服务器和端口 .connect(new InetSocketAddress("localhost", 8080)) // Netty 中很多方法都是异步的,如 connect // 这时需要使用 sync 方法等待 connect 建立连接完毕 .sync() // 获取 channel 对象,它即为通道抽象,可以进行数据读写操作 .channel() // 写入消息并清空缓冲区 .writeAndFlush("hello world"); } }

3.运行流程

组件介绍
  • channel 可以理解为数据的通道
  • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • handler 可以理解为数据的处理工序,工序有多道,合在一起组成pipeline,handler 分 Inbound 和 Outbound 两类
  • eventLoop 可以理解为处理数据的工人,eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责
二、组件 1.EventLoop
事件循环对象 EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
它的继承关系如下
  • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
事件循环组 EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
以一个简单的实现为例:
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程 DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());

输出
io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6 io.netty.channel.DefaultEventLoop@60f82f98

也可以使用 for 循环
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); for (EventExecutor eventLoop : group) { System.out.println(eventLoop); }

输出
io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6

处理普通与定时任务
public class TestEventLoop { public static void main(String[] args) { // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程 EventLoopGroup group = new NioEventLoopGroup(2); // 通过next方法可以获得下一个 EventLoop System.out.println(group.next()); System.out.println(group.next()); // 通过EventLoop执行普通任务 group.next().execute(()->{ System.out.println(Thread.currentThread().getName() + " hello"); }); // 通过EventLoop执行定时任务 group.next().scheduleAtFixedRate(()->{ System.out.println(Thread.currentThread().getName() + " hello2"); }, 0, 1, TimeUnit.SECONDS); // 优雅地关闭 group.shutdownGracefully(); } }

输出结果如下
io.netty.channel.nio.NioEventLoop@7bb11784 io.netty.channel.nio.NioEventLoop@33a10788 nioEventLoopGroup-2-1 hello nioEventLoopGroup-2-2 hello2 nioEventLoopGroup-2-2 hello2 nioEventLoopGroup-2-2 hello2

关闭 EventLoopGroup
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
处理IO任务
服务器代码
public class MyServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8)); } }); } }) .bind(8080); } }

客户端代码
public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); System.out.println(channel); // 此处打断点调试,调用 channel.writeAndFlush(...); System.in.read(); } }

分工
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件
public class MyServer { public static void main(String[] args) { new ServerBootstrap() // 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))... } }

一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件

增加自定义EventLoopGroup
当有的任务需要较长的时间处理时,可以使用非NioEventLoop,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理
public class MyServer { public static void main(String[] args) { // 增加自定义的非NioEventLoopGroup EventLoopGroup group = new DefaultEventLoopGroup(); new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理 socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8)); // 调用下一个handler ctx.fireChannelRead(msg); } }) // 该handler绑定自定义的Group .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8)); } }); } }) .bind(8080); } }

启动四个客户端发送数据
nioEventLoopGroup-4-1 hello1 defaultEventLoopGroup-2-1 hello1 nioEventLoopGroup-4-2 hello2 defaultEventLoopGroup-2-2 hello2 nioEventLoopGroup-4-1 hello3 defaultEventLoopGroup-2-3 hello3 nioEventLoopGroup-4-2 hello4 defaultEventLoopGroup-2-4 hello4

可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理

handler 执行中如何换人?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 获得下一个handler的EventLoop, excutor 即为 EventLoop EventExecutor executor = next.executor(); // 如果下一个EventLoop 和当前的EventLoop在同一个线程 if (executor.inEventLoop()) { // 使用当前的 EventLoop 来处理任务 next.invokeChannelRead(m); } else { // 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行 executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }

  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
Channel
Channel 的常用方法
  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)
ChannelFuture
public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程 // NIO线程:NioEventLoop 中的线程 .connect(new InetSocketAddress("localhost", 8080)); // 该方法用于等待连接真正建立 channelFuture.sync(); // 获取客户端-服务器之间的Channel对象 Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world"); System.in.read(); } }

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端
所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)
public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程 // NIO线程:NioEventLoop 中的线程 .connect(new InetSocketAddress("localhost", 8080)); // 当connect方法执行完毕后,也就是连接真正建立后 // 会在NIO线程中调用operationComplete方法 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world"); } }); System.in.read(); } }

通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作
处理关闭
public class ReadClient { public static void main(String[] args) throws InterruptedException { // 创建EventLoopGroup,使用完毕后关闭 NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); channelFuture.sync(); Channel channel = channelFuture.channel(); Scanner scanner = new Scanner(System.in); // 创建一个线程用于输入并向服务器发送 new Thread(()->{ while (true) { String msg = scanner.next(); if ("q".equals(msg)) { // 关闭操作是异步的,在NIO线程中执行 channel.close(); break; } channel.writeAndFlush(msg); } }, "inputThread").start(); // 获得closeFuture对象 ChannelFuture closeFuture = channel.closeFuture(); System.out.println("waiting close..."); // 同步等待NIO线程执行完close操作 closeFuture.sync(); // 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的 System.out.println("关闭之后执行一些额外操作..."); // 关闭EventLoopGroup group.shutdownGracefully(); } }

关闭channel
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
    // 获得closeFuture对象 ChannelFuture closeFuture = channel.closeFuture(); // 同步等待NIO线程执行完close操作 closeFuture.sync();

  • 调用closeFuture.addListener方法,添加close的后续操作
    closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { // 等待channel关闭后才执行的操作 System.out.println("关闭之后执行一些额外操作..."); // 关闭EventLoopGroup group.shutdownGracefully(); } });

Future和Promise netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
【Netty学习之组件】
Netty Future
public class NettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); // 获得 EventLoop 对象 EventLoop eventLoop = group.next(); Future future = eventLoop.submit(new Callable() { @Override public Integer call() throws Exception { return 50; } }); // 主线程中获取结果 System.out.println(Thread.currentThread().getName() + " 获取结果"); System.out.println("getNow " + future.getNow()); System.out.println("get " + future.get()); // NIO线程中异步获取结果 future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { System.out.println(Thread.currentThread().getName() + " 获取结果"); System.out.println("getNow " + future.getNow()); } }); } }

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
Netty Promise
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
public class NettyPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建EventLoop NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); // 创建Promise对象,用于存放结果 DefaultPromise promise = new DefaultPromise<>(eventLoop); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 自定义线程向Promise中存放结果 promise.setSuccess(50); }).start(); // 主线程从Promise中获取结果 System.out.println(Thread.currentThread().getName() + " " + promise.get()); } }

    推荐阅读