阅读 121

Nacos配置中心之客户端长轮询

11Nacos配置中心之客户端长轮询

客户端长轮询定时任务是在NacosFactory的createConfigService构建ConfigService对象实例的时候启动的

createConfigService

public static ConfigService createConfigService(String serverAddr) throws NacosException {     return ConfigFactory.createConfigService(serverAddr); } 复制代码

public class ConfigFactory {     /**      * Create Config      *      * @param properties init param      * @return ConfigService      * @throws NacosException Exception      */     public static ConfigService createConfigService(Properties properties) throws NacosException {         try {             Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");             Constructor constructor = driverImplClass.getConstructor(Properties.class);             ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);             return vendorImpl;         } catch (Throwable e) {             throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);         }     }     /**      * Create Config      *      * @param serverAddr serverList      * @return Config      * @throws ConfigService Exception      */     public static ConfigService createConfigService(String serverAddr) throws NacosException {         Properties properties = new Properties();         properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);         return createConfigService(properties);     } } 复制代码

  1. 通过Class.forName加载NacosConfigService类

  2. 使用反射来完成NacosConfigService类的实例化

NacosConfigService构造

NacosConfigService构造方法:

public NacosConfigService(Properties properties) throws NacosException {     String encodeTmp = properties.getProperty("encode");     if (StringUtils.isBlank(encodeTmp)) {         this.encode = "UTF-8";     } else {         this.encode = encodeTmp.trim();     }     this.initNamespace(properties);     this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));     this.agent.start();     this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); } 复制代码

  1. 初始化HttpAgent,使用了装饰器模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加监控统计信息

  2. ClientWorker是客户端的工作类,agent作为参数传入ClientWorker,用agent做一些远程调用

ClientWorker构造

ClientWorker的构造函数:

