Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())

Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())
文章图片

前言 大家好,我是小郭,在前面一篇的文章,我们已经完成了RocketMQ在docker环境下的搭建,接下来主要从五个方面学习NameServer,分别是架构设计、启动流程、路由注册、故障删除、路由发现。
Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())
文章图片

NameServer是什么? 【Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())】Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())
文章图片

  • 从RocketMQ的架构图中,多个NameServer节点组成集群提高RocketMQ的可用性
  • NameServer节点是无状态的,节点间不通信,所以需要Broker轮询去进行注册
    主要做两件事:
  1. 对Topic进行保存和管理,为消息客户端根据Topic提供路由信息服务
  2. 对Broker进行保存和管理,保存Broker注册信息,同时保持长连接
  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
NameServer的架构设计 Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())
文章图片

从源码入手,看启动流程 思考问题:
  1. 如果是你设计启动流程,怎么设计最简单的启动方式?
入口:org.apache.rocketmq.namesrv.NamesrvStartup#main0
STEP1:解析配置文件 ,填充NamesrvConfig、NettyServerConfig,返回NamesrvController实例
我们在启动NameServer的时候通常会使用./mqbaneserver -c configFile -p,那-c和-p主要有什么作用呢?
  1. 通过-c 带上指定 文件,我们可以通过文件读取配置
// 通过-c指令指定配置文件地址 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); ? namesrvConfig.setConfigStorePath(file); ? System.out.printf("load config properties file OK, %s%n", file); in.close(); } } 复制代码

  1. 利用-p 可以输出当前加载的配置信息
if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } 复制代码

  1. NamesrvConfig主要配置信息
// 获取RocketMq主目录 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); ? // 存储KV配置属性的持久化路径 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; ? // 配置文件路径,启动时候使用 -c 选项 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; ? private String productEnvName = "center"; private boolean clusterTest = false; ? //是否支持顺序消息 private boolean orderMessageEnable = false; 复制代码

STEP2:根据属性创建NamesrvController 实例,并初始化该实例
入口:NamesrvStartup#start
  1. 加载Kv配置
public void load() { String content = null; try { // 通过存储KV配置属性的持久化路径获取内容 content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { // 序列化转换 KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } } } 复制代码

  1. 定时任务 每10s扫描一次Broker,移除失活Broker
// 定时任务 每10s扫描一次Broker,移除失活Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); 复制代码

超过120s,则从NameServer移除,直接移除是为了降低设计的复杂度
//超过120s失联,移除 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } 复制代码

  1. 定时任务 每隔10分钟,打印一次Kv配置信息
// 每隔10分钟,打印一次Kv配置信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); 复制代码

STEP3:NameServer停止流程,利用JVM钩子函数,监听Broker
钩子函数的作用也大多数用来收尾的,在JVM进程关闭之前,进行一些清理工作比如关闭资源或者同步等。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); 复制代码

最后,基于Netty实现的RPC服务端,NettyRemotingServer通过nettyServerConfig配置进行启动
通过上面对源码的分析,现在可以来回答思考的问题,我们只需要配置NamesrvConfig和nettyServerConfig,就能利用netty启动NameServer。
路由注册 上面提过NameServer对路由信息管理,主要是进行路由的注册、故障剔除和路由信息存储。
问题:
  1. 路由信息如何注册到NameServer?
  2. Broker出现故障是如何发现和解决的?
STEP 1: 利用线程池,遍历NameServer依次发送心跳包
入口:BrokerController#start#registerBrokerAll#doRegisterBrokerAll
  1. 开启一个线程池定时任务,每隔30s向集群中所有NameServer发送心跳包
    只有当调度任务来的时候,线程池才会去启动一个线程
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); 复制代码

  1. 遍历所有NameServer列表,封装请求头 RegisterBrokerRequestHeader进行发送
获取全部的Address地址
List nameServerAddressList = this.remotingClient.getNameServerAddressList();
封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); 复制代码

遍历所有NameServer
for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } 复制代码

