注意
SimpleMessageListenerContainer
可以进行动态设置,比如在运行中可以动态修改其消费者数量的大小,接收消息的模式等。很多基于RabbitMQ的制定化后端管理控制台在进行动态设置的时候,也是根据这一特性去实现的。
package com.wyg.rabbitmq.springamqp; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; /** * RabbitAdmin * * @author wyg0405@gmail.com * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */ @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses("localhost:5672"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); cachingConnectionFactory.setVirtualHost("/"); return cachingConnectionFactory; } /** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 17:16 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 监听多个queue container.addQueueNames("test01", "test02", "test03"); // 设置当前消费者数量 container.setConcurrentConsumers(1); // 设置最大的消费者数量 container.setMaxConcurrentConsumers(5); // 设置不要重回队列 container.setDefaultRequeueRejected(false); // 设置自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置消费端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 设置监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息处理 String msg = new String(message.getBody(), "UTF-8"); System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);; } }); return container; } }
package com.wyg.rabbitmq.springamqp; import java.io.*; import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitConfigTest { @Autowired RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private SimpleMessageListenerContainer simpleMessageListenerContainer; @Test public void testSimpleMessageListenerContainerSendMsg() { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey for (int i = 0; i < 3; i++) { rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "条消息").getBytes()); } } }