@SuppressWarnings("PMD.ThreadPoolCreationRule")     public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {         this.agent = agent;         this.configFilterChainManager = configFilterChainManager;         // Initialize the timeout parameter         init(properties);         executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread t = new Thread(r);                 t.setName("com.alibaba.nacos.client.Worker." + agent.getName());                 t.setDaemon(true);                 return t;             }         });         executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread t = new Thread(r);                 t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());                 t.setDaemon(true);                 return t;             }         });         executor.scheduleWithFixedDelay(new Runnable() {             @Override             public void run() {                 try {                     checkConfigInfo();                 } catch (Throwable e) {                     LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);                 }             }         }, 1L, 10L, TimeUnit.MILLISECONDS);     } 复制代码

  1. 构建定时调度的线程池,第一个线程池executor只拥有一个核心线程,每隔10s执行一次checkConfigInfo()方法,功能就是每10ms检查一次配置信息

  2. 第二个线程池executorService只完成了初始化,后续用于客户端的定时长轮询功能。

checkConfigInfo方法:

public void checkConfigInfo() {     // 分任务     int listenerSize = cacheMap.get().size();     // 向上取整为批数     int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());     if (longingTaskCount > currentLongingTaskCount) {         for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {             // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题             executorService.execute(new LongPollingRunnable(i));         }         currentLongingTaskCount = longingTaskCount;     } } 复制代码

检查配置是否发生变化

cacheMap用来存储监听变更的缓存集合,key是根据dataID/group/tenant拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。

默认情况下每个长轮询LongPollingRunnable任务处理3000个监听配置集,超过3000个启动多个LongPollingRunnable执行。

LongPollingRunnable

LongPollingRunnable是一个线程,我们可以直接找到LongPollingRunnable里面的run方法

class LongPollingRunnable implements Runnable {     private int taskId;     public LongPollingRunnable(int taskId) {         this.taskId = taskId;     }     @Override     public void run() {         List<CacheData> cacheDatas = new ArrayList<CacheData>();         List<String> inInitializingCacheList = new ArrayList<String>();         try {             // check failover config             for (CacheData cacheData : cacheMap.get().values()) {                 if (cacheData.getTaskId() == taskId) {                     cacheDatas.add(cacheData);                     try {                         checkLocalConfig(cacheData);                         if (cacheData.isUseLocalConfigInfo()) {                             cacheData.checkListenerMd5();                         }                     } catch (Exception e) {                         LOGGER.error("get local config info error", e);                     }                 }             }             // check server config             List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);             for (String groupKey : changedGroupKeys) {                 String[] key = GroupKey.parseKey(groupKey);                 String dataId = key[0];                 String group = key[1];                 String tenant = null;                 if (key.length == 3) {                     tenant = key[2];                 }                 try {                     String content = getServerConfig(dataId, group, tenant, 3000L);                     CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));                     cache.setContent(content);                     LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",                         agent.getName(), dataId, group, tenant, cache.getMd5(),                         ContentUtils.truncateContent(content));                 } catch (NacosException ioe) {                     String message = String.format(                         "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                         agent.getName(), dataId, group, tenant);                     LOGGER.error(message, ioe);                 }             }             for (CacheData cacheData : cacheDatas) {                 if (!cacheData.isInitializing() || inInitializingCacheList                     .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {                     cacheData.checkListenerMd5();                     cacheData.setInitializing(false);                 }             }             inInitializingCacheList.clear();             executorService.execute(this);         } catch (Throwable e) {             // If the rotation training task is abnormal, the next execution time of the task will be punished             LOGGER.error("longPolling error : ", e);             executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);         }     } } 复制代码

  1. 遍历CacheData,检查本地配置,根据taskId对cacheMap进行数据分割,通过checkLocalConfig方法检查本地配置,本地在${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。

  2. 执行checkUpdateDataIds方法在服务端建立长轮询机制,通过长轮询检查数据变更。

  3. 遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据dataId,group,tenant去服务端读取对应的配置信息并保存到本地文件中。

  4. 继续定时执行当前线程

checkUpdateDataIds

checkUpdateDataIds基于长连接方式监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。

checkUpdateDataIds中调用checkUpdateConfigStr

/**  * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。  */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {     List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);     List<String> headers = new ArrayList<String>(2);     headers.add("Long-Pulling-Timeout");     headers.add("" + timeout);     // told server do not hang me up if new initializing cacheData added in     if (isInitializingCacheList) {         headers.add("Long-Pulling-Timeout-No-Hangup");         headers.add("true");     }     if (StringUtils.isBlank(probeUpdateString)) {         return Collections.emptyList();     }     try {         HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,             agent.getEncode(), timeout);         if (HttpURLConnection.HTTP_OK == result.code) {             setHealthServer(true);             return parseUpdateDataIdResponse(result.content);         } else {             setHealthServer(false);             LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);         }     } catch (IOException e) {         setHealthServer(false);         LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);         throw e;     }     return Collections.emptyList(); } 复制代码

checkUpdateConfigStr通过agent.httpPost调用/listener接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认30s。如果服务端的数据发生变更,客户端会收到HttpResult。服务端返回的是存在数据变更的dataId, group, tenant。获得这些信息后,在LongPollingRunnable的run方法中调用getServerConfig方法从Nacos服务器中读取具体的配置内容。

getServerConfig

从Nacos服务器中读取具体的配置内容:

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)     throws NacosException {     if (StringUtils.isBlank(group)) {         group = Constants.DEFAULT_GROUP;     }     HttpResult result = null;     try {         List<String> params = null;         if (StringUtils.isBlank(tenant)) {             params = Arrays.asList("dataId", dataId, "group", group);         } else {             params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);         }         result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);     } catch (IOException e) {         String message = String.format(             "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),             dataId, group, tenant);         LOGGER.error(message, e);         throw new NacosException(NacosException.SERVER_ERROR, e);     }     switch (result.code) {         case HttpURLConnection.HTTP_OK:             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);             return result.content;         case HttpURLConnection.HTTP_NOT_FOUND:             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);             return null;         case HttpURLConnection.HTTP_CONFLICT: {             LOGGER.error(                 "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "                     + "tenant={}", agent.getName(), dataId, group, tenant);             throw new NacosException(NacosException.CONFLICT,                 "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);         }         case HttpURLConnection.HTTP_FORBIDDEN: {             LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,                 group, tenant);             throw new NacosException(result.code, result.content);         }         default: {             LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,                 group, tenant, result.code);             throw new NacosException(result.code,                 "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);         }     } }


作者:周杰倫本人
链接:https://juejin.cn/post/7018008943897247752

文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