Spring Cloud Eureka: 客户端运行原理及源码阅读

前言

Eureka可分为客户端和服务端两个部分,本篇主要介绍的客户端的运行机制和概念.

 

Eureka客户端主要实现的功能分别是服务注册,服务发现,服务续约

服务注册:Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。

服务续约:在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,默认每30秒发送一次

获取服务:服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,并且缓存在Eureka Client本地,默认缓存30秒。

 

源码阅读

Eureka客户端的实现主要依靠注解@EnableDiscoveryClient开始,所以我们先看看注解中有什么信息

我们在注解类中可以看到一段注释信息,告诉我们这个注解的作用主要是为了启动DiscoveryClient实现.这个提示还是比较明显哦,我们进入DiscoveryClient看看


类上面的注释,说明了这个类是用来帮助和Eureka Server互相协作的,可以进行服务注册,服务续约,服务下线,获取服务列表.

没错,这个就是Eureka 客户端的核心实现类,在DiscoveryClient类中可以看到有很多方法,包括register()、renew()、shutdown()、unregister()等。
既然Eureka Client需要一开始先初始化DiscoveryClient实例,那就看下DiscoveryClient的构造方法。在其中的一个构造方法中可以看到他调用了initScheduledTasks();

我们跟进这个方法看个究竟

/**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

该方法通过多个if判断实现,第一个if判断if (clientConfig.shouldFetchRegistry())是判断是否需要拉去eureka服务端的注册列表,如果需要则会开始定时调度线程,每隔30秒拉去一次.

第二个if判断 if (clientConfig.shouldRegisterWithEureka())是判断是否需要将自己注册到eureka服务端中,如果为true则开启一个服务续约的定时调度线程,每隔30秒续约一次

并且new了一个InstanceInfoReplicator类,该类很重要,他实现了Runable接口,在它的run方法中会发起服务注册.

 public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

 discoveryClient.register();调用真正的注册方法,对当前的节点进行服务注册

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

 

httpResponse = eurekaTransport.registrationClient.register(instanceInfo);通过这段代码最后返回响应信息,完成整个客户端的服务注册

 

 


版权声明:本文为weixin_42214548原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。