- 发送端确认
生产者能知道自己的消息是否成功发送到交换机或者队列
- 消费端确认
消费者成功消费消息后,发送确认标识使消息从MQ中删除
如何判断消息发送成功或失败 ?
- 确认消息不能路由到任何队列时,确认发送失败
- 消息可以路由到队列时,当需要发送的队列都发送成功后,进行消息确认成功.对于持久化的队列,意味着已经写入磁盘,对于镜像队列,意味着所有镜像都接受成功.
消费端如何告知rabbitmq消息消费成功或失败?
- 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
- 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
- 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
发送端确认实例
# 消息发送到交换器确认
spring.rabbitmq.publisher-confirms=true
# 消息发送到队列确认
spring.rabbitmq.publisher-returns=true
复制代码
- 创建两个监听类分别实现RabbitTemplate的ConfirmCallback和ReturnCallback接口
实现ConfirmCallback接口,当消息发送到交换机的回调
实现ReturnCallback接口,当消息路由不到指定队列时回调
消息发送到交换机监听类
@Slf4j
@Component
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("Success... 消息成功发送到交换机! correlationData:{}", correlationData);
} else {
log.info("Fail... 消息发送到交换机失败! correlationData:{}", correlationData);
}
}
}
/**
* 消息未路由到队列监听类
* @author by peng
* @date in 2019-06-01 21:32
*/
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("Fail... message:{},从交换机exchange:{},以路由键routingKey:{}," +
"未找到匹配队列,replyCode:{},replyText:{}",
message, exchange, routingKey, replyCode, replyText);
}
}
复制代码
- 重新注入RabbitTemplate,并设置两个监听类
@Configuration
public class RabbitConfig {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
rabbitTemplate.setReturnCallback( new SendReturnCallback());
return rabbitTemplate;
}
}
复制代码
生产者
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendConfirmSuccess() {
String content = "Message sent to exist exchange!";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content);
System.out.println("########### SendConfirmSuccess : " + content);
}
public void sendConfirmError() {
String content = "Message sent to not exist exchange!";
this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content);
System.out.println("########### SendConfirmError : " + content);
}
public void sendReturn() {
String content = "Message sent to exist exchange! But no queue to routing to";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content);
System.out.println("########### SendWReturn : " + content);
}
}
// 消费者
@Component
@RabbitListener(queues = "existQueue")
public class Receiver {
@RabbitHandler
public void process(String message) {
System.out.println("########### Receiver Msg:" + message);
}
}
复制代码
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
@Autowired
private Sender sender;
@Test
public void sendConfirmSuccess() {
sender.sendConfirmSuccess();
结果:成功发送并消费了消息,并输出监听日志
########### SendConfirmSuccess : Message sent to exist exchange!
Success... 消息成功发送到交换机! correlationData:null
########### Receiver Msg:Message sent to exist exchange!
}
@Test
public void sendConfirmError() {
sender.sendConfirmError();
结果:消息发送失败,并输入监听日志
########### SendConfirmError : Message sent to not exist exchange!
Fail... 消息发送到交换机失败! correlationData:null
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40)
}
@Test
public void sendReturn() {
sender.sendReturn();
结果:消息发送到交换机,但路由不到队列(不存在队列匹配路由键)
########### SendWReturn : Message sent to exist exchange! But no queue to routing to
Success... 消息成功发送到交换机! correlationData:null
Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),从交换机exchange:directConfirmExchange,以路由键routingKey:not-exist,未找到匹配队列,replyCode:312,replyText:NO_ROUTE
}
}
复制代码
消费端确认
# 消费者消息确认--手动 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
@RabbitHandler
public void process(String content, Channel channel, Message message) {
try {
System.out.println("########### message:" + message);
// 业务处理成功后调用,消息会被确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 业务处理失败后调用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("########### Receiver Msg:" + content);
}
}
复制代码
https://juejin.im/post/5cf28d755188251c064814fe