转载

ActiveMQ - 死信队列

参考 官网 。

当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。

在以下情况,消息会重新发送到客户端:

  • 使用事务的时候,调用了session的rollback方法
  • 使用事务的时候,在会话关闭后再commit
  • 使用CLIENT_ACKNOWLEDGE消息确认机制的时候,调用session的recover方法
  • 客户端连接超时

RedeliveryPolicy

消息重发的实体类的几个属性如下:

属性 默认值 说明
collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
maximumRedeliveries 6 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay -1 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay 1000L 初始重发延迟时间
redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
useCollisionAvoidance false 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

重发的设置有两种方式,一个是通过代码来设置的:

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

一个是通过activemq.xml配置文件来设置的:

<broker>
  
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <!-- Set the following policy on all queues using the '>' wildcard -->
        <policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              Use the prefix 'DLQ.' for the destination name, and make
              the DLQ a queue rather than a topic
            -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
  
</broker>

消息重发示例

生产者,跟之前的ptp的代码一样

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    ActiveMQConnection connection = null;
    Session session = null;
    Destination destination;
    MessageProducer producer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = (ActiveMQConnection)connectionFactory.createConnection();

        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.policy");
        // 创建一个生产者
        producer = session.createProducer(destination);
        // 创建一个消息
        message = session.createTextMessage("this is test.policy");
        // 发送消息
        producer.send(message);
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者,我们这边通过policy.setMaximumRedeliveries(2)设置重发次数为2,自动确认模式为AUTO_ACKNOWLEDGE,在监听中抛异常,使消息不能正确的消费。

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    ActiveMQConnection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;
    boolean useTransaction = false;
    try {
        // 创建一个ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 创建一个Connection
        connection = (ActiveMQConnection)connectionFactory.createConnection();
        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
        // 最大重试次数
        policy.setMaximumRedeliveries(2);
        // 启动消息传递的连接
        connection.start();
        // 创建一个session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 创建一个destination,把消息发送到test.queue
        destination = session.createQueue("test.policy");
        // 创建一个消费者
        consumer = session.createConsumer(destination);
        // 接收一个消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println("consumer receive:" + ((TextMessage) message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                throw new RuntimeException();
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {

    }
}

运行生产者代码,数据库数据如下:

ActiveMQ - 死信队列

再运行消费者代码,可以看出,在打印出三次test.policy后,数据库数据如下,队列变成了ActiveMQ.DLQ。之所以打印三次,是因为原先一次,加上重试两次,就打印三次。

ActiveMQ - 死信队列

admin页面如下:

ActiveMQ - 死信队列

如果我们要处理死信队列的内容,也很简单,就相当于消费者消费队列名称是ActiveMQ.DLQ。

原文  https://segmentfault.com/a/1190000022608226
正文到此结束
Loading...