阅读 90

Nacos配置中心2-配置文件的动态更新

0. 环境

  • nacos版本:1.4.1

  • Spring Cloud : 2020.0.2

  • Spring Boot :2.4.4

  • Spring Cloud alibaba: 2.2.5.RELEASE

  • Spring Cloud openFeign 2.2.2.RELEASE

测试代码:github.com/hsfxuebao/s…

当远程 Nacos Config Server 中的配置信息发生了变更, Nacos Config Client 是如何感知到 的呢?这里就来解决这个问题.

1. 长轮询模型

Nacos Config Server 中配置数据的变更, Nacos Config Client 是如何知道的呢? Nacos Config Server 采用了长轮询模型实现的变更通知。

一般情况下 Server 端数据的变更若要使 Client 感知到,可以选择两种模型:

  • Push 模型: 当 Server 端的数据发生了变更,其会主动将更新推送给 Client。 Push 模型 适合于 Client 数量不多,且 Server 端数据变化比较频繁的场景。 其实时性较好,但其需 要维护长连接, 占用系统资源

  • Pull 模型: 需要 Client 定时查看 Server 端数据是否更新。其实时性不好,且可能会产生数据更新的丢失

长轮询模型整合了 Push 与 Pull 模型的优势。 Client 仍定时发起 Pull 请求,查看 Server端数据是否更新。

  • 若发生了更新,则 Server 立即将更新数据以响应的形式发送给 Client 端;

  • 若没有发生更新, Server 端不会发送任何信息,但其会临时性的保持住这个连接一段时间。若在此时间段内, Server 端数据发生了变更,这个变更就会触发 Server 向 Client 发送变更结果。这次发送的执行,就是因为长连接的存在。若此期间仍未发生变更,则放弃这个连接。等待着下一次 Client 的 Pull 请求。

长轮询模型,是 Push 与 Pull 模型的整合,既减少了 Push 模型中长连接的被长时间维护 的时间,又降低了 Pull 模型实时性较差的问题

2. 流程解析

核心类 CacheData 是一个维护配置项和其下注册的所有监听器的实例。所有的 CacheData 都保存在 ClientWorker 类中的原子 cacheMap 中,其内部的核心成员有:

image.png

其中,content 是配置内容,MD5 值是用来检测配置是否发生变更的关键,内部还维护着一个若干监听器组成的数组,一旦发生变更则依次回调这些监听器。

大致的业务流程如下:

image.png 下面将根据内容对源码进行分析:

  • 客户端定时发送检测请求(发起长轮询)

  • 客户端将更新同步到应用实例

  • 服务端接收和响应长轮询请求

  • 用户主动发起配置变更

3. client定时发送检测请求

要想使用 nacos 的配置功能,需要引入相应的依赖包:

<dependency>     <groupId>com.alibaba.cloud</groupId>     <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> 复制代码

根据 springboot 自动装配的特性,我们找到对应的 spring.factories 文件,

image.png

点击进入到 NacosConfigBootstrapConfiguration 文件中,注意查看 nacosConfigManager() 方法
image.png

该方法中创建了一个 NacosConfigManager 对象,NacosConfigManager 对象的构造方法中调用了 createConfigService(nacosConfigProperties) 方法,用于创建 ConfigService 对象。

image.png

ConfigService 是一个接口,只有一个实现类 NacosConfigService,所以我们直接找到 NacosConfigService 的构造方法:

public NacosConfigService(Properties properties) throws NacosException {     ValidatorUtils.checkInitParam(properties);     String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);     if (StringUtils.isBlank(encodeTmp)) {         this.encode = Constants.ENCODE;     } else {         this.encode = encodeTmp.trim();     }     initNamespace(properties);     // nacos自研的httpClient     this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));     this.agent.start();     // todo 创建worker     this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); } 复制代码

