转载

Spring Boot Kafka概览、配置及优雅地实现发布订阅

本文属于翻译,转载注明出处,欢迎关注微信小程序 小白AI博客 微信公众号 小白AI 或者网站 https://xiaobaiai.net

Spring Boot Kafka概览、配置及优雅地实现发布订阅

[TOC]

1 前言

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于 Spring Integration 方式。本文内容基于 Spring Kafka2.3.3文档 及 Spring Boot Kafka相关文档 ,Spring创建了一个名为 Spring kafka 的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka, Spring-Kafka 项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以 spring.kafka.* 作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

实现下面的所涉及到的功能实现,需要有如下环境:

  • Java运行或开发环境(JRE/JDK)
  • Kafka安装成功

更多的配置可以参考 《Kafka,ZK集群开发或部署环境搭建及实验》 这一篇文章。

本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。

2 Spring Kafka功能概览

Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x

具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。

Spring Kafka相关的注解有如下几个:

注解类型 描述
EnableKafka 启用由 AbstractListenerContainerFactory 在封面(covers)下创建的Kafka监听器注解端点,用于配置类;
EnableKafkaStreams 启用默认的Kafka流组件
KafkaHandler 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解
KafkaListener 将方法标记为指定主题上Kafka消息监听器的目标的注解
KafkaListeners 聚合多个KafkaListener注解的容器注解
PartitionOffset 用于向KafkaListener添加分区/初始偏移信息
TopicPartition 用于向KafkaListener添加主题/分区信息

如使用 @EnableKafka 可以监听 AbstractListenerContainerFactory 子类目标端点,如 ConcurrentKafkaListenerContainerFactoryAbstractKafkaListenerContainerFactory 的子类。

public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }

@EnableKafka 并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的 @EnableKafka 。如果想要自己实现Kafka配置类,则需要加上 @EnableKafka ,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除 KafkaAutoConfiguration

@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自动创建主题

:bulb: 要在应用启动时就创建主题,可以添加 NewTopic 类型的Bean。如果该主题已经存在,则忽略Bean。

2.2 发送消息

Spring的 KafkaTemplate 是自动配置的,你可以直接在自己的Bean中自动连接它,如下例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

KafkaTemplate 包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回 ListenableFuture ,以此监听异步发送状态,成功还是失败,KafkaTemplate提供如下接口:

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
    T doInKafka(Producer<K, V> producer);
}

sendDefault API 要求已向模板提供默认主题。部分API接受一个时间戳作为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,如果主题配置为使用 CREATE_TIME ,则记录用户指定的时间戳(如果未指定则生成)。如果将主题配置为使用 LOG_APPEND_TIME ,则忽略用户指定的时间戳,并且代理将添加本地代理时间。 metricspartitionsFor 方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问

要使用模板,可以配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操作:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    // KafkaTemplate构造函数中输入生产者工厂配置
    return new KafkaTemplate<Integer, String>(producerFactory());
}

然后,要使用模板,可以调用其方法之一发送消息。

当你使用包含 Message<?> 参数的方法时,主题、分区和键信息在消息头中提供,有如下子项:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

如访问头部信息中某一项信息:

public void handleMessage(Message<?> message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}

可选的功能是,可以使用 ProducerListener 配置 KafkaTemplate ,以获得带有发送结果(成功或失败)的异步回调,而不是等待将来完成。以下列表显示了 ProducerListener 接口的定义:

public interface ProducerListener<K, V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess();
}

默认情况下,模板配置有 LoggingProducerListener ,它只记录错误,在发送成功时不执行任何操作。只有当 isInterestedInSuccess 返回true时才调用 onSuccess

为了方便起见,如果你只想实现其中一个方法,那么将提供抽象 ProducerListenerAdapter 。对于 isInterestedInSuccess ,它返回false。下面演示了异步结果回调:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed");  
        }

    });
}

如果希望阻止式发送线程等待结果,可以调用 futureget() 方法。你可能希望在等待之前调用 flush() ,或者为了方便起见,模板有一个带有 autoFlush 参数的构造函数,该构造函数在每次发送时都会导致模板 flush() 。不过,请注意,刷新可能会显著降低性能:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

使用DefaultKafkaProducerFactory:

