之前我有 一系列关于RabbitMQ文章 介绍了RabbitMQ的用法,本篇我们介绍如何在Spring Boot中集成RabbitMQ。本篇主要内容如下:
工程名称: rabbitmq
需要在我们的工程中的pom.xml引入新jar
<!-- spring boot: 此版本使用的amqp-client的4.x版本,所有需要注释掉上面的amqp-client配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置application-boot.yml
spring: # 配置rabbitMQspring: rabbitmq: host: 10.240.80.134 username: spring-boot password: spring-boot virtual-host: spring-boot-vhost
下面的代码都在这个工程中
@Configuration public class RabbitConfigure2 { // 队列名称 public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-2"; // 交换机名称 public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-2"; // 绑定的值 public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-2"; }
在spring boot默认会生成AmqpAdmin和AmqpTemplate 供我们和RabbitMQ交互。
AmqpTemplate 的默认实例是RabbitTemplate,AmqpAdmin 默认实例是RabbitAdmin,通过源码发现其内部实现实际是RabbitTemplate。所以AmqpAdmin和AmqpTemplate两者本质是相同的
发送者代码: SendMsg2
@Component public class SendMsg2 { // 此接口默认实现只有一个,且是RabbitAdmin,通过源码发现其内部实现实际是RabbitTemplate。所以AmqpAdmin和AmqpTemplate当前两者本质是相同的 @Autowired private AmqpAdmin amqpAdmin; // 此接口的默认实现是RabbitTemplate,目前只有一个实现, @Autowired private AmqpTemplate amqpTemplate; /** * 发送消息 * * @param msgContent */ public void send_2(String msgContent) { amqpTemplate.convertAndSend(RabbitConfigure2.SPRING_BOOT_EXCHANGE, RabbitConfigure2.SPRING_BOOT_BIND_KEY, msgContent); } }
通过@RabbitListener注解要接收消息的方法,在此注解中可以定义
接收者代码: ReceiveMsg2
@Component public class ReceiveMsg2 { /** * === 在RabbitMQ上创建queue,exchange,binding 方法二:直接在@RabbitListener声明 begin === * 接收 * @param content */ @RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding( value = @Queue(value = RabbitConfigure2.SPRING_BOOT_QUEUE+"3", durable = "true", autoDelete="true"), exchange = @Exchange(value = RabbitConfigure2.SPRING_BOOT_EXCHANGE, type = ExchangeTypes.TOPIC), key = RabbitConfigure2.SPRING_BOOT_BIND_KEY) ) public void receive_2(String content) { // ... System.out.println("[ReceiveMsg-2] receive msg: " + content); } }
启动类: SpringBootRabbitApplication2
测试类: SimpleTest2
@RunWith(SpringRunner.class) @SpringBootTest(classes= SpringBootRabbitApplication2.class, value = "spring.profiles.active=boot") public class SimpleTest2 { @Autowired private ReceiveMsg2 receiveMsg2; @Autowired private SendMsg2 sendMsg2; @Test public void sendAndReceive_2(){ String testContent = "send msg via spring boot -2"; sendMsg2.send_2(testContent); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }
[ReceiveMsg-2] receive msg: send msg via spring boot -2
在第一个例子中我们在@RabbitListener注解上声明队列、交换机、绑定关系,这里我们使用@Bean来声明这张对象。详细见RabbitConfigure1
@Configuration public class RabbitConfigure1 { // 队列名称 public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-1"; // 交换机名称 public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-1"; // 绑定的值 public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-1"; // === 在RabbitMQ上创建queue,exchange,binding 方法一:通过@Bean实现 begin === /** * 定义队列: * @return */ @Bean Queue queue() { return new Queue(SPRING_BOOT_QUEUE, false); } /** * 定义交换机 * @return */ @Bean TopicExchange exchange() { return new TopicExchange(SPRING_BOOT_EXCHANGE); } /** * 定义绑定 * @param queue * @param exchange * @return */ @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(SPRING_BOOT_BIND_KEY ); }
接收类,还是使用@RabbitListener,但是只需要配置queues ,queues 除了支持完全匹配还支持正则匹配
消息接收类: ReceiveMsg1
@Component public class ReceiveMsg1 { /** * 获取信息: * queue也可以支持RabbitMQ中对队列的模糊匹配 * @param content */ @RabbitListener(queues = RabbitConfigure1.SPRING_BOOT_QUEUE) public void receive_1(String content) { // ... System.out.println("[ReceiveMsg-1] receive msg: " + content); } }
SimpleTest 输出
[ReceiveMsg-1] receive msg: send msg via spring boot - 1
之前的例子里,我们无论发送和接收消息,消息内容都是使用String,这里我们可以通过设置MessageConverter设置发送和接收消息的内容的方法参数是对象。
MsgContent1 和 MsgContent2 最简单的POJO类,只有私有属性和set/get方法
和上一个demo类似,除了使用@Bean声明队列、交换机、绑定关系外,还多了方法jackson2JsonMessageConverter()初始化MessageConverter ,此对象会被注入到RabbitListenerContainer容器中,用于转化发送和收到的消息
RabbitMsgConvertConfigure
@Configuration public class RabbitMsgConvertConfigure { /** * 定义消息转换实例 * @return */ @Bean MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } …. }
发送消息,这里直接发送对象
消息发送者: SendMsgConvertMsg
@Component public class SendMsgConvertMsg { @Autowired private AmqpTemplate amqpTemplate; /** * 发送消息 * * @param msgContent */ public void sendMsgContent1(MsgContent1 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent ); } /** * 发送消息 * @param msgContent */ public void sendMsgContent2(MsgContent2 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent); } }
@RabbitListener定义在类表示此类是消息监听者并设置要监听的队列
@RabbitHandler:在类中可以定义多个@RabbitHandler,spring boot会根据不同参数传送到不同方法处理
消息接收者: ReceiveMsgConvertMsg
@Component // @RabbitListener除了可以作用在方法,也可以作用在类上。在后者的情况下,需要在处理的方法使用@RabbitHandler。一个类可以配置多个@RabbitHandler @RabbitListener(queues = RabbitMsgConvertConfigure.SPRING_BOOT_QUEUE) public class ReceiveMsgConvertMsg { /** * 获取信息: * queue也可以支持RabbitMQ中对队列的模糊匹配 * @param content */ @RabbitHandler public void receiveMsgContent1(MsgContent1 content) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: " + content); } @RabbitHandler public void receiveMsgContent2(MsgContent2 msgContent2) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: " + msgContent2); } }
测试类: MsgConvertTest
输出结果
[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: [ name = send msg via spring boot - msg convert - MsgContent1; age = 27 ] [ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: [ id = 83; content = send msg via spring boot - msg convert - MsgContent1 ]
所有的详细代码见github代码,请尽量使用 tag v0.17,不要使用master,因为master一直在变,不能保证文章中代码和github上的代码一直相同