转载

聊聊KafkaCanalConnector的getFlatList

本文主要研究一下KafkaCanalConnector的getFlatList

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    protected KafkaConsumer<String, Message> kafkaConsumer;
    protected KafkaConsumer<String, String>  kafkaConsumer2;                            // 用于扁平message的数据消费
    protected String                         topic;
    protected Integer                        partition;
    protected Properties                     properties;
    protected volatile boolean               connected      = false;
    protected volatile boolean               running        = false;
    protected boolean                        flatMessage;

    private Map<Integer, Long>               currentOffsets = new ConcurrentHashMap<>();

    //......

    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }

        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return Lists.newArrayList();
        }

        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));

        currentOffsets.clear();
        for (TopicPartition topicPartition : records.partitions()) {
            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
        }

        if (!records.isEmpty()) {
            List<FlatMessage> flatMessages = new ArrayList<>();
            for (ConsumerRecord<String, String> record : records) {
                String flatMessageJson = record.value();
                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                flatMessages.add(flatMessage);
            }

            return flatMessages;
        }
        return Lists.newArrayList();
    }

    //......
}
  • KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

ack

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    //......

    public void ack() {
        waitClientRunning();
        if (!running) {
            return;
        }

        if (kafkaConsumer != null) {
            kafkaConsumer.commitSync();
        }
        if (kafkaConsumer2 != null) {
            kafkaConsumer2.commitSync();
        }
    }

    //......
}
  • ack方法主要是执行kafkaConsumer.commitSync()及kafkaConsumer2.commitSync()

rollback

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

    //......

    public void rollback() {
        waitClientRunning();
        if (!running) {
            return;
        }
        // 回滚所有分区
        if (kafkaConsumer != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
        if (kafkaConsumer2 != null) {
            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
            }
        }
    }

    //......
}
  • rollback方法则根据currentOffsets的值通过kafkaConsumer的seek方法进行回退

小结

KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

doc

  • KafkaCanalConnector
原文  https://segmentfault.com/a/1190000022321299
正文到此结束
Loading...