如上面使用 KafkaTemplate 中所示, ProducerFactory 用于创建生产者。默认情况下,当不使用事务时, DefaultKafkaProducerFactory 会创建一个供所有客户机使用的单例生产者,如 KafkaProducer javadocs中所建议的那样。但是,如果对模板调用flush(),这可能会导致使用同一个生产者的其他线程延迟。从2.3版开始, DefaultKafkaProducerFactory 有一个新属性 producerPerThread 。当设置为 true 时,工厂将为每个线程创建(和缓存)一个单独的生产者,以避免此问题。

producerPerThread 为true时,当不再需要生产者时,用户代码必须在工厂上调用 closeThreadBoundProducer() 。这将实际关闭生产者并将其从 ThreadLocal 中移除。调用reset()或destroy()不会清理这些生产者。

创建 DefaultKafkaProducerFactory 时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给 DefaultKafkaProducerFactory 构造函数(在这种情况下,所有生产者共享相同的实例)。或者,可以提供 Supplier<Serializer> s (从版本2.3开始),用于为每个生产者获取单独的 Serializer 实例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

使用ReplyingKafkaTemplate:

版本 2.1.3 引入了 KafkaTemplate 的一个子类来提供请求/应答语义。这个类名为 ReplyingKafkaTemplate ,并且有一个方法(除了超类中的那些方法之外)。下面的列表显示了方法签名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个 ListenableFuture ,它被结果异步填充(或者超时时出现异常)。结果还有一个 sendFuture 属性,这是调用 KafkaTemplate.send() 的结果。你可以使用此Future确定发送操作的结果。这里就不展开了。

2.3 接收消息

可以通过配置 MessageListenerContainer 并提供消息监听器或使用 @KafkaListener 注解来接收消息。

2.3.1 消息监听器

使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。
public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。

2.3.1.1 消息监听器容器

提供了两个 MessageListenerContainer 的实现:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 从单个线程上的所有主题或分区接收所有消息(即一个分区只能分配到一个消费者,一个消费者可以被分配多个分区)。 ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例,以提供多线程使用,从多线程上去处理主题或分区的所有消息。

从Spring Kafka2.2.7版开始,你可以将 RecordInterceptor 添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始, CompositeRecordInterceptor 可用于调用多个拦截器。

默认情况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你可以设置侦听器容器的 interceptBeforeTx 属性,以便在事务启动之前调用侦听器。没有为批处理侦听器提供侦听器,因为Kafka已经提供了 ConsumerInterceptor

2.3.1.2 使用KafkaMessageListenerContainer

有如下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

每个都获取一个 ConsumerFactory 以及有关主题和分区的信息,以及 ContainerProperties 对象中的其他配置。 ConcurrentMessageListenerContainer (稍后介绍)使用第二个构造函数跨使用者实例分发 TopicPartitionOffsetContainerProperties 具有以下构造函数:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个 TopicPartitionOffset 参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内的当前最后偏移量。提供了 TopicPartitionOffset 的构造函数,该构造函数接受一个附加的布尔参数。如果是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于 group.id 属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。

要将 MessageListener 分配给容器,可以在创建容器时使用 ContainerProps.setMessageListener 方法。下面的示例演示了如何执行此操作:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

注意当创建一个 Defaultkafkafkaconsumerfactory 时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例可以传递给key/value的 DefaultKafkaConsumerFactory 构造函数,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>s (从版本2.3开始),用于为每个使用者获取单独的反序列化程序实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅Javadoc 中 ContainerProperties

从版本Spring Kafka 2.1.1开始,一个名为 logContainerConfig 的新属性就可用了。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。

例如,要将日志级别更改为INFO,可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)

从版本Spring Kafka 2.2开始,添加了名为 missingtopicsfailal 的新容器属性(默认值:true)。如果代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。如果容器配置为侦听主题模式(regex),则不适用。以前,容器线程在 consumer.poll() 方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。要恢复以前的行为,可以将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动创建不存在的topic。

2.3.1.3 使用 ConcurrentMessageListenerContainer

单个构造函数类似于第一个 KafkaListenerContainer 构造函数。下面的列表显示了构造函数的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个并发属性。例如, container.setConcurrency(3) 即表示创建三个 KafkaMessageListenerContainer 实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。

