阅读 313

Nacos配置中心之服务端长轮询处理机制

Nacos配置中心之服务端长轮询处理机制

客户端发起http请求进行长轮询,调用接口/listner 服务端在nacos-config模块中的ConfigController类中:

ConfigController的listener方法:

/**  * 比较MD5  */ @PostMapping("/listener") public void listener(HttpServletRequest request, HttpServletResponse response)     throws ServletException, IOException {     request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);     String probeModify = request.getParameter("Listening-Configs");     if (StringUtils.isBlank(probeModify)) {         throw new IllegalArgumentException("invalid probeModify");     }     probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);     Map<String, String> clientMd5Map;     try {         clientMd5Map = MD5Util.getClientMd5Map(probeModify);     } catch (Throwable e) {         throw new IllegalArgumentException("invalid probeModify");     }     // do long-polling     inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); } 复制代码

  1. 获取客户端需要监听的可能发送变化的配置,计算MD5值

  2. inner.doPollingConfig执行长轮询请求

doPollingConfig

/**  * 轮询接口  */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,                               Map<String, String> clientMd5Map, int probeRequestSize)     throws IOException {     // 长轮询     if (LongPollingService.isSupportLongPolling(request)) {         longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);         return HttpServletResponse.SC_OK + "";     }     // else 兼容短轮询逻辑     List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);     // 兼容短轮询result     String oldResult = MD5Util.compareMd5OldResult(changedGroups);     String newResult = MD5Util.compareMd5ResultString(changedGroups);     String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);     if (version == null) {         version = "2.0.0";     }     int versionNum = Protocol.getVersionNumber(version);     /**      * 2.0.4版本以前, 返回值放入header中      */     if (versionNum < START_LONGPOLLING_VERSION_NUM) {         response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);         response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);     } else {         request.setAttribute("content", newResult);     }     // 禁用缓存     response.setHeader("Pragma", "no-cache");     response.setDateHeader("Expires", 0);     response.setHeader("Cache-Control", "no-cache,no-store");     response.setStatus(HttpServletResponse.SC_OK);     return HttpServletResponse.SC_OK + ""; } 复制代码

  1. 判断当前请求是否为长轮询,如果是,调用addLongPollingClient

addLongPollingClient

主要把客户端的长轮询请求封装成ClientLongPolling交给scheduler执行

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,                                  int probeRequestSize) {     String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);     String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);     String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);     String tag = req.getHeader("Vipserver-Tag");     int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);     /**      * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance      */     long timeout = Math.max(10000, Long.parseLong(str) - delayTime);     if (isFixedPolling()) {         timeout = Math.max(10000, getFixedPollingInterval());         // do nothing but set fix polling timeout     } else {         long start = System.currentTimeMillis();         List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);         if (changedGroups.size() > 0) {             generateResponse(req, rsp, changedGroups);             LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                 System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",                 clientMd5Map.size(), probeRequestSize, changedGroups.size());             return;         } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {             LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                 RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                 changedGroups.size());             return;         }     }     String ip = RequestUtil.getRemoteIp(req);     // 一定要由HTTP线程调用,否则离开后容器会立即发送响应     final AsyncContext asyncContext = req.startAsync();     // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制     asyncContext.setTimeout(0L);     scheduler.execute(         new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } 复制代码

  1. 获取客户端请求的超时时间,减去500ms后赋值给timeout变量。

  2. 判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行

  3. 和服务端的数据进行MD5对比,如果发送变化则直接返回

  4. scheduler.execute 执行ClientLongPolling线程

ClientLongPolling

ClientLongPolling是一个线程,run方法如下:

@Override public void run() {     asyncTimeoutFuture = scheduler.schedule(new Runnable() {         @Override         public void run() {             try {                 getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());                 /**                  * 删除订阅关系                  */                 allSubs.remove(ClientLongPolling.this);                 if (isFixedPolling()) {                     LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                         (System.currentTimeMillis() - createTime),                         "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                         "polling",                         clientMd5Map.size(), probeRequestSize);                     List<String> changedGroups = MD5Util.compareMd5(                         (HttpServletRequest)asyncContext.getRequest(),                         (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);                     if (changedGroups.size() > 0) {                         sendResponse(changedGroups);                     } else {                         sendResponse(null);                     }                 } else {                     LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                         (System.currentTimeMillis() - createTime),                         "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                         "polling",                         clientMd5Map.size(), probeRequestSize);                     sendResponse(null);                 }             } catch (Throwable t) {                 LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());             }         }     }, timeoutTime, TimeUnit.MILLISECONDS);     allSubs.add(this); } 复制代码

  1. 通过scheduler.schedule启动一个定时任务,并延时时间为29.5s

  2. 将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系。

  3. 定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。

  4. 通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端

所谓长轮询就是服务端收到请求之后,不立即返回,而是在延29.5s才把请求结果返回给客户端,这使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态


作者:周杰倫本人
链接:https://juejin.cn/post/7018009310903042056


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