本文主要研究一下rocketmq的sendBatchMessage
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
public class SendMessageRequestHeader implements CommandCustomHeader { @CFNotNull private String producerGroup; @CFNotNull private String topic; @CFNotNull private String defaultTopic; @CFNotNull private Integer defaultTopicQueueNums; @CFNotNull private Integer queueId; @CFNotNull private Integer sysFlag; @CFNotNull private Long bornTimestamp; @CFNotNull private Integer flag; @CFNullable private String properties; @CFNullable private Integer reconsumeTimes; @CFNullable private boolean unitMode = false; @CFNullable private boolean batch = false; private Integer maxReconsumeTimes; //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List<ConsumeMessageHook> consumeMessageHookList; public SendMessageProcessor(final BrokerController brokerController) { super(brokerController); } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response; } } //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List<ConsumeMessageHook> consumeMessageHookList; public SendMessageProcessor(final BrokerController brokerController) { super(brokerController); } //...... private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("Receive SendMessage request command {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("message topic length too long " + requestHeader.getTopic().length()); return response; } if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); return response; } MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } messageExtBatch.setSysFlag(sysFlag); messageExtBatch.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); messageExtBatch.setBody(request.getBody()); messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); messageExtBatch.setBornHost(ctx.channel().remoteAddress()); messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); } //...... }
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
public class MessageExtBatch extends MessageExt { private static final long serialVersionUID = -2353110995348498537L; public ByteBuffer wrap() { assert getBody() != null; return ByteBuffer.wrap(getBody(), 0, getBody().length); } private ByteBuffer encodedBuff; public ByteBuffer getEncodedBuff() { return encodedBuff; } public void setEncodedBuff(ByteBuffer encodedBuff) { this.encodedBuff = encodedBuff; } }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
public static class MessageExtBatchEncoder { // Store the message content private final ByteBuffer msgBatchMemory; // The maximum length of the message private final int maxMessageSize; MessageExtBatchEncoder(final int size) { this.msgBatchMemory = ByteBuffer.allocateDirect(size); this.maxMessageSize = size; } public ByteBuffer encode(final MessageExtBatch messageExtBatch) { msgBatchMemory.clear(); //not thread-safe int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); int sysFlag = messageExtBatch.getSysFlag(); int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE messagesByteBuff.getInt(); // 2 MAGICCODE messagesByteBuff.getInt(); // 3 BODYCRC messagesByteBuff.getInt(); // 4 FLAG int flag = messagesByteBuff.getInt(); // 5 BODY int bodyLen = messagesByteBuff.getInt(); int bodyPos = messagesByteBuff.position(); int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); messagesByteBuff.position(bodyPos + bodyLen); // 6 properties short propertiesLen = messagesByteBuff.getShort(); int propertiesPos = messagesByteBuff.position(); messagesByteBuff.position(propertiesPos + propertiesLen); final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + ", maxMessageSize: " + this.maxMessageSize); throw new RuntimeException("message size exceeded"); } totalMsgLen += msgLen; // Determines whether there is sufficient free space if (totalMsgLen > maxMessageSize) { throw new RuntimeException("message size exceeded"); } // 1 TOTALSIZE this.msgBatchMemory.putInt(msgLen); // 2 MAGICCODE this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgBatchMemory.putInt(bodyCrc); // 4 QUEUEID this.msgBatchMemory.putInt(messageExtBatch.getQueueId()); // 5 FLAG this.msgBatchMemory.putInt(flag); // 6 QUEUEOFFSET this.msgBatchMemory.putLong(0); // 7 PHYSICALOFFSET this.msgBatchMemory.putLong(0); // 8 SYSFLAG this.msgBatchMemory.putInt(messageExtBatch.getSysFlag()); // 9 BORNTIMESTAMP this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes()); // 14 Prepared Transaction Offset, batch does not support transaction this.msgBatchMemory.putLong(0); // 15 BODY this.msgBatchMemory.putInt(bodyLen); if (bodyLen > 0) this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); // 16 TOPIC this.msgBatchMemory.put((byte) topicLength); this.msgBatchMemory.put(topicData); // 17 PROPERTIES this.msgBatchMemory.putShort(propertiesLen); if (propertiesLen > 0) this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); } msgBatchMemory.flip(); return msgBatchMemory; } private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
public class CommitLog { //...... public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); AppendMessageResult result; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } if (messageExtBatch.getDelayTimeLevel() > 0) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); if (bornSocketAddress.getAddress() instanceof Inet6Address) { messageExtBatch.setBornHostV6Flag(); } InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); if (storeSocketAddress.getAddress() instanceof Inet6Address) { messageExtBatch.setStoreHostAddressV6Flag(); } long eclipsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //fine-grained lock instead of the coarse-grained MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); //...... } //...... }
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
public class CommitLog { //...... class DefaultAppendMessageCallback implements AppendMessageCallback { public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBatch messageExtBatch) { byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(messageExtBatch.getTopic()); keyBuilder.append('-'); keyBuilder.append(messageExtBatch.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; msgIdBuilder.setLength(0); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); int sysFlag = messageExtBatch.getSysFlag(); int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); //...... } } //...... }
SendMessageRequestHeader定义了batch属性,用于标识是否是MessageBatch;processRequest方法在判断requestHeader.isBatch()时会执行sendBatchMessage;sendBatchMessage方法会执行msgCheck,之后构造messageExtBatch,然后执行brokerController.getMessageStore().putMessages(messageExtBatch),之后通过handlePutMessageResult方法处理PutMessageResult