当监听多个主题时,默认的分区分布可能不是你期望的那样。例如,如果你有三个主题,每个主题有五个分区,并且希望使用 concurrency=15 ,那么你只看到五个活动的消费者,每个消费者从每个主题中分配一个分区,其他十个消费者处于空闲状态。这是因为默认的Kafka PartitionAssignorRangeAssignor (参见其Javadoc)。对于这种情况,你可能需要考虑改用 RoundRobinAssignor ,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。若要更改 PartitionAssignor ,你可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者配置参数( ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

)。

使用Spring Boot时,可以按如下方式分配设置策略:

spring.kafka.consumer.properties.partition.assignment.strategy=/
org.apache.kafka.clients.consumer.RoundRobinAssignor

对于第二个构造函数, ConcurrentMessageListenerContainerTopicPartition 实例分布在委托 KafkaMessageListenerContainer 实例上。

例如,如果提供了六个 TopicPartition 实例,并发性为3;每个容器得到两个分区。对于五个 TopicPartition 实例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于 TopicPartitions 的数量,则会向下调整并发性,以便每个容器获得一个分区。调整分区的方式可以使用命令行工具 kafka-topics.sh 查询和调整主题上的分区数。还可以添加一个 NewTopic Bean,如果NewTopic设定的数目大于当前数目,spring boot的自动配置的 KafkaAdmin 将向上调整分区。

client.id属性(如果已设置)将附加 -n ,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供唯一名称所必需的。

从版本Spring Kafka 1.3开始, MessageListenerContainer 提供了对底层 KafkaConsumer 的度量的访问。对于 ConcurrentMessageListenerContainermetrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的度量(metrics)。根据为底层 KafkaConsumer 提供的 client-id 度量被分组到 Map<MetricName, ?extends Metric>

从2.3版开始, ContainerProperties 提供了一个 idleBetweenPolls 选项,允许侦听器容器中的主循环在 KafkaConsumer.poll() 调用之间睡眠。从提供的选项中选择实际睡眠间隔作为最小值,并且选择 max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差异。

2.3.1.4 提交偏移量

提供了几个提交偏移量的选项。如果 enable.auto.commit 使用者属性为 true ,则Kafka将根据其配置自动提交偏移量。如果为 false ,则容器支持多个 AckMode 设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将 enable.auto.commit 设置为 false ,除非在配置中显式设置。以前,如果未设置属性,则使用Kafka默认值(true)。消费者 poll() 方法返回一个或多个 ConsumerRecords 。为每个记录调用 MessageListener 。以下列表描述了容器对每个 AckMode 采取的操作:

  • RECORD: 当侦听器在处理记录后返回时提交偏移量。
  • BATCH: 处理完 poll() 返回的所有记录后提交偏移量。
  • TIME: 在处理完 poll() 返回的所有记录后提交偏移量,只要超过上次提交后的 ackTime
  • COUNT: 在处理完 poll() 返回的所有记录后提交偏移量,只要上次提交后收到 ackCount 记录。
  • COUNT_TIME: 类似于 TIMECOUNT ,但如果两个条件都为true,则执行提交。
  • MANUAL: 消息侦听器负责 acknowledge()Acknowledgment 。之后,应用与BATCH相同的语义。
  • MANUAL_IMMEDIATE: 侦听器调用 Acknowledgement.acknowledge() 方法时立即提交偏移量。

MANUAL和MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener 。请参见消息侦听器。

根据 syncCommits 容器属性,使用消费者上的 commitSync()commitAsync() 方法。默认情况下, syncCommits 为true;另请参阅 setSyncCommitTimeout 。请参阅 setCommitCallback 以获取异步提交的结果;默认回调是 LoggingCommitCallback ,它记录错误(以及调试级别的成功)。

因为侦听器容器有自己的提交偏移的机制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,否则它将无条件地将其设置为false。

Acknowledgment 有以下方法:

public interface Acknowledgment {
    void acknowledge();
}

此方法使侦听器可以控制何时提交偏移。

从版本2.3开始,确认接口有两个附加方法 nack(long sleep)nack(int index, long sleep) 。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发 IllegalStateException

nack()只能在调用侦听器的消费者线程上调用。

使用批处理侦听器时,可以在发生故障的批内指定索引。调用 nack() 时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次 poll() 时重新传递这些偏移量。这是对 SeekToCurrentBatchErrorHandler 的改进, SeekToCurrentBatchErrorHandler 只能查找整个批次以便重新交付。

