Kafka是一个开源分布式的流处理平台,一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka由Scala和Java编写,2012年成为Apache基金会下顶级项目。
官网 戳这里。
下载并解压(注意需要Kafka与Spring Boot版本对应,可以参考 这里 ):
tar -xvf kafka_2.12-2.5.0.tgz cd kafka_2.12-2.5.0
接着启动ZooKeeper与Kafka:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
Kafka需要用到ZooKeeper,需要在启动Kafka之前启动ZooKeeper(ZooKeeper是一个开源的分布式应用程序协调服务,是Hadoop的组件,主要用于解决分布式应用中的一些数据管理问题)。
Kafka默认使用9092端口,部署在服务器上需要注意防火墙以及安全组的处理。
考虑到Spring Boot在2.3.0M1中(截至本文写作日期2020.07.14Spring Boot已更新到2.4.0M1)首次采用Gradle而不是Maven来构建项目,换句话说日后Spring Boot的构建工具将从Maven迁移到Gradle,Spring Boot团队给出的主要原因是可以减少项目构建所花费的时间,详情可以 戳这里 瞧瞧。
另外由于另一个基于JVM的语言Kotlin的日渐崛起,后端开始逐渐有人采用Kotlin(尽管不多,不过语法糖真的香,JetBrains家的语言配合IDE,爽得飞起),因此本示例项目将采用两种方式搭建:
选择的依赖如下(当然您喜欢的话可以在 pom.xml
或者 build.gradle.kts
里面加,对于Kotlin不需要 Lombok
):
Java版:
Kotlin版:
serialize
:序列化/反序列化实体类 Constant.java
/ Constant.kt
:常量类 Consumer.java
/ Consumer.kt
:消费者类 Entity.java
/ Entity.kt
:实体类 Producer.java
/ Product.kt
:生产者类 TestApplicationTests
:测试类 包含Topic与GroupId,Java版:
public class Constants { public static final String TOPIC = "TestTopic"; public static final String GROUP_ID = "TestGroupId"; }
Kotlin版:
object Constants { const val TOPIC = "TestTopic" const val GROUP_ID = "TestGroupId" }
@AllArgsConstructor @NoArgsConstructor @Data @Builder public class Entity { private long id; private String name; private int num; }
说一下Lombok的几个注解:
@AllArgsConstructor
/ @NoArgsConstructor
:生成所有参数/无参数构造方法 @Data
:等价于 @Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode
,自动生成 Setter+Getter+toString()+equals()+hashCode()
,还有 @RequireArgsConstructor
为类的每一个 final
或非空字段生成一个构造方法 @Builder
:可以通过建造者模式创建对象 Kotlin版:
class Entity { var id: Long = 0 var name: String = "" var num: Int = 0 constructor() constructor(id:Long,name:String,num:Int) { this.id = id this.name = name this.num = num } }
@Component @Slf4j //防止出现Field injection not recommended警告,代替了原来的直接在字段上@Autowired @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class Producer { private final KafkaTemplate<String, Entity> kafkaTemplate; public void send(Entity entity) { //发送消息 //类型一般为String+自定义消息内容,String代表消息Topic,这里消息内容用Entity表示 ListenableFuture<SendResult<String, Entity>> future = kafkaTemplate.send(Constants.TOPIC, entity); //回调函数 future.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable throwable) { log.info("Send message failed"); } @Override public void onSuccess(SendResult<String, Entity> stringEntitySendResult) { log.info("Send message success"); } }); } }
这里的 send
有两个参数,对应于 sendResult<>
中的参数类型,第一个为消息的Topic,第二个为消息体,一般使用String或者Json。
Kotlin版:
@Component class Producer { @Autowired private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null private val log = LoggerFactory.getLogger(this.javaClass) fun send(entity: Entity) { val future = kafkaTemplate!!.send(Constants.TOPIC,entity); future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{ override fun onSuccess(result : SendResult<String?,Entity?>?) { log.info("Send success"); } override fun onFailure(e:Throwable) { log.info("Send failed"); } }) } }
@Component @Slf4j public class Consumer { @KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID) public void consume(Entity entity) { log.info("Consume a entity, id is "+entity.getId()); } }
使用 @KafkaListener
注解,第一个参数表示需要消费的消息的Topic,可以是 String []
,第二个是消费者组的id。生产者的消息Topic必须与消费者的Topic保持一致否则不能消费,这里简单处理打印日志。
Kotlin版:
@Component class Consumer { private val log = LoggerFactory.getLogger(this.javaClass) @KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID) fun consume(entity: Entity) { log.info("Consume a entity, id is "+entity.id.toString()) } }
这里自定义了序列化/反序列化类,序列化/反序列化类需要实现 org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口,其中 T
是想要序列化的类型,这里是 Entity
。序列化接口反编译如下:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } byte[] serialize(String var1, T var2); default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { } }
反序列化反编译接口如下:
public interface Deserializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } T deserialize(String var1, byte[] var2); default T deserialize(String topic, Headers headers, byte[] data) { return this.deserialize(topic, data); } default void close() { } }
也就是只需要实现其中的 serialize/deserialize
方法即可。这里序列化/反序列化用到了自带的Jackson:
@Slf4j public class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> { public byte [] serialize(String topic, Entity entity) { try { return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity); } catch (JsonProcessingException e) { e.printStackTrace(); log.error("Can not serialize entity in Serializer"); } return null; } }
反序列化:
@Slf4j public class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> { public Entity deserialize(String topic,byte [] data) { try { return data == null ? null : new ObjectMapper().readValue(data,Entity.class); } catch (IOException e) { e.printStackTrace(); log.error("Can not deserialize entity in Deserializer"); } return null; } }
Kotlin版:
class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?> { private val log = LoggerFactory.getLogger(this.javaClass) override fun serialize(topic: String?, data: Entity?): ByteArray? { try { return if (data == null) null else ObjectMapper().writeValueAsBytes(data) } catch (e:JsonProcessingException) { e.printStackTrace() log.error("Can not serialize entity in Serializer") } return null } }
class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?> { private val log = LoggerFactory.getLogger(this.javaClass) override fun deserialize(topic: String?, data: ByteArray?): Entity? { try { return ObjectMapper().readValue(data, Entity::class.java) } catch (e:IOException) { e.printStackTrace() log.error("Can not deserialize entity in Deserializer") } return null } }
application.properties
:
# 地址,本地直接localhost,部署可以使用公网ip spring.kafka.bootstrap-servers=localhost:9092 # 消费者组id spring.kafka.consumer.group-id=TestGroupId spring.kafka.consumer.auto-offset-reset=earliest # 消费者键反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消费者值反序列化类 spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer # 生产者键序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 生产者值序列化类 spring.kafka.producer.value-serializer=com.test.serialize.Serializer
对于 auto-offest-rest
,该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下怎么处理,有四个取值:
earliest
:当各分区有已提交的 offest
时,从提交的 offest
开始消费,无提交的 offest
时,从头开始消费 latest
(默认):当各分区有已提交的 offest
时,从提交的 offest
开始消费,无提交的 offest
时,消费新产生的该分区下的数据 none
:各分区都存在已提交的 offest
时,从 offest
后消费,只要有一个分区不存在已提交的 offest
,则抛出异常 exception
:其他情况将抛出异常给消费者 对于序列化/反序列化,String可以使用自带的序列化/反序列化类:
org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringDeserializer
至于Json可以使用:
org.springframework.kafka.support.serializer.JsonSerializer org.springframework.kafka.support.serializer.JsonDeserializer
其他自定义的请实现 org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口。
yml版:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: TestGroupId auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.test.serialize.Deserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.test.serialize.Serializer
@SpringBootTest @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) class TestApplicationTests { private final Producer producer; @Test void contextLoads() { Random random = new Random(); for (int i = 0; i < 1000; i++) { long id = i+1; String name = UUID.randomUUID().toString(); int num = random.nextInt(); producer.send(Entity.builder().id(id).name(name).num(num).build()); } } }
生产者发送1000条消息。
Kotlin版:
@SpringBootTest class TestApplicationTests { @Autowired private val producer:Producer? = null @Test fun contextLoads() { for(i in 0..1000) { val id = (i + 1).toLong() val name = java.util.UUID.randomUUID().toString() val num = (0..100000).random() producer!!.send(Entity(id,name,num)) } } }
控制台输出如下:
所有消息被成功发送并且被成功消费。
最后可以去验证一下Kafka的Topic列表,可以看到配置文件中的Topic的值( TestTopic
),进入Kafka目录:
bin/kafka-topics.sh --list --zookepper localhost:2181
1、 CSDN-Kafka优点
2、 简书-Spring Boot 2.x 快速集成整合消息中间件 Kafka
3、 简书-springboot 之集成kafka