RocketMQ学习七-Netty的应用

一,RocketMQ中Remoting通信机制
RocketMQ消息队列集群主要包括NameServe、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

  1. Broker启动后将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息;
  2. 消息生产者Producer在发送消息时需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息(如果没有会从NameServer上重新拉取);
  3. Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者收消息并落盘存储。
  4. 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
二,RocketMQ中Remoting通信模块API
RocketMQ学习七-Netty的应用
文章图片

  1. RemotingService:为顶层接口。主要方法有:
void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook);

  1. RemotingServer/RemotingClient:远程服务器/客户端基础接口,两者中的方法基本类似:
    /** * requestCode 命令编码 * processor RocketMQ 请求业务处理器,例如消息发送的处理器为 SendMessageProcessor,PullMessageProcessor 为消息拉取的业务处理器。 * executor 线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行 */ void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); //根据请求编码获取对应的请求业务处理器与线程池 Pair getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    RomotingClient主要是Producer发送消息与Consumer拉取消息时用到;RomotingServer主要是Broker进行回调,获取Consumer状态等的时候用到。
    这里重点需要关注下registerProcessor注册命令处理器这个方法。RocketMQ 会按照业务逻辑进行拆分,例如消息发送、消息拉取等每一个网络操作会定义一个请求编码(requestCode),然后每一个类型对应一个业务处理器 NettyRequestProcessor,并可以按照不同的 requestCode 定义不同的线程池,实现不同请求的线程池隔离。
  2. NettyRemotingAbstract: Netty 远程服务抽象实现类,定义网络远程调用、请求,响应等处理逻辑。重要的属性有:
Semaphore semaphoreOneway:控制 oneway 发送方式的并发度的信号量,默认为 65535 个许可。Semaphore semaphoreAsync:控制异步发送方式的并发度的信号量,默认为 65535 个许可。ConcurrentMap responseTable:当前正在等 待对端返回的请求处理表,其中 opaque 表示请求的编号,全局唯一,通常采用原子递增,客户端向 对端发送网络请求时,通常会采取单一长连接,故发送请求后会向调用端立即返回 ResponseFuture,同时会将请求放入到该映射表中,当响应处理完成后(响应会包含请求 code),然后从该映射表中获取对应的 ResponseFutre,然后通知调用端的返回结果,这里是 Future 模式在网络编程中的经典运用。HashMap> processorTable:注册的请求处理命令。RocketMQ 的设计中采用了不同 请求命令支持不同的线程池,即实现业务线程池的隔离。

  1. NettyRemotingClient:基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
    重要属性如下:
NettyClientConfig nettyClientConfig:与网络相关的配置项。Bootstrap bootstrap:Netty 客户端启动帮助类。EventLoopGroup eventLoopGroupWorker:Netty 客户端 Work 线程组,俗称 IO 线程。ConcurrentMap channelTables:当前客户端已创建的连接(网络通道、Netty Cannel),每一个地址一条长连接。ExecutorService publicExecutor:默认任务线程池。ExecutorService callbackExecutor:回掉类请求执行线程池。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组,即 Netty ChannelHandler 在这些线程中执行。

  1. NettyRemotingServer:基于 Netty 网络编程服务端。
其核心属性如下所示:
ServerBootstrap serverBootstrap:Netty Server 端启动帮助类。EventLoopGroup eventLoopGroupSelector:Netty Server Work 线程组,即主从多 Reactor 中的从 Reactor,主要负责读写事件的处理。EventLoopGroup eventLoopGroupBoss:Netty Boss 线程组,即主从 Reactor 线程模型中的主 Reactor,主要负责 OP_ACCEPT 事件(创建连接)。NettyServerConfig nettyServerConfig:Netty 服务端配置。Timer timer = new Timer("ServerHouseKeepingService", true):定时扫描器,对 NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的请求移除。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组。int port:服务端绑定端口。NettyEncoder encoder:RocketMQ 通信协议(编码器)。NettyDecoder decoder:RocketMQ 通信协议(解码器)。NettyConnectManageHandler connectionManageHandler:Netty 连接管路器 Handler,主要实现对连接的状态跟踪。NettyServerHandler serverHandler:NettyServer 端核心业务处理器。

这里再根据类之间的调用关系再阐述一下消息的发送与消费,再看一张图:
RocketMQ学习七-Netty的应用
文章图片