注意:通过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于 consumer max.poll.interval.ms 属性非常重要。

2.3.1.5 侦听器容器自动启动和手动启动

侦听器容器实现了 SmartLifecycle (通过 SmartLifecycle 在Spring加载和初始化所有bean后,接着执行一些任务或者启动需要的异步服务),默认情况下 autoStartuptrue 。容器在后期启动( Integer.MAX-VALUE - 100 )。实现 SmartLifecycle 以处理来自侦听器的数据的其他组件应该在较早的阶段启动。 -100 为以后的阶段留出了空间,使组件能够在容器之后自动启动。比如我们通过 @Bean 将监听器容器交给Spring管理,这个时候通过 SmartLifecycle 自动执行了初始化的任务,但是当我们手动通过new监听器容器实例,则后初始化则不会执行,比如 KafkaMessageListenerContainer 实例需要手动执行 start()

autoStartup 在手动执行start中设置true与false没有作用,可以参见 @KafkaListener 声明周期管理这一小节。

2.3.2 @KafkaListener注解

2.3.2.1 Record Listeners

@KafkaListener 注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个 MessagingMessageListenerAdapter 中,该适配器配置有各种功能,如转换器,用于转换数据(如有必要)以匹配方法参数。通过使用属性占位符( ${…} ),或者可以使用SpEL( #{…​} )配置注释上的大多数属性。有关更多信息,请参阅Javadoc。

@KafkaListener :

  • id :listener唯一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。
  • containerFactory :上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是 ConcurrentKafkaListenerContainerFactory ,配置Bean名称
  • topics :需要监听的Topic,可监听多个,可以是表达式或者占位符关键字或者直接是主题名称,如多个主题监听: {"topic1" , "topic2"}
  • topicPattern : 此侦听器的主题模式。条目可以是“主题模式”、“属性占位符键”或“表达式”。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。
  • topicPartitions :用于使用手动主题/分区分配时
  • errorHandler :监听异常处理器,配置Bean名称,默认为空
  • groupId :消费组ID
  • idIsGroup :id是否为GroupId
  • clientIdPrefix :消费者Id前缀
  • beanRef :真实监听容器的Bean名称,需要在 Bean名称前加 "__"

@KafkaListener 注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

此机制生效需要 @Configuration 类之一上的 @EnableKafka 注解和用于配置基础 ConcurrentMessageListenerContainer 的侦听器容器工厂。默认情况下,需要名为 kafkaListenerContainerFactory 的bean。以下示例演示如何使用 ConcurrentMessageListenerContain :

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

注意,要设置容器属性,必须在工厂上使用 getContainerProperties() 方法。它用作注入容器的实际属性的模板。

从版本2.1.1开始,现在可以为注解创建的消费者设置 client.id 属性。 clientdprefix 的后缀是 -n ,其中n是一个整数,表示使用并发时的容器号。

从2.2版开始,现在可以通过使用批注本身的属性来重写容器工厂的并发性和自动启动属性。属性可以是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操作:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

你还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操作:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

你可以在 partitionspartitionOffsets 属性中指定每个分区,但不能同时指定两者。

使用手动 AckMode 时,还可以向侦听器提供 Acknowledgment 。下面的示例还演示了如何使用不同的容器工厂:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

最后,可以从消息头获得有关消息的元数据。你可以使用以下头名称来检索消息头内容:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

示例:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

2.3.2.2 批处理侦听器

从版本1.1开始,可以配置 @KafkaListener 方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,可以设置 batchListener 属性。下面的示例演示了如何执行此操作:

@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

以下示例显示如何接收有效载荷列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您可以接收消息列表 Message<?> 对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和 /Consumer<?, ?> 参数)。下面的示例演示如何执行此操作:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种情况下,不会对有效载荷执行转换。如果 BatchMessagingMessageConverter 配置了 RecordMessageConverter ,则还可以向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。