接着查看 ClientWorker 的构造方法

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,         final Properties properties) {     this.agent = agent;     this.configFilterChainManager = configFilterChainManager;     // Initialize the timeout parameter     // 里面初始化了长轮询的超时时间,默认为 30s     init(properties);     // 创建一个线程池,仅包含一个核心线程, 用于执行后面的定时任务     this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {         @Override         public Thread newThread(Runnable r) {             Thread t = new Thread(r);             t.setName("com.alibaba.nacos.client.Worker." + agent.getName());             t.setDaemon(true);             return t;         }     });     // 创建一个线程池,用于执行后续的长轮询任务,其包含的核心线程数量为当前server的逻辑内核数量     // ---------- Netty源码中的相关内容 --------------     // EventLoopGroup  -> 线程池     // EventLoop -> 线程驱动     this.executorService = Executors             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {                 @Override                 public Thread newThread(Runnable r) {                     Thread t = new Thread(r);                     t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());                     t.setDaemon(true);                     return t;                 }             });     // 执行一个定时任务,初始化一个线程池,延迟 1 毫秒启动,之后每隔 10 毫秒执行一次,调用 checkConfigInfo() 方法     this.executor.scheduleWithFixedDelay(new Runnable() {         @Override         public void run() {             try {                 // todo 发出配置更新检测请求                 checkConfigInfo();             } catch (Throwable e) {                 LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);             }         }     }, 1L, 10L, TimeUnit.MILLISECONDS); } 复制代码

接下来看下 checkConfigInfo() 方法中做了什么:

public void checkConfigInfo() {     // Dispatch taskes.     // cacheMap的key为配置文件的key(配置文件名称+groupId)     // value为CacheData(每个配置文件都会有一个CacheData,用于存放来自于Server的配置文件数据)     int listenerSize = cacheMap.size();     // Round up the longingTaskCount.     // 向上取整     int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());     if (longingTaskCount > currentLongingTaskCount) {         for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {             // The task list is no order.So it maybe has issues when changing.             // todo 执行长轮询任务             // 创建了长轮询对象 LongPollingRunnable ,交由线程池执行             executorService.execute(new LongPollingRunnable(i));         }         currentLongingTaskCount = longingTaskCount;     } } 复制代码

这里有一个非常重要的对象 cacheMap,来看下它长啥样:

/**  * groupKey -> cacheData.  */ private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>(); 复制代码

cacheMap 的主要作用是用来存储监听变更的缓存集合,为了保障线程安全使用了 ConcurrentHashMap 的结构。

  • key 被称为 groupKey ,是由 dataId,group,tenant(租户)拼接而成的字符串;

  • value 为 CacheData 对象,每个 dataId 都会持有一个 CacheData 对象。

接回上面的 LongPollingRunnable 对象,它是 ClientWorker 的一个内部类,实现了 Runnable 接口,对于这种情况我们直接查看其 run() 方法:

class LongPollingRunnable implements Runnable {     private final int taskId;     public LongPollingRunnable(int taskId) {         this.taskId = taskId;     }     @Override     public void run() {         List<CacheData> cacheDatas = new ArrayList<CacheData>();         List<String> inInitializingCacheList = new ArrayList<String>();         try {             // ====================  第一部分 检查本地文件  ====================             // check failover config             // 遍历所有配置文件对应的cacheData             for (CacheData cacheData : cacheMap.values()) {                 if (cacheData.getTaskId() == taskId) {                     cacheDatas.add(cacheData);                     try {                         // todo 将本地配置文件内容更新到当前cacheData                         // 本地配置文件,远程配置文件,快照配置文件                         checkLocalConfig(cacheData);                         if (cacheData.isUseLocalConfigInfo()) {                             // 如果 isUseLocalConfigInfo 返回为 true, 表示缓存和本地配置不一致                             cacheData.checkListenerMd5();                         }                     } catch (Exception e) {                         LOGGER.error("get local config info error", e);                     }                 }             }             // ====================  第二部分 检查服务端文件  ====================             // check server config             // todo 从server端检测这些配置文件是否发生了变更             // 返回结果为所有发生了变更的配置文件的key             List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);             if (!CollectionUtils.isEmpty(changedGroupKeys)) {                 LOGGER.info("get changedGroupKeys:" + changedGroupKeys);             }             // 遍历所有发生了变更的配置文件key             for (String groupKey : changedGroupKeys) {                 String[] key = GroupKey.parseKey(groupKey);                 String dataId = key[0];                 String group = key[1];                 String tenant = null;                 if (key.length == 3) {                     tenant = key[2];                 }                 try {                     // todo 从server获取当前配置文件的最新内容                     String[] ct = getServerConfig(dataId, group, tenant, 3000L);                     // 获取到当前配置文件的cacheData                     CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));                     // 将更新过的内容写入到cacheData                     cache.setContent(ct[0]);                     if (null != ct[1]) {                         cache.setType(ct[1]);                     }                     LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",                             agent.getName(), dataId, group, tenant, cache.getMd5(),                             ContentUtils.truncateContent(ct[0]), ct[1]);                 } catch (NacosException ioe) {                     String message = String                             .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                                     agent.getName(), dataId, group, tenant);                     LOGGER.error(message, ioe);                 }             }             // 收尾工作             for (CacheData cacheData : cacheDatas) {                 if (!cacheData.isInitializing() || inInitializingCacheList                         .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {                     // todo 监听回调                     cacheData.checkListenerMd5();                     cacheData.setInitializing(false);                 }             }             inInitializingCacheList.clear();             // 启动下次的任务             executorService.execute(this);         } catch (Throwable e) {             // If the rotation training task is abnormal, the next execution time of the task will be punished             LOGGER.error("longPolling error : ", e);             executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);         }     } } 复制代码

这一段代码较长,拆分为两段来看:

3.1 校验本地文件

主要有两个重要方法:checkLocalConfig(cacheData)checkListenerMd5() 方法。

checkLocalConfig(cacheData)

这个方法的作用是校验本地文件,分为 3 种情况:

private void checkLocalConfig(CacheData cacheData) {     final String dataId = cacheData.dataId;     final String group = cacheData.group;     final String tenant = cacheData.tenant;     // 获取到本地配置文件     File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);     // 若配置的是不使用本地配置文件,但该配置文件在本地又存在,那就使用它     if (!cacheData.isUseLocalConfigInfo() && path.exists()) {         // 获取本地配置文件内容         String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);         // 计算出该内容的md5         final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);         // 设置 使用本地配置文件属性 为true         cacheData.setUseLocalConfigInfo(true);         // 将本地配置文件的最后修改时间写入到cacheData         cacheData.setLocalConfigInfoVersion(path.lastModified());         // 将本地配置文件内容写入到cacheData         cacheData.setContent(content);         LOGGER.warn(                 "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",                 agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));         return;     }     // If use local config info, then it doesn't notify business listener and notify after getting from server.     // 若配置的是使用本地配置文件,但这个文件又不存在     if (cacheData.isUseLocalConfigInfo() && !path.exists()) {         // 设置 useLocalConfigInfo 为 false 后直接返回         cacheData.setUseLocalConfigInfo(false);         LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),                 dataId, group, tenant);         return;     }     // When it changed.     // 若设置的是使用本地配置文件,且文件也存在,且这个本地配置文件发生了变化     if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path             .lastModified()) {         String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);         final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);         cacheData.setUseLocalConfigInfo(true);         cacheData.setLocalConfigInfoVersion(path.lastModified());         cacheData.setContent(content);         LOGGER.warn(                 "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",                 agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));     } } 复制代码

checkListenerMd5()

void checkListenerMd5() {     for (ManagerListenerWrap wrap : listeners) {         if (!md5.equals(wrap.lastCallMd5)) {             safeNotifyListener(dataId, group, content, type, md5, wrap);         }     } } 复制代码

如果 md5 值不一样,则发送数据变更通知,调用 safeNotifyListener 方法,方法的内容如下:

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,         final String md5, final ManagerListenerWrap listenerWrap) {     final Listener listener = listenerWrap.listener;     // 创建一个 job 对象,用于异步执行     Runnable job = new Runnable() {         @Override         public void run() {             ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();             ClassLoader appClassLoader = listener.getClass().getClassLoader();             try {                 // 如果是 AbstractConfigChangeListener ,创建 ConfigChangeEvent 对象                 if (listener instanceof AbstractSharedListener) {                     AbstractSharedListener adapter = (AbstractSharedListener) listener;                     adapter.fillContext(dataId, group);                     LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);                 }                 // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。                 Thread.currentThread().setContextClassLoader(appClassLoader);                 ConfigResponse cr = new ConfigResponse();                 cr.setDataId(dataId);                 cr.setGroup(group);                 cr.setContent(content);                 configFilterChainManager.doFilter(null, cr);                 String contentTmp = cr.getContent();                 listener.receiveConfigInfo(contentTmp);                 // compare lastContent and content                 if (listener instanceof AbstractConfigChangeListener) {                     Map data = ConfigChangeHandler.getInstance()                             .parseChangeData(listenerWrap.lastContent, content, type);                     ConfigChangeEvent event = new ConfigChangeEvent(data);                     ((AbstractConfigChangeListener) listener).receiveConfigChange(event);                     listenerWrap.lastContent = content;                 }                 listenerWrap.lastCallMd5 = md5;                 LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,                         listener);             } catch (NacosException ex) {                 LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",                         name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());             } catch (Throwable t) {                 LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,                         group, md5, listener, t.getCause());             } finally {                 Thread.currentThread().setContextClassLoader(myClassLoader);             }         }     };     final long startNotify = System.currentTimeMillis();     try {         if (null != listener.getExecutor()) {             // 执行             listener.getExecutor().execute(job);         } else {             job.run();         }     } catch (Throwable t) {         LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,                 group, md5, listener, t.getCause());     }     final long finishNotify = System.currentTimeMillis();     LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",             name, (finishNotify - startNotify), dataId, group, md5, listener); } 复制代码

这个方法中,对 dataId 注册过监听的客户端推送变更后的数据内容。客户端接收通知后通过receiveConfigInfo() 方法接收回调数据,处理自身业务。

3.2 第二部分 - 检查服务端文件

这里面有两个重要方法:checkUpdateDataIds()getServerConfig() 方法:

checkUpdateDataIds()

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {     StringBuilder sb = new StringBuilder();     // 将cacheDatas中的所有配置文件名称全部拼接为String,发送给Server,     // 让Server去检测这些文件是否发生了变更     for (CacheData cacheData : cacheDatas) {         if (!cacheData.isUseLocalConfigInfo()) {             sb.append(cacheData.dataId).append(WORD_SEPARATOR);             sb.append(cacheData.group).append(WORD_SEPARATOR);             if (StringUtils.isBlank(cacheData.tenant)) {                 sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);             } else {                 sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);                 sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);             }             if (cacheData.isInitializing()) {                 // It updates when cacheData occours in cacheMap by first time.                 inInitializingCacheList                         .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));             }         }     }     boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();     // todo 将这个配置文件名称String发送给Server     return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); } 复制代码

该方法中调用了 checkUpdateConfigStr(sb.toString(), isInitializingCacheList) 方法,从服务器中获取 dataId 列表,请求头中加入了长轮询的标识:

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {     Map<String, String> params = new HashMap<String, String>(2);     params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);     Map<String, String> headers = new HashMap<String, String>(2);     // 这里在请求头中塞了一个 "Long-Pulling-Timeout" 标识,这个是服务端长轮询的判断条件,非常重要     headers.put("Long-Pulling-Timeout", "" + timeout);     // told server do not hang me up if new initializing cacheData added in     if (isInitializingCacheList) {         headers.put("Long-Pulling-Timeout-No-Hangup", "true");     }     if (StringUtils.isBlank(probeUpdateString)) {         return Collections.emptyList();     }     try {         // In order to prevent the server from handling the delay of the client's long task,         // increase the client's read timeout to avoid this problem.         long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);         // 提交请求   调用服务端接口:/v1/cs/configs/listener         HttpRestResult<String> result = agent                 .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),                         readTimeoutMs);         if (result.ok()) {             setHealthServer(true);             // todo 解析result,返回所有发生变更的配置文件的key             return parseUpdateDataIdResponse(result.getData());         } else {             setHealthServer(false);             LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),                     result.getCode());         }     }      ...     return Collections.emptyList(); } 复制代码

getServerConfig() 方法

checkUpdateDataIds() 方法执行完成后,得到了有更新的 changedGroupKeys,循环 changedGroupKeys 列表,调用 getServerConfig() 方法,获取服务端的配置:

public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)         throws NacosException {     String[] ct = new String[2];     if (StringUtils.isBlank(group)) {         group = Constants.DEFAULT_GROUP;     }     HttpRestResult<String> result = null;     try {         Map<String, String> params = new HashMap<String, String>(3);         if (StringUtils.isBlank(tenant)) {             params.put("dataId", dataId);             params.put("group", group);         } else {             params.put("dataId", dataId);             params.put("group", group);             params.put("tenant", tenant);         }         // 发送请求 调用 /v1/cs/configs,获取配置信息         result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);     } catch (Exception ex) {         String message = String                 .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",                         agent.getName(), dataId, group, tenant);         LOGGER.error(message, ex);         throw new NacosException(NacosException.SERVER_ERROR, ex);     }     ... } 复制代码

至此,客户端发起长轮询的代码已经分析完成。

3.3 方法调用图

image.png

3.4 小节

ClientWorker 通过其下的两个线程池完成配置长轮询的工作,一个是单线程的 executor,每隔 10ms 按照每 3000 个配置项为一批次捞取待轮询的 cacheData 实例,将其包装成为一个 LongPollingTask 提交进入第二个线程池 executorService 处理。

image.png 该长轮询任务内部主要分为四步:

  1. 检查本地配置,忽略本地快照不存在的配置项,检查是否存在需要回调监听器的配置项

  2. 如果本地没有配置项的,从服务端拿,返回配置内容发生变更的键值列表

  3. 每个键值再到服务端获取最新配置,更新本地快照,补全之前缺失的配置

  4. 检查 MD5 标签是否一致,不一致需要回调监听器

如果该轮询任务抛出异常,等待一段时间再开始下一次调用,减轻服务端压力。另外,Nacos 在 HTTP 工具类中也有限流器的代码,通过多种手段降低轮询或者大流量情况下的风险。下文还会讲到,如果在服务端没有发现变更的键值,那么服务端会夯住这个 HTTP 请求一段时间(客户端侧默认传递的超时是 30s),以此进一步减轻客户端的轮询频率和服务端的压力。

4. Client将更新同步到应用实例

要解决的问题有两个:

  • config client 的每个配置文件对应的 cacheData 是什么时候创建的?

  • 如何将更新过的 cacheData 中的数据同步到应用实例的?

总思路:

  • config client 启动时会为每个其所需要的配置文件创建一个本地缓存 CacheData,并为 每个 CacheData 添加一个监听器。 一旦监听到 CacheData 中数据发生了变更,就会引发监听 回调的执行。该回调并未直接从 CacheData 中读取变更数据,而是发布了一个刷新事件 RefreshEvent。该事件能够触发所有被@RefreshScope 注解的类实例被重新创建并初始化,而 初始化时使用的会自动更新的属性(被@Value 注解的属性)值就来自于 CacheData

4.1 入口

@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) public class NacosConfigAutoConfiguration {    ...    @Bean    public NacosContextRefresher nacosContextRefresher(          NacosConfigManager nacosConfigManager,          NacosRefreshHistory nacosRefreshHistory) {       // Consider that it is not necessary to be compatible with the previous       // configuration       // and use the new configuration if necessary.       return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);    } } 复制代码

com.alibaba.cloud.nacos.refresh.NacosContextRefresher#onApplicationEvent:

public class NacosContextRefresher       implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {    ...    @Override    public void onApplicationEvent(ApplicationReadyEvent event) {       // many Spring context       // 设置ready状态为true       if (this.ready.compareAndSet(false, true)) {          // todo 注册监听          this.registerNacosListenersForApplications();       }    }        private void registerNacosListenersForApplications() {    if (isRefreshEnabled()) {       // 遍历当前应用所需要的所有配置文件       for (NacosPropertySource propertySource : NacosPropertySourceRepository             .getAll()) {          // 若当前遍历的配置文件不会自动刷新,则直接跳过          if (!propertySource.isRefreshable()) {             continue;          }          // 获取当前遍历的配置文件名称          String dataId = propertySource.getDataId();          // todo 添加监听器          registerNacosListener(propertySource.getGroup(), dataId);       }    }         private void registerNacosListener(final String groupKey, final String dataKey) {        // 构建出配置文件key,格式为:  配置文件名,groupId        String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);        // 若指定的文件key没有相应的监听器,则创建一个并写入到缓存listenerMap中        // 这个map的key为文件key,value为其对应的监听器        // 若指定的文件key具有相应的监听器,则不进行创建        // 但这个computeIfAbsent()返回结果为指定key对应的监听器        Listener listener = listenerMap.computeIfAbsent(key,              lst -> new AbstractSharedListener() {                 @Override                 public void innerReceive(String dataId, String group,                       String configInfo) {                    // 计数器增一                    refreshCountIncrement();                    // 将本次更新记录到刷新历史缓存中                    nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);                    // todo feature: support single refresh for listening                    // 发布一个RefreshEvent事件                    applicationContext.publishEvent(                          new RefreshEvent(this, null, "Refresh Nacos config"));                    if (log.isDebugEnabled()) {                       log.debug(String.format(                             "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",                             group, dataId, configInfo));                    }                 }              });        try {           // todo 将监听器注册到configService           configService.addListener(dataKey, groupKey, listener);        }        catch (NacosException e) {           log.warn(String.format(                 "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,                 groupKey), e);        }     } } 复制代码

com.alibaba.nacos.client.config.NacosConfigService#addListener:

@Override public void addListener(String dataId, String group, Listener listener) throws NacosException {     // todo 添加监听器     worker.addTenantListeners(dataId, group, Arrays.asList(listener)); } // com.alibaba.nacos.client.config.impl.ClientWorker#addTenantListeners public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)         throws NacosException {     group = null2defaultGroup(group);     String tenant = agent.getTenant();     // todo 获取到指定文件对应的CacheData     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);     // 将监听器添加到CacheData     for (Listener listener : listeners) {         // todo         cache.addListener(listener);     } } 复制代码

核心方法addCacheDataIfAbsent:

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {     // 构建文件key,格式为:  配置文件名+groupId     String key = GroupKey.getKeyTenant(dataId, group, tenant);     // // 从缓存cacheMap中获取指定文件的cacheData, 若缓存中有该cacheData,则直接返回     CacheData cacheData = cacheMap.get(key);     if (cacheData != null) {         return cacheData;     }     // 创建CacheData     cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);     // multiple listeners on the same dataid+group and race condition     // 将cacheData写入到cacheMap     CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);     if (lastCacheData == null) {         //fix issue # 1317         if (enableRemoteSyncConfig) {             // todo 从远程config server获取最新的配置信息             String[] ct = getServerConfig(dataId, group, tenant, 3000L);             // 将获取的最新数据写入到cacheData             cacheData.setContent(ct[0]);         }         int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();         cacheData.setTaskId(taskId);         lastCacheData = cacheData;     }     // reset so that server not hang this check     lastCacheData.setInitializing(true);     LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);     MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());     return lastCacheData; } 复制代码

核心方法addListener

public void addListener(Listener listener) {     if (null == listener) {         throw new IllegalArgumentException("listener is null");     }     // 将监听器包装为wrap     ManagerListenerWrap wrap =             (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)                     : new ManagerListenerWrap(listener, md5);     // 将这个监听器wrap添加到当前CacheData的所有监听器集合listeners中     if (listeners.addIfAbsent(wrap)) {         LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,                 listeners.size());     } } 复制代码

4.2 方法调用图

image.png

5. Server处理Client配置变更检测请求

config server 接收到 config client 的配置变更检测请求后,首先会解析出请求指定的 要检测的所有目标配置文件,同时也会解析出 client 对处理方式的要求。 server 的处理方式 有四种类型:

  • 短轮询: 没有长连接维护。 server 接收到 client 请求后立即轮询检测所有目标配置文件 是否发生变更, 并将检测结果立即返回 client。不过,这个返回的结果与 nacos client 的 版本有密切关系,版本不同形成的结果不同。

  • 固定时长的长轮询: server 接收到 client 请求后,会直接维护一个指定的固定时长的长 连接,默认 30s。长连接结束前会检测一次是否发生配置变更。不过,在长连接维护期 间是不检测配置变更情况的。

  • 不挂起的非固定时长的长轮询: 与短轮询类似。 server 接收到 client 请求后立即轮询检 测所有目标配置文件是否发生变更,并将检测结果立即返回 client。与短轮询不同的是, 其返回结果与 nacos client 版本无关。

  • 挂起的非固定时长的长轮询: server 接收到 client 请求后,会先检测是否发生了配置变 更。若发生了,则直接返回 client,关闭连接。若未发生,则首先会将这个长轮询实例 写入到一个缓存队列 allSubs,然后维护一个 30s 的长连接(这个时间用户不能指定)。 时间结束,长连接直接关闭。在该长连接维护期间,系统同时监听着配置变更事件,一 旦发生变更就会立即将变更发送给相应的长轮询对应的 client,关闭连接。

配置文件是否发生变更,是如何判断的?

  • 就是把来自于 client 的配置文件的 md5 与 server 端配置文件的 md5 进行对比。若相等,则 未发生变更,否则,发生了变更。

server 创建的客户端长轮询实例,是为谁创建的?是为每个 config client 都会创建一个长轮 询实例?还是为每个配置文件都创建一个长轮询实例?

  • 都不是。是为每次的配置更新检测请求创建一个长轮询实例。

5.1 源码分析

在服务端代码 com.alibaba.nacos.config.server.controller.ConfigController#listener 中,找到客户端请求的 /v1/cs/configs/listener 接口:

@PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response)         throws ServletException, IOException {     request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);     // 从请求中获取要检测的目标配置文件     String probeModify = request.getParameter("Listening-Configs");     if (StringUtils.isBlank(probeModify)) {         throw new IllegalArgumentException("invalid probeModify");     }     // 使用UTF-8对目标配置文件字符串解码     probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);     Map<String, String> clientMd5Map;     try {         // todo 将目标配置文件字符串解析为map         clientMd5Map = MD5Util.getClientMd5Map(probeModify);     } catch (Throwable e) {         throw new IllegalArgumentException("invalid probeModify");     }     // do long-polling     // todo 轮询处理     inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); } 复制代码

该接口中调用了 doPollingConfig() 方法:

/**  * 轮询接口.  */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,         Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {     // Long polling.     // 判断当前请求是否指定为长轮询处理方式     // 若请求头中包含了长轮询timeout,则表示处理方式为长轮询处理方式     if (LongPollingService.isSupportLongPolling(request)) {         // todo 长轮询处理方式         longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);         return HttpServletResponse.SC_OK + "";     }     // Compatible with short polling logic.     // ------------------- 以下是短轮询处理逻辑 ---------------------     // 获取目标配置中所有发生了配置变更的文件key     List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);     // Compatible with short polling result.     // 将变更文件key形成两种不同的结果,以兼容不同版本的client     String oldResult = MD5Util.compareMd5OldResult(changedGroups);     String newResult = MD5Util.compareMd5ResultString(changedGroups);     // 从请求头中获取client版本     String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);     if (version == null) {         version = "2.0.0";     }     int versionNum = Protocol.getVersionNumber(version);     // Befor 2.0.4 version, return value is put into header.     // 根据不同的client版本,构建出不同的响应形式     if (versionNum < START_LONG_POLLING_VERSION_NUM) {         response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);         response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);     } else {         request.setAttribute("content", newResult);     }     Loggers.AUTH.info("new content:" + newResult);     // Disable cache.     response.setHeader("Pragma", "no-cache");     response.setDateHeader("Expires", 0);     response.setHeader("Cache-Control", "no-cache,no-store");     response.setStatus(HttpServletResponse.SC_OK);     return HttpServletResponse.SC_OK + ""; } 复制代码

记住这个 LongPollingService 对象,非常重要,后面还会用到

addLongPollingClient() 这个方法比较重要,详细代码如下:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,         int probeRequestSize) {     // 从请求中获取长连接维护的时长 就是客户端提交请求的超时时间,默认为 30s     String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);     // 从请求中获取长连接是否不进行挂起(boolean),该属性是针对 非固定时长长轮询 的     String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);     String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);     String tag = req.getHeader("Vipserver-Tag");     // 这个时间是长连接提前执行关闭任务的时间,即长连接真正维护的时长是29.5s     int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);     // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.     // 是长连接真正维护的时长,29.5s     long timeout = Math.max(10000, Long.parseLong(str) - delayTime);     // 处理固定时长的长轮询情况:仅获取了用户指定的长连接的时长     if (isFixedPolling()) {         timeout = Math.max(10000, getFixedPollingInterval());         // Do nothing but set fix polling timeout.     // 处理非固定时长的长轮询情况     } else {         long start = System.currentTimeMillis();         // todo 获取目标配置中所有发生了配置变更的文件key         List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);         // 处理有配置变更发生的情况         if (changedGroups.size() > 0) {             // 生成response,并关闭连接             generateResponse(req, rsp, changedGroups);             LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",                     RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                     changedGroups.size());             return;         // 处理没有配置变更发生,且非固定时长长连接不挂起的情况:直接关闭连接         } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {             LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                     RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                     changedGroups.size());             return;         }     }     // 在哪种情况下代码会走到这里?     // 1)固定时长的长轮询     // 2)挂起的非固定时长的长轮询     // 从请求中获取提交请求的config client的ip     String ip = RequestUtil.getRemoteIp(req);     // Must be called by http thread, or send response.     final AsyncContext asyncContext = req.startAsync();     // AsyncContext.setTimeout() is incorrect, Control by oneself     asyncContext.setTimeout(0L);     // todo 立即执行客户端长轮询任务ClientLongPolling     ConfigExecutor.executeLongPolling(             new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } 复制代码

该方法中,对 md5 进行比较,如果不相同,说明文件内容已经变更,调用 generateResponse() 直接响应客户端。如果配置项没有变更,创建一个 ClientLongPolling 对象,交给定时线程池处理。

ClientLongPolling 对象实现了 Runnable 接口,我们直接看它的 run 方法,内容如下:

class ClientLongPolling implements Runnable {     @Override     public void run() { // 外层run()         // 定义并执行一个异步定时任务,并放入线程池中,延时 29.5s 执行一次         asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {             // 29.5s后会执行这个内层run()             @Override             public void run() {  // 内层run()                 try {                     // retainIps是一个缓存map,其中存放着当前server所hold的                     // 所有长轮询实例对应的client最近访问server的时间                     getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());                     // Delete subsciber's relations.                     // 将当前长轮询实例从allSubs中删除,因为当前长连接马上就要关闭了                     allSubs.remove(ClientLongPolling.this);                     // 处理固定时长长轮询的情况                     if (isFixedPolling()) {                         LogUtil.CLIENT_LOG                                 .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",                                         RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                         "polling", clientMd5Map.size(), probeRequestSize);                         // 获取目标配置中所有发生了配置变更的文件key                         List<String> changedGroups = MD5Util                                 .compareMd5((HttpServletRequest) asyncContext.getRequest(),                                         (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);                         // 将结果返回                         if (changedGroups.size() > 0) {                             sendResponse(changedGroups);                         } else {                             sendResponse(null);                         }                     // 处理挂起的非固定时长长轮询的情况:向client返回一个空的响应,并关闭连接                     } else {                         LogUtil.CLIENT_LOG                                 .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",                                         RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                         "polling", clientMd5Map.size(), probeRequestSize);                         // todo                         sendResponse(null);                     }                 } catch (Throwable t) {                     LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());                 }             }  // end-内层run()         }, timeoutTime, TimeUnit.MILLISECONDS);         // allSubs是一个缓存队列,其中存放着当前config server所hold的所有长轮询实例         // 将当前长轮询实例写入到缓存队列         allSubs.add(this);     } // end-外层run() } 复制代码

run 方法中有两个逻辑,先看第二个,将当前的长轮询对象放入 allSubs 中,allSubs 是一个队列,定义如下:

    /**      * ClientLongPolling subscibers.      */     final Queue<ClientLongPolling> allSubs; 复制代码

再看第一个方法,创建了一个 Runnable 对象,并放入线程池中,延时 29.5s 执行一次。如果这个期间配置没有发生变更,正常返回客户端;如果配置有变更(md5 比较不相同),则调用 sendResponse(changedGroups) 方法响应客户端。

5.2 方法调用图

image.png

5.3 小节

客户端会有一个长轮询任务,拉取服务端的配置变更,那么服务端是如何处理这个长轮询任务的呢?源码逻辑位于 LongPollingService 类,其中有一个 Runnable 任务名为 ClientLongPolling,服务端会将受到的轮询请求包装成一个 ClientLongPolling 任务,该任务持有一个 AsyncContext 响应对象(Servlet 3.0 的新机制),通过定时线程池延后 29.5s 执行。

为什么比客户端 30s 的超时时间提前 500ms 返回是为了最大程度上保证客户端不会因为网络延时造成超时

image.png

这里需要注意的是,在 ClientLongPolling 任务被提交进入线程池待执行的同时,服务端也通过一个队列 allSubs 保存了所有正在被夯住的轮询请求,这是因为在配置项被夯住的期间内,如果用户通过管理平台操作了配置项变更、或者服务端该节点收到了来自其他节点的 dump 刷新通知,那么都应立即取消夯住的任务,及时通知客户端数据发生了变更。

为了达到这个目的,LongPollingService 类继承自 Event 接口,实际上本身是个事件触发器,需要实现 onEvent 方法,其事件类型是 LocalDataChangeEvent

当服务端在请求被夯住的期间接收到某项配置变更时,就会发布一个 LocalDataChangeEvent 类型的事件通知(注意同上文中的 ConfigDataChangeEvent 区别),之后会将这个变更包装成一个 DataChangeTask 异步执行,内容就是从 allSubs 中找出夯住的 ClientLongPolling 请求,写入变更强制其立即返回。

6. 用户主动发起配置变更

也就是server 在长轮询期间对配置变更的感知。总思路:

Nacos Config Server 启动时会创建 LongPollingService 实例,而在创建 LongPollingService实例时,会首先创建一个 allSubs 队列,同时还会注册一个 LocalDataChangeEvent 的订阅者。一旦 Server 中保存的配置发生变更,就会触发订阅者的回调方法的执行。而回调方法则会引发 DataChangeTask 的异步任务执行。

DataChangeTask 任务就是从当前 server 所 hold 的所有长轮询实例集合 allSubs 中查找,这个发生了变更的配置文件是哪个长轮询实例对应的 client 要检测的变更,然后将这个发生变更的配置文件key发送给这个长轮询对应的client,并将这个长轮询实例从allSubs中删除。

6.1 源码分析

用户修改了数据,会调用 /nacos/v1/cs/configs 接口,对应的代码入口为: com.alibaba.nacos.config.server.controller.ConfigController#publishConfig,代码如下:

@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,     // .... 省略其它代码     if (StringUtils.isBlank(betaIps)) {         if (StringUtils.isBlank(tag)) {             persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);             //  调用通知方法 notifyConfigChange             ConfigChangePublisher                     .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));         } else {             persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);             //  调用通知方法 notifyConfigChange             ConfigChangePublisher.notifyConfigChange(                     new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));         }     } else {         // beta publish         persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);         //  调用通知方法 notifyConfigChange         ConfigChangePublisher                 .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));     }     ConfigTraceService             .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),                     ConfigTraceService.PERSISTENCE_EVENT_PUB, content);     return true; } 复制代码

可以看到,notifyConfigChange() 会在三个地方被调用,创建了 ConfigDataChangeEvent 事件,该事件只有一个接收者 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService

@Autowired public AsyncNotifyService(ServerMemberManager memberManager) {      // .... 省略其它代码                  @Override         public void onEvent(Event event) {             // ... 省略其它代码                 if (!httpQueue.isEmpty()) {                     // 创建了一个 AsyncTask 对象                     ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));                 }                 if (!rpcQueue.isEmpty()) {                     ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));                 }                              }         }                  // ... 省略其它代码     }); } 复制代码

onEvent() 方法中,创建了一个 AsyncTask 对象,用于通知配置文件有变更的操作,AsyncTask 对象的 run 方法内容为:

@Override public void run() {     executeAsyncInvoke(); } private void executeAsyncInvoke() {     while (!queue.isEmpty()) {         // ... 省略其它代码                 // 这里会调用 /v1/cs/communication/dataChange 接口,通知配置有变动                 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));             }         }     } } 复制代码

根据调用的 /v1/cs/communication/dataChange 接口,找到对应的代码:com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo

 @GetMapping("/dataChange") public Boolean notifyConfigInfo( // .... 省略部分代码){     // ... 省略其它代码     if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {         // 调用 dump() 方法         dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);     } else {         dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);     }     return true; } 复制代码

dump 方法的内容如下:

public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {     String groupKey = GroupKey2.getKey(dataId, group, tenant);     String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));     // 主要方法在这里,创建了一个 task 任务     dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));     DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); } 复制代码

继续看 addTask() 方法,可以追溯到 com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#addTask 的方法。而在 NacosDelayTaskExecuteEngine 类的构造方法中,可以看到如下代码:

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {     super(logger);     tasks = new ConcurrentHashMap<>(initCapacity);     processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));     // 创建了一个 ProcessRunnable 对象     processingExecutor             .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } 复制代码

查看 ProcessRunnable 对象的 run 方法,会调用 processTasks() 方法。processTasks() 方法中又会调用 getProcessor 获取对应的任务处理器

protected void processTasks() {     // 获取所有的task     Collection<Object> keys = getAllTaskKeys();     // 遍历     for (Object taskKey : keys) {         //获取任务         AbstractDelayTask task = removeTask(taskKey);         if (null == task) {             continue;         }         // 获取processor         NacosTaskProcessor processor = getProcessor(taskKey);         if (null == processor) {             getEngineLog().error("processor not found for task, so discarded. " + task);             continue;         }         try {             // ReAdd task if process failed             // todo 如果处理失败的话,就进行重试             if (!processor.process(task)) {                 retryFailedTask(taskKey, task);             }         } catch (Throwable e) {             getEngineLog().error("Nacos task execute error : " + e.toString(), e);             // 重试             retryFailedTask(taskKey, task);         }     } } 复制代码

这里获取到的是 DumpProcessor 对象,查看 DumpProcessorprocess 方法,com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process

@Override public boolean process(NacosTask task) {     final PersistService persistService = dumpService.getPersistService();     // .... 省略部分代码     // 如果是 beta 版本,这里不考虑     if (isBeta) {        // .... 省略部分代码     }     // 设置一些值     if (StringUtils.isBlank(tag)) {         ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);         build.remove(Objects.isNull(cf));         build.content(Objects.isNull(cf) ? null : cf.getContent());         build.type(Objects.isNull(cf) ? null : cf.getType());     } else {         // 如果 tag 不为空,这里不考虑 ...省略部分代码     }     // 关键代码     return DumpConfigHandler.configDump(build.build()); } 复制代码

configDump() 方法中的代码是:

public static boolean configDump(ConfigDumpEvent event) {         // .... 省略部分代码         if (event.isBeta()) {             // beta 版本的不考虑,  .... 省略部分代码         }         if (StringUtils.isBlank(event.getTag())) {             // .... 省略部分代码             if (!event.isRemove()) {                 // 重要代码,调用 dump() 方法                 result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);                 // 记录日志                 if (result) {                     ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),                             ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,                             content.length());                 }             } else {                 // .... 省略部分代码             }             return result;         } else {            // 存在 tag 的暂时不考虑,  .... 省略部分代码             return result;         }              } 复制代码

好了,兜兜转转这么久,终于到了最重要的一段方法,这段代码主要做两件事情:

  • dump() 方法保存配置文件并更新 md5

  • ConfigTraceService.logDumpEvent() 方法记录日志,这个方法就不展开说了

dump() 方法中,最重要的方法是:updateMd5(groupKey, md5, lastModifiedTs) ,这个方法的内容如下:

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {     CacheItem cache = makeSure(groupKey);     if (cache.md5 == null || !cache.md5.equals(md5)) {         cache.md5 = md5;         cache.lastModifiedTs = lastModifiedTs;         // 创建了一个 LocalDataChangeEvent 事件         NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));     } } 复制代码

这个方法中创建了一个 LocalDataChangeEvent 事件对象并发送,那这个事件是在哪里处理的呢??还记得之前的长轮询对象 LongPollingService 对象么,来看下它的构造方法:

public LongPollingService() {     // 创建allSubs队列。该队列中存放的是当前server所hold的所有长轮询实例     allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();     // 定义并启动一个定时任务,用于进行长轮询统计     ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);     // Register LocalDataChangeEvent to NotifyCenter.     // 将LocalDataChangeEvent注册到NotifyCenter,这样的话,     // 注册到NotifyCenter的订阅者就会被LocalDataChangeEvent事件给触发     NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);     // Register A Subscriber to subscribe LocalDataChangeEvent.     // 向NotifyCenter注册一个LocalDataChangeEvent事件的订阅者     NotifyCenter.registerSubscriber(new Subscriber() {         // 回调方法:一旦发生了数据变更事件,就会触发该方法的执行         @Override         public void onEvent(Event event) {             // 若当前长连接为固定时长的,那么忽略本次变更             if (isFixedPolling()) {                 // Ignore.             // 若当前长连接为挂起的非固定时长的,则进行相关处理~             } else {                 if (event instanceof LocalDataChangeEvent) {                     LocalDataChangeEvent evt = (LocalDataChangeEvent) event;                     // 立即执行DataChangeTask任务                     ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));                 }             }         }         @Override         public Class<? extends Event> subscribeType() {             return LocalDataChangeEvent.class;         }     }); } 复制代码

可以看到 LongPollingService 接受到 LocalDataChangeEvent 对象之后,创建了一个 DataChangeTask 对象,看下它的 run() 方法:

@Override public void run() {     try {         ConfigCacheService.getContentBetaMd5(groupKey);         // 循环 allSubs 队列,取出所有的长连接         for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {             ClientLongPolling clientSub = iter.next();             if (clientSub.clientMd5Map.containsKey(groupKey)) {                 // If published tag is not in the beta list, then it skipped.                 if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {                     continue;                 }                 // If published tag is not in the tag list, then it skipped.                 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {                     continue;                 }                 getRetainIps().put(clientSub.ip, System.currentTimeMillis());                 iter.remove(); // Delete subscribers' relationships.                 LogUtil.CLIENT_LOG                         .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",                                 RequestUtil                                         .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),                                 "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);                 // 响应客户端请求                 clientSub.sendResponse(Arrays.asList(groupKey));             }         }     } catch (Throwable t) {         LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));     } } 复制代码

这一步就是拿出队列中所有的长轮询对象并响应,客户端在接收到响应后会请求 /v1/cs/configs 接口获取最新的配置。至此,所有的流程就全部串起来了。

6.2 方法调用图

image.png

6.3 小节

Nacos 服务端是一个 SpringBoot 实现的服务,注册配置主要代码位于 ConfigController 和 ConfigServletInner 中。服务端一般是多节点部署的集群,因此请求一开始只会打到一台机器,这台机器将配置插入 MySQL 中进行持久化,这部分代码很简单不再赘述。

因为服务端并不是针对每次配置查询都去访问 MySQL 的,而是会依赖 dump 功能在本地文件中将配置缓存起来。因此当单台机器保存完毕配置之后,需要通知其他机器刷新内存和本地磁盘中的文件内容,因此它会发布一个名为 ConfigDataChangeEvent 的事件,这个事件会通过 HTTP 调用通知所有集群节点(包括自身),触发本地文件和内存的刷新。

image.png

结合服务端处理长轮询任务,因此完整的流程如下,如果非接收请求的节点,那么忽略第一步持久化配置后开始:

image.png

参考文章

nacos-1.4.1-source源码注释
spring-cloud-alibaba-2020.0.0-src
springcloud-source-study学习github地址
Spring Cloud Alibaba源码解析
Nacos 配置中心源码分析
图文解析 Nacos 配置中心的实现
nacos 动态配置源码解析


作者:hsfxuebao
链接:https://juejin.cn/post/7170531612700966920


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