STEP2:心跳包处理
入口:RouteInfoManager#registerBroker
  1. 判断 Broker所属集群是否存在,无则创建,然后将brokerName加入到集群Broker集群中
Set brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); 复制代码

  1. 维护BrokerData信息,如果没有则初始化BrokerData信息,主要做维护集群名称,broker名称
BrokerData brokerData = https://www.it610.com/article/this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap()); this.brokerAddrTable.put(brokerName, brokerData); } 复制代码

  1. 如果 brokerId为Master,并且Topic配置信息发生变化或者第一次注册,需要更新元数据
// brokerId为Master if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // Topic配置信息发生变化或者第一次注册 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) { ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { // 更新元数据 for (Map.Entry entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } 复制代码

  1. 更新BrokerLiveInfo
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } 复制代码

  1. 注册Broker的过滤器Server的地址列表
if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr); } else {this.filterServerTable.put(brokerAddr, filterServerList); }}
  1. 如果不是master,则查询出master信息更新到对应的masterAddr
if (MixAll.MASTER_ID != brokerId) { // 从BrokerAddrs中查询master信息 String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } 复制代码

在注册 Broker中 ,为了防止并发修改路由信息,整体加上了一个写锁。
STEP3:同一时刻NameServer只处理一个Broker心跳包,多个心跳包串行执行
故障剔除 NameServer每隔10s扫描一下Broker,当BrokerLive的最后更新时间差距超过120s,则认为失效
遍历Broker相关的集合,移除失效的Broker信息
入口:RouteInfoManager#scanNotActiveBroker
移除失活Broker
// 定时任务 每10s扫描一次Broker,移除失活Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); 复制代码

//扫描失活Broker public void scanNotActiveBroker() { Iterator it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //超过120s失联,移除 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } 复制代码

移除brokerAddr
Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } 复制代码

移除BrokerName
Iterator> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry> entry = it.next(); String clusterName = entry.getKey(); Set brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); }break; } } 复制代码

移除Topic
Iterator> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List queueDataList = entry.getValue(); Iterator itQueueData = https://www.it610.com/article/queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } }if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } 复制代码

路由发现 路由发现并不是实时的,当Topic发生变动时,NameServer不主动推送给客户端。
入口:DefaultRequestProcessor#getRouteInfoByTopic
  1. 从路由表topicQueueTable、brokerAddrTable、filterServerTable获取填充TopicRouteData的数据
TopicRouteData topicRouteData = https://www.it610.com/article/this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); 复制代码

  1. 判断是否开启顺序消息,有则则从kv配置中获取关于顺序消息相关的配置填充路由信息
//是否为顺序消息 if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { // 则从kv配置中获取关于顺序消息相关的配置填充路由信息 String orderTopicConf= this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } 复制代码

  1. TopicRouteData为空则返回未找到对应路由,有则返回
总结 通过对NameServer源码的阅读,进行一些总结
  1. NameServer通过构建NamesrvConfig和nettyServerConfig配置,调用NettyRemotingServer进行启动。
  2. Broker向NameServer注册,NameServer对Broker信息进行维护,然后每隔30s向NameServer发送心跳包。
  3. NameServer每隔10s扫描一次Broker,检测活跃Broker,当LastUpdateTime失联超过120s,则判定为不可用,并进行移除broker相关信息。
  4. 在一些关键的地方,进行配置信息、错误信息的日志输出这一点是我们可以学习的。
  5. 学习JVM钩子函数Hook的实战使用,在关闭资源或者其他的清理工作上,可以成为一种选择。
Java|【消息中间件】面试官(说一说NameServer的路由注册和剔除吧())
文章图片


伙伴们有兴趣想了解内容和更多相关学习资料的请点赞收藏+评论转发+关注我,后面会有很多干货。我有一些面试题、架构、设计类资料可以说是程序员面试必备!所有资料都整理到网盘了,需要的话欢迎下载!私信我回复【999】即可免费获取

作者:叫我小郭_
链接:https://juejin.cn/post/7101577870874509349
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    推荐阅读