1) . NettyRemotingClient 会在需要连接到指定地址先通过 Netty 相关 API 创建 Channel,并进行缓存,下一次请求如果还是发送到该地址时可重复利用。
2) . 然后调用 NettyRemotingClient 的 invokeAsync 等方法进行网络发送,在发送时在 Netty 中会进行一个非常重要的步骤:对请求编码,主要是将需要发送的请求,例如 RemotingCommand,将该对象按照特定的格式(协议)转换成二进制流。
3) . NettyRemotingServer 端接收到二进制后,网络读请求就绪,进行读请求事件处理流程。首先需要从二进制流中识别一个完整的请求包,这就是所谓的解码,即将二进制流转换为请求对象,解码成 RemotingCommand,然后读事件会传播到 NettyServerHandler,最终执行 NettyRemotingAbstract 的 processRequestCommand,主要是根据 requestCode 获取指定的命令执行线程池与 NettyRequestProcessor,并执行对应的逻辑,然后通过网络将执行结果返回给客户端。
4) . 客户端收到服务端的响应后,读事件触发,执行解码(NettyDecoder),然后读事件会传播到 NettyClientHandler,并处理响应结果。
三,RocketMQ中Remoting通信模块具体实现
  1. 客户端的创建。在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创建核心代码被封装在 start 方法中。
public void start() { //创建默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //创建Netty客户端,group指定Work线程组,读写事件都会在这个线程组里执行(也就是IO线程);channel指定通道类型,这里使用NIO通道 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//是否禁用 Nagle,如果设置为 true 表示立即发送,如果设置为 false,如果一个数据包比较小,会尝试等待更多的包在一起发送 .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())//连接超时时间,超时未连接成功抛出异常 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())//与下面一起,套接字发送缓存区与套接字接收缓存区大小,64kb .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer() {//通过handle构建事件处理链条 @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } /**addLast里要是没有传入EventExecutorGroup,那事件的执行默认在 Work 线程组*/ pipeline.addLast(//Netty 的核心扩展点,应用程序的业务逻辑可以通过该事件处理器进行扩展 defaultEventExecutorGroup, new NettyEncoder(),//RocketMQ 请求编码器,即协议编码器 new NettyDecoder(),//RocketMQ 请求解码器,即协议解码器 new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),//空闲检测 new NettyConnectManageHandler(),//连接管理器 new NettyClientHandler()); //Netty 客户端业务处理器,进行业务逻辑的处理 } }); //删除过期请求的定时任务 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }

具体步骤:
1),创建 DefaultEventExecutorGroup,默认事件执行线程组
2),调用Bootstrap的group方法指定一个Work线程组,默认情况下读写事件在该线程组中执行,也就是IO线程;同时通过channel方法指定通道类型,这里采用NIO
3),通过Bootstrap的option方法指定网络参考
4),最后,通过 Bootstrap 的 hanle 方法构建事件处理链条
  1. 建立连接
    上面第1步只是创建了客户端,并没有建立连接。在发送消息的时候才会去建立连接,相关代码如下:
if (createNewConnection) { ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); }

连接建立后会进行缓存方便后续复用。
  1. 消息的发送 这里以同步发送为例
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //有响应后进行回调,这就是异步异步 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); }responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); //计数器递减,进行唤醒操作 log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); //调用计数器await等待操作

1),首先封装一个ResponseFuture,然后与请求号一同存入CurrentHashMap里。当客户端收到服务端的响应后,需要根据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端
2),接着通过Netty调用Channel的writeAndFlush方法进行请求的发送,内部会使用编码器 NettyEncoder 将 RemotingCommand request 编码
3),当响应返回时会回调客户端,并唤醒客户端
  1. 服务端的创建 这里的服务端指的是Broker
1),创建Boss与Worker两个线程组.Boss线程组是主从Reactor里的主Reactor,用来监听连接,Worker线程组是主从Reactor里的从Reactor用来处理读写事件。
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } });

2),创建默认事件执行线程组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } });

3),使用 Netty ServerBootstrap 服务端启动类构建服务端
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)//指定boss与worker两个线程组 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)//指定通道类型 .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//绑定到服务端指定的IP/PORT .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } });

4),调用 ServerBootstrap 的 bind 方法绑定到指定端口
ChannelFuture sync = this.serverBootstrap.bind().sync();

四,通信协议
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定。协议内容主要可以分为以下4部分:
RocketMQ学习七-Netty的应用
文章图片

(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容;
【RocketMQ学习七-Netty的应用】待补充内容:
netty的二次编码
线程隔离

    推荐阅读