本文主要研究一下chronos的addMessage
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java
public class MqPullService implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class); private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig(); private static final Batcher BATCHER = Batcher.getInstance(); private volatile boolean shouldStop = false; private CountDownLatch cdl; private final List<Long> succOffsets = new ArrayList<>(); private final List<Long> failOffsets = new ArrayList<>(); private SimpleCarreraConsumer carreraConsumer; private String mqPullServiceName; private final int INTERNAL_PAIR_COUNT = 5000; private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT); //...... private void addMessage(final InternalKey internalKey, final InternalValue internalValue, final int action) { if (internalKey.getType() == MsgTypes.DELAY.getValue()) { MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY); if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) { MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.DB); return; } MetricService.putMsgSizePercent(internalValue.getTopic(), internalValue.toJsonString().getBytes(Charsets.UTF_8).length); MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.SEND); putToBlockingQueue(internalKey, internalValue); } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue() || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { MetricMsgType msgType = MetricMsgType.LOOP_DELAY; if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { msgType = MetricMsgType.LOOP_EXPONENT_DELAY; } MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType); while (true) { if (KeyUtils.isInvalidMsg(internalKey)) { return; } // 循环消息只写入rocksdb一次, seek到的时候再进行添加 if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) { MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.DB); return; } MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.SEND); putToBlockingQueue(new InternalKey(internalKey), internalValue); internalKey.nextUniqDelayMsgId(); } } else { MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.UNKNOWN); LOGGER.error("should not go here, invalid message type:{}, internalKey:{}", internalKey.getType(), internalKey.genUniqDelayMsgId()); } } private void putToBlockingQueue(InternalKey internalKey, InternalValue internalValue) { try { blockingQueue.put(new InternalPair(internalKey, internalValue)); } catch (InterruptedException e) { LOGGER.error("error while put to blockingQueue, dMsgId:{}", internalKey.genUniqDelayMsgId()); } } //...... }
DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java
public class InternalKey { private static final String SEPARATOR = "-"; private static final int LEN_UUID = 36; private static final long ONE_DAY_SECONDS = 24 * 60 * 60; private long timestamp; private int type; private long expire; private long times; private long timed; private long interval; private int innerTopicSeq; private String uuid; private int segmentNum; private int segmentIndex; //...... }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/InternalValue.java
public class InternalValue { @JSONField(name="a") private String topic; @JSONField(name="b") private String body; @JSONField(name="c") private long createTime; @JSONField(name="d") private String tags; @JSONField(name="e") private Map<String, String> properties; //...... }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java
public class Batcher { private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class); private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum(); private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen(); private WriteBatch wb = new WriteBatch(); private volatile int itemNum = 0; private static volatile Batcher instance = null; public static volatile ReentrantLock lock = new ReentrantLock(); //...... public boolean checkAndPutToDefaultCF(final InternalKey internalKey, final String strVal, final String topic, final int action) { lock.lock(); try { if (KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) { byte[] bytes = strVal.getBytes(Charsets.UTF_8); MetricService.putMsgSizePercent(topic, bytes.length); if (bytes.length <= MSG_BYTE_BASE_LEN) { putToDefaultCF(internalKey.genUniqDelayMsgId(), bytes, topic, internalKey, action); } else { // 如果字节数据超过一定长度, 则进行字节数组切分, 以便降低io.util List<byte[]> list = ByteUtils.divideArray(bytes, MSG_BYTE_BASE_LEN); final int segmentNum = list.size(); for (int segmentIndex = 0; segmentIndex < segmentNum; segmentIndex++) { internalKey.setSegmentNum(segmentNum); internalKey.setSegmentIndex(Constants.SEGMENT_INDEX_BASE + segmentIndex); putToDefaultCF(internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex), topic, internalKey, action); LOGGER.info("segment split, dMsgId:{}, len:{}, value.totalLen:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex).length, bytes.length); } } return true; } return false; } finally { lock.unlock(); } } public void putToDefaultCF(final String key, final byte[] value, final String topic, final InternalKey internalKey, final int action) { put(CFManager.CFH_DEFAULT, key.getBytes(Charsets.UTF_8), value, topic, internalKey, action); } private void put(final ColumnFamilyHandle cfh, final byte[] key, final byte[] value, final String topic, final InternalKey internalKey, final int action) { lock.lock(); try { int len = 0; if (value != null) { len = value.length; } wb.put(cfh, key, value); LOGGER.info("put to cf, dMsgId:{}, len:{}", new String(key), len); itemNum++; checkFrequency(); if (action == Actions.ADD.getValue()) { if (internalKey.getType() == MsgTypes.DELAY.getValue()) { MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.DELAY); } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) { MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_DELAY); } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_EXPONENT_DELAY); } } else { if (internalKey.getType() == MsgTypes.DELAY.getValue()) { MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY, MetricMsgToOrFrom.DB); MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY); } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) { MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY, MetricMsgToOrFrom.DB); MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY); } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) { MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY, MetricMsgToOrFrom.DB); MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY); } } } finally { lock.unlock(); } } private void checkFrequency() { if (itemNum >= PULL_BATCH_ITEM_NUM) { flush(); } } public void flush() { lock.lock(); try { if (itemNum > 0) { // make sure write succ while (!RDB.writeSync(wb)) { LOGGER.error("error while flush to db!"); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } } wb.clear(); itemNum = 0; } } finally { lock.unlock(); } } //...... }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java
public class RDB { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class); static RocksDB DB; public static void init(final String dbPath) { try { final long start = System.currentTimeMillis(); boolean result = FileIOUtils.createOrExistsDir(new File(dbPath)); assert(result != false); result = FileIOUtils.createOrExistsDir(new File(DB_PATH_BACKUP)); assert(result != false); result = FileIOUtils.createOrExistsDir(new File(DB_PATH_RESTORE)); assert(result != false); DB = RocksDB.open(OptionsConfig.DB_OPTIONS, dbPath, CF_DESCRIPTORS, CF_HANDLES); assert (DB != null); initCFManger(CF_HANDLES); final long cost = System.currentTimeMillis() - start; LOGGER.info("succ open rocksdb, path:{}, cost:{}ms", dbPath, cost); } catch (RocksDBException e) { LOGGER.error("error while open rocksdb, path:{}, err:{}", dbPath, e.getMessage(), e); } } //...... public static boolean writeSync(final WriteBatch writeBatch) { return write(OptionsConfig.WRITE_OPTIONS_SYNC, writeBatch); } private static boolean write(final WriteOptions writeOptions, final WriteBatch writeBatch) { try { DB.write(writeOptions, writeBatch); LOGGER.debug("succ write writeBatch, size:{}", writeBatch.count()); } catch (RocksDBException e) { // TODO: 2017/11/8 上报写入失败 LOGGER.error("error while write batch, err:{}", e.getMessage(), e); return false; } return true; } //...... }
MqPullService实现了Runnable接口,其addMessage方法执行BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action);Batcher的checkAndPutToDefaultCF主要是执行putToDefaultCF,而putToDefaultCF主要是执行put方法,该方法会执行wb.put(cfh, key, value),将数据写入到rocksdb的WriteBatch;之后执行checkFrequency在必要的时候进行flush;flush方法主要是执行RDB.writeSync(wb)以及wb.clear()