本文主要研究一下MqPullService的cancelMessage
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java
public class MqPullService implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class); private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig(); private static final Batcher BATCHER = Batcher.getInstance(); private volatile boolean shouldStop = false; private CountDownLatch cdl; private final List<Long> succOffsets = new ArrayList<>(); private final List<Long> failOffsets = new ArrayList<>(); private SimpleCarreraConsumer carreraConsumer; private String mqPullServiceName; private final int INTERNAL_PAIR_COUNT = 5000; private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT); //...... private void cancelMessage(final InternalKey internalKey, final String topic, final int action) { InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey(); if (internalKey.getType() == MsgTypes.DELAY.getValue()) { MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY); BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action); } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) { MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY); BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action); } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY); BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action); } else { MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN); LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(), internalKey.genUniqDelayMsgId()); } } //...... }
DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java
public class InternalKey { private static final String SEPARATOR = "-"; private static final int LEN_UUID = 36; private static final long ONE_DAY_SECONDS = 24 * 60 * 60; private long timestamp; private int type; private long expire; private long times; private long timed; private long interval; private int innerTopicSeq; private String uuid; private int segmentNum; private int segmentIndex; //...... public InternalKey cloneTombstoneInternalKey() { InternalKey tombstoneInternalKey = new InternalKey(this); tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue()); return tombstoneInternalKey; } //...... }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java
public class CancelWrap { private String uniqDelayMsgId; private String topic; public CancelWrap() { } public CancelWrap(String uniqDelayMsgId, String topic) { this.uniqDelayMsgId = uniqDelayMsgId; this.topic = topic; } public String getUniqDelayMsgId() { return uniqDelayMsgId; } public void setUniqDelayMsgId(String uniqDelayMsgId) { this.uniqDelayMsgId = uniqDelayMsgId; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String toJsonString() { return JsonUtils.toJsonString(this); } @Override public String toString() { return "CancelWrap{" + "uniqDelayMsgId='" + uniqDelayMsgId + '/'' + ", topic='" + topic + '/'' + '}'; } }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java
public class Batcher { private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class); private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum(); private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen(); private WriteBatch wb = new WriteBatch(); private volatile int itemNum = 0; private static volatile Batcher instance = null; public static volatile ReentrantLock lock = new ReentrantLock(); //...... public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) { lock.lock(); try { // 指数循环 // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5 // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5 // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5 // 普通循环 // 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5 while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) { internalKey = internalKey.nextUniqDelayMsgId(); } tombstoneInternalKey.setTimestamp(internalKey.getTimestamp()); tombstoneInternalKey.setTimes(internalKey.getTimed() + 2); tombstoneInternalKey.setTimed(internalKey.getTimed()); if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) { putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action); } } finally { lock.unlock(); } } //...... }
cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)