转载

springBoot整合RabbitMQ(新手整合请勿喷)

整合前先在springboot引入rabbitMqJAR包,版本号可以为自己自定义,本项目是跟随springboot的版本

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

然后就开始搭建配置项,在springboot项目里的application.properties增加rabbitMQ配置

# rabbitMQ配置项
# rabbitmq访问域名
spring.rabbitmq.host=127.0.0.1
# rabbitmq端口号
spring.rabbitmq.port=5672
# rabbitMq账号
spring.rabbitmq.username=
# rabbitMq密码
spring.rabbitmq.password=
# 开启confirms回调 P-> exchange
spring.rabbitmq.publisher-confirms=true
#开启returnedMessage回调Exchange->Queue
spring.rabbitmq.publisher-returns=true
#设置手动确认(ack)Queue->C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
spring.rabbitmq.template.mandatory=true
#开启消费者重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数(重试5次还不行则会把消息删掉,默认是不限次数的,次数建议控制在10次以内)
spring.rabbitmq.listener.simple.retry.max-attempts=5
#重试间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=3000
spring.rabbitmq.virtual-host=/

然后搭建rabbitMQ配置 RabbitMQConfig

@Configuration
public class RabbitMQConfig {

    private Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * 接受数据自动的转换为Json
     */
    @Bean("messageConverter")
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean("rabbitTemplate")
    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());

        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(!ack) {
                    logger.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
                    String routingKey) {
                logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
                        replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

    @Bean("rabbitAdmin")
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        System.err.println("RabbitAdmin启动了。。。");
        // 设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

然后定义初始化监听器方法 MQListenerConfig

@Configuration
public class MQListenerConfig {

    @Bean
    public MessageListenerConfig messageListenerConfig(RabbitAdmin admin,
            CachingConnectionFactory rabbitConnectionFactory)
            throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
        MessageListenerConfig messageListenerConfig = new MessageListenerConfig();
        messageListenerConfig.init(admin, rabbitConnectionFactory);
        return messageListenerConfig;
    }
}

初始化监听方法以注解形式获取消费者的队列以及监听器

@Component
public class MessageListenerConfig {

    public void init(RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory)
            throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {

        Map<String, AbstractConsumer> map = SpringUtil.getBeansOfType(AbstractConsumer.class);//查询AbstractConsumer父类下的子类
        List<AbstractConsumer> abstractConsumerList = new ArrayList<AbstractConsumer>(map.values());//将上面的子类转换为List集合
        SendMQService sendMQService = SpringUtil.getBean(RabbitServiceImpl.class);//获取rabbitMqService接口
        this.init(abstractConsumerList, 0, admin, rabbitConnectionFactory,sendMQService);//初始化参数
    }

    private void init(List<AbstractConsumer> clazzList, int index, RabbitAdmin admin,
            CachingConnectionFactory rabbitConnectionFactory,SendMQService sendMQService) {

        if (EmptyUtils.isEmpty(clazzList) || clazzList.size() <= index) {
            return;
        }

        AbstractConsumer abstractConsumer = clazzList.get(index);

        RabbitMq rabbitMq = abstractConsumer.getClass().getAnnotation(RabbitMq.class);// 根据反射获取rabbitMQ注解信息

        if (rabbitMq == null) {
            this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService);
        }

        String queueString = rabbitMq.queues(); // 队列
        String routingKeyString = rabbitMq.routingKey(); // 交换器
        String exchangeString = rabbitMq.exchange(); // 路由规则
        int count = rabbitMq.consumersPerQueue(); // 每个队列的消费者数量

        DirectMessageListenerContainer container = new DirectMessageListenerContainer(rabbitConnectionFactory);
        Queue queue = new Queue(queueString);// 声明队列
        admin.declareQueue(queue);// 初始化队列

        if (EmptyUtils.isNotEmpty(exchangeString) && EmptyUtils.isNotEmpty(routingKeyString)) {
            AbstractMQService mqService = (AbstractMQService) SpringUtil.getBean(rabbitMq.exchangeTypes() + AbstractMQService.SERVICE_NAME);
            AbstractExchange exchange = mqService.initExchange(exchangeString);
            admin.declareExchange(exchange);

            Binding binding = mqService.initBinding(queue, exchange, routingKeyString);// 初始化不同队列的数据
            admin.declareBinding(binding);
        }

