转载

Spring整合RabbitMQ-03-SimpleMessageListenerContainer

SimpleMessageListenerContainer

  1. 简单消息监听容器,这个类非常强大,我们可以对他进行很多设置,对于消息的配置项,这个类都可以满足;
  2. 监听队列(多个对列),自动启动,自动声明功能
  3. 设置事务特性,事务管理器,事务属性。事务容量(并发),是否开启事务,回滚消息等
  4. 设置消费者的数量,最大最小数量,批量消费等
  5. 设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
  6. 设置消费者标签生成策略,是否独占模式,消费者属性
  7. 设置具体的监听器,消息转换器等等

注意

SimpleMessageListenerContainer 可以进行动态设置,比如在运行中可以动态修改其消费者数量的大小,接收消息的模式等。很多基于RabbitMQ的制定化后端管理控制台在进行动态设置的时候,也是根据这一特性去实现的。

代码

package com.wyg.rabbitmq.springamqp;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.rabbitmq.client.Channel;

/**
 * RabbitAdmin
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-25 15:11
 * @since JDK1.8
 * @version V1.0
 */

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("localhost:5672");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }
    
    /**
     * SimpleMessageListenerContainer注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 17:16
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 监听多个queue
        container.addQueueNames("test01", "test02", "test03");
        // 设置当前消费者数量
        container.setConcurrentConsumers(1);
        // 设置最大的消费者数量
        container.setMaxConcurrentConsumers(5);
        // 设置不要重回队列
        container.setDefaultRequeueRejected(false);
        // 设置自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 设置消费端tag策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + System.currentTimeMillis();
            }
        });

        // 设置监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                // 消息处理
                String msg = new String(message.getBody(), "UTF-8");
                System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg
                    + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;
            }
        });

        return container;
    }

}

单元测试

package com.wyg.rabbitmq.springamqp;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
    @Autowired
    RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;

 
    @Test
    public void testSimpleMessageListenerContainerSendMsg() {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "条消息").getBytes());

        }
    }

}
原文  https://segmentfault.com/a/1190000021164406
正文到此结束
Loading...