Spring|Spring cloud Eureka(源码)

普通的spring boot注册到服务中心,变成服务提供者,做了2件事
1.加注解@EnableDiscoveryClient
2.指定注册中心地址
先看@EnableDiscoveryClient注解

/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient {}

注释说: 这个注解用来开启DiscoveryClient的实现
DiscoveryClient spring里面接口就搜到一个,其 实现类 有个EurekaDiscoveryClient
说明,DiscoveryClient是springcloud定义发现服务的接口,通过这个接口屏蔽了具体实现,默认用Eureka实现,也可以用其他实现替换,上层代码就不用改了
EurekaDiscoveryClient 依赖了netflix包下的EurekaClient接口,这个接口的实现类
com.netflix.discovery.DiscoveryClient才是实际干活的
得 服务url 它的类注释里面有
/** * The class that is instrumental for interactions with Eureka Server. .... * Eureka Client needs a configured list ofEureka Server * {@link java.net.URL}s to talk to. * * */

说明它用到了我们配过的eureka. client.serviceUrl
找到有个方法getServiceUrlsFromConfig 但是已经废弃,被代替的是链到EndpointUtils
果然有个同名方法
/** * Get the list of all eureka service urls from properties file for the eureka client to talk to. * * @param clientConfig the clientConfig to use * @param instanceZone The zone in which the client resides * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise * @return The list of all eureka service urls for the eureka client to talk to */ public static List getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { List orderedUrls = new ArrayList(); //找到配置的region,没有配置就默认 String region = getRegion(clientConfig); //找到这个region下的所有zone String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; //没有配的话,还是用那个默认的zone //就是我们经常配的eureka. client.serviceUrl.defaultZone availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones)); //根据传入的参数(当前服务所在zone,是不是同zone优先,所有zone), //按某算法决定先用哪个zone(先放入结果集) int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); //clientConfig.getEurekaServerServiceUrls 如果这个zone没有配url就返回eureka. client.serviceUrl.defaultZone的url List serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } }if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }

可以看出,决定serviceUrl的优先级是,先看有没有配region(没有就用默认值),再看这个region下的多个zone(一个都没有就用我配的defaultZone),按某算法(可能是本zone优先)决定先加入结果集的zone,任何顺序一一加入
服务注册 com.netflix.discovery.DiscoveryClient的类注释里面还有
Eureka Client is responsible for
a)Registeringthe instance with Eureka Server
注册服务到注册中心也在这个类.可以看到几个构造器的重载最终会调用到initScheduledTasks();
里面读取了我们设的eureka.client.register-with-eureka要不要注册到服务中心
if (clientConfig.shouldRegisterWithEureka()) { /......... // InstanceInfo replicator 一个定时任务 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize/...... //执行这个定时任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); }

这个定时任务里面执行的是
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //找到触发 注册的地方了!!! discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

discoveryClient.register(); 是以下这个,方法注释写 注册是REST请求方式进行的
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse httpResponse; try { //注册时 客户端传过来的 服务元数据 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }

服务续 续必定是已经注册了 在注册的if里面 有个心跳定时任务
/... // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()//定时执行这个的run ), renewalIntervalInSecs, TimeUnit.SECONDS); /...... // InstanceInfo replicator}

/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable {public void run() { if (renew()) {//都在 renew lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }

/** * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponse httpResponse; try { //直接REST请求方式续约 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); //不正常 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); return register(); } //正常 return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }

服务获取 定时任务initScheduledTasks里面 还有服务获取
private void initScheduledTasks() { //eureka.client.fetch-registry配置 默认true if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timer //去配置里面拿 默认30s int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); }

REST请求的定义 所有交换都是通过REST请求
【Spring|Spring cloud Eureka(源码)】com.netflix.eureka.resources包的ApplicationResource为例
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { //.....一堆校验registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }

@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //父类中的注册实现,把info存在一个2层map里面,第一层key是服务名.第二层key是元数据的id super.register(info, leaseDuration, isReplication); //向其他注册中心复制 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }

    推荐阅读