即上一篇,理论上先写这篇文章,rocketMq的注册中心没有使用zk来做,为了减少依赖,采用自己实现的NameSrv。NameSrv 来保存Broker、Topic的元数据。broker启动会向namesrv发送心跳,namesrv会定时检测 broker的可用性,并移除不可用的broker。采用netty长连接的定时任务来进行监听。对于生产者和消费者来说,它提供了broker的查询服务。
`
protected NamesrvController nameSrvController = null; protected NettyServerConfig nettyServerConfig = new NettyServerConfig(); protected NamesrvConfig namesrvConfig = new NamesrvConfig(); @Before public void startup() throws Exception { nettyServerConfig.setListenPort(9876); nameSrvController = new NamesrvController(namesrvConfig, nettyServerConfig); boolean initResult = nameSrvController.initialize(); assertThat(initResult).isTrue(); nameSrvController.start(); } @After public void shutdown() throws Exception { if (nameSrvController != null) { nameSrvController.shutdown(); } //maybe need to clean the file store. But we do not suggest deleting anything. } 复制代码
` 我们发现有三个主要步骤
1、构造NamesrvController 2、初始化NamesrvController 3、启动NamesrvController NamesrvController是关键,负责初始化和后台任务启动。
接下来分析一下NamesrvController构造函数 `
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { //namesrv参数配置 this.namesrvConfig = namesrvConfig; //构造netty服务参数配置 this.nettyServerConfig = nettyServerConfig; //KVConfigManager绑定NamesrvController this.kvConfigManager = new KVConfigManager(this); //处理路由Info this.routeInfoManager = new RouteInfoManager(); //监听客户端配置信息 this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); //NameSrv的参数配置会保存到文件磁盘中 this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); } 复制代码
`
RouteInfoManager它主要来负责整个集群的broker信息,以及topic和queue的配置信息 来看看内部构造: `
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //读写锁,控制并发读写 private final ReadWriteLock lock = new ReentrantReadWriteLock(); //topic和broker的queue的Map,保存了topic在每个broker上读写Queue的个 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //注册到nameServer的所有broker,按照brokerName分组,Broker使用brokername来标识主从关系 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //broker的集群对应关系,使用clusterName来判断多个broker是不是属于同一个集群。 //producer只会选择其中一个broker来进行发送 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //broker的最新心跳和设置版本号,nameServer会记录brokerAddr最后活跃时间 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //broker和filterServer对应的关系 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); } 复制代码
`
`
public boolean initialize() { //从NameServerConfig#KvConfigPath指定的文件反序列化数据到KVConfigManager this.kvConfigManager.load(); // 启动网络通信的Netty服务 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 注册一个默认负责处理Netty网络交互数据的DefaultRequestProcessor,这个Processor会使用remotingExecutor执行 // *划重点,后面这里会再次提到* this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //默认注册DefaultRequestProcessor this.registerProcessor(); // 每10s扫描一下失效的Broker //剔除失效的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //每10min打印一下 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; } 复制代码
`
namesrvController启动netty服务端,来接受客户端请求。
`
public void start() throws Exception { this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } } 复制代码
`
DefaultRequestProcessor是来管理,因为所有请求都会被处理,在上面的初始化过程,我们也看到了它被注册到了netty的Pipeline上。
`
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; } 复制代码
`
`
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; } TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class); } else { topicConfigWrapper = new TopicConfigSerializeWrapper(); topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0)); topicConfigWrapper.getDataVersion().setTimestamp(0); } RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), topicConfigWrapper, null, ctx.channel() ); responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } 复制代码
`
`
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } 复制代码
`
接下来我们继续查看brokerInfo的信息
`
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; } 复制代码
`
总结,NettyRemotingServer启动,定义了一个默认处理器DefaultRequestProcessor,在这个处理器中会根绝具体的请求码去做一些更新RouteInfoManager的操作。
本文仅表示个人意见,有什么意见都可以提哟,谢谢。