转载

SpringBoot 中使用RabbitMQ(二)消息确认

  • 发送端确认 生产者能知道自己的消息是否成功发送到交换机或者队列
  • 消费端确认 消费者成功消费消息后,发送确认标识使消息从MQ中删除

如何判断消息发送成功或失败 ?

  • 确认消息不能路由到任何队列时,确认发送失败
  • 消息可以路由到队列时,当需要发送的队列都发送成功后,进行消息确认成功.对于持久化的队列,意味着已经写入磁盘,对于镜像队列,意味着所有镜像都接受成功.

消费端如何告知rabbitmq消息消费成功或失败?

  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

发送端确认实例

  • 添加配置
# 消息发送到交换器确认
spring.rabbitmq.publisher-confirms=true
# 消息发送到队列确认
spring.rabbitmq.publisher-returns=true
复制代码
  • 创建两个监听类分别实现RabbitTemplate的ConfirmCallback和ReturnCallback接口 实现ConfirmCallback接口,当消息发送到交换机的回调 实现ReturnCallback接口,当消息路由不到指定队列时回调
消息发送到交换机监听类
@Slf4j
@Component
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
   
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("Success... 消息成功发送到交换机! correlationData:{}", correlationData);
        } else {
            log.info("Fail... 消息发送到交换机失败! correlationData:{}", correlationData);
        }
    }
}

/**
 * 消息未路由到队列监听类
 * @author by peng
 * @date in 2019-06-01 21:32
 */
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("Fail... message:{},从交换机exchange:{},以路由键routingKey:{}," +
                        "未找到匹配队列,replyCode:{},replyText:{}",
                message, exchange, routingKey, replyCode, replyText);
    }
}
复制代码
  • 重新注入RabbitTemplate,并设置两个监听类
@Configuration
public class RabbitConfig {
    
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
        rabbitTemplate.setReturnCallback( new SendReturnCallback());
        return rabbitTemplate;
    }
}
复制代码
  • 定义生产者和消费者
生产者
@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendConfirmSuccess() {
        String content = "Message sent to exist exchange!";
        this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content);
        System.out.println("########### SendConfirmSuccess : " + content);
    }
    
    public void sendConfirmError() {
        String content = "Message sent to not exist exchange!";
        this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content);
        System.out.println("########### SendConfirmError : " + content);
    }
    
    public void sendReturn() {
        String content = "Message sent to exist exchange! But no queue to routing to";
        this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content);
        System.out.println("########### SendWReturn : " + content);
    }
   
}

// 消费者
@Component
@RabbitListener(queues = "existQueue")
public class Receiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### Receiver Msg:" + message);
    }
}
复制代码
  • 测试类
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
    @Autowired
    private Sender sender;
    @Test
    public void sendConfirmSuccess() {
        sender.sendConfirmSuccess();
        
        结果:成功发送并消费了消息,并输出监听日志
        ########### SendConfirmSuccess : Message sent to exist exchange!
        Success... 消息成功发送到交换机! correlationData:null
        ########### Receiver Msg:Message sent to exist exchange!
    }
    @Test
    public void sendConfirmError() {
        sender.sendConfirmError();
        结果:消息发送失败,并输入监听日志
        ########### SendConfirmError : Message sent to not exist exchange!
        Fail... 消息发送到交换机失败! correlationData:null
        Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40)
    }
    @Test
    public void sendReturn() {
        sender.sendReturn();
        结果:消息发送到交换机,但路由不到队列(不存在队列匹配路由键)
        ########### SendWReturn : Message sent to exist exchange! But no queue to routing to
        Success... 消息成功发送到交换机! correlationData:null
        Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),从交换机exchange:directConfirmExchange,以路由键routingKey:not-exist,未找到匹配队列,replyCode:312,replyText:NO_ROUTE
    }
    
}
复制代码

消费端确认

  • 添加配置
# 消费者消息确认--手动 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
  • 消费者代码
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
    
    @RabbitHandler
    public void process(String content, Channel channel, Message message) {
        try {
            System.out.println("########### message:" + message);
            // 业务处理成功后调用,消息会被确认消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 业务处理失败后调用
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("########### Receiver Msg:" + content);
    }
}
复制代码
原文  https://juejin.im/post/5cf28d755188251c064814fe
正文到此结束
Loading...