springboot整合RabbitMQ 简单使用。
百度 安装和启动。
复制代码
原理:
生产者发送消息给 rabbit(交换机,队列,key)
消费者监听队列 处理结果
延迟队列原理:
"假延迟队列(先进先出策略,先放进去一个30秒,再放进去一个10秒,不会先处理10秒的)"
1个交换机。绑定两个队列。
A队列不设置消费者。
B队列设置消费者。
把消息发送到A队列(带上MessagePostProcessor,设置超时时间等)。
因为A队列没有消费者,所以超过设置时间数据还没消费,数据就会变成死信(Dead-letter)。
然后就根据A队列的配置。自动转发到另一个队列(B)中去了。
B直接消费就可以了。
复制代码
文档:
官方文档: https://www.rabbitmq.com/
复制代码
步骤:
1.引入spring-boot-starter-amqp
2.配置文件编写地址rabbit连接地址
3.创建交换机和队列并绑定。
4.创建生产者和消费者
5.测试
复制代码
1.引入spring-boot-starter-amqp
<!-- rabbit mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
2.配置文件编写地址rabbit连接地址
# TODO 改成自己的地址
spring.rabbitmq.host=
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
复制代码
3.创建交换机和队列并绑定。
/**
* 相关常量
* @author dripy
* @date 2019/12/23 17:37
*/
public class Constant {
/**
* 交换机
*/
public static final String MQ_EXCHANGE = "dripy-test-exchange";
/**
* 普通队列相关
*/
public static final String MQ_QUEUE = "dripy-test-queue";
/**
* 延迟队列相关
*/
public static final String MQ_DELAY_QUEUE = "dripy-test-queue-delay";
}
复制代码
1个交换机,一个队列,一个交换机与队列绑定。
后面死信队列 和 死信队列与交换机绑定是为了测试延迟队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbit MQ 配置
* <p>
* exChange 交换机 ---> key ---> queue
* 把key和queue的值设置成一样吧。
* 交换机找key 。 根据key 找 队列
*
* @author dripy
* @date 2019/12/23 17:31
*/
@Configuration
public class RabbitConfig {
/**
* 交换机
*
* @return
*/
@Bean("sendCommodityExChange")
public DirectExchange directExchange() {
//交换器名称、是否持久化、是否自动删除
return new DirectExchange(Constant.MQ_EXCHANGE, false, false);
}
/**
* 正常队列
*
* @return
*/
@Bean("sendCommodityQueue")
public Queue directQueue() {
//队列名字,是否持久化
return new Queue(Constant.MQ_QUEUE, false);
}
/**
* 交换机与正常队列绑定
*
* @param queue
* @param exchange
* @return
*/
@Bean
Binding binding(@Qualifier("sendCommodityQueue") Queue queue, @Qualifier("sendCommodityExChange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Constant.MQ_QUEUE);
}
/**
* 死信队列
* <p>
* 配置队列中数据超时未消费时, 直接转发到指定队列(正常队列)中去
*
* @return
*/
@Bean("delayqueue")
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
// 当队列超时后。转发到对应的交换机和队列中去
arguments.put("x-dead-letter-exchange", Constant.MQ_EXCHANGE);
arguments.put("x-dead-letter-routing-key", Constant.MQ_QUEUE);
return new Queue(Constant.MQ_DELAY_QUEUE, true, false, false, arguments);
}
/**
* 交换机与死信队列绑定
*
* @param queue
* @param exchange
* @return
*/
@Bean
Binding delayBinding(@Qualifier("delayqueue") Queue queue, @Qualifier("sendCommodityExChange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Constant.MQ_DELAY_QUEUE);
}
}
复制代码
4.创建生产者和消费者
生产者
import com.diandi.rabbit.config.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @author dripy
* @date 2019/12/23 17:34
*/
@Controller
@RequestMapping("/test")
public class TestController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@PostMapping("/send")
public String sendInfo() {
logger.info("发送消息");
// 把消息发送到指定的 交换机、key(key和队列名称设置成一样。方便理解)
rabbitTemplate.convertAndSend(Constant.MQ_EXCHANGE, Constant.MQ_QUEUE, "直接消费");
// 特别注意。 不要给死信队列 写消费者
// 把消息发送到死信队列中。设置超时时间。
// 当超过 超时时间后 会直接转发到 指定队列中
MessagePostProcessor processor = message -> {
// 暂定5秒
message.getMessageProperties().setExpiration("5000");
return message;
};
rabbitTemplate.convertAndSend(Constant.MQ_EXCHANGE, Constant.MQ_DELAY_QUEUE, "延迟消费", processor);
return "ok";
}
}
复制代码
消费者
package com.diandi.rabbit.listener;
import com.diandi.rabbit.config.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*
* @author dripy
* @date 2019/12/23 17:32
*/
@Component
@RabbitListener(queues = Constant.MQ_QUEUE)
public class TestListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void handler(String message) {
logger.info("发放实物队列收到的消息:{}", message);
}
}
复制代码
https://juejin.im/post/5e00b8fb518825126b262871