        MessageListenerAdapter adapter = new MessageListenerAdapter(abstractConsumer);
        adapter.setEncoding("utf-8");
        container.setConsumersPerQueue(rabbitMq.consumersPerQueue());
        container.setQueues(queue);// 监听器配置队列
        container.setMessageListener(adapter);
        container.setAutoDeclare(true);
        container.setAcknowledgeMode(rabbitMq.mode());
        container.setConsumersPerQueue(count);
        // 启动对应的适配器
        container.start();
        sendMQService.addContainer(queueString, container);
        this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService);
    }
}

初始交换机以及绑定关系接口

public interface AbstractMQService {

    static final String SERVICE_NAME = "MQService";

    /**
     * 初始化交换机
     * @return
     */
    public AbstractExchange initExchange(String exchangeName);

    /**
     * 初始化绑定关系
     * @param routeKey
     * @return
     */
    public Binding initBinding(Queue queue,AbstractExchange exchange,String routeKey);
}

初始交换机以及绑定关系实现类分别为DirectMQServiceImpl,FanoutMQServiceImpl,TopicMQServiceImpl

@Service("directMQService")
public class DirectMQServiceImpl implements AbstractMQService {

    @Override
    public AbstractExchange initExchange(String exChangeName) {
        DirectExchange exchange = new DirectExchange(exChangeName);
        return exchange;
    }

    @Override
    public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
        DirectExchange exchange = (DirectExchange) exChange;
        DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
        DirectExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
        return routKeyConfigurer.with(routeKey);
    }

}
@Service("fanoutMQService")
public class FanoutMQServiceImpl implements AbstractMQService {

    @Override
    public AbstractExchange initExchange(String exChangeName) {
        FanoutExchange exchange = new FanoutExchange(exChangeName);
        return exchange;
    }

    @Override
    public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
        FanoutExchange exchange = (FanoutExchange) exChange;
        DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
        Binding binding = bindConfigurer.to(exchange);
        return binding;
    }

}
@Service("topicMQService")
public class TopicMQServiceImpl implements AbstractMQService {

    @Override
    public AbstractExchange initExchange(String exChangeName) {
        TopicExchange exchange = new TopicExchange(exChangeName);
        return exchange;
    }

    @Override
    public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
        TopicExchange exchange = (TopicExchange) exChange;
        DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
        TopicExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
        return routKeyConfigurer.with(routeKey);
    }

}

自定义注解

@Target(value = { ElementType.FIELD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface RabbitMq {

    /**
     * 队列
     * 
     * @return
     */
    public String queues() default "";

    /**
     * 交换器
     * 
     * @return
     */
    public String exchange() default "";

    /**
     * 路由规则
     * 
     * @return
     */
    public String routingKey() default "";

    /**
     * 是否持久化
     * 
     * @return
     */
    public boolean isPersistence() default true;

    /**
     * 确认模式
     * 
     * @return
     */
    public AcknowledgeMode mode() default AcknowledgeMode.MANUAL;

    /**
     * 每个队列消费者数量
     * 
     * @return
     */
    public int consumersPerQueue() default 1;

    /**
     * 交换类型
     * 
     * @return
     */
    public String exchangeTypes() default ExchangeTypes.DIRECT;
}

自定义消费者 AbstractConsumer,此消费者用于通用,每多一个消费者只需继承,然后处理业务逻辑即可

public abstract class AbstractConsumer extends MessagingMessageListenerAdapter {

    protected static final String MQ_CORRELATIONDATA_KEY = "spring_returned_message_correlation";

    public static final String MQ_CACHE_MQ_KEY = "rabbitMQ.queues:";

    public static final Integer FAIL_MAX_COUNT = 5;

    private RedisService redisService = SpringUtil.getBean(RedisService.class);

    @Override
    public void onMessage(Message message, Channel channel) throws IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();

        String correlationId = (String) message.getMessageProperties().getHeaders().get(MQ_CORRELATIONDATA_KEY);
        String queues = messageProperties.getConsumerQueue();
        String cacheKey = new StringBuilder().append(MQ_CACHE_MQ_KEY).append(queues).append(":").append(correlationId).toString();
        Integer failCount = (Integer)redisService.get(cacheKey);
        try {
            this.handleMessage(new String(message.getBody(), "UTF-8"));
            channel.basicAck(deliveryTag, false);

            redisService.del(new StringBuilder().append(correlationId).toString());
        } catch (Exception e) {
            if(failCount > FAIL_MAX_COUNT) {
                return;
            }
            redisService.incr(cacheKey, 1, new Long(CacheTime.CACHE_EXP_THIRTY_SECONDS));
            channel.basicNack(deliveryTag, false, false);
        }
    }

    public abstract void handleMessage(String message);

}

有什么不完美的地方请各位多多指教~!,新手第一次入坑

原文  https://blog.51cto.com/11152994/2489181
正文到此结束
Loading...