Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
我们创建会话的时候,会有两个参数,第一个是是否开启事务,第二个是消息的确认机制。
当transacted为true的时候,说明会话开启事务,这个时候,后面的acknowledgeMode这个参数就失效了。
类似于JDBC,开启事务的时候,session也需要调用commit跟broker通讯,生产者通过commit方法把消息提交给broker,消费者通过commit方法和broker确认消息已收到。
生产者开启了事务,当消息发送4条的时候执行commit方法。这边以ptp方式为例,topic过程一样,不在累述。
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.ptp.transaction"); // 创建一个生产者 producer = session.createProducer(destination); for (int i = 0; i < 6; i++) { // 创建一个消息 Message message = session.createTextMessage("test.transaction" + i); // 发送消息 producer.send(message); if (i == 3) { session.commit(); } } session.rollback(); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
此时在数据库的数据如下:
由于只提交了前面4个,所以持久化数据库的时候,就只有4条数据。
消费者也开启了事务,只处理3个消息就commit。
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.ptp.transaction"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 int num = 0; while (null != (message = consumer.receive())) { System.out.println("consumer receive:" + ((TextMessage) message).getText()); if (2 == num++) { session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
此时数据库的数据如下:
由于只处理了三个就提交,所以数据库还有一个数据未提交。
消息的确认机制有4种:
消息的确认机制,就是跟broker说我消息已经收到了,不要再发了。如果没有确认,broker会重复发送,直至确认,或者累计次数达到阈值(默认6)的时候,就停止发送。
使用CLIENT_ACKNOWLEDGE消息确认机制,客户端手动确认的时候,会调用message.acknowledge()的方法来确认消息,我们看看下面的例子。
生产者发送6条数据:
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.ptp.ack"); // 创建一个生产者 producer = session.createProducer(destination); for (int i = 0; i < 6; i++) { // 创建一个消息 Message message = session.createTextMessage("test.ack" + i); // 发送消息 producer.send(message); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
数据库的数据如下,已经有6条数据了。
消费者在消费第三个消息的时候,抛出异常,其他直接调用acknowledge方法。
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.ptp.ack"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 int num = 0; while (null != (message = consumer.receive())) { String text = ((TextMessage) message).getText(); System.out.println("consumer receive:" + text); try{ if("test.ack2".equals(text)){ throw new RuntimeException(); } message.acknowledge(); }catch (Exception e){ } } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
查看数据库,发现数据都被消费了,我们不是让test.ack2这个抛异常不确认吗,为什么还是被消费了?
这是因为消息接收后,会存放在deliveredMessages中,test.ack2没有被确认,就还在deliveredMessages中,当test.ack3调用acknowledge的时候,就会把deliveredMessages都确认了。具体查看ActiveMQMessageConsumer的acknowledge方法:
public void acknowledge() throws JMSException { clearDeliveredList(); waitForRedeliveries(); synchronized(deliveredMessages) { // Acknowledge all messages so far. MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack == null) { return; // no msgs } if (session.getTransacted()) { rollbackOnFailedRecoveryRedelivery(); session.doStartTransaction(); ack.setTransactionId(session.getTransactionContext().getTransactionId()); } pendingAck = null; session.sendAck(ack); // Adjust the counters deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); if (!session.getTransacted()) { deliveredMessages.clear(); } } }