RocketMQ|RocketMQ remoting模块分析

分析后总结(学到了什么)

  1. 从类图可以看到设计很清晰。RemotingClient、RemotingServer接口封装,公共接口提取到RemotingService定义,公共实现部分提取到抽象类NettyRemotingAbstract,方便供子类复用。
  2. 使用Netty实现RPC通信很简单。主要是基于通信协议配合handler实现,基于Netty提供的hander可以满足大部分需求。(todo,可以深入学习Netty用该示例方便理解。包括ByteBuf的学习)
  3. 消息协议设计,encode、decode及其对应的handler实现。
    (相关文章:https://mp.weixin.qq.com/s/YxnMyNTTPy2lENG06W5tiQ)
    RocketMQ|RocketMQ remoting模块分析
    文章图片
    image.png
1.RocketMQ Remoting模块 https://github.com/apache/rocketmq
RocketMQ Remoting用于各个模块(client,broker和namesrv)的通信。
主要接口封装
1.1 Server
public interface RemotingServer extends RemotingService { //注册每种消息类型的处理器及对应的线程池 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); //注册默认消息类型的处理器及对应的线程池 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); Pair getProcessorPair(final int requestCode); //同步 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; //异步 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //one way void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; }

1.2 Client
public interface RemotingClient extends RemotingService { //服务端地址列表 void updateNameServerAddressList(final List addrs); List getNameServerAddressList(); RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void setCallbackExecutor(final ExecutorService callbackExecutor); ExecutorService getCallbackExecutor(); boolean isChannelWritable(final String addr); }

1.3其它主要类
public interface RemotingService { void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook); }public interface RPCHook { void doBeforeRequest(final String remoteAddr, final RemotingCommand request); void doAfterResponse(final String remoteAddr, final RemotingCommand request, final RemotingCommand response); }public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest(); }public class RemotingCommand { private int code; //如发消息请求,拉消息请求 private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body; ... }

2. NettyRemotingAbstract 完成基础功能 2.1 主要成员
/** * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint. */ protected final Semaphore semaphoreOneway; /** * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint. */ protected final Semaphore semaphoreAsync; /** * This map caches all on-going requests. */ protected final ConcurrentMap responseTable = new ConcurrentHashMap(256); /** * This container holds all processors per request code, aka, for each incoming request, we may look up the * responding processor in this map to handle the request. */ protected final HashMap> processorTable = new HashMap>(64); /** * Executor to feed netty events to user defined {@link ChannelEventListener}. */ protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); /** * The default request processor to use in case there is no exact match in {@link #processorTable} per request code. */ protected Pair defaultRequestProcessor; /** * SSL context via which to create {@link SslHandler}. */ protected volatile SslContext sslContext; /** * custom rpc hooks */ protected List rpcHooks = new ArrayList();

2.2 处理请求
/** * Process incoming request command issued by remote peer. * * @param ctx channel handler context. * @param cmd request command. */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else {} } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; }try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); }if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }

2.3 处理响应
/** * Process response from remote peer to the previous issued requests. * * @param ctx channel handler context. * @param cmd response command instance. */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }

2.4 同步、异步和oneway方法的实现 2.5 channel管理 3. NettyRemotingServer (NettyRemotingClient基本类似) 3.1 主要成员
private final ServerBootstrap serverBootstrap; //Netty Server启动类 private final EventLoopGroup eventLoopGroupSelector; //io线程池 private final EventLoopGroup eventLoopGroupBoss; //链接处理线程池 private final NettyServerConfig nettyServerConfig; private final ExecutorService publicExecutor; //work线程池 private final ChannelEventListener channelEventListener; private final Timer timer = new Timer("ServerHouseKeepingService", true); private DefaultEventExecutorGroup defaultEventExecutorGroup; //private int port = 0; private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; private static final String TLS_HANDLER_NAME = "sslHandler"; private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // sharable handlers private HandshakeHandler handshakeHandler; //tls handshake 先忽略 private NettyEncoder encoder; private NettyConnectManageHandler connectionManageHandler; //空闲链接管理,和IdleStateHandler配合使用 private NettyServerHandler serverHandler;

3.3 netty启动流程主要看NettyServerHandler
@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler {@Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { //An inquiry request from a remote peer component case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; //A response to a previous request issued by this very participant. case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }

处理逻辑见上面NettyRemotingAbstract
4. 最后看下消息协议的设计 消息长度(int,4个字节)+ 序列化类型(1个字节)&头部长度(3个字节)+ 消息头数据 + 消息体数据
encodeHandler
@ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } }

encode
public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); }public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = https://www.it610.com/article/this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }

decodeHandler
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); }@Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; }ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } }return null; } }

【RocketMQ|RocketMQ remoting模块分析】decode
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = https://www.it610.com/article/new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength> 0) { bodyData = https://www.it610.com/article/new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }public static int getHeaderLength(int length) { return length & 0xFFFFFF; }

    推荐阅读