你还可以收到一个 ConsumerRecord<?, ?> 对象,但它必须是唯一的参数(当使用手动提交或 Consumer<?, ?> 参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从版本2.2开始,侦听器可以接收 poll() 方法返回的完整的 ConsumerRecords<?, ?> 对象,允许侦听器访问其他方法,例如 partitions() (返回列表中的 TopicPartition 实例)和 records (TopicPartition)(获取选择性记录)。同样,这必须是唯一的参数(当使用手动提交或 Consumer<?, ?> 参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

2.3.3 @KafkaListener@Payload验证

从2.2版开始,现在更容易添加验证程序来验证 @KafkaListener `@Payload 参数。以前,你必须配置一个自定义的 DefaultMessageHandlerMethodFactory`并将其添加到注册器中。现在,你可以将验证器添加到注册器本身。以下代码说明了如何执行此操作:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}

当你在Spring Boot使用 validation starter ,会自动配置 LocalValidatorFactoryBean ,如下例所示:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

以下示例演示如何验证:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

2.3.4 重新平衡监听者

ContainerProperties 有一个名为 consumerRebalanceListener 的属性,该属性接受Kafka客户端的 consumerRebalanceListene r接口的实现。如果未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录重新平衡事件。该框架还添加了一个子接口 ConsumerRawareRebalanceListener 。以下列表显示了 ConsumerRawareRebalanceListener 接口定义:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 转发监听者消息

从2.0版开始,如果还使用 @SendTo 注解注释 @KafkaListener ,并且方法调用返回结果,则结果将转发到 @SendTo 指定的主题。如:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

2.3.6 @KafkaListener生命周期管理

@KafkaListener 注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用 KafkaListenerEndpointRegistry 类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何 autoStartup 设置为 true 的容器。所有容器工厂创建的所有容器必须处于同一 phase 。有关详细信息,请参 阅侦听器容器自动启动 。你可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。可以在批注上设置 autoStartup ,这将覆盖容器工厂中配置的默认设置( setAutoStartup(true) )。你可以从应用程序上下文中获取对bean的引用,例如自动连接,以管理其注册的容器。以下示例说明了如何执行此操作:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的 getListenerContainers() 方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法 getAllListenerContainers() ,它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。

2.4 流处理

Spring for Apache Kafka 提供了一个工厂bean来创建 StreamsBuilder 对象并管理其流的生命周期。只要kafka流在classpath上并且kafka流通过 @EnableKafkaStreams 注解开启,Spring Boot就会自动配置所需的 KafkaStreamsConfiguration bean。

启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可以使用 spring.kafka.streams.application-id 配置,如果未设置,则默认为 spring.application.name 。后者可以全局设置,也可以专门为流覆写。

使用专用属性可以使用其他几个属性;可以使用 spring.Kafka.streams.properties 命名空间设置其他任意Kafka属性。有关详细信息, Additional Kafka Properties 。

默认情况下,由它创建的 StreamBuilder 对象管理的流将自动启动。可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

要使用工厂bean,只需将 StreamsBuilder 连接到 @bean ,如下例所示:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

默认情况下,由它创建的 StreamBuilder 对象管理的流将自动启动。可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

2.5 附加配置

自动配置支持的属性显示在 公用应用程序属性 中。注意,在大多数情况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅 Apache Kafka 文档。

前面提到的几个属性应用于所有组件(生产者、消费者、管理员和流),但如果希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为 HIGHMEDIUMLOW 的属性。Spring Boot自动配置支持所有高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。

只有Kafka支持的属性的一个子集可以通过 KafkaProperties 类直接使用,如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

上面的参数设置示例将公共 prop.one Kafka属性设置为 first (适用于生产者、消费者和管理员), prop.two admin属性设置为 secondprop.three consumer属性设置为 thirdprop.four producer属性设置为 fourthprop.five streams属性设置为 fifth

你还可以配置Spring Kafka JsonDeserializer ,如下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

类似地,可以禁用 JsonSerializer 在头中发送类型信息的默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

2.6 使用Embdded Kafka做测试

Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的 @EmbeddedKafka 注解测试类。有关更多信息,请参阅 Spring For Apache Kafka 参考手册。

要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,需要将嵌入式代理地址(由 EmbeddedKafkaBroker 填充)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:

  • 提供系统属性以将嵌入的代理地址映射到测试类中的 spring.kafka.bootstrap-servers :
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • @EmbeddedKafka 注解上配置属性名:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Spring Integration支持

Spring Integration也有Kafka的适配器,因此我们可以很方便的采用Spring Integration去实现发布订阅,当然你也可以不使用Spring Integration。

Spring Integration是什么,具体有什么作用,可以参考另一篇文章《Spring Integration最详解》。

3 Spring Kafka配置参数

这里对所有配置做个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,比如SSL认证可以全局配置,但是也可以在每个子模块,如消费者、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其他就不展开了,用到的时候再去查找和补充。

3.1 全局配置

# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其他属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic

3.2 生产者

Spring Boot中,Kafka 生产者 相关配置(所有配置前缀为 spring.kafka.producer. ):

# 生产者要求Leader在考虑请求完成之前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的所有数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 消费者

Spring Boot中,Kafka 消费者相关配置(所有配置前缀为 spring.kafka.consumer. ):

# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台定期提交
spring.kafka.consumer.enable-auto-commit
# 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的唯一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其他特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer

3.4 监听器

Spring Boot中,Kafka Listener相关配置(所有配置前缀为 spring.kafka.listener. ):

# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 管理

spring.kafka.admin.client-id
# 如果启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 授权服务(JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL认证

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Stream流处理

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Kafka订阅发布基本特性回顾

  • 同一消费组下所有消费者协同消费订阅主题的所有分区

    • 同消费组,多消费者订阅单主题单分区,则分区只会分配给其中一个消费者,除非这个消费者挂掉,才会分配给其他一个消费者消费消息,意思就是其他消费者在旁边看着吃东西
    • 同消费组,N个消费者订阅单主题N个分区,则默认每个消费者都会被分配一个分区
    • 同消费组,N个消费者订阅单主题M个分区,当M > N时,则会有消费者多分配多于一个分区的情况;当M < N时,则会有空闲消费者,类似第一条
    • 所有上面所说的消费者实例可以是线程方式或者是进程方式存在,所说的分区分配机制叫做重平衡(rebalance)
    • 当消费者内成员个数发生变化会触发重平衡;订阅的主题个数发生变化会触发重平衡;订阅的主题分区个数发生变化会触发重平衡;
    • 总之就是一个分区只能分配到一个消费者,一个消费者可以被分配多个分区
  • 消费者offset管理机制

    • 每个主题分区中的消息都有一个唯一偏移值,具有先后顺序,与消费者具有对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并定期的将记录同步到服务端(Broker),这里的同步机制是可以设置的
    • 消息是被持久化的,当组内所有消费者重新订阅主题时,可以设置是否从头开始消费消息或者是从最后记录的偏移值位置开始消费
  • 分区和消费者个数如何设置

    • 我们知道主题分区是分布在不同的Broker上的,每个分区对应一个消费者,从而具有消息处理具有很高的吞吐量
    • 分区是调优Kafka并行度的最小单元,多线程消费者连接多分区消费消息,在实现上,通过socket连接,因此也会占用文件句柄个数
    • 创建分区都是会占用一定内存的,并不是分区越多越好,当然现在kafka社区在优化这一部分,让分区数达到更大,性能也不会有所影响

具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。

5 发布订阅示例

实现下面的示例需要的环境:

  • Kafka + Zookeeper单点服务器或集群已配置好(如果环境搭建不熟悉,可以去翻看前面写的关于Kafka的环境搭建和测试那一篇),或者是使用 Spring-kafka-test embedded Kafka Server
  • Spring Boot开发环境(2.2.1)

    • JDK(1.8或以上)
    • STS(4.4.RELEASE)
    • MARVEN构建方式

5.1 使用Embedded Kafka Server

我们知道Kafka是 Scala+Zookeeper 构建的,可以从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也非常简单。

添加依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点, Run as JUnit Test 。:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

@EmbeddedKafka 中可以设置相关参数:

  • value: 设置创建代理的个数
  • count: 同value
  • ports: 代理端口号列表
  • brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"

注意:EmbeddedKafka这样默认是没有创建主题的。会提示 Topic(s) [test] is/are not present and missingTopicsFatal is true 错误。@EmbeddedKafka默认情况是创建一个代理,该代理具有一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。

5.2 简单的发布订阅实现(无自定义配置)

下面实现一个简单发布订阅功能,通过前端WEB调用一个API,然后在该API控制器中得到请求后生产者开始发送消息,消费者后台监听消息,如果收到消费者消息,则打印出来。

5.2.1 添加依赖及配置Kafka

添加Kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>            
</dependency>

配置Kafka,这里消费者和生产者在同一应用中,我们只需要配置Kafka Brokers的服务地址+端口:

server: 
  port: 9000
spring:
  kafka:
    bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
    listener:
        # 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题
        # 且默认创建的主题是单副本单分区的
        missing-topics-fatal: false
    consumer:
        # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
        auto-offset-reset: earliest

5.2.2 添加生产者

@Service
public class Producer {

    private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

5.2.3 添加消费者

@Service
public class Consumer {

    private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
        LOGGER.info(String.format("#### -> Consumed message -> %s", message));
    }
   
}

5.2.4 添加WEB控制器

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

5.2.5 测试

添加Spring Boot Application:

@SpringBootApplication
public class TestKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestKafkaApplication.class, args);
    }

}

启动Kafka Brokers后,需要手动创建主题(如果想自动创建,则需要借助KafkaAdmin,或者是Kafka Broker设置了 allow.auto.create.topics=true 且应用设置了 listener.missing-topics-fatal=false ):

# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 创建test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

打开浏览器测试:

http://localhost:9000/kafka/publish?message=hello

则应用控制台会打印 hello 。整个发布订阅的实现只使用了跟Kafka相关的 @KafkaListener 注解接收消息和 KafkaTemplate 模板发送消息,很是简单。

5.3 基于自定义配置发布订阅实现

上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法在程序中操作这些配置的,因此这一小节就是利用我们之前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。

实现内容有:

@KafkaListener

源码不会直接贴,只给出主体部分。

配置文件:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    private String kafkaKeySerializerClass;

    ......
    ......
}

生产者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed");  
            }

        });
    }
}

消费者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("===", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value());
            }

        };
    }
}

继承 InitializingBean 只是为了初始化,也可以去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用 @Scope ,所以需要手动获取实例,通过context去调用getBean()。另外配置文件没有写全,这里需要注意。

5.3 基于Spring Integration发布订阅实现

Spring Integration也有对Kafka支持的适配器,采用Spring Integration,我们也能够快速的实现发布订阅功能,且实现群组多消费者批量消费功能:

  • 实现Kafka自定义配置类
  • 采用Spring Integration
  • 发布订阅
  • 群组多消费者批量消费
  • 采用DSL特定领域语法去编写
  • 生产者发布成功与失败异常处理

