RocketMQ中Broker的启动源码分析(一)

在RocketMQ中,使用BrokerStartup作为启动类,相较于NameServer的启动,Broker作为RocketMQ的核心可复杂得多
【RocketMQ中NameServer的启动源码分析】
主函数作为其启动的入口:

public static void main(String[] args) { start(createBrokerController(args)); }

首先通过createBrokerController方法生成Broker的控制器BrokerController
createBrokerController方法:
public static BrokerController createBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; }if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; }try { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); }final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); }if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } }MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split("; "); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876; 192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } }switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); }break; default: break; }messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); }log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); }Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); }return null; }

这里和NameServer中的createNamesrvController方法作用类似,对Broker所需做了一系列的配置
先设置了Netty通信时的缓冲区大小,这里默认是128K
接着会创建了几个实体类
BrokerConfig,用来封装其绝大多数基本配置信息
NettyServerConfig,封装了其作为对外暴露的消息队列服务器的信息
NettyClientConfig,则封装了其作为NameServer客户端的信息
这里面封装的信息和NameServer一个道理,都是映射了配置文件相应的配置
然后对NettyClientConfig的TLS进行设置
让NettyServerConfig默认监听10911端口
紧接着创建了一个MessageStoreConfig,这个就是用来封装Store的信息,
MessageStoreConfig会默认配置BrokerRole为ASYNC_MASTER
Broker有三种身份,用BrokerRole枚举来表示:
public enum BrokerRole { ASYNC_MASTER, SYNC_MASTER, SLAVE; }

也就是异步Master,同步Master,以及Slave
这里会对其身份进行检查,若是Slave,则需要调整其允许的消息最大内存占比,默认值是40,也就是说Master允许消息最大内存占用40%,而Slave则只允许30%
接着会对”-c“指令进行相应配置的加载
往后看到对namesrvAddr进行了检查,只是简单地检查NameServer集群地址信息是否合法
往下看到有个switch块,其根据Broker的身份,进行设置
只要是Master,将其BrokerId设为0,而Slave的BrokerId需要大于0
(一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,大于0表示Slave,Master也可以部署多个)
继续往下,这里会对Store设置HA的监听端口,是NettyServer侦听端口加1
往下是对“-p”,”-m“指令进行相应配置的加载,以及日志的相关配置
之后就会创建了一个BrokerController:
public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; this.consumerOffsetManager = new ConsumerOffsetManager(this); this.topicConfigManager = new TopicConfigManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); this.consumerFilterManager = new ConsumerFilterManager(this); this.producerManager = new ProducerManager(); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this); this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); this.filterServerManager = new FilterServerManager(this); this.slaveSynchronize = new SlaveSynchronize(this); this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); }

可以看到,这里实例化了许多成员,我就不一一分析,挑几个重要的介绍
ConsumerOffsetManager:用来管理消费者的消费消息的进度,主要通过一张map来缓存
private ConcurrentMap/* topic@group */, ConcurrentMap> offsetTable = new ConcurrentHashMap, ConcurrentMap>(512);

由topic@group的形式构成键,而值中的map的Integer代表具体的哪条消息队列,Long表示该消息队列的偏移量offset
TopicConfigManager:管理Topic和消息队列的信息,主要通过一张map来缓存
private final ConcurrentMap, TopicConfig> topicConfigTable = new ConcurrentHashMap, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion();

键就是Topic,值TopicConfig用来记录对应的消息队列的个数
PullMessageProcessor、PullRequestHoldService、NotifyMessageArrivingListener这三个来管理Pull消息请求,关于Pull消息在后续博客再细说
ConsumerManager:管理Consumer,主要通过一张map来缓存
private final ConcurrentMap/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap, ConsumerGroupInfo>(1024);

键值就是Consumer的GroupName,
而ConsumerGroupInfo由如下构成:
private final ConcurrentMap/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap, SubscriptionData>(); private final ConcurrentMap channelInfoTable = new ConcurrentHashMap(16); private volatile ConsumeType consumeType; private volatile MessageModel messageModel;

