前面我们在k8s上部署了一个kafka集群,同时也部署了kafka-manager对kafka集群进行监控管理。 本篇将先基于spring boot框架编写一个简单的kafka的生产者和消费者的程序,并在此基础上理解kafka的相关组件。
从 https://start.spring.io/ 创建一个名称为kafka-demo的spring boot工程,这里基于的是Spring Boot 2.1.6,依赖中选择添加Spring for Apache Kafka。 项目创建完毕后,首先需要在spring boot的配置文件application.yml通过配置spring.kafka.bootstrap-servers指定kafka代理地址。
spring:
kafka:
bootstrap-servers: kafka.kafka.svc.cluster.local:9092
consumer.group-id: myGroup
接下来在Spring boot配置文件application.yml通过配置spring.kafka.bootstrap-servers指定访问kafa在k8s中Service的地址:
上面的配置指定了bootstrap-servers为kafka.kafka.svc.cluster.local:9092,这就要求如果是本地调试的话需要本地开发网络和k8s集群内部网络互通,否则只能将Spring Boot程序构建成docker镜像并部署到k8s集群内才可进行测试。 上面的配置同时还配置了Consumer的group-id为myGroup。
Spring Boot已经对Kafka的支持做了开箱即用,从生成的项目中可以可以看到引入的依赖如下:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
使用kafka-manager在Kafka上创建一个名称为testTopic的Topic,分区数(Partitions)为3,副本数(Replication Factor)为3。
简单起见我们直接使用创建项目时生成的KafkademoApplicationTests作为Producer,加入Producer代码:
package com.example.kafkademo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkademoApplicationTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void producer() {
kafkaTemplate.send("testTopic", "msgValue");
}
}
运行上面的单元测试,如果前面配置没有问题的话,就可以向testTopic中发送了一条消息。 KafkaTemplate是Spring Boot自动配置的,直接使用即可。
简单起见,在KafkademoApplication中添加Consumer代码:
package com.example.kafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class KafkademoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkademoApplication.class, args);
}
@KafkaListener(topics = "testTopic")
public void processMessage(String content) {
System.out.println("Recieved msg :" + content);
}
}
消费者程序KafkademoApplication启动后,会立即消费之前生产者投递的消息,并打印Recieved msg : msgValue。
前面我们在Kubernetes上部署了一个3节点的Kafka集群和监控和管理工具kafka-manager,并使用Spring Boot编写了简单的生产者和消费者程序。 下面一起来理解一下目前出现的一些与kafka相关的组件:
Broker: Broker是kafa的服务端进程。 一个kafka的服务端由多个Broker组成。 前面使用helm部署的kafka集群,Broker以Statefulset的形式运行:
kubectl get pod -n kafka -l app=kafka
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
即一个Kafka集群由多个Broker组成,多个Broker会被部署到不同的机器上(这里通过k8s StatefulSet实现)以实现高可用。 Broker作为Kafka的服务端处理客户端发送的请求,同时对消息进行持久化。
Topic是发布订阅的主题,可以针对具体业务创建一个或多个Topic。 Topic相对于Kafka集群可以理解为一个”逻辑整体“,Topic会被分区(Partition),即数据会被分成多份,不同的分区可分布在不同的Broker上,每个分区(Partition)还可被创建副本(Replication)。
例如前面我们使用kafka-managerKafka上创建一个名称为testTopic的Topic,分区数(Partitions)为3,副本数(Replication Factor)为3。
Producer是向Topic发送消息的客户端程序。 前面例子中我们使用KafkademoApplicationTests作为生成者,向testTopic发送了值为msgValue的消息: kafkaTemplate.send("testTopic", "msgValue");。
Producer向Topic发送的消息只会被发送到Topic的一个分区中,而每个分区即成为一组有序的消息,所以分区里需要有一个编号,从0开始,被成为分区位移。
Consumer是订阅Topic,并消费消息的客户端程序。 前面例子中的Producer程序如下:
@KafkaListener(topics = "testTopic")
public void processMessage(String content) {
System.out.println("Recieved msg :" + content);
}
在Kafka中多个Consumer可以组成一个组来消费多个主题。 这些主题中的每个分区只会被Consumer Group内的一个Consumer实例消费。 但是一个Consumer可以从多个分区中消费消息。
前面例子中我们配置了consumer.group-id为myGroup,所以每启动一个kafka-demo这个spring boot应用的实例都将作为一个Consumer实例被加入到myGroup这个Consumer Group中。 前面我们在启动kafka-demo的时候,Spring Boot的日志中会打印partitions assigned: [testTopic-0, testTopic-1, testTopic-2],可以看出只启动一个Consumer实例的时候,testTopic的3个分区都被分配给了这个Consumer实例。
下面来做个试验:
启动第一个kafka-demo实例,testTopic的分区都被分配给了这个实例,partitions assigned: [testTopic-0, testTopic-1, testTopic-2]。
继续启动第二个kafka-demo实例,此时testTopic的3个分区会被2个Consumer实例重新分配,第二个实例被分配了0和1两个分区,partitions assigned: [testTopic-0, testTopic-1],前面启动的第一个实例被分配了2这个分区partitions assigned: [testTopic-2]
继续启动第三个kafka-demo实例,此时testTopic的3个分区被3个Consumer实例重新分配,每个Consumer实例个分配到一个分区。
这个就是Kafka Consumer的Rebalance即如果消费者组中有新的消费者加入时,之前组内消费者负责的分区会重新平衡,因此假如一个消费者挂掉后,Kafka可以把这个消费者之前负责的分区转移给消费者组内其他限制的消费者。 Consumer的Rebalance即使一个Consumer Group下所有的Consumer针对如何订阅分配一个Topic下的所有分区达成共识的过程,Rebalance在coordinator的协助下完成,在Rebalance过程中Consumer Group下所有的Consumer将停止消费消息,因此如果频繁进行Rebalance将会对Consumer消费消息的吞吐量影响较大。 需要注意在以下情况下会发生Consumer Rebalance: Consumer Group中的Consumer数量发生变化; Consumer Group订阅的Topic数量或某个Topic的分区数发生变化。
作者:青蛙小白
原文:https://blog.frognew.com/2019/07/spring-boot-kafka.html