参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目 cloud-stream-consumer
消费端 和 cloud-stream-consumer
生产端
public interface StreamReceive { @Input("MQRece") SubscribableChannel mqReceive(); }
添加一个 StreamReceive
接口,定义 @input
通道
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } }
添加消息监听,接受消息定义为 byte[]
添加 application.properties
配置文件信息
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876 spring.cloud.stream.bindings.MQRece.destination=message-topic spring.cloud.stream.bindings.MQRece.group=rece-group server.port=19999
为MQRece通道添加主题 message-topic
,组名 rece-group
到此Stream 客户端消费就完成了,本节需要把@SendTo注解用起来,需要新建一个MessageChannel进行产生消息
public interface MsgBackPush { @Output("back-push") MessageChannel backPush(); }
然后在 ReceiveListener
添加@SendTo
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } }
新增通道配置 application.properties
spring.cloud.stream.bindings.back-push.destination=back-topic spring.cloud.stream.bindings.back-push.group=back-group
SpringBoot启动类记得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
@SpringBootApplication @EnableBinding(value = {StreamReceive.class,MsgBackPush.class}) public class CloudStreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamConsumerApplication.class, args); } }
到此,cloud-stream-consumer这个demo就完成了
接下来看看 cloud-stream-producer
public interface StreamPush { @Output("MQPush") MessageChannel mqPush(); }
定义一个通道名为 MQPush
,进行消息生产
public interface ProducerReceive { @Input("producer-receive") SubscribableChannel producerReceive(); }
定义一个通道名为 producer-receive
,进行回执消息的消费
@Component @Slf4j public class ProducerListener { @StreamListener("producer-receive") public void producerReceive(byte[] bytes){ log.info("come back message:"+new String(bytes)); } }
具体回执消息处理逻辑,再来看看 application.properties
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876 spring.cloud.stream.bindings.MQPush.destination=message-topic spring.cloud.stream.bindings.MQPush.group=push-group spring.cloud.stream.bindings.producer-receive.destination=back-topic spring.cloud.stream.bindings.producer-receive.group=back-group server.port=20000
为通道设置topic和group,新建一个Http接口测试一下成果
@SpringBootApplication @EnableBinding(value = {StreamPush.class,ProducerReceive.class}) @RestController public class CloudStreamProducerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamProducerApplication.class, args); } @Autowired private StreamPush streamPush; @GetMapping("/sendMessage") public String sendMessage(){ streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build()); return "ok"; } }
访问 http://localhost:20000/sendMessage
,结果图如下
cloud-stream-consumer日志输出
cloud-stream-producer日志输出
学习@ServiceActivator这个注解,上面的项目 cloud-stream-consumer
ReceiveListener类中添加
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); // 抛出异常 if(1==1){ throw new RuntimeException("Message consumer failed!"); } return "ok".getBytes(); } @Autowired private MsgBackPush msgBackPush; @ServiceActivator(inputChannel = "message-topic.rece-group.errors") public void error(Message<?> message){ log.info("消费者消费消息失败:"+message); msgBackPush.backPush().send(MessageBuilder.withPayload("消息消费失败".getBytes()).build()); } }
通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:
访问 http://localhost:20000/sendMessage
,结果图如下
cloud-stream-consumer日志输出
cloud-stream-producer日志输出