在Kafka的场景下毒丸是:针对Kafka主题生产推入的记录,无论尝试多少次,消费者使用都会失败。
因此,毒丸可以有不同的形式:
主题的消费者应配置正确的反序列化器,以能够反序列化生产者的序列化Java对象的字节。只要生产者和消费使用者都使用相同的兼容串行器和解串器,一切就可以正常工作。
当生产者序列化程序和消费者反序列化程序不兼容时,您将陷入毒丸场景。在键和值反序列化器中都可能发生这种不兼容。
在现实生活中的项目中,我在以下情况下遇到了毒药:
好奇如何在您当地的开发环境中造成毒药?但是更重要的是,您自己学习如何通过应用此博客文章中介绍的配置来保护您的消费者应用程序。
您可以在 GitHub上 找到示例项目。
选择适合您的项目的序列化器。如果需要,您甚至可以实现自己的自定义序列化程序。
卡夫卡集群负责:
Kafka集群不负责:
Kafka甚至都不知道数据的结构。Kafka主题中的记录存储为字节数组。Kafka旨在分发字节。这就是Kafka快速且可扩展的原因之一。
消费者负责:
使用者可以开始使用Kafka主题的记录之前,必须在应用程序中配置相应的键和值反序列化器。这是使用Spring Boot和Spring Kafka的键和值序列化程序的Kafka使用者配置示例:
spring: kafka: consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
在用户应用程序中无法处理毒药的影响很大。让我们来看看发生了什么:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1241) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1002) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value <b>for</b> partition stock-quotes-avro-1 at offset 69. If needed, please seek past the record to <b>continue</b> consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
坏结果:
如果您没有及时注意到,您的消费者应用程序可以快速将数GB的日志文件写入磁盘。您还可以将日志自动发送到日志聚合工具,例如ELK堆栈(Elasticsearch,Logstash和Kibana)。
如果您没有适当的监视,则有时可能会“吃掉”所有服务器磁盘空间。在最坏的情况下,您可能还会在同一台计算机上运行其他服务,由于磁盘已满,它们将开始报告运行状况不佳!
如何在有毒药情况下生存?
有两种方法可以使毒丸药幸存下来:
使用Spring Kafka解决问题 ErrorHandlingDeserializer:
当反序列化器无法反序列化消息时,Spring将无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,ErrorHandlingDeserializer已经引入了。该处理器将委托给实际的反序列化(键或值)。如果委托未能反序列化记录内容,则在包含原因和原始字节的标头中ErrorHandlingDeserializer返回一个<b>null</b>值和一个DeserializationException。当您使用记录级时MessageListener,如果中ConsumerRecord包含DeserializationException键或值的标头,ErrorHandler则会使用failed调用容器的ConsumerRecord。记录不会传递给侦听器。
背后的想法ErrorHandlingDeserializer很简单,但是当我第一次配置它时,我花了一些时间来解决问题。
对于我们的键和值反序列化器,请配置ErrorHandlingDeserializerSpring Kafka提供的。
该ErrorHandlingDeserializer会委托给真正的解串器(key和value)。我们必须“告诉” ErrorHandlingDeserializer:
例如:
application.yml中配置ErrorHandlingDeserializer :
spring: kafka: bootstrap-servers: localhost:9092 consumer: # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: # Delegate deserializers spring.deserializer.key.delegate.<b>class</b>: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.<b>class</b>: io.confluent.kafka.serializers.KafkaAvroDeserializer
现在,当键或值委托无法对毒药进行反序列化时,ErrorHandlingDeserializer返回空值并DeserializationException在包含原因和原始字节的标头中添加一个。
如果ConsumerRecord包含DeserializationException键或值的标头,ErrorHandler则使用failed调用容器的标头ConsumerRecord,并且不会将记录传递给侦听器(用注释的类或方法@KafkaListener)。
默认情况下,容器的错误处理程序是SeekToCurrentErrorHandler。通过配置LoggingErrorHandler,我们可以记录毒丸的内容。
这是配置的示例LoggingErrorHandler:
@Configuration @EnableKafka <b>public</b> <b>class</b> KafkaConfiguration { <font><i>/** * Boot will autowire this into the container factory. */</i></font><font> @Bean <b>public</b> LoggingErrorHandler errorHandler() { <b>return</b> <b>new</b> LoggingErrorHandler(); } } </font>
长话短说,ErrorHandlingDeserializer确保处理并记录了毒药。消费者抵消向前移动,以便消费者可以继续消费下一个记录。
万岁-您在毒丸场景中幸免于难!
发布死信主题
在许多情况下,记录反序列化异常足够好,但是以后使检查毒药变得更加困难。从Spring Kafka 2.3开始,您可以配置ErrorHandlingDeserializer与a组合使用DeadLetterPublishingRecoverer,SeekToCurrentErrorHandler以将毒药的值发布到死信主题。
@Configuration @EnableKafka <b>public</b> <b>class</b> KafkaConfiguration { <font><i>/** * Boot will autowire this into the container factory. */</i></font><font> @Bean <b>public</b> SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { <b>return</b> <b>new</b> SeekToCurrentErrorHandler(deadLetterPublishingRecoverer); } </font><font><i>/** * Configure the {@link DeadLetterPublishingRecoverer} to publish poison pill bytes to a dead letter topic: * "stock-quotes-avro.DLT". */</i></font><font> @Bean <b>public</b> DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) { <b>return</b> <b>new</b> DeadLetterPublishingRecoverer(bytesTemplate); } } </font>
这使您可以灵活地使用毒药和检查数据。Spring Kafka将把死信记录发送到一个名为的主题<originalTopicName>.DLT(后缀为的原始主题的名称.DLT),并发送到与原始记录相同的分区。
请注意,您的使用者应用程序也将成为生产者,因此您需要在配置(application.yml)中配置键和值序列化程序:
spring: kafka: producer: # Important! # In <b>case</b> you publish to a 'dead letter topic' you consumer application becomes # a producer as well! So you need to specify the producer properties! key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer