转载

关于使用Spring Boot的Kafka教程 - DZone大数据

关于Kafka背后的体系结构及其pub-sub模型的教程,以及我们如何使用流行的Java框架Spring Boot。

Apache Kafka是一个分布式流媒体平台,具有发布和订阅记录流,以容错方式存储记录以及处理该记录流等功能。

它用于构建实时流数据管道,可以执行功能,例如将记录流从一个应用程序可靠地传递到另一个应用程序,以及处理记录并将其传输到目标应用程序。

TOPIC

Kafka作为一个或多个服务器中的集群运行,集群存储/检索名为Topics的Feed /类别中的记录。主题中的每条记录都存储有键,值和时间戳。

主题可以包含零个,一个或多个使用者,他们将订阅写入该主题的数据。在Kafka术语中,主题Topic始终是多用户multi-subscriber feed的一部分。

分区

Kafka群集为每个主题使用分区日志。

分区维护数据的插入顺序,一旦将记录发布到主题,它将保留在那里,具体取决于保留期(可配置)。记录始终附加在分区的末尾。它维护一个名为“offsets”的标志,用于唯一标识分区中的每条记录。

偏移量由消费者应用程序控制。使用偏移量,消费者可能会回溯到较旧的偏移量并在需要时重新处理记录。

生产者

记录流(即数据)由生产者发布到主题。他们还可以在将数据发布到主题时分配分区。生产者可以循环方式发送数据,或者可以基于根据记录的优先级将记录发送到某些分区来实现优先级系统。

消费者

消费者消费使用主题的记录。它们是基于消费者组的概念,其中一些消费者被分配在群组中。发布到主题的记录是分配一个消费者组,再传递给其中消费者的一个实例。Kafka内部使用消费者组内消费记录的机制。消费者的每个实例将获得特定分区日志,使得在消费者组内,记录可以由每个消费者并行处理。

Spring Boot Kafka

Spring为Kafka提供了很好的支持,并提供了与原生Kafka Java客户端一起使用的抽象层。

我们可以添加以下依赖项来开始使用Spring Boot和Kafka。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.3.RELEASE</version>
</dependency>

要下载并安装Kafka,请参阅官方指南 https://kafka.apache.org/quickstart 。

下载Kafka后,您可以发出命令来启动ZooKeeper,Kafka使用它来存储元数据。

zookeeper-server-start.bat ./config/zookeeper.properties

接下来,我们需要通过发出以下命令在本地启动Kafka集群。

kafka-server-start.bat ./config/server.properties 

现在,默认情况下,Kafka服务器启动在localhost:9092

编写一个简单的REST控制器并公开一个端点,  /publish如下所示。它用于将消息发布到主题。

<b>package</b> com.rahul.kafkaspringboot.controllers;
<b>import</b> com.rahul.kafkaspringboot.services.Producer;
<b>import</b> org.springframework.beans.factory.annotation.Autowired;
<b>import</b> org.springframework.web.bind.annotation.PostMapping;
<b>import</b> org.springframework.web.bind.annotation.RequestMapping;
<b>import</b> org.springframework.web.bind.annotation.RequestParam;
<b>import</b> org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = <font>"/kafka"</font><font>)
<b>public</b> <b>class</b> KafkaController {
<b>private</b> <b>final</b> Producer producer;
@Autowired
<b>public</b> KafkaController(Producer producer) {
<b>this</b>.producer = producer;
}
@PostMapping(value = </font><font>"/publish"</font><font>)
<b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message){
<b>this</b>.producer.sendMessage(message);
}
}
</font>

然后我们可以编写使用Spring的生产者  KafkaTemplate 将消息发送到一个名为的主题  users,  如下所示。

<b>package</b> com.rahul.kafkaspringboot.services;
<b>import</b> org.slf4j.Logger;
<b>import</b> org.slf4j.LoggerFactory;
<b>import</b> org.springframework.beans.factory.annotation.Autowired;
<b>import</b> org.springframework.kafka.core.KafkaTemplate;
<b>import</b> org.springframework.stereotype.Service;
@Service
<b>public</b> <b>class</b> Producer {
<b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);
<b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>;
@Autowired
<b>private</b> KafkaTemplate<String,String> kafkaTemplate;
<b>public</b> <b>void</b> sendMessage(String message){
logger.info(String.format(</font><font>"$$ -> Producing message --> %s"</font><font>,message));
<b>this</b>.kafkaTemplate.send(TOPIC,message);
}
}
</font>

我们还可以编写如下所示的消费者,消费者使用主题用户的消息并将日志输出到控制台。

<b>package</b> com.rahul.kafkaspringboot.services;
<b>import</b> org.slf4j.Logger;
<b>import</b> org.slf4j.LoggerFactory;
<b>import</b> org.springframework.kafka.annotation.KafkaListener;
<b>import</b> org.springframework.stereotype.Service;
@Service
<b>public</b> <b>class</b> Consumer {
<b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Consumer.<b>class</b>);
@KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>)
<b>public</b> <b>void</b> consume(String message){
logger.info(String.format(</font><font>"$$ -> Consumed Message -> %s"</font><font>,message));
}
}
</font>

现在,我们需要一种方法告诉我们的应用程序在哪里找到Kafka服务器并创建一个主题并发布到它。我们可以使用  application.yaml 如下所示的方法。

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

现在,如果我们运行应用程序并点击如下所示的端点,我们就已经发布了一条消息。

如果我们从控制台检查日志,它应该打印发送到发布端点的消息。

总结

在这篇文章中,我们已经看到了Kafka系统中使用的基本术语。我们还看到使用Spring Boot配置Kafka是多么容易。大多数魔法都是由Spring Boot在幕后完成的。一种简单快捷的方法是在application.yml文件中配置与Kafka相关的详细信息,如果我们更改Kafka集群并且必须将服务器指向新的Kafka集群地址,这是很好的。

原文  https://www.jdon.com/51923
正文到此结束
Loading...