花了一周多的时间(周末去掉..捂脸)在工作之余写了两篇关于rabbitMq的内容,一篇是原生版的,一篇是springboot版的。初学者最好是看一下原声版更清晰一点,如果急于应用也可以直接看本文。本文内容较多,看完了五大消息模型的应用后还有进阶篇连着在一起,研究不太彻底请多多指教,好了,不打扰你们看了!
原生版传送门
官方参考文档
Server:又称之为Broker,接受客户端的连接,实现AMQP实体服务。
Connection:连接,应用程序与Broker的网络连接。
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Message:消息,服务器和应用程序之间传送的数据,由Message Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body就是消息体内容。
Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。
Exchange:交换机,只有转发能力不具备存储消息能力,根据路由键转发消息到绑定的队列。
Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。
Routing key:一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:也可以称之为Message Queue(消息队列),保存消息并将它们转发到消费者。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
</dependencies>
复制代码
spring:
rabbitmq:
host: 127.0.0.1
username: admin123
password: 123456
virtual-host: /test
复制代码
关于一些方法的使用,参数属性说明都在代码中有注释
P(producer/ publisher):生产者,如寄快递
C(consumer):消费者,如收快递
红色区域:队列,如快递区,等待消费者拿快递
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
package com.ao.springbootamqp.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMqConfig {
/*队列*/
public static final String TEST_QUEUE = "simple-amqp_queue";
/**声明队列
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE)
public Queue testQueue() {
return new Queue(TEST_QUEUE, true);
}
复制代码
package com.ao.springbootamqp.service;
import com.ao.springbootamqp.config.RabbitMqConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.UUID;
@Component
@Slf4j
public class RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
/*发送消息到队列*/
public String sendQueue(Object payload){
return baseSend("", RabbitMqConfig.TEST_QUEUE, payload, null, null);
}
/**
* MQ 公用发送方法
*
* @param exchange 交换机
* @param routingKey 队列
* @param payload 消息体
* @param messageId 消息id(唯一性)
* @param messageExpirationTime 持久化时间
* @return 消息编号
*/
public String baseSend(String exchange, String routingKey, Object payload, String messageId, Long messageExpirationTime) {
/*若为空,则自动生成*/
if (messageId == null) {
messageId = UUID.randomUUID().toString();
}
String finalMessageId = messageId;
/*设置消息属性*/
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
/*消息属性中写入消息id*/
message.getMessageProperties().setMessageId(finalMessageId);
/*设置消息持久化时间*/
if (!StringUtils.isEmpty(messageExpirationTime)){
message.getMessageProperties().setExpiration(messageExpirationTime.toString());
}
/*设置消息持久化*/
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
/*构造消息体,转换json数据格式*/
Message message = null;
try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(payload);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
message = new Message(json.getBytes(), messageProperties);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
/*表示当前消息唯一性*/
CorrelationData correlationData = new CorrelationData(finalMessageId);
/**
* public void convertAndSend(String exchange, String routingKey, Object message,
* MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
* exchange: 路由
* routingKey: 绑定key
* message: 消息体
* messagePostProcessor: 消息属性处理器
* correlationData: 表示当前消息唯一性
*/
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
return finalMessageId;
}
}
复制代码
@SpringBootTest
class RabbitMqTest {
@Autowired
private RabbitMqService rabbitMqService;
@Test
public void tt(){
String s = "顺丰快递";
rabbitMqService.sendQueue(s);
}
}
复制代码
可以看到,消息已经成功发送到服务器上啦,里面消息的属性也正是我们设置好的。因为消息已经发送到服务器上啦,所以待会启动消费者便可以消费了
@Component
public class RecService {
/*队列*/
public static final String TEST_QUEUE = "simple-amqp_queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("我收到了=-=" + msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
P(producer/ publisher):生产者,如寄快递
C1、C2(consumer):消费者,如收快递
红色区域:队列,如快递区,等待消费者拿快递
@SpringBootTest
class RabbitMqTest {
@Autowired
private RabbitMqService rabbitMqService;
@Test
public void tt(){
for (int i = 0;i < 10; i++){
String s = "消息" + i;
rabbitMqService.sendQueue(s);
}
}
}
复制代码
@Component
public class RecService1 {
/*队列*/
public static final String TEST_QUEUE = "work-amqp-queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("消费者1收到=-=" + msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
可以看到,消费一样多,如果想能者多劳模式,添加配置如下:
#指定一个请求能够处理多少个消息
listener:
simple:
#测试消费者1值为3,消费者2值为1
prefetch: 1
复制代码
或者在消费者添加channel.basicQos(1)即可。这就告诉RabbitMq不要一直向消费者发送消息,而是要等待消费者的确认了前一个消息
@Component
public class RecService1 {
/*队列*/
public static final String TEST_QUEUE = "work-amqp-queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message,Channel channel){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("消费者1收到=-=" + msg);
channel.basicQos(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
重新启动两个消费者,再循环发送10条消息,查看控制台如下:
在这种订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
P:生产者,如寄快递
X: 交换机,相当于快递公司
红色区域:队列,如快递区,等待消费者拿快递
C1、C2:消费者,如收快递
在RabbitMqConfig修改如下配置,声明队列1和队列2,并把交换机与这两个队列进行绑定
/*交换机*/
public static final String TEST_EXCHANGE = "fanout_amqp_exchange";
/*声明一个fanout交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.fanoutExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "fanout_amqp_queue_1";
/*队列2*/
public static final String TEST_QUEUE_2 = "fanout_amqp_queue_2";
/**声明队列1
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
return new Queue(TEST_QUEUE_1, true);
}
/*声明队列2*/
@Bean(TEST_QUEUE_2)
public Queue testQueue2() {
return new Queue(TEST_QUEUE_2, true);
}
/*队列1与路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("")
.noargs();
}
/*队列2与路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("")
.noargs();
}
复制代码
在 RabbitMqService 添加发送方式: 发送到交换机
/*发送到交换器*/
public String sendExchange(Object payload,String routingKey){
return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, null);
}
复制代码
@Test
public void t1(){
String s = "广播快递";
rabbitMqService.sendExchange(s,"");
}
复制代码
可以看到已经发送成功
@RabbitListener(queues = TEST_QUEUE)
复制代码
在这种订阅模式中,生产者发布消息,消费者有选择性的接收消息。队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的routing key
P:生产者,如寄快递
X: 交换机,相当于快递公司
红色区域:队列,如快递区,等待消费者拿快递
C1、C2:消费者,如收快递
error、info这些就是我们讲的RoutingKey
修改RabbitMqConfig配置, 主要是在交换机与这两个队列进行绑定时候指定routingkey,队列1只接收顺丰快递,队列2只接收京东快递
/*交换机*/
public static final String TEST_EXCHANGE = "direct_amqp_exchange";
/*声明一个direct交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "direct_amqp_queue_1";
/*队列2*/
public static final String TEST_QUEUE_2 = "direct_amqp_queue_2";
/**声明队列
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
return new Queue(TEST_QUEUE_1, true);
}
@Bean(TEST_QUEUE_2)
public Queue testQueue2() {
return new Queue(TEST_QUEUE_2, true);
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF")
.noargs();
}
/*队列2路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("JD")
.noargs();
}
复制代码
@Test
public void t2(){
String s = "京东快递";
String s1 = "顺丰快递";
rabbitMqService.sendExchange(s,"JD");
rabbitMqService.sendExchange(s1,"SF");
}
复制代码
改一下相应的队列名再启动,按道理来说消费者1应该收到顺丰快递,消费者2应该收到京东快递,结果如下:
结果符合预期。
Topic
类型的 Exchange
与 Direct
相比,都是可以根据 RoutingKey
把消息路由到不同的队列。只不过 Topic
类型 Exchange
可以让队列在绑定 Routing key
的时候使用 通配符
!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
修改RabbitMqConfig与direct基本一样,只修改了一下队列名和交换机,routingkey改成 队列1只接收顺丰快递,队列2任何快递都接收
/*声明一个direct交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.topicExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF.kd")
.noargs();
}
/*队列2路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("#.kd")
.noargs();
}
复制代码
@Test
public void t2(){
String s = "EMS快递";
String s1 = "顺丰快递";
String s2 = "京东快递";
rabbitMqService.sendExchange(s,"EMS.kd");
rabbitMqService.sendExchange(s1,"SF.kd");
rabbitMqService.sendExchange(s2,"JD.kd");
}
复制代码
结果如下,符合预期!
温馨提示:以下代码示例都以路由模式进行演示。
实现RabbitMQ消息的可靠要保证以下3点:
RabbitMQ消息确认机制:RabbitMQ消息确认有2种:消息发送确认,消费接收确认。消息发送确认是确认生产者将消息发送到Exchange,Exchange分发消息至Queue的过程中,消息是否可靠投递。第一步是否到达Exchange,第二步确认是否到达Queue。
交换机,队列,消息进行持久化:防止消息发送到了broker,还没等到消费者消费 ,broker就挂掉了
消费者确认机制: 模式有3种: none(没有任何的应答会被发送) , auto(自动应答) , manual(手动应答) 。为了保证消息可靠性,我们设置手动应答,这是为什么呢?采用自动应答的方式,每次消费端收到消息后,不管是否处理完成,Broker都会把这条消息置为完成,然后从Queue中删除。如果消费端消费时,抛出异常,消费端没有成功消费该消息,从而造成消息丢失。手动应答方式可以调用basicAck、basicNack、basicReject方法,只有在消息得到正确处理下,再发送ACK。
spring:
rabbitmq:
host: 127.0.0.1
username: admin123
password: 123456
virtual-host: /test
# 确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
publisher-confirms: true
# 实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发
publisher-returns: true
listener:
# 消息消费确认,可以手动确认
simple:
acknowledge-mode: manual
复制代码
增加实现ConfirmCallBack接口和实现ReturnCallback接口代码
// 消息发送到交换器Exchange后触发回调
private final RabbitTemplate.ConfirmCallback confirmCallback =
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
//成功业务逻辑
log.info("消息投递到及交换机成功啦!!!");
} else {
//失败业务逻辑
log.info("消息投递到及交换机失败啦!!");
}
}
};
// 如果消息从交换器发送到对应队列失败时触发
private final RabbitTemplate.ReturnCallback returnCallback =
new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//失败业务逻辑
log.info("message=" + message.toString());
log.info("replyCode=" + replyCode);
log.info("replyText=" + replyText);
log.info("exchange=" + exchange);
log.info("routingKey=" + routingKey);
}
};
复制代码
在rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData)之前增加如下代码:
rabbitTemplate.setConfirmCallback(this.confirmCallback);
rabbitTemplate.setReturnCallback(this.returnCallback);
复制代码
为了方便测试,用controller发送消息。消息路由不到合适的Exchange,Confirm机制回送的ACK会返回false,走异常处理,进行一些业务逻辑,如重试或者补偿等手段
@RestController
public class TestController {
@Autowired
private RabbitMqService sender;
@PostMapping("/tt")
public String sendMsg(String msg){
sender.sendExchange(msg,"");
return "ok";
}
}
复制代码
这里在发送消息的时候,指定一个不存在的routingkey,模拟失败回调
sender.sendExchange(msg,"XXX");
复制代码
这个在上文的代码中有提到,略。
前面提到有这3种手动应答方式basicAck、basicNack、basicReject,那么先了解一下。
当multiple为false,只确认当前的消息。当multiple为true,批量确认所有比当前deliveryTag小的消息。deliveryTag是用来标识Channel中投递的消息。RabbitMQ保证在每个Channel中,消息的deliveryTag是从1递增。
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.transmit(new Ack(deliveryTag, multiple));
this.metricsCollector.basicAck(this, deliveryTag, multiple);
}
复制代码
当消费者消费消息时出现异常了,那么可以使用这种方式。当requeue为true,失败消息会重新进入Queue,一般结合重试机制使用,当重试次数超过最大值,丢弃该消息)或者是死信队列+重试队列。当requeue为false,丢弃该消息。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.transmit(new Nack(deliveryTag, multiple, requeue));
this.metricsCollector.basicNack(this, deliveryTag);
}
复制代码
和basicNack用法一样。
先把手动确定注释掉
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("消费者1收到=-=" + msg);
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag,false);
}
复制代码
消息变成unacked
停止消费者程序,消息又变成ready,这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次称为Ready
加入如下配置,消费者重试是在listener下配置retry相关参数,生产者重试是在template下配置retry相关参数,别搞混了
listener:
# 消息消费确认,可以手动确认
simple:
acknowledge-mode: manual
#是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
retry:
enabled: true
#初始重试间隔为1s
initial-interval: 1000
#重试的最大次数
max-attempts: 3
#重试间隔最多1s
max-interval: 1000
#每次重试的因子是1.0 等差
multiplier: 1.0
复制代码
模拟消费者消费出异常啦,加入int i=1/0;
可以看到,重试了3次消费
如果listener.retry次数尝试完并还是抛出异常,那该怎么办?可以通过配置MessageRecoverer对异常消息进行处理,默认有两个实现:
RepublishMessageRecoverer:将消息重新发送到指定队列,需手动配置。测试一下:
在RabbitMqConfig增加如下:先声明一个重试的交换机(RETRY_EXCHANGE)和一个声明重试队列(RETRY_QUEUE),然后进行绑定,routingkey为:retry
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, "retry");
}
复制代码
增加一个消费者,如下:
@RabbitListener(queues = RETRY_QUEUE)
public void t3(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("重试消费者收到了=-=" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
复制代码
重试次数用完了(因重试的最大次数配置为3),测试结果如下:
RejectAndDontRequeueRecoverer:如果不手动配置MessageRecoverer,会默认使用这个,实现仅仅是将异常打印抛出,源码如下(测试略):
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
}
}
复制代码
由重试机制可能会造成延迟,从而造成重复消费的问题,比如说支付,推送短信,邮件等。
消息发送方在发送时在消息头加入唯一ID,比如UUID,订单号,时间戳,traceId等。在上文对消息的封装已经处理过:
接收方接受消息后先获取消息头的唯一ID,判断redis内是否已经包含唯一ID,如果包含说明已经消费成功,直接不处理消息。如果redis内不包含唯一ID,处理消息,成功后把唯一ID存入缓存
死信,顾名思义就是无法被消费的消息,如消费者出现某种异常导致消息没有被消费,就会将消息重新投递到另一个Exchange(Dead Letter Exchanges),该Exchange再根据routingKey重定向到另一个队列,在这个队列重新处理该消息。
消息被拒绝(basic.reject或basic.nack)并且requeue=false.
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)
消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到了retry重新入队的上限次数
消息的TTL(Time To Live)-存活时间已经过期
队列长度限制被超越(队列满,queue的" x-max-length "参数)
本例使用第三种。
声明一个死信交换机(DL_EXCHANGE)和死信队列(DL_QUEUE),然后进行绑定,并且声明业务队列(TEST_QUEUE_1)时加入 x-dead-letter-exchange 和 x-dead-letter-routing-key 的参数,代码如下:
/*业务交换机*/
public static final String TEST_EXCHANGE = "test_amqp_exchange";
/*声明业务交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "test_amqp_queue_1";
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
// x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF")
.noargs();
}
/*死信交换机*/
public static final String DL_EXCHANGE = "deadLetterExchange";
/*声明死信交换机*/
@Bean(DL_EXCHANGE)
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DL_EXCHANGE).durable(true).build();
}
/*死信队列*/
public static final String DL_QUEUE = "deadLetterQueue";
/*声明死信队列*/
@Bean(DL_QUEUE)
public Queue deadLetterQueue() {
return new Queue(DL_QUEUE,true);
}
/*死信队列绑定死信交换机*/
@Bean
Binding bindingDead(@Qualifier(DL_QUEUE) Queue queue,
@Qualifier(DL_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dlk")
.noargs();
}
复制代码
//业务消费者
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
try {
int i = 1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e){
System.out.println("消费者1出错啦");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}}
//死信消费者
@RabbitListener(queues = DL_QUEUE)
public void t3( Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("死信队列收到了=-=" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
复制代码
大概流程就是消息被业务消费者消费,此时业务消费者挂掉了,就走catch代码basicNack,mq收到了nack就会把消息重新投递到业务队列x-dead-letter-exchange绑定的死信交换机,然后根据业务队列x-dead-letter-routing-key绑定的死信路由键匹配到死信队列,然后最终被死信消费者消费了。
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
订单在十分钟之内未支付则自动取消。
用户进行退款,卖家在三天内没有进行处理,则短信通知卖家或通知所驻的平台。
死信(DLX)上文已经了解过了,那么什么是TTL呢?RabbitMQ可以针对Queue设置x-message-ttl 或者 针对Message设置setExpiration ,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)。
通过队列属性设置,队列中所有消息都有相同的过期时间。
缺点:如果使用这种方式设置消息的TTL,当延时时间梯度比较多的话,比如1分钟,2分钟,5分钟,12分钟……需要创建很多交换机和队列来路由消息。
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
//声明过期时间5秒
args.put("x-message-ttl", 5000);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
//x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
复制代码
对消息进行单独设置,每条消息TTL可以不同。
缺点:如果单独设置消息的TTL,则可能会造成队列中的消息阻塞,因为队列是先进先出的,前一条消息没有出队(没有被消费),后面的消息无法投递。消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。
官网下载
可解决单独对消息设置TTL,延时时长短的优先处理
下面演示的是死信+TTL,代码还是以上文死信队列的为主
在业务队列增加x-message-ttl配置,设置一秒;消费者删除业务消费者(模拟消息没被消费而过期),只留下死信消费者;其余不变。
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
//声明过期时间5秒
args.put("x-message-ttl", 1000);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
//x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
复制代码
可以看到,时间 在1秒后被死信消费者消费
注释掉队列的过期时间,然后修改一下发送方法,如下;
/*发送到交换器*/
public String sendExchange(Object payload,String routingKey,Long messageExpirationTime){
return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, messageExpirationTime);
}
复制代码
controller如下,消费者不变
@Autowired
private RabbitMqService sender;
@PostMapping("/tt")
public String sendMsg(String msg){
sender.sendExchange(msg,"SF",5000L);
System.out.println("【5秒过期时间测试】发送时间是:"+LocalDateTime.now());
return "ok";
}
复制代码
可以看到消息5秒后被死信消费者消费