可以看到封装了一个subscriptionTable ,这个map记录Topic和订阅内容
以及一个channelInfoTable,记录Consumer的物理连接
ConsumeType是一个枚举,表明两种消费方式:
public enum ConsumeType { CONSUME_ACTIVELY("PULL"), CONSUME_PASSIVELY("PUSH"); }

MessageModel 也是一个枚举,表明两种消费模式:
public enum MessageModel { /** * broadcast */ BROADCASTING("BROADCASTING"), /** * clustering */ CLUSTERING("CLUSTERING"); }

Broadcasting:同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费
Clustering:同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的
结合着来看,也就是说使用相同GroupName的一组Consumer,其ConsumeType和MessageModel必定相同,其订阅的Topic会根据ConsumeType和MessageModel来完成相应的方式的消息处理
回到BrokerController的构造
ProducerManager:管理Producer,主要通过一张map来缓存
private final HashMap/* group name */, HashMap> groupChannelTable = new HashMap, HashMap>();

相比ConsumerManager,对Producer的管理简单的多,只需要记录group name 和物理连接的映射
再回到createBrokerController方法,在完成BrokerController的创建后,会调用BrokerController的initialize方法:
BrokerController的initialize方法:
public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } }result = result && this.messageStore.load(); if (result) { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true)); this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); this.registerProcessor(); final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); }if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } }if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } }private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } initialTransaction(); initialAcl(); initialRpcHooks(); } return result; }

首先完成对…/store/config/topics.json、…/store/config/consumerOffset.json、
…/store/config/subscriptionGroup.json、…/store/config/consumerFilter.json这几个文件的加载
接着创建一个DefaultMessageStore,这是Broker的核心存储
DefaultMessageStore的构造:
private final ConcurrentMap/* topic */, ConcurrentMap> consumeQueueTable; ...... public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; // 请求定位服务 this.allocateMappedFileService = new AllocateMappedFileService(this); // 存储服务 if (messageStoreConfig.isEnableDLegerCommitLog()) { this.commitLog = new DLedgerCommitLog(this); } else { this.commitLog = new CommitLog(this); } // 消费队列信息 this.consumeQueueTable = new ConcurrentHashMap<>(32); // 刷新队列服务 this.flushConsumeQueueService = new FlushConsumeQueueService(); // 清除CommitLog数据服务 this.cleanCommitLogService = new CleanCommitLogService(); // 清除消费队列服务 this.cleanConsumeQueueService = new CleanConsumeQueueService(); this.storeStatsService = new StoreStatsService(); // 索引服务 this.indexService = new IndexService(this); // HA服务,主从复制 if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService = new HAService(this); } else { this.haService = null; } this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); }this.allocateMappedFileService.start(); this.indexService.start(); this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); lockFile = new RandomAccessFile(file, "rw"); }

可以看到DefaultMessageStore的构造会创建很多服务,来管理store的存储
其中isEnableDLegerCommitLog用来判断是否使用DLeger,默认false是关闭的
所以在默认情况下使用CommitLog + HAService
关于DLeger可参考这篇博客 【Dledger-RocketMQ 基于Raft协议的commitlog存储库】
后续的主从复制还是以CommitLog + HAService为主
回到initialize方法,接着会调用DefaultMessageStore的load方法:
public boolean load() { boolean result = true; try { boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); }// load Commit Log result = result && this.commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; }if (!result) { this.allocateMappedFileService.shutdown(); }return result; }

这里会加载CommitLog和ConsumeQueue对应的文件
接着创建熟悉的NettyRemotingServer,在前面博客中介绍过了,就不再展开
这里会根据nettyServerConfig克隆一份服务端配置,以此创建fastRemotingServer服务端,只不过这个服务端侦听的是上面服务端端口减2的端口号
看过我前面的博客就会发现这个fastRemotingServer的端口号其实就是之前提到过的VIP通道
详见:
【RocketMQ中Producer的启动源码分析】
【RocketMQ中Producer消息的发送源码分析】
接着会根据不同的需求创建很多不同的线程池
然后调用registerProcessor方法:
registerProcessor方法:
public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); /** * ClientManageProcessor */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor */ ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); /** * EndTransactionProcessor */ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); }

