本文采用Spring cloud本文为2.1.8RELEASE,version=Greenwich.SR3
本文基于前两篇文章 eureka-server、eureka-client、eureka-ribbon 和 spring-gateway 的实现。
参考
Spring Cloud Stream 是一个构建消息驱动微服务的框架。它通过使用 Spring Integration 来连接消息代理中间件以及实现消息事件驱动的微服务应用。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration,实现了一套轻量级的消息驱动的微服务框架。
spring-cloud-stream
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
spring: application: name: spring-cloud-stream cloud: stream: kafaka: binder: brokers: 192.168.10.196:9092 #kafaka服务地址 zk-noeds: 192.168.10.196:2181 #zk服务地址 auto-create-topics: true bindings: output: #stream默认提供的output destination: stream-kafaka #消息发送到的目的地 content-type: text/plain #消息发送格式,接收端不用指定格式,但是发送端要。 server: port: 1000
KafakaSendService
package com.mm.spring.cloud.springcloudstream.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) public class KafakaSendService { @Autowired private Source source; public void sendMsg(String msg) { source.output().send(MessageBuilder.withPayload(msg).build()); } }
KafakaProducerController
这个 KafakaProducerController
是方便演示用
package com.mm.spring.cloud.springcloudstream.controller; import com.mm.spring.cloud.springcloudstream.service.KafakaSendService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafakaProducerController { @Autowired private KafakaSendService kafakaSendService; @RequestMapping("/send/{msg}") public void send(@PathVariable String msg) { kafakaSendService.sendMsg(msg); } }
eureka-ribbon
应用 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
spring: cloud: stream: kafka: binder: brokers: 192.168.10.196:9092 auto-create-topics: true bindings: input: destination: stream-kafaka
1.5.3 新增消息处理类: RecieceService
package spring.cloud.demo.eurekaribbon.service; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) public class RecieceService { @StreamListener(Sink.INPUT) public void recieve(Object payload) { System.out.println(payload); } }
按照顺序启动 eureka-server
、 eureka-client
、 eureka-ribbon
、 spring-cloud-stream
应用的服务,然后访问 http://localhost:1000/send/maomao
, 然后查看 eureka-ribbon
的控制台可以看到,如下图显示:
证明 eureka-ribbon
已经接到了消息。
至此,一个简单的 Spring Cloud Stream 的 kafaka 应用就搭建完成了,这个是方式是使用了 Stream 的默认 Source 和 Sink 方式。
Spring Cloud Stream 涉及的内容还是挺多的,这里先简单实现,至此《spring cloud 2.x 版本小白系列教程》就终结了,写这个小白系列教程的目的就是为了大家能对 Spring cloud 有一个初步的认识和了解,后续我这边会更新 Spring cloud 进阶教程,大家敬请期待,感谢小伙伴们的支持,谢谢!!!
gitHub 地址
<center><font color=red>《Srping Cloud 2.X小白教程》目录</font></center>