本文主要研究一下RocketMQCanalConnector的getFlatList
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
public class RocketMQCanalConnector implements CanalMQConnector { private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class); private static final String CLOUD_ACCESS_CHANNEL = "cloud"; private String nameServer; private String topic; private String groupName; private volatile boolean connected = false; private DefaultMQPushConsumer rocketMQConsumer; private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue; private int batchSize = -1; private long batchProcessTimeout = 60 * 1000; private boolean flatMessage; private volatile ConsumerBatchMessage lastGetBatchMessage = null; private String accessKey; private String secretKey; private String customizedTraceTopic; private boolean enableMessageTrace = false; private String accessChannel; private String namespace; //...... public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException { List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit); if (messages != null && !messages.isEmpty()) { ack(); } return messages; } public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException { try { if (this.lastGetBatchMessage != null) { throw new CanalClientException("mq get/ack not support concurrent & async ack"); } ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit); if (batchMessage != null) { this.lastGetBatchMessage = batchMessage; return batchMessage.getData(); } } catch (InterruptedException ex) { logger.warn("Get message timeout", ex); throw new CanalClientException("Failed to fetch the data after: " + timeout); } return Lists.newArrayList(); } public void ack() throws CanalClientException { try { if (this.lastGetBatchMessage != null) { this.lastGetBatchMessage.ack(); } } catch (Throwable e) { if (this.lastGetBatchMessage != null) { this.lastGetBatchMessage.fail(); } } finally { this.lastGetBatchMessage = null; } } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
public class RocketMQCanalConnector implements CanalMQConnector { //...... public synchronized void subscribe(String filter) throws CanalClientException { if (connected) { return; } try { if (rocketMQConsumer == null) { this.connect(); } rocketMQConsumer.subscribe(this.topic, "*"); rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) { context.setAutoCommit(true); boolean isSuccess = process(messageExts); if (isSuccess) { return ConsumeOrderlyStatus.SUCCESS; } else { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); rocketMQConsumer.start(); } catch (MQClientException ex) { connected = false; logger.error("Start RocketMQ consumer error", ex); } connected = true; } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
public class RocketMQCanalConnector implements CanalMQConnector { //...... private boolean process(List<MessageExt> messageExts) { if (logger.isDebugEnabled()) { logger.debug("Get Message: {}", messageExts); } List messageList = Lists.newArrayList(); for (MessageExt messageExt : messageExts) { byte[] data = messageExt.getBody(); if (data != null) { try { if (!flatMessage) { Message message = CanalMessageDeserializer.deserializer(data); messageList.add(message); } else { FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class); messageList.add(flatMessage); } } catch (Exception ex) { logger.error("Add message error", ex); throw new CanalClientException(ex); } } else { logger.warn("Received message data is null"); } } ConsumerBatchMessage batchMessage; if (!flatMessage) { batchMessage = new ConsumerBatchMessage<Message>(messageList); } else { batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList); } try { messageBlockingQueue.put(batchMessage); } catch (InterruptedException e) { logger.error("Put message to queue error", e); throw new RuntimeException(e); } boolean isCompleted; try { isCompleted = batchMessage.waitFinish(batchProcessTimeout); } catch (InterruptedException e) { logger.error("Interrupted when waiting messages to be finished.", e); throw new RuntimeException(e); } boolean isSuccess = batchMessage.isSuccess(); return isCompleted && isSuccess; } //...... }
RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()