这里会创建好几种Processor,通过registerProcessor方法同时注册到remotingServer和fastRemotingServer中
registerProcessor方法:
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; }Pair pair = new Pair(processor, executorThis); this.processorTable.put(requestCode, pair); }

这里实际上就是向processorTable进行了记录的添加,为的是后续收到请求能做出对应的处理
processorTable:
protected final HashMap> processorTable = new HashMap>(64);

这里的SendMessageProcessor很重要,后续会详细介绍
在完成registerProcessor后,会创建8个定时任务

BrokerController.this.getBrokerStats().record();

定时打印Broker的状态

BrokerController.this.consumerOffsetManager.persist();

定时向consumerOffset.json文件中写入消费者偏移量

BrokerController.this.consumerFilterManager.persist();

定时向consumerFilter.json文件写入消费者过滤器信息

BrokerController.this.protectBroker();

定时禁用消费慢的consumer,保护Broker,需要设置disableConsumeIfConsumerReadSlowly属性,默认false

BrokerController.this.printWaterMark();

定时打印Send、Pull、Query、Transaction队列信息

public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } }

定时打印已存储在提交日志中但尚未调度到消费队列的字节数

if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); }

若是设置了NamesrvAddr,需要通过updateNameServerAddressList完成一次NameServer地址的跟新(updateNameServerAddressList在前面博客介绍过了)
若是设置了NamesrvAddr,并且设置了fetchNamesrvAddrByAddressServer属性(默认关闭),需要定时获取更新NameServer地址(fetchNameServerAddr方法在之前博客也介绍过)

if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } }

在非DLeger模式下
若是SLAVE,则需要检查是否设置了HA的Master地址
若设置了Master地址要通过updateHaMasterAddress方法向更新Master地址
updateHaMasterAddress方法实则是在HAClient中实现的:
public void updateMasterAddress(final String newAddr) { String currentAddr = this.masterAddress.get(); if (currentAddr == null || !currentAddr.equals(newAddr)) { this.masterAddress.set(newAddr); log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr); } }

非常简单,只是一个比较替换的操作
若没有设置需要更改updateMasterHAServerAddrPeriodically为true,在后面会有用
若是MASTER,则需要定时打印slave落后的字节数
设置完定时任务后,和NameServer中一样设置了对SslContext的监听
接着通过initialTransaction方法,加载事务需要的实例
initialTransaction方法:
private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }

这里动态加载了TransactionalMessageService和AbstractTransactionalMessageCheckListener的实现类,位于如下
“META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService”
“META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener”
还创建了TransactionalMessageCheckService
initialAcl会创建ACL权限检查:
private void initialAcl() { if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; }List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; }for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; this.registerServerRPCHook(new RPCHook() {@Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); }@Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }

需要设置aclEnable属性,默认关闭
若是设置了,同样会加载"META-INF/service/org.apache.rocketmq.acl.AccessValidator"配置的AccessValidator实体类
然后将其包装成RPC钩子,注册到remotingServer和fastRemotingServer中,用于请求的调用validate方法进行ACL权限检查
initialRpcHooks方法则会注册配置了的RPC钩子:
private void initialRpcHooks() { List rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class); if (rpcHooks == null || rpcHooks.isEmpty()) { return; } for (RPCHook rpcHook: rpcHooks) { this.registerServerRPCHook(rpcHook); } }

加载"META-INF/service/org.apache.rocketmq.remoting.RPCHook"下的配置的实体类
【RocketMQ中Broker的启动源码分析(一)】到此Broker启动前的准备工作已经完成,后面start方法会进行真正的启动,在下一篇博客继续分析

    推荐阅读