聊聊eureka|聊聊eureka instance的lastDirtyTimestamp

序 本文主要研究一下eureka instance的lastDirtyTimestamp
server端

  • lastDirtyTimestamp
    last timestamp when this instance information was updated.
即该instance在client端最后被修改的时间戳
instance的接口,除了更新meta以及cancelLease操作外,其他修改的操作都要带上lastDirtyTimestamp,比如
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.java
@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { //...... }@PUT @Path("status") public Response statusUpdate( @QueryParam("value") String newStatus, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { //...... }@DELETE @Path("status") public Response deleteStatusUpdate( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("value") String newStatusValue, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { //...... }

除renewLease外,其他两个的lastDirtyTimestamp仅仅是用来做传递用。
renewLease
@PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response = null; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; }

如果renew不成功,则返回404,client端则会执行register逻辑
如果renew成功,且lastDirtyTimestamp不为null,则判断是否需要SyncWhenTimestampDiffers,默认true
validateDirtyTimestamp
private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) { InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false); if (appInfo != null) { if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) { Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication}; if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) { logger.debug( "Time to sync, since the last dirty timestamp differs -" + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); return Response.status(Status.NOT_FOUND).build(); } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) { // In the case of replication, send the current instance info in the registry for the // replicating node to sync itself with this one. if (isReplication) { logger.debug( "Time to sync, since the last dirty timestamp differs -" + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); return Response.status(Status.CONFLICT).entity(appInfo).build(); } else { return Response.ok().build(); } } }} return Response.ok().build(); }

如果lastDirtyTimestamp参数大于server本地instance的lastDirtyTimestamp值,则response返回404;
如果是server本地大于lastDirtyTimestamp参数,不是replication模式则返回200,replication模式,则返回409 Conflict,让调用方同步自己的数据。
PeerEurekaNode
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java
/** * Send the heartbeat information of an instance to the node represented by * this class. If the instance does not exist the node, the instance * registration information is sent again to the peer node. * * @param appName *the application name of the instance. * @param id *the unique identifier of the instance. * @param info *the instance info {@link InstanceInfo} of the instance. * @param overriddenStatus *the overridden status information if any of the instance. * @throws Throwable */ public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable { if (primeConnection) { // We do not care about the result for priming request. replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); return; } ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { @Override public EurekaHttpResponse execute() throws Throwable { return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); }@Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); if (info != null) { logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", getTaskName(), info.getId(), info.getStatus()); register(info); } } else if (config.shouldSyncWhenTimestampDiffers()) { InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; if (peerInstanceInfo != null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } }; long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime); }

这里的handleFailure,对于404的处理就是重新register;其他的就是诸如409的情况,先判断是否开启shouldSyncWhenTimestampDiffers配置,这个默认是true,然后将peer返回的最新info信息覆盖到本地
syncInstancesIfTimestampDiffers
/** * Synchronize {@link InstanceInfo} information if the timestamp between * this node and the peer eureka nodes vary. */ private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) { try { if (infoFromPeer != null) { logger.warn("Peer wants us to take the instance information from it, since the timestamp differs," + "Id : {} My Timestamp : {}, Peer's timestamp: {}", id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp()); if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) { logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus()); registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus()); } registry.register(infoFromPeer, true); } } catch (Throwable e) { logger.warn("Exception when trying to set information from peer :", e); } }

这一段就是peerNode在replicate中遇到409的时候,根据返回的InstanceInfo覆盖本地的过程。
client端 eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/InstanceInfo.java
/** * Sets the dirty flag so that the instance information can be carried to * the discovery server on the next heartbeat. */ public synchronized void setIsDirty() { isInstanceInfoDirty = true; lastDirtyTimestamp = System.currentTimeMillis(); }

