本文主要研究一下chronos的DeleteBgWorker
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/workers/DeleteBgWorker.java
public class DeleteBgWorker { private static final Logger LOGGER = LoggerFactory.getLogger(DeleteBgWorker.class); private static final DeleteConfig DELETE_CONFIG = ConfigManager.getConfig().getDeleteConfig(); private static final int SAVE_HOURS_OF_DATA = DELETE_CONFIG.getSaveHours(); private static final long INITIAL_DELAY_MINUTES = 1; // 1 分钟 private static final long PERIOD_MINUTES = 10; // 10 分钟 private static volatile DeleteBgWorker instance = null; private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 2017/10/13 00:00:00 */ private static final long MIN_TIMESTAMP = 1507824000; private static final ScheduledExecutorService SCHEDULE = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("delete-bg-worker-schedule-%d").daemon(true).build()); private DeleteBgWorker() { } public void start() { SCHEDULE.scheduleAtFixedRate(() -> { byte[] beginKey = String.valueOf(MIN_TIMESTAMP).getBytes(Charsets.UTF_8); final long seekTimestampInSecond = MetaService.getSeekTimestamp(); byte[] endKey = String.valueOf(seekTimestampInSecond - SAVE_HOURS_OF_DATA * 60 * 60).getBytes(Charsets.UTF_8); deleteRange(beginKey, endKey); }, INITIAL_DELAY_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES); LOGGER.info("DeleteBgWorker has started, initialDelayInMinutes:{}", INITIAL_DELAY_MINUTES); } private void deleteRange(final byte[] beginKey, final byte[] endKey) { LOGGER.info("deleteRange start, beginKey:{}, endKey:{}", new String(beginKey), new String(endKey)); final long start = System.currentTimeMillis(); RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey); LOGGER.info("deleteRange end, beginKey:{}({}), endKey:{}({}), cost:{}ms", new String(beginKey), formatter.format(Long.parseLong(new String(beginKey)) * 1000), new String(endKey), formatter.format(Long.parseLong(new String(endKey)) * 1000), System.currentTimeMillis() - start); } public void stop() { SCHEDULE.shutdownNow(); while (!SCHEDULE.isShutdown()) { LOGGER.info("DeleteBgWorker is shutting down!"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { LOGGER.info("DeleteBgWorker was forced to shutdown, err:{}", e.getMessage(), e); } } LOGGER.info("DeleteBgWorker was shutdown!"); } public static DeleteBgWorker getInstance() { if (instance == null) { synchronized (DeleteBgWorker.class) { if (instance == null) { instance = new DeleteBgWorker(); } } } return instance; } }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java
public class RDB { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class); static RocksDB DB; //...... public static boolean deleteFilesInRange(final ColumnFamilyHandle cfh, final byte[] beginKey, final byte[] endKey) { try { DB.deleteRange(cfh, beginKey, endKey); LOGGER.debug("succ delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}", cfh.toString(), new String(beginKey), new String(endKey)); } catch (RocksDBException e) { LOGGER.error("error while delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}, err:{}", cfh.toString(), new String(beginKey), new String(endKey), e.getMessage(), e); return false; } return true; } //...... }
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MetaService.java
public class MetaService { private static final Logger LOGGER = LoggerFactory.getLogger(MetaService.class); private static volatile long seekTimestamp = -1; private static volatile long zkSeekTimestamp = -1; private static volatile Map<String, Long> zkQidOffsets = new ConcurrentHashMap<>(); private static final DbConfig dbConfig = ConfigManager.getConfig().getDbConfig(); private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("offset-seektimestamp-schedule-%d").daemon(true).build()); public static void load() { final long start = System.currentTimeMillis(); if (seekTimestamp == -1) { seekTimestamp = loadSeekTimestampFromFile(); } final long cost = System.currentTimeMillis() - start; LOGGER.info("succ load seekTimestamp, seekTimestamp:{}, cost:{}ms", seekTimestamp, cost); SCHEDULER.scheduleWithFixedDelay(() -> { // 如果是master则拉取并上报zk offset和seekOffset if (MasterElection.isMaster()) { MqConsumeStatService.getInstance().uploadOffsetsToZk(); uploadSeekTimestampToZk(); } }, 5, 1, TimeUnit.SECONDS); } private static long loadSeekTimestampFromFile() { String seekTimestampStr = FileIOUtils.readFile2String(dbConfig.getSeekTimestampPath()); if (StringUtils.isBlank(seekTimestampStr)) { final long initSeekTimestamp = TsUtils.genTS(); boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(initSeekTimestamp)); if (result) { LOGGER.info("init seekTimestamp and succ save, current seekTimestamp:{}", initSeekTimestamp); } else { LOGGER.error("init seekTimestamp and fail to save, current seekTimestamp:{}", initSeekTimestamp); } return initSeekTimestamp; } LOGGER.info("succ load seekTimestamp from file, seekTimestamp:{}", Long.parseLong(seekTimestampStr)); return Long.parseLong(seekTimestampStr); } public static long getSeekTimestamp() { return seekTimestamp; } /** * 此处的lock不能去掉 * 判断消息超时时 */ public static void nextSeekTimestamp() { Batcher.lock.lock(); try { seekTimestamp++; boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(seekTimestamp)); if (result) { LOGGER.info("incr seekTimestamp and succ save, next seekTimestamp:{}", seekTimestamp); } else { LOGGER.error("incr seekTimestamp and fail to save, next seekTimestamp:{}", seekTimestamp); } } finally { Batcher.lock.unlock(); } } public static void uploadSeekTimestampToZk() { String seekTimestampStr = String.valueOf(MetaService.getSeekTimestamp()); ZkUtils.createOrUpdateValue(Constants.SEEK_TIMESTAMP_ZK_PATH, seekTimestampStr); LOGGER.debug("upload seekTimestamp to zk, seekTimestamp:{}", seekTimestampStr); } public static Map<String, Long> getZkQidOffsets() { return zkQidOffsets; } public static void setZkQidOffsets(Map<String, Long> zkQidOffsets) { MetaService.zkQidOffsets = zkQidOffsets; } public static long getZkSeekTimestamp() { return zkSeekTimestamp; } public static void setZkSeekTimestamp(long zkSeekTimestamp) { MetaService.zkSeekTimestamp = zkSeekTimestamp; } }
DeleteBgWorker提供了静态方法getInstance来获取或创建单例,该类提供了start、stop两个方法;start方法会往SCHEDULE注册一个调度任务,每隔PERIOD_MINUTES执行一次,它主要执行deleteRange方法;deleteRange主要是执行RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey),它会从MetaService获取seekTimestamp来计算endKey;stop方法则是关闭SCHEDULE