在PeerAwareInstanceRegistryImpl这个注册实现类中会看到replicateToPeers这个方法:
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
...
return true;
}
return false;
}@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
了解eureka的朋友可能都知道replicateToPeers这个方法是用于eureka server之间同步eureka client信息的方法。其中的代码是这样的:
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
其中有两个关键代码:peerEurekaNodes.getPeerEurekaNodes() 和 peerEurekaNodes.isThisMyUrl(node.getServiceUrl()))
peerEurekaNodes.isThisMyUrl() 这个方法是判断node是否自己(当前节点),如果是自己,就跳过,否则发送复制信息给这个eureka server node。
peerEurekaNodes.getPeerEurekaNodes() 是eureka server node的列表,eureka server会维护集群中跟自己有直接关系的eureka server信息,其实就是将eureka.client.serviceUrl.{zone}这个url列表中生成,默认是eureka.client.serviceUrl.defaultZone,接下来看下维护这个peerEurekaNodes的逻辑。
peerEurekaNodes管理
其实逻辑很简单,代码的执行序列是这样的:
spring DefaultEureka PeerEurekaNodes ScheduledExe initialize 在EurekaServerAu toConfiguration 中发布了DefaultEure kaServerContext 和PeerEurekaNode s start DefaultEurekaServerContext .initialize()方法标注了@PostCon struct,Spring会在加载完该类后执行这个方 法。 scheduleWithFixedDelay PeerEurekaNo des创建一个定时任务, 定时执行updateP eerEurekaNod es()方法,更新pee rEurekaNodes updatePeerEurekaNodes spring DefaultEureka PeerEurekaNodes ScheduledExe 这里将PeerEurekaNodes的部分代码贴出来,我认为设计得挺巧妙的
public void start() {
...
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
...
}
} protected List> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
} protected void updatePeerEurekaNodes(List> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}Set> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}// Remove peers no long available
List newNodeList = new ArrayList<>(peerEurekaNodes);
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
1、在创建执行updatePeerEurekaNodes的定时任务之前,先执行一遍updatePeerEurekaNodes()方法,更新peer eureka nodes列表,之前要先执行这个方法,是因为定时任务执行的周期默认是10分钟,如果等定时调度来执行的话,太慢了。
2、resolvePeerUrls()获取peer node url时,调用了一个内部判断方法isThisMyUrl(),判断peer url是否是当前节点的url。如果是,就剔除掉。
【spring cloud eureka server 集群同步之peernodes管理】3、updatePeerEurekaNodes()方法中,通过以前的peerurls减去当前的peerurls判断是否要更新peerEurekaNodes列表,以减少PeerEurekaNode的创建工作。
推荐阅读
- #|SpringCloud-Feign[微服务日志处理]
- spring-cloud|springcloud之高可用eureka集群搭建
- 实现微服务预热调用之后再开始服务(下)
- 实现微服务预热调用之后再开始服务(上)
- SpringCloud升级之路2020.0.x版-45. 实现公共日志记录