Nacos配置中心之客户端长轮询
11Nacos配置中心之客户端长轮询
客户端长轮询定时任务是在NacosFactory的createConfigService构建ConfigService对象实例的时候启动的
createConfigService
public static ConfigService createConfigService(String serverAddr) throws NacosException { return ConfigFactory.createConfigService(serverAddr); } 复制代码
public class ConfigFactory { /** * Create Config * * @param properties init param * @return ConfigService * @throws NacosException Exception */ public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } /** * Create Config * * @param serverAddr serverList * @return Config * @throws ConfigService Exception */ public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties); } } 复制代码
通过Class.forName加载NacosConfigService类
使用反射来完成NacosConfigService类的实例化
NacosConfigService构造
NacosConfigService构造方法:
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty("encode"); if (StringUtils.isBlank(encodeTmp)) { this.encode = "UTF-8"; } else { this.encode = encodeTmp.trim(); } this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); } 复制代码
初始化HttpAgent,使用了装饰器模式,实际工作的类是ServerHttpAgent,MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加监控统计信息
ClientWorker是客户端的工作类,agent作为参数传入ClientWorker,用agent做一些远程调用
ClientWorker构造
ClientWorker的构造函数:
@SuppressWarnings("PMD.ThreadPoolCreationRule") public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } 复制代码
构建定时调度的线程池,第一个线程池executor只拥有一个核心线程,每隔10s执行一次checkConfigInfo()方法,功能就是每10ms检查一次配置信息
第二个线程池executorService只完成了初始化,后续用于客户端的定时长轮询功能。
checkConfigInfo方法:
public void checkConfigInfo() { // 分任务 int listenerSize = cacheMap.get().size(); // 向上取整为批数 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } } 复制代码
检查配置是否发生变化
cacheMap用来存储监听变更的缓存集合,key是根据dataID/group/tenant拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。
默认情况下每个长轮询LongPollingRunnable任务处理3000个监听配置集,超过3000个启动多个LongPollingRunnable执行。
LongPollingRunnable
LongPollingRunnable是一个线程,我们可以直接找到LongPollingRunnable里面的run方法
class LongPollingRunnable implements Runnable { private int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } } 复制代码
遍历CacheData,检查本地配置,根据taskId对cacheMap进行数据分割,通过checkLocalConfig方法检查本地配置,本地在${user}\naocs\config\目录下缓存一份服务端的配置信息,checkLocalConfig将内存中的数据和本地磁盘数据比较,不一致说明数据发生了变化,需要触发事件通知。
执行checkUpdateDataIds方法在服务端建立长轮询机制,通过长轮询检查数据变更。
遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据dataId,group,tenant去服务端读取对应的配置信息并保存到本地文件中。
继续定时执行当前线程
checkUpdateDataIds
checkUpdateDataIds基于长连接方式监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。
checkUpdateDataIds中调用checkUpdateConfigStr
/** * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); } 复制代码
checkUpdateConfigStr通过agent.httpPost调用/listener接口实现长轮询请求。长轮询请求是实现层面只是设置了一个比较长的超时时间,默认30s。如果服务端的数据发生变更,客户端会收到HttpResult。服务端返回的是存在数据变更的dataId, group, tenant。获得这些信息后,在LongPollingRunnable的run方法中调用getServerConfig方法从Nacos服务器中读取具体的配置内容。
getServerConfig
从Nacos服务器中读取具体的配置内容:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
作者:周杰倫本人
链接:https://juejin.cn/post/7018008943897247752