本文主要研究一下nacos config的doPollingConfig
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java
@Controller @RequestMapping(Constants.CONFIG_CONTROLLER_PATH) public class ConfigController { private static final Logger log = LoggerFactory.getLogger(ConfigController.class); private static final String NAMESPACE_PUBLIC_KEY = "public"; public static final String EXPORT_CONFIG_FILE_NAME = "nacos_config_export_"; public static final String EXPORT_CONFIG_FILE_NAME_EXT = ".zip"; public static final String EXPORT_CONFIG_FILE_NAME_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; private final transient ConfigServletInner inner; private final transient PersistService persistService; private final transient ConfigSubService configSubService; //...... @RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); } //...... }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigServletInner.java
@Service public class ConfigServletInner { @Autowired private LongPollingService longPollingService; @Autowired private PersistService persistService; private static final int TRY_GET_LOCK_TIMES = 9; private static final int START_LONGPOLLING_VERSION_NUM = 204; /** * 轮询接口 */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 长轮询 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 兼容短轮询逻辑 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短轮询result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); /** * 2.0.4版本以前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } // 禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; } //...... }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
@Service public class LongPollingService extends AbstractEventListener { private static final int FIXED_POLLING_INTERVAL_MS = 10000; private static final int SAMPLE_PERIOD = 100; private static final int SAMPLE_TIMES = 3; private static final String TRUE_STR = "true"; private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>(); private static boolean isFixedPolling() { return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false); } //...... static public boolean isSupportLongPolling(HttpServletRequest req) { return null != req.getHeader(LONG_POLLING_HEADER); } public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP线程调用,否则离开后容器会立即发送响应 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } //...... }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
@Service public class LongPollingService extends AbstractEventListener { //...... class ClientLongPolling implements Runnable { @Override public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 删除订阅关系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); } void sendResponse(List<String> changedGroups) { /** * 取消超时任务 */ if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); } generateResponse(changedGroups); } void generateResponse(List<String> changedGroups) { if (null == changedGroups) { /** * 告诉容器发送HTTP响应 */ asyncContext.complete(); return; } HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse(); try { String respString = MD5Util.compareMd5ResultString(changedGroups); // 禁用缓存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(respString); asyncContext.complete(); } catch (Exception se) { pullLog.error(se.toString(), se); asyncContext.complete(); } } ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize, long timeoutTime, String appName, String tag) { this.asyncContext = ac; this.clientMd5Map = clientMd5Map; this.probeRequestSize = probeRequestSize; this.createTime = System.currentTimeMillis(); this.ip = ip; this.timeoutTime = timeoutTime; this.appName = appName; this.tag = tag; } // ================= final AsyncContext asyncContext; final Map<String, String> clientMd5Map; final long createTime; final String ip; final String appName; final String tag; final int probeRequestSize; final long timeoutTime; Future<?> asyncTimeoutFuture; } //...... }
ConfigController的listener方法会从request中读取Listening-Configs参数,然后decode,计算clientMd5Map,最后执行inner.doPollingConfig;ConfigServletInner的doPollingConfig方法首先判断request是否支持long polling,支持的话则执行longPollingService.addLongPollingClient;不支持的话则将newResult放入content中,然后返回