Kafka是由LinkedIn开发并开源的分布式消息系统,因其分布式及高吞吐率而被广泛使用,现已与Cloudera Hadoop,Apache Storm,Apache Spark集成,具备许多优秀的性能:高吞吐、分布式、跨平台、实时性以及伸缩性,本文我们就来看看如何将Spring Cloud Bus和Kafka进行整合。
本文是Spring Cloud系列的第二十八篇文章,了解前二十七篇文章内容有助于更好的理解本文:
1. 使用Spring Cloud搭建服务注册中心
2. 使用Spring Cloud搭建高可用服务注册中心
3. Spring Cloud中服务的发现与消费
4. Eureka中的核心概念
5. 什么是客户端负载均衡
6. Spring RestTemplate中几种常见的请求方式
7. RestTemplate的逆袭之路,从发送请求到负载均衡
8. Spring Cloud中负载均衡器概览
9. Spring Cloud中的负载均衡策略
10. Spring Cloud中的断路器Hystrix
11. Spring Cloud自定义Hystrix请求命令
12. Spring Cloud中Hystrix的服务降级与异常处理
13. Spring Cloud中Hystrix的请求缓存
14. Spring Cloud中Hystrix的请求合并
15. Spring Cloud中Hystrix仪表盘与Turbine集群监控
16. Spring Cloud中声明式服务调用Feign
17. Spring Cloud中Feign的继承特性
18. Spring Cloud中Feign配置详解
19. Spring Cloud中的API网关服务Zuul
20. Spring Cloud Zuul中路由配置细节
21. Spring Cloud Zuul中异常处理细节
22. 分布式配置中心Spring Cloud Config初窥
23. Spring Cloud Config服务端配置细节(一)
24. Spring Cloud Config服务端配置细节(二)之加密解密
25. Spring Cloud Config客户端配置细节
26. Spring Cloud Bus之RabbitMQ初窥
27. Spring Cloud Bus整合RabbitMQ
Kafka现在是Apache上的开源项目,直接到官网下载即可( http://kafka.apache.org/ ),这个不用我多说。
下载成功之后,是一个压缩文件,解压该文件,我们可以看到一个bin目录,进入到bin目录中,bin目录下的.sh文件都是Linux/Unix下的shell脚本,在Linux/Unix环境下直接运行这些脚本即可,bin目录中还有一个windows目录,该目录下存储的都是windows中的批处理文件。我们在运行时根据自己的操作系统选择合适的命令去执行,本文以windows为例。解压后为了后面的命令操作方便,我将windows文件配置到环境变量中,我的是 D:/Program/kafka_2.11-0.11.0.1/bin/windows
,然后在cmd中进入到解压目录下,执行 zookeeper-server-start.bat ./config/zookeeper.properties
命令,表示启动zookeeper(由于Kafka依赖的zookeeper,所以我们要先启动zookeeper再启动Kafka),如下:
zookeeper在启动的过程中需要用到zookeeper.properties配置文件,这个文件中定义了zookeeper的端口为2181。zookeeper启动成功之后,接下来我们要启动Kafka,执行 kafka-server-start.bat ./config/server.properties
命令,如下:
两者都启动成功之后,我们可以执行如下命令 kafka-console-producer.bat --broker-list localhost:9092 --topic test
来发送一条消息,该命令可以启动Kafka基于命令行的消息生产客户端,启动成功之后,我们就可以直接在命令行发送消息了,如下:
消息发送了,当然要有人来接收,接下来我们来创建消息接收端,执行 kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
命令,启动成功之后,我们就可以收到刚刚发送的消息了,如下:
Spring Cloud Bus和Kafka的整合非常简单,如果我们使用了默认配置,就可以从RabbitMQ无缝切换过来,只需要修改一下我们之前config-server和config-client的依赖,将spring-cloud-starter-bus-amqp改为spring-cloud-starter-bus-kafka,如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency>
修改之后,我们分别启动eureka、config-server和config-client,测试方式还是和上文一样,该有的/bus/refresh接口的功能还是不变。这里我就不再赘述了。
好了,Kafka我们就先说这么多。有问题欢迎留言讨论。
更多JavaEE资料请关注公众号: