本文主要研究一下KafkaCanalConnector的getFlatList
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector { protected KafkaConsumer<String, Message> kafkaConsumer; protected KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费 protected String topic; protected Integer partition; protected Properties properties; protected volatile boolean connected = false; protected volatile boolean running = false; protected boolean flatMessage; private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>(); //...... public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); if (!running) { return Lists.newArrayList(); } List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit); if (messages != null && !messages.isEmpty()) { this.ack(); } return messages; } public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException { waitClientRunning(); if (!running) { return Lists.newArrayList(); } ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout)); currentOffsets.clear(); for (TopicPartition topicPartition : records.partitions()) { currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition)); } if (!records.isEmpty()) { List<FlatMessage> flatMessages = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { String flatMessageJson = record.value(); FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class); flatMessages.add(flatMessage); } return flatMessages; } return Lists.newArrayList(); } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector { //...... public void ack() { waitClientRunning(); if (!running) { return; } if (kafkaConsumer != null) { kafkaConsumer.commitSync(); } if (kafkaConsumer2 != null) { kafkaConsumer2.commitSync(); } } //...... }
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
public class KafkaCanalConnector implements CanalMQConnector { //...... public void rollback() { waitClientRunning(); if (!running) { return; } // 回滚所有分区 if (kafkaConsumer != null) { for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) { kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1); } } if (kafkaConsumer2 != null) { for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) { kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1); } } } //...... }
KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage