Fescar|Fescar example解析 - TM发送逻辑

开篇 ?这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。
?文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程和配置加载过程。
?文章的最后会附上GlobalAction相关Request的类关系图,便于理解依赖关系。
?


Fescar TM发送流程 Fescar|Fescar example解析 - TM发送逻辑
文章图片
TM Sender.jpg 【Fescar|Fescar example解析 - TM发送逻辑】说明:

  • 1.DefaultGlobalTransaction执行begin/commit/rollback等调用DefaultTransactionManager。
  • 2.DefaultTransactionManager内部调用syncCall()方法,进而调用TmRpcClient的sendMsgWithResponse()方法。
  • 3.TmRpcClient调用父类AbstractRpcRemoting的sendAsyncRequest()方法构建发送队列。
  • 4.AbstractRpcRemotingClient的MergedSendRunnable线程消费发送队列构建MergedWarpMessage调用sendRequest发送。
  • 5.sendRequest()方法内部调用writeAndFlush完成消息发送。



Fescar|Fescar example解析 - TM发送逻辑
文章图片
TmRcpClient
说明:
  • TmRpcClient的类依赖关系图如上。
  • TmRpcClient继承自AbstractRpcRemotingClient类。


Fescar TM发送源码分析
public class DefaultTransactionManager implements TransactionManager {private static class SingletonHolder { private static final TransactionManager INSTANCE = new DefaultTransactionManager(); }/** * Get transaction manager. * * @return the transaction manager */ public static TransactionManager get() { return SingletonHolder.INSTANCE; }private DefaultTransactionManager() {}@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); return response.getXid(); }@Override public GlobalStatus commit(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setTransactionId(txId); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); }@Override public GlobalStatus rollback(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setTransactionId(txId); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); return response.getGlobalStatus(); }@Override public GlobalStatus getStatus(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest(); queryGlobalStatus.setTransactionId(txId); GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus); return response.getGlobalStatus(); }private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request); } catch (TimeoutException toe) { throw new TransactionException(TransactionExceptionCode.IO, toe); } } }

说明:
  • DefaultTransactionManager的beigin/commit/rollback方法内部最终调用syncCall()方法。
  • syncCall方法内部执行TmRpcClient.getInstance().sendMsgWithResponse(request)调用TmRpcClient方法。


public final class TmRpcClient extends AbstractRpcRemotingClient { @Override public Object sendMsgWithResponse(Object msg) throws TimeoutException { return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout()); }@Override public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException { return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout); } }

说明:
  • TmRpcClient内部执行发送sendMsgWithResponse调用sendAsyncRequestWithResponse。
  • sendAsyncRequestWithResponse的实现在父类AbstractRpcRemoting当中。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws TimeoutException { if (timeout <= 0) { throw new FrameworkException("timeout should more than 0ms"); } return sendAsyncRequest(address, channel, msg, timeout); }private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout) throws TimeoutException { if (channel == null) { LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel."); return null; }// 构建RpcMessage对象 final RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(RpcMessage.getNextMessageId()); rpcMessage.setAsync(false); rpcMessage.setHeartbeat(false); rpcMessage.setRequest(true); rpcMessage.setBody(msg); // 通过MessageFuture包装实现超时 final MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeout); futures.put(rpcMessage.getId(), messageFuture); // 测试代码走的是这个分支 if (address != null) { // 根据address进行hash放置到不同的Map当中 ConcurrentHashMap> map = basketMap; BlockingQueue basket = map.get(address); if (basket == null) { map.putIfAbsent(address, new LinkedBlockingQueue()); basket = map.get(address); } basket.offer(rpcMessage); if (LOGGER.isDebugEnabled()) { LOGGER.debug("offer message: " + rpcMessage.getBody()); }// 发送其实是另外一个线程单独执行发送操作的 if (!isSending) { synchronized (mergeLock) { mergeLock.notifyAll(); } } } else { ChannelFuture future; channelWriteableCheck(channel, msg); future = channel.writeAndFlush(rpcMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(future.cause()); } destroyChannel(future.channel()); } } }); }// 通过Future实现限时超时机制 if (timeout > 0) { try { return messageFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (Exception exx) { LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg); if (exx instanceof TimeoutException) { throw (TimeoutException)exx; } else { throw new RuntimeException(exx); } } } else { return null; } } }

说明:
  • 构建RpcMessage对象,包装Request。
  • 构建MessageFuture对象,包装RpcMessage,实现超时等待功能。
  • 通过basket进行分桶操作,真正执行发送的代码在AbstractRpcRemotingClient类的MergedSendRunnable。
  • Request的发送类似生成消费者模型,上述代码只是生产者部分。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RemotingService, RegisterMsgListener, ClientMessageSender {public class MergedSendRunnable implements Runnable {@Override public void run() { while (true) { synchronized (mergeLock) { try { mergeLock.wait(MAX_MERGE_SEND_MILLS); } catch (InterruptedException e) {} } isSending = true; for (String address : basketMap.keySet()) { BlockingQueue basket = basketMap.get(address); if (basket.isEmpty()) { continue; }MergedWarpMessage mergeMessage = new MergedWarpMessage(); while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); mergeMessage.msgs.add((AbstractMessage)msg.getBody()); mergeMessage.msgIds.add(msg.getId()); } if (mergeMessage.msgIds.size() > 1) { printMergeMessageLog(mergeMessage); } Channel sendChannel = connect(address); try { sendRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && address != null) { destroyChannel(address, sendChannel); } LOGGER.error("", "client merge call failed", e); } } isSending = false; } } }

