namesrv 充当服务注册中心的作用,向 producer、consumer 提供 broker 的信息,并将不可用的 broker 及时剔除。有点类似 eureka-server 的作用。与 eureka-server 不大一样的是, namesrv 集群之间信息不共享 ,也无法同步。broker 注册的时候,是同时向 namesrv 集群进行注册。eureka-client 则是向集群中的一台 eureka-server 注册。eureka-server 再进行集群间的消息同步。
namesrv 启动时,主要是加载配置文件,初始化 NettyServer,NamesrvController。并由 NamesrvController 创建 2 个线程池。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
每 10 秒 扫描不可用的 broker。以及每 10 分钟 打印 KV 配置。
在启动后,注册 JVM 钩子函数,在程序停止时,优雅的关闭资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { // controller 为 NamesrvController controller.shutdown(); return null; } }));
// 读写锁 private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 主题队列表,初始化时,topic 默认有 4 个队列 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // broker 信息。包含集群名称、broker 名称、主备 broker 地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // broker 集群信息。存储集群中所有的名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // broker 状态信息,namesrv 每次收到心跳包,会替换该信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // broker FilterService 列表,用于类模式消息过滤 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private String brokerName; private int readQueueNums; private int writeQueueNums; // 读写权限,参考 permName private int perm; private int topicSynFlag;
private String cluster; private String brokerName; private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
// 最后一次更新时间, namesrv 收到 broker 心跳后,会更新该信息。 private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr;
Broker 在启动 10 秒 后,每隔 30 秒 会向注册过的所有 namesrv 发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); // brokerConfig.getRegisterNameServerPeriod() = 30000
// 使用 CountDownLatch 等到向所有 namesrv 注册完毕,才会放行 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { }
请求被 DefaultRequestProcessor 接收,code = REGISTER_BROKER
switch (request.getCode()) { ... ... case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } ... ... }
接着由 RouteInfoManager 执行注册的操作,维护 broker 信息,主要维护以下几个Map 的信息
clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable
... ... // 拿到 写锁 this.lock.writeLock().lockInterruptibly(); // 维护 clusterAddrTable Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 维护 brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 维护 topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { // 真正做维护操作的方法 this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 维护 brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); ... ... // 释放写锁 this.lock.writeLock().unlock();
Namesrv 路由剔除触发时机:
路由剔除的操作, 同样无非是对 RouterInfoManager 中维护的 broker 信息进行 CURD。不做过多介绍
RocketMQ 路由发现非实时,当 Topic 路由发生变化后,Namesrv 不主动推送给客户端,而是由客户端拉取主题最新的路由
根据主题名称拉取路由信息的 code = RequestCode.GET_ROUTEINTO_BY_TOPIC
try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 从 topicQueueTable 拿到topic 对应的 队列数据 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } // 从 brokerAddrTable 获取 broker 对应的地址 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); }
主要是操作 topicQueueTable, brokerAddrTable, filterServerMap,填充 topicRouteData。
A: 该问题留到 消息发送时说。
A: