本文属于翻译,转载注明出处,欢迎关注微信小程序 小白AI博客
微信公众号 小白AI
或者网站 https://xiaobaiai.net
[TOC]
本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于 Spring Integration
方式。本文内容基于 Spring Kafka2.3.3文档 及 Spring Boot Kafka相关文档 ,Spring创建了一个名为 Spring kafka
的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka, Spring-Kafka 项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以 spring.kafka.*
作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
实现下面的所涉及到的功能实现,需要有如下环境:
更多的配置可以参考 《Kafka,ZK集群开发或部署环境搭建及实验》
这一篇文章。
本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。
Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):
Spring for Apache Kafka | Spring Integration for Apache Kafka Version | kafka-clients |
---|---|---|
2.3.x | 3.2.x | 2.3.1 |
2.2.x | 3.1.x | 2.0.1, 2.1.x, 2.2.x |
2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。
Spring Kafka相关的注解有如下几个:
注解类型 | 描述 |
---|---|
EnableKafka | 启用由 AbstractListenerContainerFactory 在封面(covers)下创建的Kafka监听器注解端点,用于配置类; |
EnableKafkaStreams | 启用默认的Kafka流组件 |
KafkaHandler | 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解 |
KafkaListener | 将方法标记为指定主题上Kafka消息监听器的目标的注解 |
KafkaListeners | 聚合多个KafkaListener注解的容器注解 |
PartitionOffset | 用于向KafkaListener添加分区/初始偏移信息 |
TopicPartition | 用于向KafkaListener添加主题/分区信息 |
如使用 @EnableKafka
可以监听 AbstractListenerContainerFactory
子类目标端点,如 ConcurrentKafkaListenerContainerFactory
是 AbstractKafkaListenerContainerFactory
的子类。
public class ConcurrentKafkaListenerContainerFactory<K,V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
@EnableKafka
并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的 @EnableKafka
。如果想要自己实现Kafka配置类,则需要加上 @EnableKafka
,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除 KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
:bulb: 要在应用启动时就创建主题,可以添加 NewTopic
类型的Bean。如果该主题已经存在,则忽略Bean。
Spring的 KafkaTemplate
是自动配置的,你可以直接在自己的Bean中自动连接它,如下例所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
KafkaTemplate
包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回 ListenableFuture
,以此监听异步发送状态,成功还是失败,KafkaTemplate提供如下接口:
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); Map<MetricName, ? extends Metric> metrics(); List<PartitionInfo> partitionsFor(String topic); <T> T execute(ProducerCallback<K, V, T> callback); // Flush the producer. void flush(); interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); }
sendDefault
API 要求已向模板提供默认主题。部分API接受一个时间戳作为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,如果主题配置为使用 CREATE_TIME
,则记录用户指定的时间戳(如果未指定则生成)。如果将主题配置为使用 LOG_APPEND_TIME
,则忽略用户指定的时间戳,并且代理将添加本地代理时间。 metrics
和 partitionsFor
方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问
要使用模板,可以配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操作:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { // KafkaTemplate构造函数中输入生产者工厂配置 return new KafkaTemplate<Integer, String>(producerFactory()); }
然后,要使用模板,可以调用其方法之一发送消息。
当你使用包含 Message<?>
参数的方法时,主题、分区和键信息在消息头中提供,有如下子项:
KafkaHeaders.TOPIC KafkaHeaders.PARTITION_ID KafkaHeaders.MESSAGE_KEY KafkaHeaders.TIMESTAMP
如访问头部信息中某一项信息:
public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC)); }
可选的功能是,可以使用 ProducerListener
配置 KafkaTemplate
,以获得带有发送结果(成功或失败)的异步回调,而不是等待将来完成。以下列表显示了 ProducerListener
接口的定义:
public interface ProducerListener<K, V> { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); void onError(String topic, Integer partition, K key, V value, Exception exception); boolean isInterestedInSuccess(); }
默认情况下,模板配置有 LoggingProducerListener
,它只记录错误,在发送成功时不执行任何操作。只有当 isInterestedInSuccess
返回true时才调用 onSuccess
。
为了方便起见,如果你只想实现其中一个方法,那么将提供抽象 ProducerListenerAdapter
。对于 isInterestedInSuccess
,它返回false。下面演示了异步结果回调:
public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); }
如果希望阻止式发送线程等待结果,可以调用 future
的 get()
方法。你可能希望在等待之前调用 flush()
,或者为了方便起见,模板有一个带有 autoFlush
参数的构造函数,该构造函数在每次发送时都会导致模板 flush()
。不过,请注意,刷新可能会显著降低性能:
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } }
如上面使用 KafkaTemplate
中所示, ProducerFactory
用于创建生产者。默认情况下,当不使用事务时, DefaultKafkaProducerFactory
会创建一个供所有客户机使用的单例生产者,如 KafkaProducer
javadocs中所建议的那样。但是,如果对模板调用flush(),这可能会导致使用同一个生产者的其他线程延迟。从2.3版开始, DefaultKafkaProducerFactory
有一个新属性 producerPerThread
。当设置为 true
时,工厂将为每个线程创建(和缓存)一个单独的生产者,以避免此问题。
当 producerPerThread
为true时,当不再需要生产者时,用户代码必须在工厂上调用 closeThreadBoundProducer()
。这将实际关闭生产者并将其从 ThreadLocal
中移除。调用reset()或destroy()不会清理这些生产者。
创建 DefaultKafkaProducerFactory
时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给 DefaultKafkaProducerFactory
构造函数(在这种情况下,所有生产者共享相同的实例)。或者,可以提供 Supplier<Serializer> s
(从版本2.3开始),用于为每个生产者获取单独的 Serializer
实例:
@Bean public ProducerFactory<Integer, CustomValue> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); } @Bean public KafkaTemplate<Integer, CustomValue> kafkaTemplate() { return new KafkaTemplate<Integer, CustomValue>(producerFactory()); }
版本 2.1.3
引入了 KafkaTemplate
的一个子类来提供请求/应答语义。这个类名为 ReplyingKafkaTemplate
,并且有一个方法(除了超类中的那些方法之外)。下面的列表显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record); RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout);
结果是一个 ListenableFuture
,它被结果异步填充(或者超时时出现异常)。结果还有一个 sendFuture
属性,这是调用 KafkaTemplate.send()
的结果。你可以使用此Future确定发送操作的结果。这里就不展开了。
可以通过配置 MessageListenerContainer
并提供消息监听器或使用 @KafkaListener
注解来接收消息。
使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例 public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例 public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。 public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。 public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。 public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。 public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。 public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。 public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。
提供了两个 MessageListenerContainer
的实现:
KafkaMessageListenerContainer
从单个线程上的所有主题或分区接收所有消息(即一个分区只能分配到一个消费者,一个消费者可以被分配多个分区)。 ConcurrentMessageListenerContainer
委托给一个或多个 KafkaMessageListenerContainer
实例,以提供多线程使用,从多线程上去处理主题或分区的所有消息。
从Spring Kafka2.2.7版开始,你可以将 RecordInterceptor
添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始, CompositeRecordInterceptor
可用于调用多个拦截器。
默认情况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你可以设置侦听器容器的 interceptBeforeTx
属性,以便在事务启动之前调用侦听器。没有为批处理侦听器提供侦听器,因为Kafka已经提供了 ConsumerInterceptor
。
有如下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset... topicPartitions)
每个都获取一个 ConsumerFactory
以及有关主题和分区的信息,以及 ContainerProperties
对象中的其他配置。 ConcurrentMessageListenerContainer
(稍后介绍)使用第二个构造函数跨使用者实例分发 TopicPartitionOffset
。 ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个 TopicPartitionOffset
参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内的当前最后偏移量。提供了 TopicPartitionOffset
的构造函数,该构造函数接受一个附加的布尔参数。如果是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于 group.id
属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。
要将 MessageListener
分配给容器,可以在创建容器时使用 ContainerProps.setMessageListener
方法。下面的示例演示了如何执行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
注意当创建一个 Defaultkafkafkaconsumerfactory
时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例可以传递给key/value的 DefaultKafkaConsumerFactory
构造函数,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>s
(从版本2.3开始),用于为每个使用者获取单独的反序列化程序实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
有关可以设置的各种属性的更多信息,请参阅Javadoc 中 ContainerProperties
。
从版本Spring Kafka 2.1.1开始,一个名为 logContainerConfig
的新属性就可用了。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。
例如,要将日志级别更改为INFO,可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)
。
从版本Spring Kafka 2.2开始,添加了名为 missingtopicsfailal
的新容器属性(默认值:true)。如果代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。如果容器配置为侦听主题模式(regex),则不适用。以前,容器线程在 consumer.poll()
方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。要恢复以前的行为,可以将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动创建不存在的topic。
单个构造函数类似于第一个 KafkaListenerContainer
构造函数。下面的列表显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
它还有一个并发属性。例如, container.setConcurrency(3)
即表示创建三个 KafkaMessageListenerContainer
实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。
concurrency=15
,那么你只看到五个活动的消费者,每个消费者从每个主题中分配一个分区,其他十个消费者处于空闲状态。这是因为默认的Kafka
PartitionAssignor
是
RangeAssignor
(参见其Javadoc)。对于这种情况,你可能需要考虑改用
RoundRobinAssignor
,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。若要更改
PartitionAssignor
,你可以在提供给
DefaultKafkaConsumerFactory
的属性中设置
partition.assignment.strategy
消费者配置参数(
ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。
使用Spring Boot时,可以按如下方式分配设置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=/ org.apache.kafka.clients.consumer.RoundRobinAssignor
对于第二个构造函数, ConcurrentMessageListenerContainer
将 TopicPartition
实例分布在委托 KafkaMessageListenerContainer
实例上。
例如,如果提供了六个 TopicPartition
实例,并发性为3;每个容器得到两个分区。对于五个 TopicPartition
实例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于 TopicPartitions
的数量,则会向下调整并发性,以便每个容器获得一个分区。调整分区的方式可以使用命令行工具 kafka-topics.sh
查询和调整主题上的分区数。还可以添加一个 NewTopic
Bean,如果NewTopic设定的数目大于当前数目,spring boot的自动配置的 KafkaAdmin
将向上调整分区。
client.id属性(如果已设置)将附加 -n
,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供唯一名称所必需的。
从版本Spring Kafka 1.3开始, MessageListenerContainer
提供了对底层 KafkaConsumer
的度量的访问。对于 ConcurrentMessageListenerContainer
, metrics()
方法返回所有目标 KafkaMessageListenerContainer
实例的度量(metrics)。根据为底层 KafkaConsumer
提供的 client-id
度量被分组到 Map<MetricName, ?extends Metric>
。
从2.3版开始, ContainerProperties
提供了一个 idleBetweenPolls
选项,允许侦听器容器中的主循环在 KafkaConsumer.poll()
调用之间睡眠。从提供的选项中选择实际睡眠间隔作为最小值,并且选择 max.poll.interval.ms
消费者配置和当前记录批处理时间之间的差异。
提供了几个提交偏移量的选项。如果 enable.auto.commit
使用者属性为 true
,则Kafka将根据其配置自动提交偏移量。如果为 false
,则容器支持多个 AckMode
设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将 enable.auto.commit
设置为 false
,除非在配置中显式设置。以前,如果未设置属性,则使用Kafka默认值(true)。消费者 poll()
方法返回一个或多个 ConsumerRecords
。为每个记录调用 MessageListener
。以下列表描述了容器对每个 AckMode
采取的操作:
poll()
返回的所有记录后提交偏移量。 poll()
返回的所有记录后提交偏移量,只要超过上次提交后的 ackTime
poll()
返回的所有记录后提交偏移量,只要上次提交后收到 ackCount
记录。 TIME
和 COUNT
,但如果两个条件都为true,则执行提交。 acknowledge()
和 Acknowledgment
。之后,应用与BATCH相同的语义。 Acknowledgement.acknowledge()
方法时立即提交偏移量。 MANUAL和MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListener
或 BatchAcknowledgingMessageListener
。请参见消息侦听器。
根据 syncCommits
容器属性,使用消费者上的 commitSync()
或 commitAsync()
方法。默认情况下, syncCommits
为true;另请参阅 setSyncCommitTimeout
。请参阅 setCommitCallback
以获取异步提交的结果;默认回调是 LoggingCommitCallback
,它记录错误(以及调试级别的成功)。
因为侦听器容器有自己的提交偏移的机制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,否则它将无条件地将其设置为false。
Acknowledgment
有以下方法:
public interface Acknowledgment { void acknowledge(); }
此方法使侦听器可以控制何时提交偏移。
从版本2.3开始,确认接口有两个附加方法 nack(long sleep)
和 nack(int index, long sleep)
。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发 IllegalStateException
。
nack()只能在调用侦听器的消费者线程上调用。
使用批处理侦听器时,可以在发生故障的批内指定索引。调用 nack()
时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次 poll()
时重新传递这些偏移量。这是对 SeekToCurrentBatchErrorHandler
的改进, SeekToCurrentBatchErrorHandler
只能查找整个批次以便重新交付。
注意:通过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于 consumer max.poll.interval.ms
属性非常重要。
侦听器容器实现了 SmartLifecycle
(通过 SmartLifecycle
在Spring加载和初始化所有bean后,接着执行一些任务或者启动需要的异步服务),默认情况下 autoStartup
为 true
。容器在后期启动( Integer.MAX-VALUE - 100
)。实现 SmartLifecycle
以处理来自侦听器的数据的其他组件应该在较早的阶段启动。 -100
为以后的阶段留出了空间,使组件能够在容器之后自动启动。比如我们通过 @Bean
将监听器容器交给Spring管理,这个时候通过 SmartLifecycle
自动执行了初始化的任务,但是当我们手动通过new监听器容器实例,则后初始化则不会执行,比如 KafkaMessageListenerContainer
实例需要手动执行 start()
。
autoStartup
在手动执行start中设置true与false没有作用,可以参见 @KafkaListener
声明周期管理这一小节。
@KafkaListener
注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个 MessagingMessageListenerAdapter
中,该适配器配置有各种功能,如转换器,用于转换数据(如有必要)以匹配方法参数。通过使用属性占位符( ${…}
),或者可以使用SpEL( #{…}
)配置注释上的大多数属性。有关更多信息,请参阅Javadoc。
@KafkaListener
:
id
:listener唯一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。 containerFactory
:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是 ConcurrentKafkaListenerContainerFactory
,配置Bean名称 topics
:需要监听的Topic,可监听多个,可以是表达式或者占位符关键字或者直接是主题名称,如多个主题监听: {"topic1" , "topic2"}
topicPattern
: 此侦听器的主题模式。条目可以是“主题模式”、“属性占位符键”或“表达式”。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。 topicPartitions
:用于使用手动主题/分区分配时 errorHandler
:监听异常处理器,配置Bean名称,默认为空 groupId
:消费组ID idIsGroup
:id是否为GroupId clientIdPrefix
:消费者Id前缀 beanRef
:真实监听容器的Bean名称,需要在 Bean名称前加 "__" @KafkaListener
注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
此机制生效需要 @Configuration
类之一上的 @EnableKafka
注解和用于配置基础 ConcurrentMessageListenerContainer
的侦听器容器工厂。默认情况下,需要名为 kafkaListenerContainerFactory
的bean。以下示例演示如何使用 ConcurrentMessageListenerContain
:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
注意,要设置容器属性,必须在工厂上使用 getContainerProperties()
方法。它用作注入容器的实际属性的模板。
从版本2.1.1开始,现在可以为注解创建的消费者设置 client.id
属性。 clientdprefix
的后缀是 -n
,其中n是一个整数,表示使用并发时的容器号。
从2.2版开始,现在可以通过使用批注本身的属性来重写容器工厂的并发性和自动启动属性。属性可以是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操作:
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... }
你还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操作:
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
你可以在 partitions
或 partitionOffsets
属性中指定每个分区,但不能同时指定两者。
使用手动 AckMode
时,还可以向侦听器提供 Acknowledgment
。下面的示例还演示了如何使用不同的容器工厂:
@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
最后,可以从消息头获得有关消息的元数据。你可以使用以下头名称来检索消息头内容:
KafkaHeaders.OFFSET KafkaHeaders.RECEIVED_MESSAGE_KEY KafkaHeaders.RECEIVED_TOPIC KafkaHeaders.RECEIVED_PARTITION_ID KafkaHeaders.RECEIVED_TIMESTAMP KafkaHeaders.TIMESTAMP_TYPE
示例:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... }
从版本1.1开始,可以配置 @KafkaListener
方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,可以设置 batchListener
属性。下面的示例演示了如何执行此操作:
@Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; }
以下示例显示如何接收有效载荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... }
主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... }
或者,您可以接收消息列表 Message<?>
对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和 /
或 Consumer<?, ?>
参数)。下面的示例演示如何执行此操作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) { ... } @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) { ... } @KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) { ... }
在这种情况下,不会对有效载荷执行转换。如果 BatchMessagingMessageConverter
配置了 RecordMessageConverter
,则还可以向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。
你还可以收到一个 ConsumerRecord<?, ?>
对象,但它必须是唯一的参数(当使用手动提交或 Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) { ... } @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) { ... }
从版本2.2开始,侦听器可以接收 poll()
方法返回的完整的 ConsumerRecords<?, ?>
对象,允许侦听器访问其他方法,例如 partitions()
(返回列表中的 TopicPartition
实例)和 records
(TopicPartition)(获取选择性记录)。同样,这必须是唯一的参数(当使用手动提交或 Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") public void pollResults(ConsumerRecords<?, ?> records) { ... }
从2.2版开始,现在更容易添加验证程序来验证 @KafkaListener
`@Payload 参数。以前,你必须配置一个自定义的
DefaultMessageHandlerMethodFactory`并将其添加到注册器中。现在,你可以将验证器添加到注册器本身。以下代码说明了如何执行此操作:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); } }
当你在Spring Boot使用 validation starter
,会自动配置 LocalValidatorFactoryBean
,如下例所示:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
以下示例演示如何验证:
public static class ValidatedClass { @Max(10) private int bar; public int getBar() { return this.bar; } public void setBar(int bar) { this.bar = bar; } }
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
ContainerProperties
有一个名为 consumerRebalanceListener
的属性,该属性接受Kafka客户端的 consumerRebalanceListene
r接口的实现。如果未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录重新平衡事件。该框架还添加了一个子接口 ConsumerRawareRebalanceListener
。以下列表显示了 ConsumerRawareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); }
从2.0版开始,如果还使用 @SendTo
注解注释 @KafkaListener
,并且方法调用返回结果,则结果将转发到 @SendTo
指定的主题。如:
@KafkaListener(topics = "annotated21") @SendTo("!{request.value()}") // runtime SpEL public String replyingListener(String in) { ... } @KafkaListener(topics = "${some.property:annotated22}") @SendTo("#{myBean.replyTopic}") // config time SpEL public Collection<String> replyingBatchListener(List<String> in) { ... } @KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") @SendTo("annotated23reply") // static reply topic definition public String replyingListenerWithErrorHandler(String in) { ... } ... @KafkaListener(topics = "annotated25") @SendTo("annotated25reply1") public class MultiListenerSendTo { @KafkaHandler public String foo(String in) { ... } @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
为 @KafkaListener
注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用 KafkaListenerEndpointRegistry
类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何 autoStartup
设置为 true
的容器。所有容器工厂创建的所有容器必须处于同一 phase
。有关详细信息,请参 阅侦听器容器自动启动 。你可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。可以在批注上设置 autoStartup
,这将覆盖容器工厂中配置的默认设置( setAutoStartup(true)
)。你可以从应用程序上下文中获取对bean的引用,例如自动连接,以管理其注册的容器。以下示例说明了如何执行此操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false") public void listen(...) { ... }
@Autowired private KafkaListenerEndpointRegistry registry; ... this.registry.getListenerContainer("myContainer").start(); ...
注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的 getListenerContainers()
方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法 getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。
Spring for Apache Kafka
提供了一个工厂bean来创建 StreamsBuilder
对象并管理其流的生命周期。只要kafka流在classpath上并且kafka流通过 @EnableKafkaStreams
注解开启,Spring Boot就会自动配置所需的 KafkaStreamsConfiguration
bean。
启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可以使用 spring.kafka.streams.application-id
配置,如果未设置,则默认为 spring.application.name
。后者可以全局设置,也可以专门为流覆写。
使用专用属性可以使用其他几个属性;可以使用 spring.Kafka.streams.properties
命名空间设置其他任意Kafka属性。有关详细信息, Additional Kafka Properties 。
默认情况下,由它创建的 StreamBuilder
对象管理的流将自动启动。可以使用 spring.kafka.streams.auto-startup
属性自定义此行为。
要使用工厂bean,只需将 StreamsBuilder
连接到 @bean
,如下例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public static class KafkaStreamsExampleConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } }
默认情况下,由它创建的 StreamBuilder
对象管理的流将自动启动。可以使用 spring.kafka.streams.auto-startup
属性自定义此行为。
自动配置支持的属性显示在 公用应用程序属性 中。注意,在大多数情况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅 Apache Kafka
文档。
前面提到的几个属性应用于所有组件(生产者、消费者、管理员和流),但如果希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为 HIGH
、 MEDIUM
或 LOW
的属性。Spring Boot自动配置支持所有高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。
只有Kafka支持的属性的一个子集可以通过 KafkaProperties
类直接使用,如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth spring.kafka.streams.properties.prop.five=fifth
上面的参数设置示例将公共 prop.one
Kafka属性设置为 first
(适用于生产者、消费者和管理员), prop.two
admin属性设置为 second
, prop.three
consumer属性设置为 third
, prop.four
producer属性设置为 fourth
, prop.five
streams属性设置为 fifth
。
你还可以配置Spring Kafka JsonDeserializer
,如下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
类似地,可以禁用 JsonSerializer
在头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。
Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的 @EmbeddedKafka
注解测试类。有关更多信息,请参阅 Spring For Apache Kafka 参考手册。
要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,需要将嵌入式代理地址(由 EmbeddedKafkaBroker
填充)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:
spring.kafka.bootstrap-servers
: static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
@EmbeddedKafka
注解上配置属性名: @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Spring Integration也有Kafka的适配器,因此我们可以很方便的采用Spring Integration去实现发布订阅,当然你也可以不使用Spring Integration。
Spring Integration是什么,具体有什么作用,可以参考另一篇文章《Spring Integration最详解》。
这里对所有配置做个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,比如SSL认证可以全局配置,但是也可以在每个子模块,如消费者、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其他就不展开了,用到的时候再去查找和补充。
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性 spring.kafka.bootstrap-servers # 在发出请求时传递给服务器的ID。用于服务器端日志记录 spring.kafka.client-id,默认无 # 用于配置客户端的其他属性,生产者和消费者共有的属性 spring.kafka.properties.* # 消息发送的默认主题,默认无 spring.kafka.template.default-topic
Spring Boot中,Kafka 生产者
相关配置(所有配置前缀为 spring.kafka.producer.
):
# 生产者要求Leader在考虑请求完成之前收到的确认数 spring.kafka.producer.acks # 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理) spring.kafka.producer.batch-size spring.kafka.producer.bootstrap-servers # 生产者可用于缓冲等待发送到服务器的记录的总内存大小。 spring.kafka.producer.buffer-memory # 在发出请求时传递给服务器的ID。用于服务器端日志记录。 spring.kafka.producer.client-id # 生产者生成的所有数据的压缩类型 spring.kafka.producer.compression-type # 键的序列化程序类 spring.kafka.producer.key-serializer spring.kafka.producer.properties.* # 大于零时,启用失败发送的重试次数 spring.kafka.producer.retries spring.kafka.producer.ssl.key-password spring.kafka.producer.ssl.key-store-location spring.kafka.producer.ssl.key-store-password spring.kafka.producer.ssl.key-store-type spring.kafka.producer.ssl.protocol spring.kafka.producer.ssl.trust-store-location spring.kafka.producer.ssl.trust-store-password spring.kafka.producer.ssl.trust-store-type # 非空时,启用对生产者的事务支持 spring.kafka.producer.transaction-id-prefix spring.kafka.producer.value-serializer
Spring Boot中,Kafka 消费者相关配置(所有配置前缀为 spring.kafka.consumer.
):
# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms) spring.kafka.consumer.auto-commit-interval # 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置 # earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 spring.kafka.consumer.auto-offset-reset # 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性 spring.kafka.consumer.bootstrap-servers # 在发出请求时传递给服务器的ID,用于服务器端日志记录 spring.kafka.consumer.client-id # 消费者的偏移量是否在后台定期提交 spring.kafka.consumer.enable-auto-commit # 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量 spring.kafka.consumer.fetch-max-wait # 服务器应为获取请求返回的最小数据量。 spring.kafka.consumer.fetch-min-size # 标识此消费者所属的默认消费者组的唯一字符串 spring.kafka.consumer.group-id # 消费者协调员的预期心跳间隔时间。 spring.kafka.consumer.heartbeat-interval # 用于读取以事务方式写入的消息的隔离级别。 spring.kafka.consumer.isolation-level # 密钥的反序列化程序类 spring.kafka.consumer.key-deserializer # 在对poll()的单个调用中返回的最大记录数。 spring.kafka.consumer.max-poll-records # 用于配置客户端的其他特定于消费者的属性。 spring.kafka.consumer.properties.* # 密钥存储文件中私钥的密码。 spring.kafka.consumer.ssl.key-password # 密钥存储文件的位置。 spring.kafka.consumer.ssl.key-store-location # 密钥存储文件的存储密码。 spring.kafka.consumer.ssl.key-store-password # 密钥存储的类型,如JKS spring.kafka.consumer.ssl.key-store-type # 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1 spring.kafka.consumer.ssl.protocol # 信任存储文件的位置。 spring.kafka.consumer.ssl.trust-store-location # 信任存储文件的存储密码。 spring.kafka.consumer.ssl.trust-store-password # 信任存储区的类型。 spring.kafka.consumer.ssl.trust-store-type # 值的反序列化程序类。 spring.kafka.consumer.value-deserializer
Spring Boot中,Kafka Listener相关配置(所有配置前缀为 spring.kafka.listener.
):
# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数 spring.kafka.listener.ack-count= spring.kafka.listener.ack-mode spring.kafka.listener.ack-time spring.kafka.listener.client-id spring.kafka.listener.concurrency spring.kafka.listener.idle-event-interval spring.kafka.listener.log-container-config # 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动, # 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic spring.kafka.listener.missing-topics-fatal=true # 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位 spring.kafka.listener.monitor-interval spring.kafka.listener.no-poll-threshold spring.kafka.listener.poll-timeout spring.kafka.listener.type
spring.kafka.admin.client-id # 如果启动时代理不可用,是否快速失败 spring.kafka.admin.fail-fast=false spring.kafka.admin.properties.* spring.kafka.admin.ssl.key-password spring.kafka.admin.ssl.key-store-location spring.kafka.admin.ssl.key-store-password spring.kafka.admin.ssl.key-store-type spring.kafka.admin.ssl.protocol spring.kafka.admin.ssl.trust-store-location spring.kafka.admin.ssl.trust-store-password spring.kafka.admin.ssl.trust-store-type
spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=false spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule spring.kafka.jaas.options.*
spring.kafka.ssl.key-password spring.kafka.ssl.key-store-location spring.kafka.ssl.key-store-password spring.kafka.ssl.key-store-type spring.kafka.ssl.protocol spring.kafka.ssl.trust-store-location spring.kafka.ssl.trust-store-password spring.kafka.ssl.trust-store-type
spring.kafka.streams.application-id spring.kafka.streams.auto-startup spring.kafka.streams.bootstrap-servers spring.kafka.streams.cache-max-size-buffering spring.kafka.streams.client-id spring.kafka.streams.properties.* spring.kafka.streams.replication-factor spring.kafka.streams.ssl.key-password spring.kafka.streams.ssl.key-store-location spring.kafka.streams.ssl.key-store-password spring.kafka.streams.ssl.key-store-type spring.kafka.streams.ssl.protocol spring.kafka.streams.ssl.trust-store-location spring.kafka.streams.ssl.trust-store-password spring.kafka.streams.ssl.trust-store-type spring.kafka.streams.state-dir
同一消费组下所有消费者协同消费订阅主题的所有分区
消费者offset管理机制
分区和消费者个数如何设置
具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。
实现下面的示例需要的环境:
Spring-kafka-test
embedded Kafka Server
Spring Boot开发环境(2.2.1)
我们知道Kafka是 Scala+Zookeeper
构建的,可以从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也非常简单。
添加依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点, Run as JUnit Test
。:
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
@EmbeddedKafka
中可以设置相关参数:
注意:EmbeddedKafka这样默认是没有创建主题的。会提示 Topic(s) [test] is/are not present and missingTopicsFatal is true
错误。@EmbeddedKafka默认情况是创建一个代理,该代理具有一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。
下面实现一个简单发布订阅功能,通过前端WEB调用一个API,然后在该API控制器中得到请求后生产者开始发送消息,消费者后台监听消息,如果收到消费者消息,则打印出来。
添加Kafka依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置Kafka,这里消费者和生产者在同一应用中,我们只需要配置Kafka Brokers的服务地址+端口:
server: port: 9000 spring: kafka: bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094 listener: # 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题 # 且默认创建的主题是单副本单分区的 missing-topics-fatal: false consumer: # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息) auto-offset-reset: earliest
@Service public class Producer { private static final Logger LOGGER = LogManager.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { LOGGER.info(String.format("===Producing message: {}", message)); this.kafkaTemplate.send(TOPIC, message); } }
@Service public class Consumer { private static final Logger LOGGER = LogManager.getLogger(Consumer.class); @KafkaListener(topics = "test", groupId = "group_test") public void consume(String message) throws IOException { LOGGER.info(String.format("#### -> Consumed message -> %s", message)); } }
@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @GetMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }
添加Spring Boot Application:
@SpringBootApplication public class TestKafkaApplication { public static void main(String[] args) { SpringApplication.run(TestKafkaApplication.class, args); } }
启动Kafka Brokers后,需要手动创建主题(如果想自动创建,则需要借助KafkaAdmin,或者是Kafka Broker设置了 allow.auto.create.topics=true
且应用设置了 listener.missing-topics-fatal=false
):
# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇) # 创建test主题 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
打开浏览器测试:
http://localhost:9000/kafka/publish?message=hello
则应用控制台会打印 hello
。整个发布订阅的实现只使用了跟Kafka相关的 @KafkaListener
注解接收消息和 KafkaTemplate
模板发送消息,很是简单。
上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法在程序中操作这些配置的,因此这一小节就是利用我们之前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。
实现内容有:
@KafkaListener
源码不会直接贴,只给出主体部分。
配置文件:
@Configuration @ConfigurationProperties(prefix = "m2kc") @PropertySource("classpath:kafka.properties") @Validated public class M2KCKafkaConfig { @Value("${m2kc.kafka.bootstrap.servers}") private String kafkaBootStrapServers; @Value("${m2kc.kafka.key.serializer.class}") private String kafkaKeySerializerClass; ...... ...... }
生产者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaProducer { private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class); private String mTopic = "test"; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaTemplate<String, String> mKafkaTemplate; @Autowired public KafkaProducer(M2KCKafkaConfig kafkaConfig) { mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; mKafkaTemplate = getKafkaTemplate(); } public KafkaTemplate<String, String> getKafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaProducerFactory<String, String>(properties); } public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); } }
消费者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaConsumer implements InitializingBean { private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class); private String mTopic; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; @Autowired public KafkaConsumer(M2KCKafkaConfig kafkaConfig) { LOGGER.info("===KafkaConsumer construct"); mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; } @PostConstruct public void start(){ LOGGER.info("===KafkaConsumer start"); } @Override public void afterPropertiesSet() throws Exception { LOGGER.info("===afterPropertiesSet is called"); createContainer(); } private void createContainer() { mKafkaMessageListenerContainer = createKafkaMessageListenerContainer(); mKafkaMessageListenerContainer.setAutoStartup(false);; mKafkaMessageListenerContainer.start(); LOGGER.info("===", mKafkaMessageListenerContainer); } private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() { KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(), createContainerProperties()); LOGGER.info("===createKafkaMessageListenerContainer"); return container; } private ContainerProperties createContainerProperties() { ContainerProperties containerProps = new ContainerProperties(mTopic); containerProps.setMessageListener(createMessageListener()); return containerProps; } private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaConsumerFactory<String, String>(properties); } private MessageListener<String, String> createMessageListener() { return new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> data) { // TODO Auto-generated method stub LOGGER.info("===Consuming msg: {}", data.value()); } }; } }
继承 InitializingBean
只是为了初始化,也可以去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用 @Scope
,所以需要手动获取实例,通过context去调用getBean()。另外配置文件没有写全,这里需要注意。
Spring Integration也有对Kafka支持的适配器,采用Spring Integration,我们也能够快速的实现发布订阅功能,且实现群组多消费者批量消费功能:
我们可以先看看整体的Kafka消息传递通道:
具体的Demo可以参考 Github中的一个sample :
本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其他包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握总体,结合实际,差不多基本内容都有所涉及了。
Spring Expression Language(简称SpEL),在Spring中,不同于属性占位符 ${...}
,而 SpEL
表达式则要放到 #{...}
中(除代码块中用Expression外)。如配置文件中有topics参数 spring.kafka.topics
,则可以将配置文件中参数传入注解 @KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
。
SpEL
表达式常用示例:
// 字面量 #{3.1415926} // 浮点数 #{9.87E4} // 科学计数法表示98700 #{'Hello'} // String 类型 #{false} // Boolean 类型 // 引用Bean、属性和方法 #{sgtPeppers} // 使用这个bean #{sgtPeppers.artist} // 引用bean中的属性 #{sgtPeppers.selectArtist()} // 引用bean中的方法 #{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操作 #{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase() // 访问类作用域的方法和常量的话,使用T()这个关键的运算符 #{T(java.lang.Math)} #{T(java.lang.Math).PI} // 引用PI的值 #{T(java.lang.Math).random()} // 获取0-1的随机数 #{T(System).currentTimeMillis()} // 获取时间到当前的毫秒数 // 替代属性占位符获取配置文件属性值 @Value("#{表达式}" private String variable;