本文主要研究一下carrera的BrokerMonitor
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java
public class BrokerMonitor extends BaseConfigMonitor { private final static Logger LOGGER = LoggerFactory.getLogger(BrokerMonitor.class); private ExecutorService executor = ExecutorUtils.newFixedThreadPool(100, "BrokerMonitor", 200); private BrokerMonitorItem monitorItem = null; @Override protected void initMonitor(String broker, BrokerConfig brokerConfig) throws Exception { doMonitor(broker, brokerConfig); } public BrokerMonitor(MonitorConfig monitorConfig) { super("Broker", monitorConfig); } private void doMonitor(String broker, BrokerConfig config) throws InterruptedException { if (monitorItem != null) { // stop first. LOGGER.info("Stop old monitor broker: {}", broker); monitorItem.stop(); } BrokerMonitorItem item = new BrokerMonitorItem(broker, config); try { item.start(); } catch (Exception e) { LOGGER.error("broker monitor start exception, broker=" + broker, e); } } @Override public void shutdown() { ExecutorUtils.shutdown(executor); monitorItem.stop(); super.shutdown(); } //...... }
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java
class BrokerMonitorItem { private String broker; private BrokerConfig config; private volatile boolean isRunning = false; private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); public BrokerMonitorItem(String broker, BrokerConfig config) { this.broker = broker; this.config = config; } public void start() { isRunning = true; scheduledExecutor.submit(() -> { while (isRunning) { monitorNamesvr(); monitorBroker(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } LOGGER.info("broker<{}> [Active]", broker); } }); } public void stop() { isRunning = false; ExecutorUtils.shutdown(scheduledExecutor); } private void monitorBroker() { if (MapUtils.isEmpty(config.getBrokers()) || StringUtils.isBlank(config.getBrokerClusterAddrs())) { return; } String nameSvr = config.getBrokerClusterAddrs().split(";")[0]; // use first namesvr. for (Map.Entry<String, Set<String>> entry : config.getBrokers().entrySet()) { String master = entry.getKey(); Set<String> slaves = entry.getValue(); executor.execute(() -> { int j = 0; for (; j < 2; ++j) { try { long masterOffset = Utils.checkReceive(broker, nameSvr, master); if (masterOffset <= 0) { continue; } Utils.checkSend(broker, nameSvr, master); if (CollectionUtils.isNotEmpty(slaves)) { for (String slave : slaves) { long slaveOffset = Utils.checkReceive(broker, nameSvr, slave); LOGGER.info("ReplicaDelayCheck broker={}, address={}->{}, masterOffset={}, slaveOffset={}, delayNum={}", broker, master.split(":")[0], slave.split(":")[0], masterOffset, slaveOffset, (masterOffset - slaveOffset)); if (slaveOffset <= 0) { continue; } if (masterOffset - slaveOffset > 60) { LOGGER.error(String.format("[AlarmReplicaDelayErr] broker=%s, address=%s->%s, delayNum=%s", broker, master.split(":")[0], slave.split(":")[0], (masterOffset - slaveOffset))); } } } break; } catch (Throwable e) { LOGGER.error("broker check broker exception, broker=" + broker, e); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } if (j == 2) { LOGGER.error(String.format("[AlarmCheckBrokerErr] broker=%s, namesvr=%s", broker, nameSvr)); } }); } } private void monitorNamesvr() { if (StringUtils.isBlank(config.getBrokerClusterAddrs())) { LOGGER.info("broker:{}, brokerClusterAddrs is empty", config.getBrokerCluster()); return; } for (String nameSvr : config.getBrokerClusterAddrs().split(";")) { executor.execute(() -> { int j = 0; for (; j < 2; ++j) { try { Utils.checkNameSvr(nameSvr, broker); LOGGER.info(String.format("[NameSvrCheck] broker=%s, namesvr=%s", broker, nameSvr)); break; } catch (Throwable e) { LOGGER.error("broker checkNameSvr exception, broker=" + broker + ", namesvr=" + nameSvr, e); } try { Thread.sleep(1000); } catch (InterruptedException e) { LOGGER.error("broker checkNameSvr Thread.sleep exception, broker=" + broker, e); } } if (j == 2) { LOGGER.error(String.format("[AlarmNameSvrErr] broker=%s, namesvr=%s", broker, nameSvr)); } }); } } }
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/Utils.java
public class Utils { private static final Logger logger = LoggerFactory.getLogger(Utils.class); private static final Map<String, DefaultMQAdminExt> mqAdminExtMap = new ConcurrentHashMap<>(); private static final Map<String, DefaultMQPullConsumer> nameSvrCheckMap = new ConcurrentHashMap<>(); private static final Map<String, DefaultMQPullConsumer> brokerReceiveCheckMap = new ConcurrentHashMap<>(); private static final Map<String, DefaultMQProducer> brokerSendCheckMap = new ConcurrentHashMap<>(); //...... public static void checkNameSvr(String nameSvr, String cluster) throws MQClientException, InterruptedException { getNameSvrCheckConsumer(nameSvr, cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues("SELF_TEST_TOPIC"); } public static long checkReceive(String cluster, String nameSvr, String address) throws MQClientException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, InterruptedException, RemotingException, MQBrokerException { DefaultMQPullConsumer consumer = getReceiveCheckConsumer(nameSvr, cluster, address); Field f1 = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory"); f1.setAccessible(true); MQClientInstance instance = (MQClientInstance) f1.get(consumer.getDefaultMQPullConsumerImpl()); Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable"); f.setAccessible(true); Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService"); f2.setAccessible(true); ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance); service.shutdown(); service.awaitTermination(1000, TimeUnit.SECONDS); ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f.get(instance); HashMap<Long, String> addresses = new HashMap<>(); addresses.put(0L, address); map.put("rmqmonitor_" + address, addresses); MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0); boolean pullOk = false; long maxOffset = -1; for (int i = 0; i < 2; ++i) { try { maxOffset = consumer.getDefaultMQPullConsumerImpl().maxOffset(queue); PullResult result = consumer.pull(queue, "*", maxOffset > 100 ? maxOffset - 10 : 0, 1); if (result.getPullStatus() == PullStatus.FOUND) { pullOk = true; break; } else if(result.getPullStatus() == PullStatus.NO_NEW_MSG) { checkSend(cluster, nameSvr, address); continue; } logger.warn("pull result failed, PullResult={}, cluster={}, namesvr={}, address={}", result, cluster, nameSvr, address); } catch (Throwable e) { logger.error("pull exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e); } Thread.sleep(1000); } if (!pullOk) { logger.error(String.format("[AlarmPullErr] cluster=%s, broker=%s", cluster, address)); } else { logger.info("AlarmPullCheck cluster={}, broker={}", cluster, address); } return maxOffset; } public static void checkSend(String cluster, String nameSvr, String address) throws MQClientException, NoSuchFieldException, SecurityException, InterruptedException, IllegalArgumentException, IllegalAccessException, UnsupportedEncodingException, MQBrokerException, RemotingException { if (!isBrokerTopicWritable(cluster, nameSvr, address)) { return; } DefaultMQProducer producer = getSendCheckProducer(nameSvr, cluster, address); MQClientInstance instance = producer.getDefaultMQProducerImpl().getmQClientFactory(); Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable"); f.setAccessible(true); Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService"); f2.setAccessible(true); ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance); service.shutdown(); service.awaitTermination(1000, TimeUnit.SECONDS); ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f .get(instance); HashMap<Long, String> addresses = new HashMap<>(); addresses.put(0L, address); map.put("rmqmonitor_" + address, addresses); MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0); boolean sendOk = false; SendResult sendResult = null; for (int i = 0; i < 2; i++) { try { Message msg = new Message("SELF_TEST_TOPIC", // topic "TagA", // tag ("Hello RocketMQ " + i).getBytes()// body ); sendResult = producer.send(msg, queue); if (sendResult.getSendStatus() == SendStatus.SEND_OK || sendResult.getSendStatus() == SLAVE_NOT_AVAILABLE) { sendOk = true; break; } logger.warn("send result failed, SendResult={}, cluster={}, namesvr={}, address={}", sendResult, cluster, nameSvr, address); } catch (Exception e) { logger.error("send exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e); } Thread.sleep(1000); } // 报警 if (!sendOk) { logger.error(String.format("[AlarmSendErr] cluster=%s, broker=%s, result=%s", cluster, address, sendResult == null ? "null" : sendResult.toString())); } else { logger.info("AlarmSendCheck cluster={}, broker={}, result={}", cluster, address, sendResult.toString()); } } //...... }
BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法