本文主要研究一下rocketmq的QueryMessageProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest(); }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
public class QueryMessageProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; public QueryMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.QUERY_MESSAGE: return this.queryMessage(ctx, request); case RequestCode.VIEW_MESSAGE_BY_ID: return this.viewMessageById(ctx, request); default: break; } return null; } @Override public boolean rejectRequest() { return false; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
public class QueryMessageProcessor implements NettyRequestProcessor { //...... public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); final QueryMessageResponseHeader responseHeader = (QueryMessageResponseHeader) response.readCustomHeader(); final QueryMessageRequestHeader requestHeader = (QueryMessageRequestHeader) request .decodeCommandCustomHeader(QueryMessageRequestHeader.class); response.setOpaque(request.getOpaque()); String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); if (isUniqueKey != null && isUniqueKey.equals("true")) { requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); } final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp()); assert queryMessageResult != null; responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = new QueryMessageTransfer(response.encodeHeader(queryMessageResult .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { log.error("transfer query message by page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); queryMessageResult.release(); } return null; } response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("can not find message, maybe time range not correct"); return response; } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
public class QueryMessageProcessor implements NettyRequestProcessor { //...... public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); response.setOpaque(request.getOpaque()); final SelectMappedBufferResult selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMappedBufferResult != null) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), selectMappedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { log.error("Transfer one message from page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); selectMappedBufferResult.release(); } return null; } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); } return response; } //...... }
QueryMessageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.QUERY_MESSAGE或者RequestCode.VIEW_MESSAGE_BY_ID的request;其中针对RequestCode.QUERY_MESSAGE执行queryMessage方法,针对RequestCode.VIEW_MESSAGE_BY_ID执行viewMessageById方法;其rejectRequest返回false