Spring Boot Kafka概览、配置及优雅地实现发布订阅

我们可以先看看整体的Kafka消息传递通道:

  • 出站通道中KafkaProducerMessageHandler用于将消息发送到主题
  • KafkaMessageDrivenChannelAdapter用于设置入站通道和消息处理

具体的Demo可以参考 Github中的一个sample :

  • https://github.com/spring-pro...

6 总结

本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其他包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握总体,结合实际,差不多基本内容都有所涉及了。

7 知识扩展

Spring Expression Language(简称SpEL),在Spring中,不同于属性占位符 ${...} ,而 SpEL 表达式则要放到 #{...} 中(除代码块中用Expression外)。如配置文件中有topics参数 spring.kafka.topics ,则可以将配置文件中参数传入注解 @KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

SpEL 表达式常用示例:

// 字面量
#{3.1415926}    // 浮点数
#{9.87E4}       // 科学计数法表示98700
#{'Hello'}      // String 类型
#{false}        // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers}                                   // 使用这个bean
#{sgtPeppers.artist}                            // 引用bean中的属性
#{sgtPeppers.selectArtist()}                    // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()}      // 方法返回值的操作
#{sgtPeppers.selectArtist()?.toUpperCase()}     // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类作用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // 引用PI的值
#{T(java.lang.Math).random()}       // 获取0-1的随机数
#{T(System).currentTimeMillis()}    // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}" 
private String variable;

8 参考资料

  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://blog.csdn.net/lishuan...
  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://www.javatt.com/p/16904
  • https://github.com/cwenao/spr...
  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://docs.spring.io/spring...
  • https://www.intertech.com/Blo...
  • https://joshlong.com/jl/blogP...
  • https://examples.javacodegeek...
  • https://www.orchome.com/553
  • https://docs.spring.io/spring...
  • https://programming.vip/docs/... (事务型消息)
  • https://docs.confluent.io/cur...
  • https://github.com/spring-pro...
  • https://www.jianshu.com/p/27f...
  • https://www.jianshu.com/p/cec...
  • https://memorynotfound.com/sp...
原文  https://segmentfault.com/a/1190000021405060
正文到此结束
Loading...