RocketMQ|RocketMQ remoting模块分析
分析后总结(学到了什么)
- 从类图可以看到设计很清晰。RemotingClient、RemotingServer接口封装,公共接口提取到RemotingService定义,公共实现部分提取到抽象类NettyRemotingAbstract,方便供子类复用。
- 使用Netty实现RPC通信很简单。主要是基于通信协议配合handler实现,基于Netty提供的hander可以满足大部分需求。(todo,可以深入学习Netty用该示例方便理解。包括ByteBuf的学习)
- 消息协议设计,encode、decode及其对应的handler实现。
(相关文章:https://mp.weixin.qq.com/s/YxnMyNTTPy2lENG06W5tiQ)
文章图片
image.png
RocketMQ Remoting用于各个模块(client,broker和namesrv)的通信。
主要接口封装
1.1 Server
public interface RemotingServer extends RemotingService {
//注册每种消息类型的处理器及对应的线程池
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//注册默认消息类型的处理器及对应的线程池
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort();
Pair getProcessorPair(final int requestCode);
//同步
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
//异步
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
//one way
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
1.2 Client
public interface RemotingClient extends RemotingService {
//服务端地址列表
void updateNameServerAddressList(final List addrs);
List getNameServerAddressList();
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
void setCallbackExecutor(final ExecutorService callbackExecutor);
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr);
}
1.3其它主要类
public interface RemotingService {
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
}public interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
final RemotingCommand response);
}public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception;
boolean rejectRequest();
}public class RemotingCommand {
private int code;
//如发消息请求,拉消息请求
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
...
}
2. NettyRemotingAbstract 完成基础功能 2.1 主要成员
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
*/
protected final ConcurrentMap responseTable =
new ConcurrentHashMap(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap> processorTable =
new HashMap>(64);
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*/
protected Pair defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
*/
protected List rpcHooks = new ArrayList();
2.2 处理请求
/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair matched = this.processorTable.get(cmd.getCode());
final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
2.3 处理响应
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
2.4 同步、异步和oneway方法的实现 2.5 channel管理 3. NettyRemotingServer (NettyRemotingClient基本类似) 3.1 主要成员
private final ServerBootstrap serverBootstrap;
//Netty Server启动类
private final EventLoopGroup eventLoopGroupSelector;
//io线程池
private final EventLoopGroup eventLoopGroupBoss;
//链接处理线程池
private final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
//work线程池
private final ChannelEventListener channelEventListener;
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
//private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
private HandshakeHandler handshakeHandler;
//tls handshake 先忽略
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
//空闲链接管理,和IdleStateHandler配合使用
private NettyServerHandler serverHandler;
3.3 netty启动流程主要看NettyServerHandler
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler {@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
//An inquiry request from a remote peer component
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
//A response to a previous request issued by this very participant.
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
处理逻辑见上面NettyRemotingAbstract
4. 最后看下消息协议的设计 消息长度(int,4个字节)+ 序列化类型(1个字节)&头部长度(3个字节)+ 消息头数据 + 消息体数据
encodeHandler
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
encode
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = https://www.it610.com/article/this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
decodeHandler
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}return null;
}
}
【RocketMQ|RocketMQ remoting模块分析】decode
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = https://www.it610.com/article/new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength> 0) {
bodyData = https://www.it610.com/article/new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
推荐阅读
- SpringBoot调用公共模块的自定义注解失效的解决
- Node.js中readline模块实现终端输入
- python自定义封装带颜色的logging模块
- 解决SpringBoot引用别的模块无法注入的问题
- Python(pathlib模块)
- Apache多路复用模块(MPMs)介绍
- 依赖注入模块
- python3|python3 模块 包
- 轻量模块注意力机制ECA-Net(注意力模块+一维卷积)
- 模块和包的引入总结