Nacos配置中心之事件订阅
10 Nacos配置中心之事件订阅
NacosContextRefresher实现ApplicationListener接口进行事件监听,在上下文准备完毕时候触发这个事件
NacosContextRefresher的onApplicationEvent方法:
@Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } 复制代码
registerNacosListenersForApplications进行Nacos事件监听的注册。
registerNacosListenersForApplications方法:
private void registerNacosListenersForApplications() { if (refreshProperties.isEnabled()) { for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository .getAll()) { if (!nacosPropertySource.isRefreshable()) { continue; } String dataId = nacosPropertySource.getDataId(); registerNacosListener(nacosPropertySource.getGroup(), dataId); } } } 复制代码
registerNacosListener方法:
private void registerNacosListener(final String group, final String dataId) { Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() { @Override public void receiveConfigInfo(String configInfo) { refreshCountIncrement(); String md5 = ""; if (!StringUtils.isEmpty(configInfo)) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug("Refresh Nacos config group " + group + ",dataId" + dataId); } } @Override public Executor getExecutor() { return null; } }); try { configService.addListener(dataId, group, listener); } catch (NacosException e) { e.printStackTrace(); } } 复制代码
registerNacosListener中applicationContext.publishEvent方法发布RefreshEvent事件,事件的监听实现在RefreshEventListener类中
RefreshEventListener类:
/** * On application start up, NacosContextRefresher add nacos listeners to all application * level dataIds, when there is a change in the data, listeners will refresh * configurations. * * @author juven.xuxb * @author pbting */ public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware { private final static Logger log = LoggerFactory .getLogger(NacosContextRefresher.class); private static final AtomicLong REFRESH_COUNT = new AtomicLong(0); private final NacosRefreshProperties refreshProperties; private final NacosRefreshHistory refreshHistory; private final ConfigService configService; private ApplicationContext applicationContext; private AtomicBoolean ready = new AtomicBoolean(false); private Map<String, Listener> listenerMap = new ConcurrentHashMap<>(16); public NacosContextRefresher(NacosRefreshProperties refreshProperties, NacosRefreshHistory refreshHistory, ConfigService configService) { this.refreshProperties = refreshProperties; this.refreshHistory = refreshHistory; this.configService = configService; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } private void registerNacosListenersForApplications() { if (refreshProperties.isEnabled()) { for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository .getAll()) { if (!nacosPropertySource.isRefreshable()) { continue; } String dataId = nacosPropertySource.getDataId(); registerNacosListener(nacosPropertySource.getGroup(), dataId); } } } private void registerNacosListener(final String group, final String dataId) { Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() { @Override public void receiveConfigInfo(String configInfo) { refreshCountIncrement(); String md5 = ""; if (!StringUtils.isEmpty(configInfo)) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug("Refresh Nacos config group " + group + ",dataId" + dataId); } } @Override public Executor getExecutor() { return null; } }); try { configService.addListener(dataId, group, listener); } catch (NacosException e) { e.printStackTrace(); } } public static long getRefreshCount() { return REFRESH_COUNT.get(); } public static void refreshCountIncrement() { REFRESH_COUNT.incrementAndGet(); } } 复制代码
handler方法中调用refresh.refresh()方法完成配置的更新和应用。
作者:周杰倫本人
链接:https://juejin.cn/post/7018008744801861646