转载

Spring Cloud Stream如何深度支持Apache Kafka?

Spring Cloud Stream提供了一种编程模型,可以立即连接到Apache Kafka。应用程序需要在其类路径中包含Kafka绑定器并添加一个名为的注释@EnableBinding,该注释将Kafka主题绑定到其输入或输出(或两者)。

Spring Cloud Stream提供了三个方便的接口来绑定@EnableBinding:( Source单输出),Sink(单输入)和Processor(单输入和输出)。它可以扩展到具有多个输入和输出的自定义接口。

以下代码片段显示了Spring Cloud Stream的基本编程模型:

@SpringBootApplication
@EnableBinding(Processor.<b>class</b>)
<b>public</b> <b>class</b> UppercaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  <b>public</b> String process(String s) {
     <b>return</b> s.toUpperCase();
  }
}

请注意该方法已注释@StreamListener,Spring Cloud Stream提供该方法以接收来自Kafka主题的消息。同样的方法也用注释SendTo,这是一个方便的注释,用于将消息发送到输出目的地。这是一个Spring Cloud Stream Processor应用程序,它使用来自输入的消息并向输出生成消息。

前面的代码中没有提到Kafka主题Topic。此时可能出现的一个自然问题是,“此应用程序如何与Kafka通信?”答案是:使用 Spring Boot 支持的众多配置选项之一配置入站和出站主题。在这种情况下,我们使用名为YAML的配置文件application.yml,默认情况下会搜索该文件。以下是输入和输出目标的配置:

spring.cloud.stream.bindings:
  input:
    destination: topic1
  output:
    destination: topic2

Spring Cloud Stream将输入input映射到输出topic1和输出topic2。这是一组非常小的配置,但有更多选项可用于进一步自定义应用程序。默认情况下,主题是使用单个分区创建的,但可以由应用程序覆盖。有关更多信息,请参阅这些 文档 。

最重要的是,开发人员可以专注于编写核心业务逻辑,并让Spring Cloud Stream和Spring Boot处理基础架构问题(例如连接到Kafka,配置和调整应用程序等)。

以下示例显示了另一个简单的应用程序(消费者):

@SpringBootApplication
@EnableBinding(Sink.<b>class</b>)
<b>public</b> <b>class</b> LoggingConsumerApplication {

  @StreamListener(Sink.INPUT)
  <b>public</b> <b>void</b> handle(Person person) {
     System.out.println(<font>"Received: "</font><font> + person);
  }

  <b>public</b> <b>static</b> <b>class</b> Person {
     <b>private</b> String name;
     <b>public</b> String getName() {
        <b>return</b> name;
     }
     <b>public</b> <b>void</b> setName(String name) {
        <b>this</b>.name = name;
     }
     <b>public</b> String toString() {
        <b>return</b> <b>this</b>.name;
     }
  }
}
</font>

请注意,@EnableBinding带有一个Sink.class,表示这是一个消费者。与之前的应用程序的一个主要区别在于,注释的方法@StreamListener是将称为Person的POJO作为其参数而不是字符串。将来自Kafka主题的消息转换为此POJO!Spring Cloud Stream提供 自动内容类型转换 。默认情况下,它application/JSON用作内容类型,但也支持其他内容类型。您可以使用该属性提供内容类型spring.cloud.stream.bindings.input.contentType,然后将其设置为适当的内容类型,例如application/Avro。

Spring Cloud Stream根据此配置选择适当的消息转换器。如果应用程序想要使用Kafka提供的本机序列化和反序列化而不是使用Spring Cloud Stream提供的消息转换器,则可以设置以下属性。

对于序列化:

spring.cloud.stream.bindings.output.useNativeEncoding=<b>true</b>

对于反序列化:

spring.cloud.stream.bindings.input.useNativeDecoding=<b>true</b>

自动配置主题

Apache Kafka绑定器提供了一个配置器provisioner ,用于在启动时配置主题。如果在代理上启用了主题创建,则Spring Cloud Stream应用程序可以作为应用程序启动的一部分来创建和配置Kafka主题。

例如,可以向分配器提供分区和其他主题级配置。这些自定义可以在 binder 完成,适用于应用程序中使用的所有主题,或者适用于单个生产者和消费者级别。这在应用程序的开发和测试期间尤其方便。有关如何为多个分区配置主题的各种 示例 。

支持消费组和分区

消费者组和分区等众所周知的属性可通过Spring Cloud Stream进行配置。可以通过属性设置消费者组:

spring.cloud.stream.bindings.input.group=group-name

如前所述,在内部,该小组将被翻译成Kafka的消费者群体。在编写生产者应用程序时,Spring Cloud Stream提供了将数据发送到特定分区的选项。在内部,框架再次将这些职责委托给Kafka。

如果禁用消费者组的自动重新平衡,则可以限制特定应用程序实例使用来自某组分区的消息,这是一个需要覆盖的简单配置属性。有关详细信息,请参阅这些 配置选项

绑定可视化和控制

使用Spring Boot的执行器机制,我们现在可以 控制 Spring Cloud Stream中的 各个绑定 。

在运行时,可以使用 执行器端点 停止,暂停,恢复应用程序,这是Spring Boot的机制,用于在将应用程序推送到生产环境时监视和管理应用程序。此功能使用户可以对应用程序处理Kafka数据的方式进行更多控制。如果应用程序暂停绑定,则处理来自该特定主题的记录将被暂停,直到恢复为止。

Spring Cloud Stream还与 Micrometer 集成,可实现 更丰富的指标 ,排放混乱率并提供其他与监控相关的功能。这些可以与许多其他 监控系统 进一步集成。Kafka活页夹提供了扩展的 指标功能 ,可以提供有关主题的消费者滞后的其他见解。

Spring Boot 通过特殊的 健康端点 提供应用程序运行状况检查。Kafka绑定器提供了一个特殊的健康指示器实现,它考虑了与代理的连接,并检查所有分区是否健康。如果在没有领导者的情况下找到任何分区,或者无法连接代理,则运行状况检查会相应地报告状态。

下面见Spring Cloud Stream对Kafka Streams的深度支持,可点击#Kafka标签进入查看

原文  https://www.jdon.com/51897
正文到此结束
Loading...