转载

Apache Kafak如何处理消息反序列化失败等毒丸现象?

在Kafka的场景下毒丸是:针对Kafka主题生产推入的记录,无论尝试多少次,消费者使用都会失败。

因此,毒丸可以有不同的形式:

  • 记录已损坏(我自己从未使用Kafka遇到过此问题)
  • 反序列化失败

主题的消费者应配置正确的反序列化器,以能够反序列化生产者的序列化Java对象的字节。只要生产者和消费使用者都使用相同的兼容串行器和解串器,一切就可以正常工作。

当生产者序列化程序和消费者反序列化程序不兼容时,您将陷入毒丸场景。在键和值反序列化器中都可能发生这种不兼容。

在现实生活中的项目中,我在以下情况下遇到了毒药:

  • 生产者更改了键或值序列化器,并继续将数据生产到同一Kafka主题。这给该主题的所有消费者带来了反序列化问题。
  • 使用者配置了错误的key或值反序列化器。
  • 不同的生产者使用不同的键或值序列化程序,开始生产有关Kafka主题的记录。

好奇如何在您当地的开发环境中造成毒药?但是更重要的是,您自己学习如何通过应用此博客文章中介绍的配置来保护您的消费者应用程序。

您可以在 GitHub上 找到示例项目。

选择适合您的项目的序列化器。如果需要,您甚至可以实现自己的自定义序列化程序。

卡夫卡集群负责:

  • 以容错方式将记录存储在主题中
  • 在多个Kafka经纪人之间分配记录
  • 在各个Kafka经纪人之间复制记录(一个或多个副本)

Kafka集群不负责:

  • 类型检查
  • 模式验证
  • 使用SQL资料库时习惯的其他限制

Kafka甚至都不知道数据的结构。Kafka主题中的记录存储为字节数组。Kafka旨在分发字节。这就是Kafka快速且可扩展的原因之一。

消费者负责:

  • 轮询Kafka主题
  • 在微批次中使用该主题的记录
  • 将字节反序列化为键和值

使用者可以开始使用Kafka主题的记录之前,必须在应用程序中配置相应的键和值反序列化器。这是使用Spring Boot和Spring Kafka的键和值序列化程序的Kafka使用者配置示例:

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

