本文主要研究一下carrera的RocketMQProduceOffsetFetcher
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java
public class RocketMQProduceOffsetFetcher { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class); private DefaultMQAdminExt defaultMQAdminExt; private DefaultMQPullConsumer defaultMQPullConsumer; private String namesrvAddr; public RocketMQProduceOffsetFetcher(String namesrvAddr) { this.defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setNamesrvAddr(namesrvAddr); defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis())); this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis())); defaultMQPullConsumer.setNamesrvAddr(namesrvAddr); this.namesrvAddr = namesrvAddr; } public String getNamesrvAddr() { return namesrvAddr; } public void start() throws MQClientException { defaultMQAdminExt.start(); defaultMQPullConsumer.start(); defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true); } public void shutdown() { defaultMQAdminExt.shutdown(); defaultMQPullConsumer.shutdown(); } public ConsumeStats getConsumeStats(String group, String topic) throws Exception { return defaultMQAdminExt.examineConsumeStats(group, topic); } public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { return defaultMQAdminExt.examineTopicStats(topic); } public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception { return defaultMQPullConsumer.pull(mq, "*", offset, 1); } }
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { private final DefaultMQAdminExtImpl defaultMQAdminExtImpl; private String adminExtGroup = "admin_ext_group"; private String createTopicKey = MixAll.DEFAULT_TOPIC; private long timeoutMillis = 5000; //...... @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic); } @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicStats(topic); } //...... }
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { private final Logger log = ClientLogger.getLog(); private final DefaultMQAdminExt defaultMQAdminExt; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mqClientInstance; private RPCHook rpcHook; private long timeoutMillis = 20000; private Random random = new Random(); //...... @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic; TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic); ConsumeStats result = new ConsumeStats(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { String addr = bd.selectBrokerAddr(); if (addr != null) { ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3); result.getOffsetTable().putAll(consumeStats.getOffsetTable()); double value = result.getConsumeTps() + consumeStats.getConsumeTps(); result.setConsumeTps(value); } } if (result.getOffsetTable().isEmpty()) { throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message"); } return result; } @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); TopicStatsTable topicStatsTable = new TopicStatsTable(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { String addr = bd.selectBrokerAddr(); if (addr != null) { TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis); topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); } } if (topicStatsTable.getOffsetTable().isEmpty()) { throw new MQClientException("Not found the topic stats info", null); } return topicStatsTable; } //...... }