参考 官网 。
当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。
在以下情况,消息会重新发送到客户端:
消息重发的实体类的几个属性如下:
属性 | 默认值 | 说明 |
---|---|---|
collisionAvoidanceFactor | 0.15 | 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。 |
maximumRedeliveries | 6 | 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 |
maximumRedeliveryDelay | -1 | 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
initialRedeliveryDelay | 1000L | 初始重发延迟时间 |
redeliveryDelay | 1000L | 重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4) |
useCollisionAvoidance | false | 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。 |
useExponentialBackOff | false | 启用指数倍数递增的方式增加延迟时间。 |
backOffMultiplier | 5 | 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。 |
重发的设置有两种方式,一个是通过代码来设置的:
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);
一个是通过activemq.xml配置文件来设置的:
<broker> <destinationPolicy> <policyMap> <policyEntries> <!-- Set the following policy on all queues using the '>' wildcard --> <policyEntry queue=">"> <deadLetterStrategy> <!-- Use the prefix 'DLQ.' for the destination name, and make the DLQ a queue rather than a topic --> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> </broker>
生产者,跟之前的ptp的代码一样
public static void main(String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null; Session session = null; Destination destination; MessageProducer producer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = (ActiveMQConnection)connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.policy"); // 创建一个生产者 producer = session.createProducer(destination); // 创建一个消息 message = session.createTextMessage("this is test.policy"); // 发送消息 producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者,我们这边通过policy.setMaximumRedeliveries(2)设置重发次数为2,自动确认模式为AUTO_ACKNOWLEDGE,在监听中抛异常,使消息不能正确的消费。
public static void main(String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = (ActiveMQConnection)connectionFactory.createConnection(); RedeliveryPolicy policy = connection.getRedeliveryPolicy(); // 最大重试次数 policy.setMaximumRedeliveries(2); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.policy"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("consumer receive:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } throw new RuntimeException(); } }); } catch (JMSException e) { e.printStackTrace(); } finally { } }
运行生产者代码,数据库数据如下:
再运行消费者代码,可以看出,在打印出三次test.policy后,数据库数据如下,队列变成了ActiveMQ.DLQ。之所以打印三次,是因为原先一次,加上重试两次,就打印三次。
admin页面如下:
如果我们要处理死信队列的内容,也很简单,就相当于消费者消费队列名称是ActiveMQ.DLQ。