本文主要研究一下rocketmq的ClientManageProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest(); }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public class ClientManageProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; public ClientManageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.HEART_BEAT: return this.heartBeat(ctx, request); case RequestCode.UNREGISTER_CLIENT: return this.unregisterClient(ctx, request); case RequestCode.CHECK_CLIENT_CONFIG: return this.checkClientConfig(ctx, request); default: break; } return null; } @Override public boolean rejectRequest() { return false; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public class ClientManageProcessor implements NettyRequestProcessor { //...... public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } for (ProducerData data : heartbeatData.getProducerDataSet()) { this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public class ClientManageProcessor implements NettyRequestProcessor { //...... public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); final UnregisterClientRequestHeader requestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), requestHeader.getClientID(), request.getLanguage(), request.getVersion()); { final String group = requestHeader.getProducerGroup(); if (group != null) { this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo); } } { final String group = requestHeader.getConsumerGroup(); if (group != null) { SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); } this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable); } } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public class ClientManageProcessor implements NettyRequestProcessor { //...... public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(), CheckClientRequestBody.class); if (requestBody != null && requestBody.getSubscriptionData() != null) { SubscriptionData subscriptionData = requestBody.getSubscriptionData(); if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType()); return response; } try { FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString()); } catch (Exception e) { log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}", requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark(e.getMessage()); return response; } } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //...... }
ClientManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.HEART_BEAT、RequestCode.UNREGISTER_CLIENT或者RequestCode.CHECK_CLIENT_CONFIG的request;其中针对RequestCode.HEART_BEAT执行heartBeat方法,针对RequestCode.UNREGISTER_CLIENT执行unregisterClient方法,针对RequestCode.CHECK_CLIENT_CONFIG执行checkClientConfig方法;其rejectRequest返回false