点击上方 "IT牧场" ,选择 "设为星标" 技术干货每日送达!
本文基于 Spring Cloud Greenwich SR1
+ spring-cloud-starter-stream-rocketmq 0.9.0
理论兼容: Spring Cloud Finchley+
+ spring-cloud-starter-stream-rocketmq 0.2.2+
MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。
本文探讨Spring Cloud Stream & RocketMQ过滤消息的各种姿势。
在实际项目中,我们可能需要实现消息消费的过滤。
举个例子:实现消息的分流处理:
生产者生产的消息,虽然消息体可能一样,但是header不一样。可编写两个或者更多的消费者,对不同header的消息做针对性的处理!
生产者设置一下header,比如my-header,值根据你的需要填写:
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("my-header","你的header")
.build()
);
return "success";
}
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")
public void receive(String messageBody) {
log.info("通过stream收到了消息:messageBody ={}", messageBody);
}
}
如代码所示,使用 StreamListener
注解的 condition
属性。当 headers['my-header']=='你的header'
条件满足,才会进入到方法体。
该方式只支持RoketMQ,不支持Kafka/RabbitMQ
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
// 注意:只能设置1个tag
.setHeader(RocketMQHeaders.TAGS, "tag1")
.build()
);
return "success";
}
1 接口
public interface MySink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input();
@Input(INPUT2)
SubscribableChannel input2();
}
2 注解
@EnableBinding({MySink.class})
3 配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input1:
consumer:
# 表示input2消费带有tag1的消息
tags: tag1
input2:
consumer:
# 表示input2消费带有tag2或者tag3的消息
tags: tag2||tag3
bindings:
input1:
destination: test-topic
group: test-group1
input2:
destination: test-topic
group: test-group2
4 消费代码
@Service
@Slf4j
public class MyTestStreamConsumer {
/**
* 我消费带有tag1的消息
*
* @param messageBody 消息体
*/
@StreamListener(MySink.INPUT1)
public void receive1(String messageBody) {
log.info("带有tag1的消息被消费了:messageBody ={}", messageBody);
}
/**
* 我消费带有tag1或者tag2的消息
*
* @param messageBody 消息体
*/
@StreamListener(MySink.INPUT2)
public void receive2(String messageBody) {
log.info("带有tag2/tag3的消息被消费了:messageBody ={}", messageBody);
}
}
5 日志:
2019-08-04 19:10:03.799 INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : 带有tag1的消息被消费了:messageBody =消息体
• 该方式只支持RoketMQ,不支持Kafka/RabbitMQ • 用了sql,就不要用Tag
RocketMQ支持使用SQL语法过滤消息。官方文档: http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
Spring Clous Stream RocketMQ也为此特性提供了支持。
默认情况下,RocketMQ的SQL过滤支持是关闭的,要想使用SQL 92过滤消息,需要:
1 在 conf/broker.conf
添加
enablePropertyFilter = true
2 启动RocketMQ
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
@Autowired
private Source source;
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("index", 1000)
.build()
);
return "success";
}
1 接口
public interface MySink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input();
@Input(INPUT2)
SubscribableChannel input2();
}
2 注解
@EnableBinding({MySink.class})
3 配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input1:
consumer:
sql: 'index < 1000'
input2:
consumer:
sql: 'index >= 1000'
bindings:
input1:
destination: test-topic
group: test-group1
input2:
destination: test-topic
group: test-group2
4 消费代码
@Service
@Slf4j
public class MyTestStreamConsumer {
/**
* 我消费带有tag1的消息
*
* @param messageBody 消息体
*/
@StreamListener(MySink.INPUT1)
public void receive1(String messageBody) {
log.info("index > 1000的消息被消费了:messageBody ={}", messageBody);
}
/**
* 我消费带有tag1或者tag2的消息
*
* @param messageBody 消息体
*/
@StreamListener(MySink.INPUT2)
public void receive2(String messageBody) {
log.info("index <=1000 的消息被消费了:messageBody ={}", messageBody);
}
}
5 日志
2019-08-04 19:58:59.787 INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : index <=1000 的消息被消费了:messageBody =消息体
org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
• Filter Messages By SQL92 In RocketMQ [1] • RocketMQ 错误:The broker does not support consumer to filter message by SQL92 [2]
最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取!
• 001:《Java并发与高并发解决方案》学习笔记; • 002:《深入JVM内核——原理、诊断与优化》学习笔记; • 003:《Java面试宝典》 • 004:《Docker开源书》 • 005:《Kubernetes开源书》 • 006:《DDD速成(领域驱动设计速成)》 • 007: 全部 • 008: 加技术讨论群
[1]
Filter Messages By SQL92 In RocketMQ: http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
[2]
RocketMQ 错误:The broker does not support consumer to filter message by SQL92: https://blog.csdn.net/u010690828/article/details/84337688
想知道更多?长按/扫码关注我吧↓↓↓ >>>技术讨论群<<< 喜欢就点个 "在看" 呗^_^