3.|3. Dubbo服务提供端请求处理流程
时序图
文章图片
服务提供端请求处理时序图.jpeg 从时序图上不难看出,服务提供端对请求的处理先通过处理器责任链一层一层处理,然后找到需要调用的服务实现类的代理Invoker进行调用,再将响应发送到调用方。
源码解析
- org.apache.dubbo.remoting.transport.AbstractPeer#received
时序图步骤1
NettyServer继承了AbstractPeer,AbstractPeer继承了ChannelHandler,所以NettyServer也是请求处理器.
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
- org.apache.dubbo.remoting.transport.MultiMessageHandler#received
时序图步骤2
MultiMessageHandler复合消息处理器,处理复合消息;
MultiMessage(复合消息) 使用了迭代器模式;
public void received(Channel channel, Object message) throws RemotingException {
//如果是复合消息,需要拆分消息
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
//遍历,将MultiMessage中的message取出,一个一个处理
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
- org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
时序图步骤3
心跳处理器,处理心跳请求,返回心跳响应
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
//是否是心跳请求
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
//构建心跳响应并返回响应
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
//直接返回,不需要调用其他处理器
return;
}
//如果是心跳响应
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
//直接返回,不需要调用其他处理器
return;
}
//如果非心跳消息,交给处理链中后续处理器处理
handler.received(channel, message);
}
- org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
时序图步骤4
AllDispatcher:将所有消息(请求,响应,心跳,连接事件,断开事件)都派发到业务线程池.
public void received(Channel channel, Object message) throws RemotingException {
//获取业务线程池时序图步骤5
ExecutorService cexecutor = getExecutorService();
try {
//将消息组装成status:receive的ChannelEventRunnable派发到业务线程池时序图步骤6
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
//如果线程池已满,触发拒绝调用异常,返回异常响应
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
- org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
时序图步骤7
public void run() {//请求
if (state == ChannelState.RECEIVED) {
try {
//交给后续处理器
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
//连接事件
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
//断开事件
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
//响应
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
//异常
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}}
- org.apache.dubbo.remoting.transport.DecodeHandler#received
时序图步骤8
public void received(Channel channel, Object message) throws RemotingException {
//如果消息实现了Decodeable,调用message 自身实现的decode函数
if (message instanceof Decodeable) {
decode(message);
}
//如果是请求,对org.apache.dubbo.remoting.exchange.Request#mData 解码
if (message instanceof Request) {
decode(((Request) message).getData());
}
//如果是响应,对org.apache.dubbo.remoting.exchange.Response#mResult解码
if (message instanceof Response) {
decode(((Response) message).getResult());
}
//讲解马后的消息交由后续处理器处理
handler.received(channel, message);
}
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
}
- org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
时序图步骤9
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//请求消息
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {//事件处理
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {//如果是需要返回响应的请求
handleRequest(exchangeChannel, request);
} else {//如果不需要返回响应,忽略处理器返回值
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {//响应消息
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {//如果是客户端请求且消息是String 格式的,dubbo不支持string格式的客户端调用,记录异常日志,不进行后续处理
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
//通过telnet调试的消息
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
- org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
时序图步骤10
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
//请求是否解码失败,如果解码失败,取出异常信息组合响应
if (req.isBroken()) {
Object data = https://www.it610.com/article/req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// 调用org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler进行消息处理,获取处理器返回值,这里是真正的服务实现类的调用了
CompletableFuture
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler#reply
时序图步骤11
public CompletableFuture
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
时序图步骤12
Invoker> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
//获取服务端口
int port = channel.getLocalAddress().getPort();
//获取path
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
//组装serviceKey ,grout/path:version:port 唯一的确定一个服务
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
//取出缓存在exporterMap的已发布服务
DubboExporter> exporter = (DubboExporter>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}return exporter.getInvoker();
}
- org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
时序图步骤13,14
否则返回同步返回结果RpcResult
public Result invoke(Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
try {
//调用子类实现,服务发布时通过调用org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker构建了实现类代理对象
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
//如果返回值是CompletableFuture类型
if (RpcUtils.isReturnTypeFuture(invocation)) {
//返回AsyncRpcResult,当实现类返回值:obj处理完后,会回调AsyncRpcResult#resultFuture,这里就不具体说了,后续会在Dubbo异步处理的章节详细分析
return new AsyncRpcResult((CompletableFuture
总结 至此,Dubbo提供端请求处理流程就分析完了。总结一下:
- NettyServer 收到客户端连接(CONNECTED),这里因为篇幅原因没有分析连接事件,但主体流程与请求流程是相似的.
- 完成TCP 三次握手
- 客户端发送 请求(RECEIVED)
- 经由NettyServer 中构建的处理器责任链处理,复合消息拆分处理->心跳事件处理->根据线程模型分发请求->消息解码->..
- 获取服务代理类
- 调用服务代理类处理请求
- 如果请求需要返回值,回写响应
推荐阅读
- 社保代缴公司服务费包含哪些
- dubbo基本认识
- 私有化轻量级持续集成部署方案--03-部署web服务(下)
- 探索免费开源服务器tomcat的魅力
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)
- Dubbo引用服务
- java静态代理模式
- 第六章|第六章 Sleuth--链路追踪
- 云原生微服务技术趋势解读
- mac|mac 链接linux服务器 如何在Mac上连接服务器