交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。
交换机有四种类型:Direct, topic, Headers and Fanout
* Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去. * Topic:按规则转发消息(最灵活) * Headers:设置 header attribute 参数类型的交换机 * Fanout:转发消息到所有绑定队列(广播模式) 复制代码
下面介绍常用的三种模式的基础用法。
Pom 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 复制代码
application.properties 配置文件
# rabbitmq连接参数 spring.rabbitmq.host= # mq ip地址 spring.rabbitmq.port=5672 # 端口 默认5672 spring.rabbitmq.username=admin # 用户名 spring.rabbitmq.password=admin # 密码 # 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数) #spring.rabbitmq.publisher-confirms=true # 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数) #spring.rabbitmq.publisher-returns=true # 开启消费者手动确认 ACK 默认auto #spring.rabbitmq.listener.direct.acknowledge-mode=manual #spring.rabbitmq.listener.simple.acknowledge-mode=manual 复制代码
direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中
/** * Rabbit 配置类 * @author peng */ @Configuration public class DirectRabbitConfig { @Bean DirectExchange directExchange(){ // 注册一个 Direct 类型的交换机 默认持久化、非自动删除 return new DirectExchange("directExchange"); } @Bean Queue infoQueue(){ // 注册队列 return new Queue("infoMsgQueue"); } @Bean Queue warnQueue(){ return new Queue("warnMsgQueue"); } @Bean Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) { // 将队列以 info-msg 为绑定键绑定到交换机 return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg"); } @Bean Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) { return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg"); } } 复制代码
/** * 生产者 * @author peng */ @Component public class DirectSender { @Autowired private AmqpTemplate rabbitTemplate; public void sendInfo() { String content = "I am Info msg!"; // 将消息以info-msg绑定键发送到directExchange交换机 this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content); System.out.println("########### SendInfoMsg : " + content); } public void sendWarn() { String content = "I am Warn msg!"; this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content); System.out.println("########### SendWarnMsg : " + content); } public void sendWarn(int i) { String content = "I am Warn msg! " + i; this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content); System.out.println("########### SendWarnMsg : " + content); } public void sendError() { String content = "I am Error msg!"; this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content); System.out.println("########### SendErrorMsg : " + content); } } 复制代码
消费者1 /** * @author peng */ @Component // 标记此类为Rabbit消息监听类,监听队列infoMsgQueue @RabbitListener(queues = "infoMsgQueue") public class DirectReceiver1 { // 定义处理消息的方法 @RabbitHandler public void process(String message) { System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message); } } 消费者2 @Component @RabbitListener(queues = "warnMsgQueue") public class DirectReceiver2 { @RabbitHandler public void process(String message) { System.out.println("########### DirectReceiver2 Receive warnMsg:" + message); } } 复制代码
@RunWith(value=SpringJUnit4ClassRunner.class) @SpringBootTest public class DirectTest { @Autowired private DirectSender directSender; @Test public void send() { directSender.sendInfo(); directSender.sendWarn(); directSender.sendError(); } } 结果 ########### SendInfoMsg : I am Info msg! ########### SendWarnMsg : I am Warn msg! ########### DirectReceiver2 Receive warnMsg:I am Warn msg! ########### DirectReceiver1 Receive InfoMsg:I am Info msg! InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。 WarnMsg 同理 ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃 复制代码
消费者3 @Component @RabbitListener(queues = "warnMsgQueue") public class DirectReceiver3 { @RabbitHandler public void process(String message) { System.out.println("########### DirectReceiver3 Receive warnMsg:" + message); } } // 一对多 @Test public void oneToMany() { for (int i = 0; i< 100 ; i++){ directSender.sendWarn(i); } } 结果 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7 消费者2 和 消费者3 均匀(条数上)的消费了消息 复制代码
/** * 生产者3 * @author peng */ @Component public class DirectSender2 { @Autowired private AmqpTemplate rabbitTemplate; public void sendWarn(int i) { String content = "I am Warn msg! " + i +" fromSend2"; this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content); System.out.println("########### SendWarnMsg : " + content); } } // 多对多 @Test public void manyToMany() { for (int i = 0; i< 10 ; i++){ directSender.sendWarn(i); directSender2.sendWarn(i); } } 结果 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1 ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2 ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1 消费者2和消费者3分别接受了生产者1 和生产者2的消息 复制代码
fanout类型的Exchange路由规则非常简单,会发送给所有绑定到该交换机的队列上。会忽略路由键
配置类
/** * @author peng */ @Configuration public class FanoutRabbitConfig { @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean Queue queue1(){ return new Queue("fanout.1"); } @Bean Queue queue2(){ return new Queue("fanout.2"); } @Bean Queue queue3(){ return new Queue("fanout.3"); } @Bean Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } @Bean Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } @Bean Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue3).to(fanoutExchange); } } 复制代码
生产者
@Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; this.rabbitTemplate.convertAndSend("fanoutExchange", "", context); System.out.println("######## Sender : " + context); } } 复制代码
消费者
消费者1 /** * @author peng */ @Component @RabbitListener(queues = "fanout.1") public class FanoutReceiver1 { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver 1 : " + message); } } 消费者2 @Component @RabbitListener(queues = "fanout.2") public class FanoutReceiver2 { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver 2 : " + message); } } 消费者3 @Component @RabbitListener(queues = "fanout.3") public class FanoutReceiver3 { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver 3 : " + message); } } 复制代码
测试
@RunWith(value=SpringJUnit4ClassRunner.class) @SpringBootTest public class FanoutTest { @Autowired private FanoutSender fanoutSender; @Test public void send() { fanoutSender.send(); } } 结果 ######## Sender : hi, fanout msg fanout Receiver 1 : hi, fanout msg fanout Receiver 2 : hi, fanout msg fanout Receiver 3 : hi, fanout msg 复制代码