整合前先在springboot引入rabbitMqJAR包,版本号可以为自己自定义,本项目是跟随springboot的版本
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
然后就开始搭建配置项,在springboot项目里的application.properties增加rabbitMQ配置
# rabbitMQ配置项 # rabbitmq访问域名 spring.rabbitmq.host=127.0.0.1 # rabbitmq端口号 spring.rabbitmq.port=5672 # rabbitMq账号 spring.rabbitmq.username= # rabbitMq密码 spring.rabbitmq.password= # 开启confirms回调 P-> exchange spring.rabbitmq.publisher-confirms=true #开启returnedMessage回调Exchange->Queue spring.rabbitmq.publisher-returns=true #设置手动确认(ack)Queue->C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100 spring.rabbitmq.template.mandatory=true #开启消费者重试 spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数(重试5次还不行则会把消息删掉,默认是不限次数的,次数建议控制在10次以内) spring.rabbitmq.listener.simple.retry.max-attempts=5 #重试间隔时间 spring.rabbitmq.listener.simple.retry.initial-interval=3000 spring.rabbitmq.virtual-host=/
然后搭建rabbitMQ配置 RabbitMQConfig
@Configuration public class RabbitMQConfig { private Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class); @Autowired private CachingConnectionFactory connectionFactory; /** * 接受数据自动的转换为Json */ @Bean("messageConverter") public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate() { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(messageConverter()); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(!ack) { logger.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; } @Bean("rabbitAdmin") public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); System.err.println("RabbitAdmin启动了。。。"); // 设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置) rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
然后定义初始化监听器方法 MQListenerConfig
@Configuration public class MQListenerConfig { @Bean public MessageListenerConfig messageListenerConfig(RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException { MessageListenerConfig messageListenerConfig = new MessageListenerConfig(); messageListenerConfig.init(admin, rabbitConnectionFactory); return messageListenerConfig; } }
初始化监听方法以注解形式获取消费者的队列以及监听器
@Component public class MessageListenerConfig { public void init(RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory) throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException { Map<String, AbstractConsumer> map = SpringUtil.getBeansOfType(AbstractConsumer.class);//查询AbstractConsumer父类下的子类 List<AbstractConsumer> abstractConsumerList = new ArrayList<AbstractConsumer>(map.values());//将上面的子类转换为List集合 SendMQService sendMQService = SpringUtil.getBean(RabbitServiceImpl.class);//获取rabbitMqService接口 this.init(abstractConsumerList, 0, admin, rabbitConnectionFactory,sendMQService);//初始化参数 } private void init(List<AbstractConsumer> clazzList, int index, RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory,SendMQService sendMQService) { if (EmptyUtils.isEmpty(clazzList) || clazzList.size() <= index) { return; } AbstractConsumer abstractConsumer = clazzList.get(index); RabbitMq rabbitMq = abstractConsumer.getClass().getAnnotation(RabbitMq.class);// 根据反射获取rabbitMQ注解信息 if (rabbitMq == null) { this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService); } String queueString = rabbitMq.queues(); // 队列 String routingKeyString = rabbitMq.routingKey(); // 交换器 String exchangeString = rabbitMq.exchange(); // 路由规则 int count = rabbitMq.consumersPerQueue(); // 每个队列的消费者数量 DirectMessageListenerContainer container = new DirectMessageListenerContainer(rabbitConnectionFactory); Queue queue = new Queue(queueString);// 声明队列 admin.declareQueue(queue);// 初始化队列 if (EmptyUtils.isNotEmpty(exchangeString) && EmptyUtils.isNotEmpty(routingKeyString)) { AbstractMQService mqService = (AbstractMQService) SpringUtil.getBean(rabbitMq.exchangeTypes() + AbstractMQService.SERVICE_NAME); AbstractExchange exchange = mqService.initExchange(exchangeString); admin.declareExchange(exchange); Binding binding = mqService.initBinding(queue, exchange, routingKeyString);// 初始化不同队列的数据 admin.declareBinding(binding); } MessageListenerAdapter adapter = new MessageListenerAdapter(abstractConsumer); adapter.setEncoding("utf-8"); container.setConsumersPerQueue(rabbitMq.consumersPerQueue()); container.setQueues(queue);// 监听器配置队列 container.setMessageListener(adapter); container.setAutoDeclare(true); container.setAcknowledgeMode(rabbitMq.mode()); container.setConsumersPerQueue(count); // 启动对应的适配器 container.start(); sendMQService.addContainer(queueString, container); this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService); } }
初始交换机以及绑定关系接口
public interface AbstractMQService { static final String SERVICE_NAME = "MQService"; /** * 初始化交换机 * @return */ public AbstractExchange initExchange(String exchangeName); /** * 初始化绑定关系 * @param routeKey * @return */ public Binding initBinding(Queue queue,AbstractExchange exchange,String routeKey); }
初始交换机以及绑定关系实现类分别为DirectMQServiceImpl,FanoutMQServiceImpl,TopicMQServiceImpl
@Service("directMQService") public class DirectMQServiceImpl implements AbstractMQService { @Override public AbstractExchange initExchange(String exChangeName) { DirectExchange exchange = new DirectExchange(exChangeName); return exchange; } @Override public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) { DirectExchange exchange = (DirectExchange) exChange; DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue); DirectExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange); return routKeyConfigurer.with(routeKey); } }
@Service("fanoutMQService") public class FanoutMQServiceImpl implements AbstractMQService { @Override public AbstractExchange initExchange(String exChangeName) { FanoutExchange exchange = new FanoutExchange(exChangeName); return exchange; } @Override public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) { FanoutExchange exchange = (FanoutExchange) exChange; DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue); Binding binding = bindConfigurer.to(exchange); return binding; } }
@Service("topicMQService") public class TopicMQServiceImpl implements AbstractMQService { @Override public AbstractExchange initExchange(String exChangeName) { TopicExchange exchange = new TopicExchange(exChangeName); return exchange; } @Override public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) { TopicExchange exchange = (TopicExchange) exChange; DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue); TopicExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange); return routKeyConfigurer.with(routeKey); } }
自定义注解
@Target(value = { ElementType.FIELD, ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) public @interface RabbitMq { /** * 队列 * * @return */ public String queues() default ""; /** * 交换器 * * @return */ public String exchange() default ""; /** * 路由规则 * * @return */ public String routingKey() default ""; /** * 是否持久化 * * @return */ public boolean isPersistence() default true; /** * 确认模式 * * @return */ public AcknowledgeMode mode() default AcknowledgeMode.MANUAL; /** * 每个队列消费者数量 * * @return */ public int consumersPerQueue() default 1; /** * 交换类型 * * @return */ public String exchangeTypes() default ExchangeTypes.DIRECT; }
自定义消费者 AbstractConsumer,此消费者用于通用,每多一个消费者只需继承,然后处理业务逻辑即可
public abstract class AbstractConsumer extends MessagingMessageListenerAdapter { protected static final String MQ_CORRELATIONDATA_KEY = "spring_returned_message_correlation"; public static final String MQ_CACHE_MQ_KEY = "rabbitMQ.queues:"; public static final Integer FAIL_MAX_COUNT = 5; private RedisService redisService = SpringUtil.getBean(RedisService.class); @Override public void onMessage(Message message, Channel channel) throws IOException { MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); String correlationId = (String) message.getMessageProperties().getHeaders().get(MQ_CORRELATIONDATA_KEY); String queues = messageProperties.getConsumerQueue(); String cacheKey = new StringBuilder().append(MQ_CACHE_MQ_KEY).append(queues).append(":").append(correlationId).toString(); Integer failCount = (Integer)redisService.get(cacheKey); try { this.handleMessage(new String(message.getBody(), "UTF-8")); channel.basicAck(deliveryTag, false); redisService.del(new StringBuilder().append(correlationId).toString()); } catch (Exception e) { if(failCount > FAIL_MAX_COUNT) { return; } redisService.incr(cacheKey, 1, new Long(CacheTime.CACHE_EXP_THIRTY_SECONDS)); channel.basicNack(deliveryTag, false, false); } } public abstract void handleMessage(String message); }
有什么不完美的地方请各位多多指教~!,新手第一次入坑