消息中间件主要用于应用解耦、异步处理、流量削峰等场景,实现高可用、高性能、可伸缩和最终一致性。但是不同的消息中间件实现方式不同,如RabbitMQ有exchange;Kafka有Topic、partition等概念。在实际环境中如果需要替换中间件,代码都需要重构,工具量巨大,也就是说中间件与系统耦合了,SpringCloud Stream提供了一种解耦合的方式。
SpringCloud Stream通过Binder连接应用和中间件,应用通过Input把消息发送到中间件,通过Output从中间件获取消息。通过Binder与中间件交互,应用不需要关注中间件类型,只需要关注binder提供的接口实现业务逻辑即可。
RabbitMQ安装
docker run -it -d --name rabbitmq / -p 5672:5672 -p 15672:15672 / -e RABBITMQ_DEFAULT_USER=admin / -e RABBITMQ_DEFAULT_PASS=admin / registry.cn-beijing.aliyuncs.com/buyimoutianxia/rabbitmq:V3.7.25 复制代码
2.2.1 导入依赖坐标
<!--stream producer依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 复制代码
2.2.2 配置application.yml
server: port: 8500 spring: application: name: streamproduder-8500 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel output: destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange binders: #绑定器 defaultRabbit: type: rabbit 复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class) public class MyProducer implements CommandLineRunner { @Autowired private MessageChannel output; @Override public void run(String... args) throws Exception { output.send(MessageBuilder.withPayload("hello, my friend ...").build()); } } 复制代码
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } } 复制代码
http://localhost:15672
-->Exchanges中看到创建的 test-producer
<!--stream consumer依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 复制代码
server: port: 8501 spring: application: name: streamconsumer-8501 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel input: destination: test-producer #消息接收队列名称 binders: #绑定器 defaultRabbit: type: rabbit 复制代码
com.xyz.stream.binding.MyConsumer
@EnableBinding(Sink.class) public class MyConsumer { @StreamListener(Sink.INPUT) public void input(String message) { System.out.println("接收的消息:" + message); } } 复制代码
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } } 复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class) public class MyProducer { @Autowired private MessageChannel output; public void send(Object messsage) { output.send(MessageBuilder.withPayload(messsage).build()); } } 复制代码
com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class TestProducer { @Autowired private MyProducer myProducer; @Test public void test() { myProducer.send("test method ..."); } } 复制代码
com.xyz.stream.channel.MyProcess
public interface MyProcess { /** * 自定义消息生产者channel */ String MYOUTPUT = "myoutput"; @Output("myoutput") MessageChannel myoutput(); /** * 自定义消息消费者channel */ String MYINPUT = "myinput"; @Input("myinput") SubscribableChannel myinput(); } 复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(MyProcess.class) public class MyProducer { @Autowired private MessageChannel myoutput; public void send(Object messsage) { myoutput.send(MessageBuilder.withPayload(messsage).build()); } } 复制代码
server: port: 8500 spring: application: name: streamproduder-8500 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel output: destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange myoutput: destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange binders: #绑定器 defaultRabbit: type: rabbit 复制代码
com.xyz.stream.channel.MyProcess
public interface MyProcess { /** * 自定义消息生产者channel */ String MYOUTPUT = "myoutput"; @Output("myoutput") MessageChannel myoutput(); /** * 自定义消息消费者channel */ String MYINPUT = "myinput"; @Input("myinput") SubscribableChannel myinput(); } 复制代码
com.xyz.stream.binding.MyConsumer
@EnableBinding(MyProcess.class) public class MyConsumer { @StreamListener(MyProcess.MYINPUT) public void input(String message) { System.out.println("接收的消息:" + message); } } 复制代码
server: port: 8501 spring: application: name: streamconsumer-8501 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel input: destination: test-producer #消息接收队列名称 myinput: destination: costom-topic #消息接收队列名称 binders: #绑定器 defaultRabbit: type: rabbit 复制代码
通常在生产环境下,服务都不会以单点的方式运行。当一个服务启动多个实例的时候,这些实例都会绑定到同一个topic上。默认情况下,生产者产生一条消息发送到topic时,这条消息会被消费者的多个实例都接收,但在有些业务场景下,我们希望这个消息只能被一个消费者接收,就需要在消费者中通过配置消费者组的方式来实现这样的功能。
spring: application: name: streamconsumer-8501 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel input: destination: test-producer #消息接收队列名称 myinput: destination: costom-topic #消息接收队列名称 group: mygroup #设置消费者组 binders: #绑定器 defaultRabbit: type: rabbit 复制代码
为了满足相同特征的生产数据能够被同一个消费者实例所接收,需要使用消息分区的功能。
spring.cloud.stream.bindings.output.producer.partitionKeyExpression
和 spring.cloud.stream.bindings.output.producer.partitionCount
2个配置标签 spring: application: name: streamproduder-8500 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel output: destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange myoutput: destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange producer: partitionKeyExpression: payload #分区关键字 partitionCount: 2 #分区总数量 binders: #绑定器 defaultRabbit: type: rabbit 复制代码
spring.cloud.stream.bindings.input.consumer.partitioned
、 spring.cloud.stream.instanceIndex
、 spring.cloud.stream.instanceCount
配置 spring: application: name: streamconsumer-8501 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel input: destination: test-producer #消息接收队列名称 myinput: destination: costom-topic #消息接收队列名称 group: mygroup #设置消费者组 consumer: partitioned: true #消费者端开启对分区的支持 binders: #绑定器 defaultRabbit: type: rabbit instanceIndex: 0 instanceCount: 2 复制代码
spring: application: name: streamconsumer-8502 rabbitmq: #rabbiqmq连接信息 host: localhost port: 5672 username: admin password: admin cloud: stream: bindings: #stream channel input: destination: test-producer #消息接收队列名称 myinput: destination: costom-topic #消息接收队列名称 group: mygroup #设置消费者组 consumer: partitioned: true #消费者端开启对分区的支持 binders: #绑定器 defaultRabbit: type: rabbit instanceIndex: 1 instanceCount: 2 复制代码
com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class TestProducer { @Autowired private MyProducer myProducer; @Test public void test() { for (int i = 0; i < 5; i++) { myProducer.send(i); } } } 复制代码