Netty网络编程——Netty应用与核心模块组件

1.用Netty实现群聊系统
2.用Netty实现心跳检测
3.用Netty编程实现客户端与服务器端的长链接
4.Netty核心模块组件分析
我们先通过代码示例,感受一下Netty的使用,然后再剖析这些工具类每一个的作用。
1.用Netty实现群聊系统
我们用Netty实现一个群聊系统,实现客户端和服务器端之间的简单通讯。
功能点如下:
1)服务器端:可以检测用户上线,离线,并实现消息转发
2)客户端:通过channel可以把消息发送给所有其它用户,同时也能接收其它用户发送的信息。
server端:
Netty网络编程——Netty应用与核心模块组件
文章图片

client端:
Netty网络编程——Netty应用与核心模块组件
文章图片

Netty网络编程——Netty应用与核心模块组件
文章图片

server:

package com.example.demo.netty.nettyDemo.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; /** * @author sulingfeng * @title: GroupChatServer * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:12 */ @Slf4j public class GroupChatServer {private int port; public GroupChatServer(int port) { this.port = port; }public void run() throws Exception{ //两个工作线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioSocketChannel作为通道的实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接数量上限 .childOption(ChannelOption.SO_KEEPALIVE, true)//保持活动连接状态 .childHandler(new ChannelInitializer() {@Override protected void initChannel(SocketChannel ch) throws Exception {//获取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); log.info("netty 服务器启动!"); ChannelFuture channelFuture = bootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}public static void main(String[] args) throws Exception{ //启动 GroupChatServer groupChatServer = new GroupChatServer(10087); groupChatServer.run(); } }

serverHandler:
package com.example.demo.netty.nettyDemo.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.Date; /** * @author sulingfeng * @title: GroupChatServerHandler * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:21 */ @Slf4j public class GroupChatServerHandler extends SimpleChannelInboundHandler {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天"+"\n"); channelGroup.add(channel); }@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开了\n"); }@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(ctx.channel().remoteAddress()+"上线了~"); }@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info(ctx.channel().remoteAddress()+"离线了~"); }@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); //遍历channelGroup,根据不同的情况,回送不同的信息 channelGroup.forEach(ch->{ if(channel!=ch){ ch.writeAndFlush("[客户]:["+channel.remoteAddress()+"]发送了信息:"+msg+"\n"); }else{ ch.writeAndFlush("[自己]发送了信息:"+msg+"\n"); } }); }@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}

client:
package com.example.demo.netty.nettyDemo.chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import java.util.Scanner; /** * @author sulingfeng * @title: GroupChatClient * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:33 */ @Slf4j public class GroupChatClient {//属性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; }public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() {@Override protected void initChannel(SocketChannel ch) throws Exception {//得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); Channel channel = channelFuture.channel(); //客户端需要输入信息,创建一个输入循环 Scanner scanner = new Scanner(System.in); while (!scanner.hasNext("#")) { String msg = scanner.nextLine(); //通过channel 发送到服务器端 channel.writeAndFlush(msg + "\r\n"); }}finally { group.shutdownGracefully(); }}public static void main(String[] args) throws Exception { new GroupChatClient("127.0.0.1", 10087).run(); }}

【Netty网络编程——Netty应用与核心模块组件】clientHandler:
package com.example.demo.netty.nettyDemo.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; /** * @author sulingfeng * @title: GroupChatClientHandler * @projectName netty-learn * @description: TODO * @date 2022/7/12 17:36 */ @Slf4j public class GroupChatClientHandlerextends SimpleChannelInboundHandler {@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { log.info(s.trim()); } }

2.用Netty实现心跳检测
我们编写一个服务器:
当服务器超过5秒没有读时,就输出读空闲。
当服务器超过6秒没有写时,就提示写空闲。
当服务器超过7秒没有读写时,就提示读写空闲。
Netty网络编程——Netty应用与核心模块组件
文章图片

server:
package com.example.demo.netty.nettyDemo.chat.heartbeat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * @author sulingfeng * @title: BeatServer * @projectName netty-learn * @description: TODO * @date 2022/7/26 13:35 */ public class BeatServer {public static void main(String[] args) throws Exception { //两个工作线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /** * IdleStateHandler 是netty提供的处理空闲状态的处理器 * long readerIdleTime 表示长时间没有读,就会发送心跳检测包是否连接 * long writerIdleTime 表示长时间没有写,就会发送心跳检测包是否连接 * long allIdleTime表示长时间没有读写,就会发送心跳检测包是否连接 */ pipeline.addLast(new IdleStateHandler(5,6,7, TimeUnit.SECONDS)); pipeline.addLast(new MyHeartHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(10087).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}

serverHandler:
package com.example.demo.netty.nettyDemo.chat.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; /** * @author sulingfeng * @title: MyHeartHandler * @projectName netty-learn * @description: TODO * @date 2022/7/26 13:57 */ public class MyHeartHandler extends ChannelInboundHandlerAdapter {@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; String eventType = null; switch (event.state()){ case READER_IDLE: eventType="读空闲"; break; case WRITER_IDLE: eventType="写空闲"; break; case ALL_IDLE: eventType="读写空闲"; break; }System.out.println(ctx.channel().remoteAddress()+"--超时时间--"+eventType); System.out.println("服务器做相应处理"); }} }

3.用Netty编程实现客户端与服务器端的长链接
实现一个基于webSocket的长链接全双工交互,从页面发起调用,客户端就可以和服务端互发信息了。
server:
package com.example.demo.netty.nettyDemo.socket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; /** * @author sulingfeng * @title: MyServer * @projectName netty-learn * @description: TODO * @date 2022/7/13 13:57 */ @Slf4j public class MyServer {public static void main(String[] args) throws Exception {//创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLooptry{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() {@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //由于Http协议,使用http的编码和解码器 pipeline.addLast(new HttpServerCodec()); //以块的方式写,添加ChunkedWriteHandler处理器 pipeline.addLast(new ChunkedWriteHandler()); //http数据如果很大,就会将数据分段发送,发出多次http请求 //参数表示byte数组最大长度 pipeline.addLast(new HttpObjectAggregator(8192)); //浏览器请求的对应接口 : localhost:7000/hello pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); //自定义的handler ,处理业务逻辑 pipeline.addLast(new MyTextWebSocketFrameHandler()); } }); //启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}

handler:
package com.example.demo.netty.nettyDemo.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; /** * @author sulingfeng * @title: MyTextWebSocketFrameHandler * @projectName netty-learn * @description: TODO * @date 2022/7/13 14:02 */ public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler {@Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服务器收到消息 " + msg.text()); //回复消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text())); }@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText()); }@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生 " + cause.getMessage()); ctx.close(); //关闭连接 }}

html:
Title - 锐客网

4.Netty核心模块组件分析
我们通过案例大致学了一下Netty的使用,但是我们还是只会api的调用,对于其中的每一个组件,以及每一个组件的原理,我们都知之甚少,接下来我们对每个组件都进行剖析一番:
4.1)Bootstrap、ServerBootstrap
这是一个引导类,一个Netty程序都是从Bootstrap开始的,主要作用是配置以及启动整个程序,串联所有netty的组件。Bootstrap是客户端的引导类,ServerBootstrap是服务器端的引导类。
4.2)Future,ChannelFuture
Netty所有的IO操作都是异步的,不能立刻得知消息是否被处理。只能过一会儿等它执行完成,当操作成功,应用程序自然就会监听到了。
4.3)channel
channel主要用于执行网络IO操作。
4.3.1)通过channel可以获得当前通道的状态。
4.3.2)通过channel能进行IO操作。
4.3.3)通过channel能获得缓冲区的大小
4.4)selector
Netty基于selector的I/O多路复用,通过selector一个线程可以监听多个链接的事件。
当一个selector中注册channel后,selector内部可以一直查询这些channel是否有就绪的I/O事件,这样程序就可以简单地使用一个线程高效管理多个channel。
4.5)channelHandler及其实现类
它是用来处理I/O时间,并将其转发到ChannelPipeline(业务处理链路)的下一个处理程序。
我们一般都定义一个Handler来增加我们自定义的业务处理类。
4.6)Pipeline 和 ChannelPipeline
这是我们主要的业务逻辑处理类,我们通常通过继承类重写方法来加入我们想要的业务逻辑。
4.7)ChannelHandlerContext
保存 Channel 相关的所有上下文信息,可以获得当前的channel等信息。
4.8)ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数
Netty网络编程——Netty应用与核心模块组件
文章图片

4.9)EventLoopGroup 和其实现类 NioEventLoopGroup
是一组工作线程,Netty为了更好地利用多核cpu资源,一般会有多个EventLoop同时工作,每个EentLoop维护着一个seelctor实例。
4.10)Unpooled 类
Netty 提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类。

    推荐阅读