消息中间件主要用于应用解耦、异步处理、流量削峰等场景,实现高可用、高性能、可伸缩和最终一致性。但是不同的消息中间件实现方式不同,如RabbitMQ有exchange;Kafka有Topic、partition等概念。在实际环境中如果需要替换中间件,代码都需要重构,工具量巨大,也就是说中间件与系统耦合了,SpringCloud Stream提供了一种解耦合的方式。
SpringCloud Stream通过Binder连接应用和中间件,应用通过Input把消息发送到中间件,通过Output从中间件获取消息。通过Binder与中间件交互,应用不需要关注中间件类型,只需要关注binder提供的接口实现业务逻辑即可。
RabbitMQ安装
docker run -it -d --name rabbitmq / -p 5672:5672 -p 15672:15672 / -e RABBITMQ_DEFAULT_USER=admin / -e RABBITMQ_DEFAULT_PASS=admin / registry.cn-beijing.aliyuncs.com/buyimoutianxia/rabbitmq:V3.7.25 复制代码
2.2.1 导入依赖坐标
<!--stream producer依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码
2.2.2 配置application.yml
server:
port: 8500
spring:
application:
name: streamproduder-8500
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
output:
destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class)
public class MyProducer implements CommandLineRunner {
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("hello, my friend ...").build());
}
}
复制代码
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
复制代码
http://localhost:15672
-->Exchanges中看到创建的 test-producer
<!--stream consumer依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
复制代码
server:
port: 8501
spring:
application:
name: streamconsumer-8501
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
input:
destination: test-producer #消息接收队列名称
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
com.xyz.stream.binding.MyConsumer
@EnableBinding(Sink.class)
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void input(String message) {
System.out.println("接收的消息:" + message);
}
}
复制代码
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class)
public class MyProducer {
@Autowired
private MessageChannel output;
public void send(Object messsage) {
output.send(MessageBuilder.withPayload(messsage).build());
}
}
复制代码
com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestProducer {
@Autowired
private MyProducer myProducer;
@Test
public void test() {
myProducer.send("test method ...");
}
}
复制代码
com.xyz.stream.channel.MyProcess
public interface MyProcess {
/**
* 自定义消息生产者channel
*/
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
/**
* 自定义消息消费者channel
*/
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
复制代码
com.xyz.stream.bingding.MyProducer
@EnableBinding(MyProcess.class)
public class MyProducer {
@Autowired
private MessageChannel myoutput;
public void send(Object messsage) {
myoutput.send(MessageBuilder.withPayload(messsage).build());
}
}
复制代码
server:
port: 8500
spring:
application:
name: streamproduder-8500
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
output:
destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
myoutput:
destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
com.xyz.stream.channel.MyProcess
public interface MyProcess {
/**
* 自定义消息生产者channel
*/
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
/**
* 自定义消息消费者channel
*/
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
复制代码
com.xyz.stream.binding.MyConsumer
@EnableBinding(MyProcess.class)
public class MyConsumer {
@StreamListener(MyProcess.MYINPUT)
public void input(String message) {
System.out.println("接收的消息:" + message);
}
}
复制代码
server:
port: 8501
spring:
application:
name: streamconsumer-8501
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
input:
destination: test-producer #消息接收队列名称
myinput:
destination: costom-topic #消息接收队列名称
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
通常在生产环境下,服务都不会以单点的方式运行。当一个服务启动多个实例的时候,这些实例都会绑定到同一个topic上。默认情况下,生产者产生一条消息发送到topic时,这条消息会被消费者的多个实例都接收,但在有些业务场景下,我们希望这个消息只能被一个消费者接收,就需要在消费者中通过配置消费者组的方式来实现这样的功能。
spring:
application:
name: streamconsumer-8501
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
input:
destination: test-producer #消息接收队列名称
myinput:
destination: costom-topic #消息接收队列名称
group: mygroup #设置消费者组
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
为了满足相同特征的生产数据能够被同一个消费者实例所接收,需要使用消息分区的功能。
spring.cloud.stream.bindings.output.producer.partitionKeyExpression
和 spring.cloud.stream.bindings.output.producer.partitionCount
2个配置标签 spring:
application:
name: streamproduder-8500
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
output:
destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
myoutput:
destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange
producer:
partitionKeyExpression: payload #分区关键字
partitionCount: 2 #分区总数量
binders: #绑定器
defaultRabbit:
type: rabbit
复制代码
spring.cloud.stream.bindings.input.consumer.partitioned
、 spring.cloud.stream.instanceIndex
、 spring.cloud.stream.instanceCount
配置 spring:
application:
name: streamconsumer-8501
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
input:
destination: test-producer #消息接收队列名称
myinput:
destination: costom-topic #消息接收队列名称
group: mygroup #设置消费者组
consumer:
partitioned: true #消费者端开启对分区的支持
binders: #绑定器
defaultRabbit:
type: rabbit
instanceIndex: 0
instanceCount: 2
复制代码
spring:
application:
name: streamconsumer-8502
rabbitmq: #rabbiqmq连接信息
host: localhost
port: 5672
username: admin
password: admin
cloud:
stream:
bindings: #stream channel
input:
destination: test-producer #消息接收队列名称
myinput:
destination: costom-topic #消息接收队列名称
group: mygroup #设置消费者组
consumer:
partitioned: true #消费者端开启对分区的支持
binders: #绑定器
defaultRabbit:
type: rabbit
instanceIndex: 1
instanceCount: 2
复制代码
com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestProducer {
@Autowired
private MyProducer myProducer;
@Test
public void test() {
for (int i = 0; i < 5; i++) {
myProducer.send(i);
}
}
}
复制代码