本文主要研究一下rocketmq的ConsumerManageProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest(); }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; public ConsumerManageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); case RequestCode.UPDATE_CONSUMER_OFFSET: return this.updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); default: break; } return null; } @Override public boolean rejectRequest() { return false; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor { //...... public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); final GetConsumerListByGroupRequestHeader requestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo( requestHeader.getConsumerGroup()); if (consumerGroupInfo != null) { List<String> clientIds = consumerGroupInfo.getAllClientId(); if (!clientIds.isEmpty()) { GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); body.setConsumerIdList(clientIds); response.setBody(body.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } else { log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } } else { log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup()); return response; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor { //...... private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor { //...... private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); if (offset >= 0) { responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } } return response; } //...... }
ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false