微服务优雅下线与Nacos健康检查机制详解
先抛出一个问题
Nacos的健康检查和心跳机制到底是如何实现的呢?在项目实践中是否又可以参考Nacos的健康检查机制,运用于其他地方呢?
这篇文章,就带大家来揭开Nacos健康检查机制的面纱。
Nacos的健康检查
Nacos中临时实例基于心跳上报方式维持活性,基本的健康检查流程基本如下:Nacos客户端会维护一个定时任务,每隔5秒发送一次心跳请求,以确保自己处于活跃状态。Nacos服务端在15秒内如果没收到客户端的心跳请求,会将该实例设置为不健康,在30秒内没收到心跳,会将这个临时实例摘除。
原理很简单,关于代码层的实现,下面来就逐步来进行解析。
客户端的心跳
实例基于心跳上报的形式来维持活性,当然就离不开心跳功能的实现了。这里以客户端心跳实现为基准来进行分析。
Spring Cloud提供了一个标准接口ServiceRegistry,Nacos对应的实现类为NacosServiceRegistry。Spring Cloud项目启动时会实例化NacosServiceRegistry,并调用它的register方法来进行实例的注册。
@Override public void register(Registration registration) { // ... NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); }catch (Exception e) { // ... } }
在该方法中有两处需要注意,第一处是构建Instance的
getNacosInstanceFromRegistration方法,该方法内会设置Instance的元数据(metadata),通过源元数据可以配置服务器端健康检查的参数。比如,在Spring Cloud中配置的如下参数,都可以通过元数据项在服务注册时传递给Nacos的服务端。
spring: application: name: user-service-provider cloud: nacos: discovery: server-addr: 127.0.0.1:8848 heart-beat-interval: 5000 heart-beat-timeout: 15000 ip-delete-timeout: 30000
其中的heart-beat-interval、heart-beat-timeout、ip-delete-timeout这些健康检查的参数,都是基于元数据上报上去的。
register方法的第二处就是调用NamingService#registerInstance来进行实例的注册。NamingService是由Nacos的客户端提供,也就是说Nacos客户端的心跳本身是由Nacos生态提供的。
在registerInstance方法中最终会调用到下面的方法:
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
其中BeatInfo#addBeatInfo便是进行心跳处理的入口。当然,前提条件是当前的实例需要是临时(瞬时)实例。
对应的方法实现如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
在倒数第二行可以看到,客户端是通过定时任务来处理心跳的,具体的心跳请求由BeatTask完成。定时任务的执行频次,封装在BeatInfo,回退往上看,会发现BeatInfo的Period来源于Instance#
getInstanceHeartBeatInterval()。该方法具体实现如下:
public long getInstanceHeartBeatInterval() { return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL); }
可以看出定时任务的执行间隔就是配置的metadata中的数据
preserved.heart.beat.interval,与上面提到配置heart-beat-interval本质是一回事,默认是5秒。
BeatTask类具体实现如下:
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
在run方法中通过NamingProxy#sendBeat完成了心跳请求的发送,而在run方法的最后,再次开启了一个定时任务,这样周期性的进行心跳请求。
NamingProxy#sendBeat方法实现如下:
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
实际上,就是调用了Nacos服务端提供的"
/nacos/v1/ns/instance/beat"服务。
在客户端的常量类Constants中定义了心跳相关的默认参数:
static { DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); }
这样就呼应了最开始说的Nacos健康检查机制的几个时间维度。
服务端接收心跳
分析客户端的过程中已经可以看出请求的是
/nacos/v1/ns/instance/beat这个服务。Nacos服务端是在Naming项目中的InstanceController中实现的。
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { // ... Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); if (instance == null) { // ... instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); // ... service.processClientBeat(clientBeat); // ... return result; }
服务端在接收到请求时,主要做了两件事:第一,如果发送心跳的实例不存在,则将其进行注册;第二,调用其Service的processClientBeat方法进行心跳处理。
processClientBeat方法实现如下:
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
ClientBeatProcessor同样是一个实现了Runnable的Task,通过HealthCheckReactor定义的scheduleNow方法进行立即执行。
scheduleNow方法实现:
public static ScheduledFuture<?> scheduleNow(Runnable task) { return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS); }
再来看看ClientBeatProcessor中对具体任务的实现:
@Override public void run() { Service service = this.service; // logging String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { // logging instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); // logging getPushService().serviceChanged(service); } } } } }
在run方法中先检查了发送心跳的实例和IP是否一致,如果一致则更新最后一次心跳时间。同时,如果该实例之前未被标记且处于不健康状态,则将其改为健康状态,并将变动通过PushService提供事件机制进行发布。事件是由Spring的ApplicationContext进行发布,事件为ServiceChangeEvent。
通过上述心跳操作,Nacos服务端的实例的健康状态和最后心跳时间已经被刷新。那么,如果没有收到心跳时,服务器端又是如何判断呢?
服务端心跳检查
客户端发起心跳,服务器端来检查客户端的心跳是否正常,或者说对应的实例中的心跳更新时间是否正常。
服务器端心跳的触发是在服务实例注册时触发的,同样在InstanceController中,register注册实现如下:
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // ... final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
ServiceManager#registerInstance实现代码如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // ... }
心跳相关实现在第一次创建空的Service中实现,最终会调到如下方法:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
在putServiceAndInit方法中对Service进行初始化:
private void putServiceAndInit(Service service) throws NacosException { putService(service); service = getService(service.getNamespaceId(), service.getName()); service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
service.init()方法实现:
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
HealthCheckReactor#scheduleCheck方法实现:
public static void scheduleCheck(ClientBeatCheckTask task) { futureMap.computeIfAbsent(task.taskKey(), k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); }
延迟5秒执行,每5秒检查一次。
在init方法的第一行便可以看到执行健康检查的Task,具体Task是由ClientBeatCheckTask来实现,对应的run方法核心代码如下:
@Override public void run() { // ... List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); // logging... getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance deleteIp(instance); } } }
在第一个for循环中,先判断当前时间与上次心跳时间的间隔是否大于超时时间。如果实例已经超时,且未被标记,且健康状态为健康,则将健康状态设置为不健康,同时发布状态变化的事件。
在第二个for循环中,如果实例已经被标记则跳出循环。如果未标记,同时当前时间与上次心跳时间的间隔大于删除IP时间,则将对应的实例删除。
小结
通过本文的源码分析,我们从Spring Cloud开始,追踪到Nacos Client中的心跳时间,再追踪到Nacos服务端接收心跳的实现和检查实例是否健康的实现。想必通过整个源码的梳理,你已经对整个Nacos心跳的实现有所了解。
下面我们来看看正常关闭服务时如何让微服务优雅下线。
为什么说是优雅下线?我们知道在分布式应用中为了满足CAP原则中的A(可用性),像Nacos、Eureka等注册中心的客户端都会进行实例列表的缓存。当正常关闭应用时,虽然可以主动调用注册中心进行注销,但这些客户端缓存的实例列表还是要等一段时间才会失效。
上述情况就有可能导致服务请求到已经被关闭的实例上,虽然通过重试机制可以解决掉这个问题,但这种解决方案会出现重试,在一定程度上会导致用户侧请求变慢。这时就需要进行优雅的下线操作了。
下面我们先从通常关闭进程的几种方式聊起。
方式一:基于kill命令
Spring Cloud本身对关闭服务是有支持的,当通过kill命令关闭进程时会主动调用Shutdown hook来进行当前实例的注销。使用方式:
kill Java进程ID
这种方式是借助Spring Cloud的Shutdown hook机制(本质是Spring Boot提供,Spring Cloud服务发现功能进行具体注销实现),在关闭服务之前会对Nacos、Eureka等服务进行注销。但这个注销只是告诉了注册中心,客户端的缓存可能需要等几秒(Nacos默认为5秒)之后才能感知到。
这种Shutdown hook机制不仅适用于kill命令,还适用于程序正常退出、使用System.exit()、终端使用Ctrl + C等。但不适用于kill -9 这样强制关闭或服务器宕机等场景。
这种方案虽然比直接挂掉要等15秒缩短了时间,相对好一些,但本质上并没有解决客户端缓存的问题,不建议使用。
方式二:基于/shutdown端点
在Spring Boot中,提供了/shutdown端点,基于此也可以实现优雅停机,但本质上与第一种方式相同,都是基于Shutdown hook来实现的。在处理完基于Shutdown hook的逻辑之后,也会进行服务的关闭,但同样面临客户端缓存的问题,因此,也不推荐使用。
这种方式首先需要在项目中引入对应的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
然后在项目中配置开启/shutdown端点:
management: endpoint: shutdown: enabled: true endpoints: web: exposure: include: shutdown
然后停服时请求对应的端点,这里采用curl命令示例:
curl -X http://实例服务地址/actuator/shutdown
方式三:基于/pause端点
Spring Boot同样提供了/pause端点(Spring Boot Actuator提供),通过/pause端点,可以将/health为UP状态的实例修改为Down状态。
基本操作就是在配置文件中进行pause端点的开启:
management: endpoint: # 启用pause端点 pause: enabled: true # pause端点在某些版本下依赖restart端点 restart: enabled: true endpoints: web: exposure: include: pause,restart
然后发送curl命令,即可进行服务的终止。注意这里需要采用POST请求。
关于/pause端点的使用,不同的版本差异很大。笔者在使用Spring Boot 2.4.2.RELEASE版本时发现根本无法生效,查了Spring Boot和Spring Cloud项目的Issues发现,这个问题从2.3.1.RELEASE就存在。目前看应该是在最新版本中Web Server的管理改为SmartLifecycle的原因,而Spring Cloud对此貌似放弃了支持(有待考察),最新的版本调用/pause端点无任何反应。
鉴于上述版本变动过大的原因,不建议使用/pause端点进行微服务的下线操作,但使用/pause端点的整个思路还是值得借鉴的。
基本思路就是:当调用/pause端点之后,微服务的状态会从UP变为DOWN,而服务本身还是可以正常提供服务。当微服务被标记为DOWN状态之后,会从注册中心摘除,等待一段时间(比如5秒),当Nacos客户端缓存的实例列表更新了,再进行停服处理。
这个思路的核心就是:先将微服务的流量切换掉,然后再关闭或重新发布。这就解决了正常发布时客户端缓存实例列表的问题。
基于上述思路,其实自己也可以实现相应的功能,比如提供一个Controller,先调用该Controller中的方法将当前实例从Nacos中注销,然后等待5秒,再通过脚本或其他方式将服务关闭掉。
方式四:基于/service-registry端点
方式三中提到的方案如果Spring Cloud能够直接支持,那就更好了。这不,Spring Cloud提供了/service-registry端点。但从名字就可以知道专门针对服务注册实现的一个端点。
在配置文件中开启/service-registry端点:
management: endpoints: web: exposure: include: service-registry base-path: /actuator endpoint: serviceregistry: enabled: true
访问
http://localhost:8081/actuator 端点可以查看到开启了如下端点:
{ "_links": { "self": { "href": "http://localhost:8081/actuator", "templated": false }, "serviceregistry": { "href": "http://localhost:8081/actuator/serviceregistry", "templated": false } } }
通过curl命令来进行服务状态的修改:
curl -X "POST" "http://localhost:8081/actuator/serviceregistry?status=DOWN" -H "Content-Type: application/vnd.spring-boot.actuator.v2+json;charset=UTF-8"
执行上述命令之前,查看Nacos对应实例状态为:
可以看到实例详情中的按钮为“下线”也就是说目前处于UP状态。当执行完上述curl命令之后,实例详情中的按钮为“上线”,说明实例已经下线了。
上述命令就相当于我们在Nacos管理后台手动地操作了实例的上下线。
当然,上述情况是基于Spring Cloud和Nacos的模式实现的,本质上Spring Cloud是定义了一个规范,比如所有的注册中心都需要实现ServiceRegistry接口,同时基于ServiceRegistry这个抽象还定义了通用的Endpoint:
@Endpoint(id = "serviceregistry") public class ServiceRegistryEndpoint { private final ServiceRegistry serviceRegistry; private Registration registration; public ServiceRegistryEndpoint(ServiceRegistry<?> serviceRegistry) { this.serviceRegistry = serviceRegistry; } public void setRegistration(Registration registration) { this.registration = registration; } @WriteOperation public ResponseEntity<?> setStatus(String status) { Assert.notNull(status, "status may not by null"); if (this.registration == null) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found"); } this.serviceRegistry.setStatus(this.registration, status); return ResponseEntity.ok().build(); } @ReadOperation public ResponseEntity getStatus() { if (this.registration == null) { return ResponseEntity.status(HttpStatus.NOT_FOUND).body("no registration found"); } return ResponseEntity.ok().body(this.serviceRegistry.getStatus(this.registration)); } }
我们上面调用的Endpoint便是通过上面代码实现的。所以不仅Nacos,只要基于Spring Cloud集成的注册中心,本质上都是支持这种方式的服务下线的。
小结
很多项目都逐步在进行微服务化改造,但一旦因为微服务系统,将面临着更复杂的情况。本篇文章重点基于Nacos在Spring Cloud体系中优雅下线来为大家剖析了一个微服务实战中常见的问题及解决方案。