阅读 399

Nacos配置中心之事件订阅

10 Nacos配置中心之事件订阅

NacosContextRefresher实现ApplicationListener接口进行事件监听,在上下文准备完毕时候触发这个事件

NacosContextRefresher的onApplicationEvent方法:

@Override public void onApplicationEvent(ApplicationReadyEvent event) {    // many Spring context    if (this.ready.compareAndSet(false, true)) {       this.registerNacosListenersForApplications();    } } 复制代码

registerNacosListenersForApplications进行Nacos事件监听的注册。

registerNacosListenersForApplications方法:

private void registerNacosListenersForApplications() {    if (refreshProperties.isEnabled()) {       for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository             .getAll()) {          if (!nacosPropertySource.isRefreshable()) {             continue;          }          String dataId = nacosPropertySource.getDataId();          registerNacosListener(nacosPropertySource.getGroup(), dataId);       }    } } 复制代码

registerNacosListener方法:

private void registerNacosListener(final String group, final String dataId) {    Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {       @Override       public void receiveConfigInfo(String configInfo) {          refreshCountIncrement();          String md5 = "";          if (!StringUtils.isEmpty(configInfo)) {             try {                MessageDigest md = MessageDigest.getInstance("MD5");                md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))                      .toString(16);             }             catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {                log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);             }          }          refreshHistory.add(dataId, md5);          applicationContext.publishEvent(                new RefreshEvent(this, null, "Refresh Nacos config"));          if (log.isDebugEnabled()) {             log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);          }       }       @Override       public Executor getExecutor() {          return null;       }    });    try {       configService.addListener(dataId, group, listener);    }    catch (NacosException e) {       e.printStackTrace();    } } 复制代码

registerNacosListener中applicationContext.publishEvent方法发布RefreshEvent事件,事件的监听实现在RefreshEventListener类中

RefreshEventListener类:

/**  * On application start up, NacosContextRefresher add nacos listeners to all application  * level dataIds, when there is a change in the data, listeners will refresh  * configurations.  *  * @author juven.xuxb  * @author pbting  */ public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware { private final static Logger log = LoggerFactory .getLogger(NacosContextRefresher.class); private static final AtomicLong REFRESH_COUNT = new AtomicLong(0); private final NacosRefreshProperties refreshProperties; private final NacosRefreshHistory refreshHistory; private final ConfigService configService; private ApplicationContext applicationContext; private AtomicBoolean ready = new AtomicBoolean(false); private Map<String, Listener> listenerMap = new ConcurrentHashMap<>(16); public NacosContextRefresher(NacosRefreshProperties refreshProperties, NacosRefreshHistory refreshHistory, ConfigService configService) { this.refreshProperties = refreshProperties; this.refreshHistory = refreshHistory; this.configService = configService; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { // many Spring context if (this.ready.compareAndSet(false, true)) { this.registerNacosListenersForApplications(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } private void registerNacosListenersForApplications() { if (refreshProperties.isEnabled()) { for (NacosPropertySource nacosPropertySource : NacosPropertySourceRepository .getAll()) { if (!nacosPropertySource.isRefreshable()) { continue; } String dataId = nacosPropertySource.getDataId(); registerNacosListener(nacosPropertySource.getGroup(), dataId); } } } private void registerNacosListener(final String group, final String dataId) { Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() { @Override public void receiveConfigInfo(String configInfo) { refreshCountIncrement(); String md5 = ""; if (!StringUtils.isEmpty(configInfo)) { try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); if (log.isDebugEnabled()) { log.debug("Refresh Nacos config group " + group + ",dataId" + dataId); } } @Override public Executor getExecutor() { return null; } }); try { configService.addListener(dataId, group, listener); } catch (NacosException e) { e.printStackTrace(); } } public static long getRefreshCount() { return REFRESH_COUNT.get(); } public static void refreshCountIncrement() { REFRESH_COUNT.incrementAndGet(); } } 复制代码

handler方法中调用refresh.refresh()方法完成配置的更新和应用。


作者:周杰倫本人
链接:https://juejin.cn/post/7018008744801861646

文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