说明:
  • MergedSendRunnable 负责消费待发送消息体并组装成MergedWarpMessage对象。
  • sendRequest()方法内部将MergedWarpMessage再次包装成RpcMessage进行发送。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {protected void sendRequest(Channel channel, Object msg) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setAsync(true); rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage); rpcMessage.setRequest(true); rpcMessage.setBody(msg); rpcMessage.setId(RpcMessage.getNextMessageId()); if (msg instanceof MergeMessage) { mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg); } channelWriteableCheck(channel, msg); if (LOGGER.isDebugEnabled()) { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); } channel.writeAndFlush(rpcMessage); } }

说明:
  • RpcMessage再次包装MergeMessage进行发送。


TmRpcClient初始化
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) { setOrder(ORDER_NUM); setProxyTargetClass(true); this.applicationId = applicationId; this.txServiceGroup = txServiceGroup; this.mode = mode; this.failureHandlerHook = failureHandlerHook; }private void initClient() {TMClient.init(applicationId, txServiceGroup); if ((AT_MODE & mode) > 0) { RMClientAT.init(applicationId, txServiceGroup); } }public void afterPropertiesSet() { initClient(); } }

说明:
  • GlobalTransactionScanner的构造函数执行后执行afterPropertiesSet并执行initClient()操作。
  • initClient()内部执行TMClient.init(applicationId, txServiceGroup)进行TMClient的初始化。


public class TMClient { public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance( applicationId, transactionServiceGroup); tmRpcClient.init(); } }public final class TmRpcClient extends AbstractRpcRemotingClient { public void init() { if (initialized.compareAndSet(false, true)) { init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS); } }public void init(long healthCheckDelay, long healthCheckPeriod) { // 注意initVars()方法 initVars(); ExecutorService mergeSendExecutorService = new ThreadPoolExecutor( MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { reconnect(); } catch (Exception ignore) { LOGGER.error(ignore.getMessage()); } } }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS); }private void initVars() { enableDegrade = CONFIG.getBoolean( ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX); super.init(); } }

说明:
  • 核心在于关注initVars()方法。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RemotingService, RegisterMsgListener, ClientMessageSender {public void init() { NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this); // 核心构建发送的对象的连接池 nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory); nettyClientKeyPool.setConfig(getNettyPoolConfig()); serviceManager = new ServiceManagerStaticConfigImpl(); super.init(); } }public abstract class AbstractRpcRemoting extends ChannelDuplexHandler { public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { List timeoutMessageFutures = new ArrayList(futures.size()); for (MessageFuture future : futures.values()) { if (future.isTimeout()) { timeoutMessageFutures.add(future); } }for (MessageFuture messageFuture : timeoutMessageFutures) { futures.remove(messageFuture.getRequestMessage().getId()); messageFuture.setResultMessage(null); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody()); } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS); } }

说明:
  • AbstractRpcRemotingClient的init()方法核心构建nettyClientKeyPool工厂。
  • nettyClientKeyPool用于获取连接TC的对象的工厂池。


配置加载分析 Fescar|Fescar example解析 - TM发送逻辑
文章图片
Config
public class FileConfiguration implements Configuration {private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class); private static final Config CONFIG = ConfigFactory.load(); }package com.typesafe.config; public final class ConfigFactory { private ConfigFactory() { }public static Config load() { return load(ConfigParseOptions.defaults()); } }

说明:
  • 配置加载使用了JAVA 配置管理库 typesafe.config
  • 默认加载classpath下的application.conf,application.json和application.properties文件。通过ConfigFactory.load()加载。

Request的类关系图 Fescar|Fescar example解析 - TM发送逻辑
文章图片
GlobalActionRequest.png

Fescar源码分析连载 Fescar 源码解析系列

    推荐阅读