转载

Spring Cloud Stream实现消息过滤消费

点击上方 "IT牧场" ,选择 "设为星标" 技术干货每日送达!

TIPS

本文基于 Spring Cloud Greenwich SR1spring-cloud-starter-stream-rocketmq 0.9.0

理论兼容: Spring Cloud Finchley+ + spring-cloud-starter-stream-rocketmq 0.2.2+

MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。

本文探讨Spring Cloud Stream & RocketMQ过滤消息的各种姿势。

在实际项目中,我们可能需要实现消息消费的过滤。

举个例子:实现消息的分流处理:

生产者生产的消息,虽然消息体可能一样,但是header不一样。可编写两个或者更多的消费者,对不同header的消息做针对性的处理!

condition

生产者

生产者设置一下header,比如my-header,值根据你的需要填写:


 

@Autowired

private Source source;


public String testStream() {

this.source.output()

.send(

MessageBuilder

.withPayload("消息体")

.setHeader("my-header","你的header")

.build()

);

return "success";

}

消费者


 

@Service

@Slf4j

public class TestStreamConsumer {

@StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")

public void receive(String messageBody) {

log.info("通过stream收到了消息:messageBody ={}", messageBody);

}

}

如代码所示,使用 StreamListener 注解的  condition 属性。当  headers['my-header']=='你的header' 条件满足,才会进入到方法体。

Tags

TIPS

该方式只支持RoketMQ,不支持Kafka/RabbitMQ

生产者


 

@Autowired

private Source source;


public String testStream() {

this.source.output()

.send(

MessageBuilder

.withPayload("消息体")

// 注意:只能设置1个tag

.setHeader(RocketMQHeaders.TAGS, "tag1")

.build()

);

return "success";

}

消费者

1 接口


 

public interface MySink {

String INPUT1 = "input1";

String INPUT2 = "input2";


@Input(INPUT1)

SubscribableChannel input();


@Input(INPUT2)

SubscribableChannel input2();

}

2 注解

@EnableBinding({MySink.class})

3 配置


 

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 127.0.0.1:9876

bindings:

input1:

consumer:

# 表示input2消费带有tag1的消息

tags: tag1

input2:

consumer:

# 表示input2消费带有tag2或者tag3的消息

tags: tag2||tag3

bindings:

input1:

destination: test-topic

group: test-group1

input2:

destination: test-topic

group: test-group2

4 消费代码


 

@Service

@Slf4j

public class MyTestStreamConsumer {

/**

* 我消费带有tag1的消息

*

* @param messageBody 消息体

*/

@StreamListener(MySink.INPUT1)

public void receive1(String messageBody) {

log.info("带有tag1的消息被消费了:messageBody ={}", messageBody);

}


/**

* 我消费带有tag1或者tag2的消息

*

* @param messageBody 消息体

*/

@StreamListener(MySink.INPUT2)

public void receive2(String messageBody) {

log.info("带有tag2/tag3的消息被消费了:messageBody ={}", messageBody);

}

}

5 日志:

2019-08-04 19:10:03.799  INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : 带有tag1的消息被消费了:messageBody =消息体

Sql 92

TIPS

该方式只支持RoketMQ,不支持Kafka/RabbitMQ 用了sql,就不要用Tag

RocketMQ支持使用SQL语法过滤消息。官方文档: http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

Spring Clous Stream RocketMQ也为此特性提供了支持。

开启SQL 92支持

默认情况下,RocketMQ的SQL过滤支持是关闭的,要想使用SQL 92过滤消息,需要:

1 在 conf/broker.conf 添加

enablePropertyFilter = true

2 启动RocketMQ

nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

生产者


 

@Autowired

private Source source;


public String testStream() {

this.source.output()

.send(

MessageBuilder

.withPayload("消息体")

.setHeader("index", 1000)

.build()

);

return "success";

}

消费者

1 接口


 

public interface MySink {

String INPUT1 = "input1";

String INPUT2 = "input2";


@Input(INPUT1)

SubscribableChannel input();


@Input(INPUT2)

SubscribableChannel input2();

}

2 注解

@EnableBinding({MySink.class})

3 配置


 

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 127.0.0.1:9876

bindings:

input1:

consumer:

sql: 'index < 1000'

input2:

consumer:

sql: 'index >= 1000'

bindings:

input1:

destination: test-topic

group: test-group1

input2:

destination: test-topic

group: test-group2

4 消费代码


 

@Service

@Slf4j

public class MyTestStreamConsumer {

/**

* 我消费带有tag1的消息

*

* @param messageBody 消息体

*/

@StreamListener(MySink.INPUT1)

public void receive1(String messageBody) {

log.info("index > 1000的消息被消费了:messageBody ={}", messageBody);

}


/**

* 我消费带有tag1或者tag2的消息

*

* @param messageBody 消息体

*/

@StreamListener(MySink.INPUT2)

public void receive2(String messageBody) {

log.info("index <=1000 的消息被消费了:messageBody ={}", messageBody);

}

}

5 日志

2019-08-04 19:58:59.787  INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : index <=1000 的消息被消费了:messageBody =消息体

相关代码

org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

参考文档

Filter Messages By SQL92 In RocketMQ [1] RocketMQ 错误:The broker does not support consumer to filter message by SQL92 [2]

干货分享

最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取!

001:《Java并发与高并发解决方案》学习笔记; 002:《深入JVM内核——原理、诊断与优化》学习笔记; 003:《Java面试宝典》 004:《Docker开源书》 005:《Kubernetes开源书》 006:《DDD速成(领域驱动设计速成)》 007: 全部 008: 加技术讨论群

近期热文

References

[1] Filter Messages By SQL92 In RocketMQ:  http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

[2] RocketMQ 错误:The broker does not support consumer to filter message by SQL92:  https://blog.csdn.net/u010690828/article/details/84337688

想知道更多?长按/扫码关注我吧↓↓↓ Spring Cloud Stream实现消息过滤消费 >>>技术讨论群<<< 喜欢就点个 "在看" 呗^_^

原文  http://mp.weixin.qq.com/s?__biz=MzI4ODQ3NjE2OA==&mid=2247485379&idx=2&sn=2b27dfa34d6b6de2d842095377f4e9f6
正文到此结束
Loading...