在用户应用程序中无法处理毒药的影响很大。让我们来看看发生了什么:

  • 消费者应用程序正在使用Kafka主题。
  • 在某个时间点,应用程序无法对记录进行反序列化(遇到毒丸)。
  • 消费者不能处理毒丸。
  • 因为使用者偏移量没有向前移动,所以阻止了主题分区的使用。
  • 消费者将一次又一次地(非常迅速地)尝试反序列化记录,但是永远不会成功。因此,您的消费者应用程序将陷入一个无穷循环,尝试对失败的记录进行反序列化。
  • 对于每次失败,都会在您的日志文件中写入一行...糟糕!
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1241) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1002) ~[spring-kafka-2.5.0.RELEASE.jar!/:2.5.0.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value <b>for</b> partition stock-quotes-avro-1 at offset 69. If needed, please seek past the record to <b>continue</b> consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

坏结果:

如果您没有及时注意到,您的消费者应用程序可以快速将数GB的日志文件写入磁盘。您还可以将日志自动发送到日志聚合工具,例如ELK堆栈(Elasticsearch,Logstash和Kibana)。

如果您没有适当的监视,则有时可能会“吃掉”所有服务器磁盘空间。在最坏的情况下,您可能还会在同一台计算机上运行其他服务,由于磁盘已满,它们将开始报告运行状况不佳!

如何在有毒药情况下生存?

有两种方法可以使毒丸药幸存下来:

  • 等待直到Kafka主题的保留期过去。如果您的Kafka主题配置了保留策略,则可以等待直到该时间过去,以确保毒药消失了。但是,在相同保留期内,使用毒丸后,您还将丢失针对Kafka主题生成的所有记录。在现实生活中,这是不行的!
  • 更改消费者组。您可以更改使用者组并从日志的开头开始使用(开始使用写入该主题的下一条记录)。在这种情况下,您将不会消耗毒药和主题中最后写入的记录之间的记录。所以这也是不行的!
  • 手动/以编程方式更新偏移量。您可以执行此操作,但这并不简单。您必须知道毒药的偏移量,并开始消耗毒药之后的下一条记录。如果还有其他毒药该怎么办?
  • Spring-Kafka来救援!配置ErrorHandlingDeserializer。这是要走的路。继续阅读以了解如何配置您的使用应用程序。

使用Spring Kafka解决问题 ErrorHandlingDeserializer:

当反序列化器无法反序列化消息时,Spring将无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,ErrorHandlingDeserializer已经引入了。该处理器将委托给实际的反序列化(键或值)。如果委托未能反序列化记录内容,则在包含原因和原始字节的标头中ErrorHandlingDeserializer返回一个<b>null</b>值和一个DeserializationException。当您使用记录级时MessageListener,如果中ConsumerRecord包含DeserializationException键或值的标头,ErrorHandler则会使用failed调用容器的ConsumerRecord。记录不会传递给侦听器。

背后的想法ErrorHandlingDeserializer很简单,但是当我第一次配置它时,我花了一些时间来解决问题。

对于我们的键和值反序列化器,请配置ErrorHandlingDeserializerSpring Kafka提供的。

该ErrorHandlingDeserializer会委托给真正的解串器(key和value)。我们必须“告诉” ErrorHandlingDeserializer:

  • The key deserializer class (spring.deserializer.key.delegate.class)
  • The value deserializer class (spring.deserializer.value.delegate.class)

例如:

  • spring.deserializer.key.delegate.class is the StringDeserializer
  • spring.deserializer.value.delegate.class is the KafkaAvroDeserializer

application.yml中配置ErrorHandlingDeserializer :

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    properties:
      # Delegate deserializers
      spring.deserializer.key.delegate.<b>class</b>: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.<b>class</b>: io.confluent.kafka.serializers.KafkaAvroDeserializer

现在,当键或值委托无法对毒药进行反序列化时,ErrorHandlingDeserializer返回空值并DeserializationException在包含原因和原始字节的标头中添加一个。

如果ConsumerRecord包含DeserializationException键或值的标头,ErrorHandler则使用failed调用容器的标头ConsumerRecord,并且不会将记录传递给侦听器(用注释的类或方法@KafkaListener)。

默认情况下,容器的错误处理程序是SeekToCurrentErrorHandler。通过配置LoggingErrorHandler,我们可以记录毒丸的内容。

这是配置的示例LoggingErrorHandler:

@Configuration
@EnableKafka
<b>public</b> <b>class</b> KafkaConfiguration {

  <font><i>/**
   * Boot will autowire this into the container factory.
   */</i></font><font>
  @Bean
  <b>public</b> LoggingErrorHandler errorHandler() {
    <b>return</b> <b>new</b> LoggingErrorHandler();
  }
}
</font>

长话短说,ErrorHandlingDeserializer确保处理并记录了毒药。消费者抵消向前移动,以便消费者可以继续消费下一个记录。

万岁-您在毒丸场景中幸免于难!

发布死信主题

在许多情况下,记录反序列化异常足够好,但是以后使检查毒药变得更加困难。从Spring Kafka 2.3开始,您可以配置ErrorHandlingDeserializer与a组合使用DeadLetterPublishingRecoverer,SeekToCurrentErrorHandler以将毒药的值发布到死信主题。

@Configuration
@EnableKafka
<b>public</b> <b>class</b> KafkaConfiguration {

  <font><i>/**
   * Boot will autowire this into the container factory.
   */</i></font><font>
  @Bean
  <b>public</b> SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    <b>return</b> <b>new</b> SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
  }

  </font><font><i>/**
   * Configure the {@link DeadLetterPublishingRecoverer} to publish poison pill bytes to a dead letter topic:
   * "stock-quotes-avro.DLT".
   */</i></font><font>
  @Bean
  <b>public</b> DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
    <b>return</b> <b>new</b> DeadLetterPublishingRecoverer(bytesTemplate);
  }
}
</font>

这使您可以灵活地使用毒药和检查数据。Spring Kafka将把死信记录发送到一个名为的主题<originalTopicName>.DLT(后缀为的原始主题的名称.DLT),并发送到与原始记录相同的分区。

请注意,您的使用者应用程序也将成为生产者,因此您需要在配置(application.yml)中配置键和值序列化程序:

spring:
  kafka:
    producer:
      # Important!
      # In <b>case</b> you publish to a 'dead letter topic' you consumer application becomes
      # a producer as well! So you need to specify the producer properties!
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
原文  https://www.jdon.com/54521
正文到此结束
Loading...