ActiveMQ有点对点和发布订阅两种方式,这两种的消息存储还是有稍微一点区别。
队列的存储比较简单,就是先进先出(FIFO),只有当该消息已被消费和确认可以删除消息存储。如果没有被确认,其他消费者是不能获取消息的。
看看下面的例子:
生产者发送了10条消息:
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.persistence"); // 创建一个生产者 producer = session.createProducer(destination); // 创建消息 for (int i = 0; i < 10; i++) { Message message = session.createTextMessage("this is test.persistence" + i); // 发送消息 producer.send(message); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者1消费5条,但是暂时没确认
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.persistence"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收消息 for (int i = 0; i < 5; i++) { Message message = consumer.receive(); TimeUnit.SECONDS.sleep(1); System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); if (i == 4) { TimeUnit.SECONDS.sleep(10); message.acknowledge(); } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者2此时就阻塞在获取消息上面,直到消费者1确认后才接收到消息
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("test.persistence"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收消息 for (int i = 0; i < 5; i++) { Message message = consumer.receive(); System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); if (i == 4) { message.acknowledge(); } } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
在发布订阅中,每个消费者都会获取到消息的拷贝,为了节约空间,broker只存储了一份消息,并存储了每个消费者所消费的信息,这样每个消费者虽然有不同的消费进度,最终还是能一次获取到消息。如果消息被所有订阅者消费完了,broker就可以删除这个消息。
ActiveMQ提供了多种存储方式,比如AMQ、KahaDB、JDBC、内存。
参考官网
AMQ是早期版本的默认持久化存储方式,基于文件的事务存储,对于消息的存储进行了调优,速度还是非常快的。默认大小32M。当消息被成功使用时,就会被标记为清理或者存档,这个操作将在下个清理时发送。基本配置如下(其他参数详见官网):
<broker persistent="true" xmlns="http://activemq.apache.org/schema/core"> ... <persistenceAdapter> <amqPersistenceAdapter/> </persistenceAdapter> ... </broker>
5.4以后默认的持久化存储方式,也是基于文件的,与AMQ不同的是,KahaDB采用了B-Tree存储的布局。拥有高性能和可扩展性等特点。基本配置如下:
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ... <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter> ... </broker>
JDBC有三个表,两个表存储消息,还有一个表当做锁用,保证只能一个代理访问broker。配置如下:
先把默认的kahaDB注释掉,再用下面的替换,注意这个配置是在broker下面。
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
然后再增加数据库配置,注意,这个配置是broker外面,跟其他bea同级。
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
在启动activemq.bat,之前,先建数据库,并且编码设置为latin1,不然ACTIVEMQ_ACKS表会创建失败导致启动不了。然后根据报错信息一次在lib下加入commons-dbcp-1.4.jar、commons-pool-1.5.4.jar、mysql-connector-java-5.1.36.jar包。如下所示,帮我们建了三个表:
activemq_acks用于保存持久化订阅信息。
activemq_lock,用于broker集群时的Master选举。
activemq_msgs,用于存储消息信息。
代码参考之前的点对点
演示步骤如下:
1、 启动生产者,发送消息
2、 查看表数据如下:
3、 启动消费者,消息消费后表数据被删除
代码参考之前的发布订阅
演示步骤如下:
1、 启动生产者,发送消息。
2、 查看表数据,并没有持久化,所以发布订阅默认不持久化的。
3、 启动消费者,没有消息被消费。
为了让消息可以被消费者消费,我们可以这样做:
消费者代码如下
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; // 这边的Topic Topic destination; TopicSubscriber consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 设置ClientId connection.setClientID("ZhangSan"); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("test.topic.mysql"); // 创建一个消费者,调用createDurableSubscriber方法 consumer = session.createDurableSubscriber(destination,"my"); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { } }
启动两个消费者者,activemq_acks表数据如下,已经有了两个数据
activemq的subscribers界面如下,也有两个数据,如果我们消费者下线了,就会到Offline Durable Topic Subscribers列表
修改消费者的代码后,生产者发送消息的时候,如果消费者在线,就直接消费,如果不在线,上线后还可以继续消费,下图是消费者消费了几次后,LAST_ACKED_ID变成了4。
发布订阅默认不持久化,所以生产者代码可以这样修改,加下面一句话
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
生产者发送消息后,activemq_msgs表数据如下,与点对点不一样的是,这个消息被消费后,不会被删除。
由于JDBC的性能相对比较差,所以activemq还提供了日志类型的jdbc,确保了JMS事务的一致性。因为它整合了非常快的信息写入与缓存技术,它可以显着提高性能。配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beans> <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/> </persistenceAdapter> </broker> <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> </beans>
虽然性能比jdbc快,但是他不支持master/slave。
把消息存储在内存中,所以没有持久化的功能,因此要保证内存足够大,来缓存消息。
<?xml version="1.0" encoding="UTF-8"?> <beans> <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker> </beans>