这个是更改lastDirtyTimestamp的基本方法,调用这个方法的其他方法如下:
@Deprecated public void setSID(String sid) { this.sid = sid; setIsDirty(); }/** * Set the status for this instance. * * @param status status for this instance. * @return the prev status if a different status from the current was set, null otherwise */ public synchronized InstanceStatus setStatus(InstanceStatus status) { if (this.status != status) { InstanceStatus prev = this.status; this.status = status; setIsDirty(); return prev; } return null; }/** * @param isDirty true if dirty, false otherwise. * @deprecated use {@link #setIsDirty()} and {@link #unsetIsDirty(long)} to set and unset * 【聊聊eureka|聊聊eureka instance的lastDirtyTimestamp】* Sets the dirty flag so that the instance information can be carried to * the discovery server on the next heartbeat. */ @Deprecated public synchronized void setIsDirty(boolean isDirty) { if (isDirty) { setIsDirty(); } else { isInstanceInfoDirty = false; // else don't update lastDirtyTimestamp as we are setting isDirty to false } }/** * Set the dirty flag, and also return the timestamp of the isDirty event * * @return the timestamp when the isDirty flag is set */ public synchronized long setIsDirtyWithTime() { setIsDirty(); return lastDirtyTimestamp; }/** * Register application specific metadata to be sent to the discovery * server. * * @param runtimeMetadata *Map containing key/value pairs. */ synchronized void registerRuntimeMetadata( Map runtimeMetadata) { metadata.putAll(runtimeMetadata); setIsDirty(); }

isInstanceInfoDirty
@JsonIgnore public boolean isDirty() { return isInstanceInfoDirty; }/** * @return the lastDirtyTimestamp if is dirty, null otherwise. */ public synchronized Long isDirtyWithTime() { if (isInstanceInfoDirty) { return lastDirtyTimestamp; } else { return null; } }/** * Unset the dirty flag iff the unsetDirtyTimestamp matches the lastDirtyTimestamp. No-op if * lastDirtyTimestamp > unsetDirtyTimestamp * * @param unsetDirtyTimestamp the expected lastDirtyTimestamp to unset. */ public synchronized void unsetIsDirty(long unsetDirtyTimestamp) { if (lastDirtyTimestamp <= unsetDirtyTimestamp) { isInstanceInfoDirty = false; } else { } }

这个方法,如果lastDirtyTimestamp <= unsetDirtyTimestamp,则标识isInstanceInfoDirty为false
InstanceInfoReplicator
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

这里首先refreshInstanceInfo,之后判断如果是isInstanceInfoDirty则会执行register操作
refreshInstanceInfo
/** * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the * isDirty flag on the instanceInfo is set to true */ void refreshInstanceInfo() { applicationInfoManager.refreshDataCenterInfoIfRequired(); applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; }if (null != status) { applicationInfoManager.setInstanceStatus(status); } }

refreshDataCenterInfoIfRequired
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/ApplicationInfoManager.java
/** * Refetches the hostname to check if it has changed. If it has, the entire * DataCenterInfo is refetched and passed on to the eureka * server on next heartbeat. * * see {@link InstanceInfo#getHostName()} for explanation on why the hostname is used as the default address */ public void refreshDataCenterInfoIfRequired() { String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo()); instanceInfo.setIsDirty(); } }public void refreshLeaseInfoIfRequired() { LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); if (leaseInfo == null) { return; } int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(currentLeaseRenewal) .setDurationInSecs(currentLeaseDuration) .build(); instanceInfo.setLeaseInfo(newLeaseInfo); instanceInfo.setIsDirty(); } }

从新刷新配置,如果有变更则instanceInfo.setIsDirty()
小结 在server端,处理renewLease的时候,对lastDirtyTimestamp参数进行判断,如果大于server本地instance的lastDirtyTimestamp值,则response返回404;如果是server本地大于lastDirtyTimestamp参数,不是replication模式则返回200,replication模式,则返回409 Conflict,让调用方同步自己的数据。
在client端,有两个属性,一个是lastDirtyTimestamp,一个是isInstanceInfoDirty。在更新instance的status的时候,会调用setIsDirty,即更新lastDirtyTimestamp以及设置isInstanceInfoDirty为true;然后client端还有个InstanceInfoReplicator定时任务,会定时读取配置文件,如果有变更则调用setIsDirty,之后是调用instanceInfo.isDirtyWithTime(),如果是dirty则会重新register并重置isInstanceInfoDirty,更新lastDirtyTimestamp。
doc
  • 聊聊eureka client的HeartbeatThread
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 聊聊EurekaHealthCheckHandler
  • 聊聊eureka server的instance注册及元数据变更接口

    推荐阅读