深入理解Eureka - Eureka Server节点复制机制
Eureka Server提供了节点复制机制,来保证注册中心节点间服务注册信息的同步。
Eureka Server节点之间服务注册信息的同步发生在以下两种场景下:
- Eureka Server节点启动时,主动拉取其他节点的服务注册信息。
- Eureka Server接收到Register、renew、cancel时,主动推送至其他节点。
Eureka Server节点启动时的服务同步
Eureka Server启动时,EurekaServerBootStrap调用PeerAwareInstanceRegistryImpl.syncUp(),同步注册服务信息。
// Copy registry from neighboring eureka node int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount);
syncUp()进行注册服务信息的同步:
/** * Populates the registry information from a peer eureka node. This * operation fails over to other nodes until the list is exhausted if the * communication fails. */ @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
首先会从EurekaClient.getApplications获取所有的注册服务信息,然后调用register()进行服务注册。
如果同步失败,则会sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次进行同步。
同步完成后,调用PeerAwareInstanceRegistryImpl.openForTraffic()方法,进行自我保护阀值的计算:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
至此,Eureka Server启动时的节点复制就进行完了。
备注
最大同步重试次数=serverConfig.getRegistrySyncRetries()。
同步失败后每次同步的间隔时间=serverConfig.getRegistrySyncRetryWaitMs()。
Eureka Server接收到Register、renew、cancel时的服务同步
处理Register、renew、cancel同步时比较复杂,包括好几个队列的转换处理以及异常处理。这里算是个简图吧,省略了队列的转换和异常处理。
Eureka Server接收到Register、renew或cancel事件后,执行向其他节点同步:
/** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. * */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
/** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
以Register为例(逻辑是一样的),Eureka Server循环遍历其他节点,然后调用node.register(info):
/** * Sends the registration information of {@link InstanceInfo} receiving by * this node to the peer node represented by this class. * * @param info * the instance information {@link InstanceInfo} of any instance * that is send to this instance. * @throws Exception */ public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
这里执行TaskDispatcher的process,实际底层转到了AcceptorExecutor.process():
acceptorExecutor.process(id, task, expiryTime);
然后acceptorExecutor里会开启内部线程AcceptorRunner进行处理,处理逻辑有点复杂(包括队列的转换和异常处理),最终将处理好的结果放到BlockingQueue<List<TaskHolder>> batchWorkQueue中。
TaskExecutors中会在内部开启WokerRunnable线程组(ThreadGroup),循环poll batchWorkQueue队列中的Task,然后调用InstanceReplicationTask的execute方法,将register事件推送到其他Eureka Server节点。
public void run() { try { while (!isShutdown.get()) { List<TaskHolder<ID, T>> holders = getWork(); metrics.registerExpiryTimes(holders); List<T> tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery WorkerThread error", e); } }
register、renew、cancel的处理逻辑一样,只不过调用的同步接口不同罢了。
Spring Cloud实战项目Jbone地址
github地址:https://github.com/417511458/jbone
码云地址:https://gitee.com/majunwei2017/jbone