目录
- 客户端配置缓存更新
- 长轮训任务启动入口
- ClientWorker
- checkConfigInfo
- LongPollingRunnable.run
- checkLocalConfig
- checkListenerMd5
- 检查服务端配置
- checkUpdateDataIds
- checkUpdateConfigStr
- 客户端缓存配置长轮训机制总结
- 服务端配置更新的推送
- doPollingConfig
- addLongPollingClient
- ClientLongPolling
- allSubs
- LongPollingService
- DataChangeTask
- 原理总结
Nacos 作为配置中心,当应用程序去访问Nacos动态获取配置源之后,会缓存到本地内存以及磁盘中。
由于Nacos作为动态配置中心,意味着后续配置变更之后需要让所有相关的客户端感知,并更新本地内存!
那么这个功能是在哪里实现的呢? 以及它是采用什么样的方式来实现配置的更新的呢? 我们一起来探索一下源码的实现!
客户端配置缓存更新
当客户端拿到配置后,需要动态刷新,从而保证数据和服务器端是一致的,这个过程是如何实现的呢?在这一小节中我们来做一个详细分析。
Nacos采用长轮训机制来实现数据变更的同步,原理如下!
文章图片
整体工作流程如下:
- 客户端发起长轮训请求
- 服务端收到请求以后,先比较服务端缓存中的数据是否相同,如果不通,则直接返回
- 如果相同,则通过schedule延迟29.5s之后再执行比较
- 为了保证当服务端在29.5s之内发生数据变化能够及时通知给客户端,服务端采用事件订阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的通知,并且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成了一次数据的推送
- 如果 DataChangeTask 任务完成了数据的 “推送” 之后,ClientLongPolling 中的调度任务又开始执行了怎么办呢?
很简单,只要在进行 “推送” 操作之前,先将原来等待执行的调度任务取消掉就可以了,这样就防止了推送操作写完响应数据之后,调度任务又去写响应数据,这时肯定会报错的。所以,在ClientLongPolling方法中,最开始的一个步骤就是删除订阅事件
长轮训任务启动入口
在NacosConfigService的构造方法中,当这个类被实例化以后,有做一些事情
- 初始化一个HttpAgent,这里又用到了装饰起模式,实际工作的类是ServerHttpAgent, MetricsHttpAgent内部也是调用了ServerHttpAgent的方法,增加了监控统计的信息
- ClientWorker, 客户端的一个工作类,agent作为参数传入到clientworker,可以基本猜测到里面会用到agent做一些远程相关的事情
public NacosConfigService(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {this.encode = Constants.ENCODE;
} else {this.encode = encodeTmp.trim();
}initNamespace(properties);
//this.configFilterChainManager = new ConfigFilterChainManager(properties);
//初始化网络通信组件this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
//初始化ClientWorkerthis.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
ClientWorker
在上述初始化代码中,我们重点需要关注ClientWorker这个类,它的构造方法如下
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,final Properties properties) {this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
//初始化配置过滤管理器// Initialize the timeout parameterinit(properties);
//初始化配置//初始化一个定时调度的线程池,重写了threadfactory方法this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}});
//初始化一个定时调度的线程池,从里面的name名字来看,似乎和长轮训有关系。而这个长轮训应该是和nacos服务端的长轮训this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}});
//设置定时任务的执行频率,并且调用checkConfigInfo这个方法,猜测是定时去检测配置是否发生了变化//首次执行延迟时间为1毫秒、延迟时间为10毫秒this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();
} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}}}, 1L, 10L, TimeUnit.MILLISECONDS);
}
可以看到 ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:
第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。
第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。
checkConfigInfo
ClientWorker构造初始化中,启动了一个定时任务去执行
checkConfigInfo()
方法,这个方法主要是定时检查本地配置和服务器上的配置的变更情况,这个方法定义如下.
public void checkConfigInfo() {// Dispatch tasks.int listenerSize = cacheMap.size();
//// Round up the longingTaskCount.// 向上取整为批数,监听的配置数量除以3000,得到一个整数,代表长轮训任务的数量int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
//currentLongingTaskCount表示当前的长轮训任务数量,如果小于计算的结果,则可以继续创建if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount;
i < longingTaskCount;
i++) {// The task list is no order.So it maybe has issues when changing.executorService.execute(new LongPollingRunnable(i));
}currentLongingTaskCount = longingTaskCount;
}}
这个方法主要的目的是用来检查服务端的配置信息是否发生了变化。如果有变化,则触发listener通知
cacheMap: AtomicReference