深入理解Eureka覆盖状态(九)

应用场景
在实际开发使用过程当中,在Eureka Admin控制台上,我们想强制下线某个服务 ,就需要用到覆盖状态的
概念,其实说白了,就是在给实例存储另外一个状态,当续约,注册的时候,以这个覆盖状态为准 。
覆盖状态
设置覆盖状态 程序入口: com.netflix.eureka.resources.InstanceResource

@PUT @Path("status") public Response statusUpdate( @QueryParam("value") String newStatus, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try { // 判断实例是否存在,不存在则更新失败 if (registry.getInstanceByAppAndId(app.getName(), id) == null) { logger.warn("Instance not found: {}/{}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // 更新状态 boolean isSuccess = registry.statusUpdate(app.getName(), id, InstanceStatus.valueOf(newStatus), lastDirtyTimestamp, "true".equals(isReplication)); if (isSuccess) { logger.info("Status updated: " + app.getName() + " - " + id + " - " + newStatus); return Response.ok().build(); } else { logger.warn("Unable to update status: " + app.getName() + " - " + id + " - " + newStatus); return Response.serverError().build(); } } catch (Throwable e) { logger.error("Error updating instance {} for status {}", id, newStatus); return Response.serverError().build(); } }

1.判断实例在服务端是否存在,不存在则返回更新失败
2.调用更新状态的接口
public boolean statusUpdate(final String appName, final String id, final InstanceStatus newStatus, String lastDirtyTimestamp, final boolean isReplication) { // 调用父类的状态更新 if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { // Eureka Server 集群同步 replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication); return true; } return false; }public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { try { //使用读锁 read.lock(); // 更新状态的次数 状态统计 STATUS_UPDATE.increment(isReplication); // 从本地数据里面获取实例信息, Map> gMap = registry.get(appName); Lease lease = null; if (gMap != null) { lease = gMap.get(id); } // 实例不存在,则直接返回,更细你失败 if (lease == null) { return false; } else { // 执行一下lease的renew方法,里面主要是更新了这个instance的最后更新时间。 lease.renew(); // 获取instance实例信息 InstanceInfo info = lease.getHolder(); // Lease is always created with its instance info object. // This log statement is provided as a safeguard, in case this invariant is violated. if (info == null) { logger.error("Found Lease without a holder for instance id {}", id); } // 当instance信息不为空时 if ((info != null) && !(info.getStatus().equals(newStatus))) { // Mark service as UP if needed // 如果新状态是UP的状态,那么启动一下serviceUp() , 主要是更新服务的注册时间。 if (InstanceStatus.UP.equals(newStatus)) { lease.serviceUp(); } // 将instance Id 和这个状态的映射信息放入覆盖缓存MAP里面去 overriddenInstanceStatusMap.put(id, newStatus); // Set it for transfer of overridden status to replica on // replica start up // 设置覆盖状态到实例信息里面去 info.setOverriddenStatus(newStatus); long replicaDirtyTimestamp = 0; if (lastDirtyTimestamp != null) { replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp); } // // 如果replicaDirtyTimestamp 的时间大于instance的getLastDirtyTimestamp() ,则更新一下 // 此处主要作用是为了设置instance的status 为新状态 if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) { info.setLastDirtyTimestamp(replicaDirtyTimestamp); info.setStatusWithoutDirty(newStatus); } else { info.setStatus(newStatus); } // 将instance的变化放入变化队列里面去,客户端增量获取注册信息时,会用到。 info.setActionType(ActionType.MODIFIED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); info.setLastUpdatedTimestamp(); // 缓存过期 invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress()); } return true; } } finally { read.unlock(); } }

步骤如下:
1.获取实例信息,判断instance信息是否为空
2.将instanceId和覆盖状态,放入overriddenInstanceStatusMap 这个缓存MAP中进行了存储 , 过期时间默认为 1小时
3.更新instance的状态
4.添加instance Change 的记录进入队列
5.清除缓存。
删除覆盖状态 程序入口: com.netflix.eureka.resources.InstanceResource
@DELETE @Path("status") public Response deleteStatusUpdate( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("value") String newStatusValue, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try { // 判断应用是否为空 if (registry.getInstanceByAppAndId(app.getName(), id) == null) { logger.warn("Instance not found: {}/{}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // 删除覆盖状态 InstanceStatus newStatus = newStatusValue =https://www.it610.com/article/= null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue); boolean isSuccess = registry.deleteStatusOverride(app.getName(), id, newStatus, lastDirtyTimestamp,"true".equals(isReplication)); if (isSuccess) { logger.info("Status override removed: " + app.getName() + " - " + id); return Response.ok().build(); } else { logger.warn("Unable to remove status override: " + app.getName() + " - " + id); return Response.serverError().build(); } } catch (Throwable e) { logger.error("Error removing instance's {} status override", id); return Response.serverError().build(); } }

步骤说明:
1.判断应用信息是否为空
2.执行删除覆盖状态的接口
@Override public boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { // 删除覆盖状态 if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { // Eureka Server 集群同步 replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication); return true; } return false; }@Override public boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { try { // 上读锁 read.lock(); // 覆盖状态删除次数统计 STATUS_OVERRIDE_DELETE.increment(isReplication); // 从本地获取appName对应的实例信息 Map> gMap = registry.get(appName); Lease lease = null; if (gMap != null) { lease = gMap.get(id); } if (lease == null) { return false; } else { // lease.renew(); InstanceInfo info = lease.getHolder(); // Lease is always created with its instance info object. // This log statement is provided as a safeguard, in case this invariant is violated. if (info == null) { logger.error("Found Lease without a holder for instance id {}", id); } // 将这个instanceID 对应的覆盖状态信息从MAP中移除 InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id); if (currentOverride != null && info != null) { // 设置instanceInfo的覆盖状态为UNKONW, 这是初始状态 info.setOverriddenStatus(InstanceStatus.UNKNOWN); // 设置instance的status为最新的状态 info.setStatus(newStatus); long replicaDirtyTimestamp = 0; if (lastDirtyTimestamp != null) { replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp); } // If the replication's dirty timestamp is more than the existing one, just update // it to the replica's. if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) { info.setLastDirtyTimestamp(replicaDirtyTimestamp); } // 将instance的变化放入变化队列里面去,客户端增量获取注册信息时,会用到。 info.setActionType(ActionType.MODIFIED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); // 更新instance的最后更新时间 info.setLastUpdatedTimestamp(); // 缓存过期 invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress()); } return true; } } finally { read.unlock(); } }

步骤说明:
1.从本地的CurrentHashMap中获取appName对应的应用信息,然后通过instanceId获取该应用对应的机器
2.移除overriddenInstanceStatusMap中该instanceId对应的覆盖状态
3.设置instanceInfo的覆盖状态为UNKNOW,这个是他的初始状态。
  1. 设置instance的status为新状态
5.添加instance Change 的记录进入队列
6.清除缓存。
实际应用
客户端注册
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try {// .....省略N多代码 // 判断instance的的覆盖状态是否等于UNKONW (默认状态下就是等于UNKONW) if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { // 如果不等于,则说明被修改过,放入overriddenInstanceStatusMap logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } // overriddenInstanceStatusMap 里面是否存在这个instanceId的覆盖状态 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); // 如果存在,则设置进去 if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); }// 获取instance的状态,并设置进去 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // .....省略N多代码 } finally { read.unlock(); } }

续约
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map> gMap = registry.get(appName); Lease leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // 获取应用实例的最终状态 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); // 如果应用实例的最终状态为UNKONW,则无法续约。 if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } // 应用实例的状态和最终的状态不一致,则以最终状态为准 if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { Object[] args = { instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId() }; instanceInfo.setStatus(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true; } }

步骤说明:
1.获取应用实例的最终状态
2.如果最终状态为UNKONW,则无法续约,返回false ,存在UNKONW的可能性,在deleteStatusOverride()的时候
存在UNKONW的可能性
3.应用实例的状态和最终状态不一致,以最终状态为准。
下线
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { // ..... 省略N多代码 // 将覆盖状态移除 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); // ..... 省略N多代码 } finally { read.unlock(); } }

步骤说明:
1.移除覆盖状态
过期 【深入理解Eureka覆盖状态(九)】和下线一致

    推荐阅读