Spring boot 整合 RabbitMQ,本示例采用 RabbitTemplate 发送消息,采用 @RabbitListener 接受消息。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
需要在配置文件中设置 RabbitMQ 的服务器、端口号、用户名和密码等信息。
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
在 Spring boot 整合 RabbitMQ 的项目中,为了方便使用 RabbitMQ 的相关操作组件和跟踪消息在发送过程中的状态,可以在项目中自定义注入和配置 Bean 相关组件。下面设置自定义配置的 Bean 组件放到 RabbitmqConfig 配置类中。
package com.example.mgt.examplemgtservicek8stest.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; /** * RabbitMQ 自定义注入配置 Bean 相关组件 * @since 2020/5/15 13:52 */ @Configuration public class RabbitmqConfig { //定义日志 private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class); //自动装配 RabbitMQ 的链接工厂实例 @Autowired private CachingConnectionFactory connectionFactory; //自动装配消息监听器所在的容器工厂配置类实例 @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; //下面为单一消费者实例的配置 @Bean(name="singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ //定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //设置容器工厂所用的实例 factory.setConnectionFactory(connectionFactory); //设置消息在传输中的格式,这里采用 JSON 的格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); //设置并发消费者实例的初始数量为1 factory.setConcurrentConsumers(1); //设置并发消费者实例中最大数量为1 factory.setMaxConcurrentConsumers(1); //设置并发消费者实例中每个实例拉取的消息数量为1个 factory.setPrefetchCount(1); return factory; } //自定义配置 RabbitMQ 发送消息的操作组件 RabbitTemplate @Bean public RabbitTemplate rabbitTemplate(){ //设置:发送消息后进行确认 connectionFactory.setPublisherConfirms(true); //设置:发送消息后返回确认信息 connectionFactory.setPublisherReturns(true); //构造发送消息组件实例对象 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm( CorrelationData correlationData, boolean b, String s) { if(b){ log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s); }else{ System.out.println("消息确认失败"); } } }); // 设置消息收到确认 rabbitTemplate.setMandatory(true); /* rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage( Message message, int i, String s, String s1, String s2) { log.info("消息发送失败"); } });*/ return rabbitTemplate; } }
1、当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;
2、当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;
Spring boot 整合 RabbitMQ 的正式代码编写前,需要创建队列、交换机、路由及绑定操作。以生产者发送一个简单字符串为例。
最先在 RabbitmqConfig 类中创建队列、交换机、路由和绑定操作,如下:
// 定义读取配置文件的环境变量的实例 @Autowired private Environment env; //创建队列 @Bean(name = "basicQueue") public Queue basicQueue(){ return new Queue(env.getProperty("mq.basic.info.queue.name"),true); } //创建交换机:以 DirectExchange 为例 @Bean public DirectExchange basicExchange(){ return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"),true,false); } //绑定 @Bean public Binding basicBinding(){ return BindingBuilder.bind(basicQueue()).to(basicExchange()). with(env.getProperty("mq.basic.info.routing.key.name")); }
补充:上面 env 读取到的变量,都需要在配置文件 application.yml 中设置好。
mq: basic: info: queue: name: lyz.queue exchange: name: lyz.exchange routing: key: name: lyz.key
package com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; /** * 生产者 * @since 2020/5/15 14:33 */ @Component public class BasicPublisher { private static final Logger log = LoggerFactory.getLogger(BasicPublisher.class); //定义 JSON 序列化和反序列化实例 @Autowired private ObjectMapper objectMapper; //定义 RabbitMQ 消息操作组件 @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; public void sendMsg(String message) { if(!Strings.isNullOrEmpty(message)){ try{ //定义传输格式为 JSON 字符串 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //指定消息模型中的交换机 rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name")); //指定消息模型中的路由 rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name")); //将字符串转化为待发送的消息,即一串二进制的数据流 Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build(); //转化并发送消息 rabbitTemplate.convertAndSend(msg); log.info("基本消息模型-生产者-发送消息:{}",message); } catch (Exception e){ log.error("基本消息模型-生产者-发送消息异常:{}",message,e.fillInStackTrace()); } } } }
package com.example.mgt.examplemgtservicek8stest.rabbitmq.consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * 消费者 * @since 2020/5/15 14:57 */ @Component public class BasicConsumer { private static final Logger log = LoggerFactory.getLogger(BasicConsumer.class); @Autowired public ObjectMapper objectMapper; //监听并接受消费队列中的消息-容器工厂 singleListenerContainer 在 RabbitmqConfig 类中定义 @RabbitListener(queues="${mq.basic.info.queue.name}",containerFactory = "singleListenerContainer") //由于消息本质是一串二进制数据流,因而监听接受的消息采用字节数组接收 public void consumeMsg1(@Payload byte[] msg) { try{ String message = new String(msg,"utf-8"); log.info("基本信息模型-消费者-监听到的消息: {}",message); } catch (Exception e){ log.error("基本信息获取失败:",e.fillInStackTrace()); } } }
package com.example.mgt.examplemgtservicek8stest; import com.fasterxml.jackson.databind.ObjectMapper; import com.example.mgt.examplemgtservicek8stest.rabbitmq.publisher.BasicPublisher; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * RabbitMQ 的 JAVA 单元测试类 * @since 2020/5/15 15:13 */ @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RabbitmqTest { private static final Logger log = LoggerFactory.getLogger(RabbitmqTest.class); @Autowired private ObjectMapper objectMapper; @Autowired private BasicPublisher basicPublisher; @Test public void test1() throws Exception{ String msg = "~~~~~~~~~~测试"; basicPublisher.sendMsg(msg); } }