当我们需要广播的时候,我们采用发布订阅模式。生产者往broker发送1,2,3,4,5,6,消费者1和消费者2都收到1,2,3,4,5,6。
当我们需要队列的时候,我们采用点对点模式,点对点模式中如果为了加快消息消费,我们就加多个消费者。生产者往broker发送1,2,3,4,5,6,消费者1收到1,3,5,消费者2收到2,4,6。
现在有个场景是这样的,在发布订阅模式中,由于生产者产生消息的速度超过了消费者1消费的速度,使消息一直积压,于是就想能不能既用发布订阅模式分发消息又用点对点模式加快消息消费呢?
activemq提供了一个虚拟topics的功能,首先元素的开头必须是 VirtualTopic.
开头的,消费端的格式是 Consumer.<consumer name>.VirtualTopic.<VirtualTopicName>.
,比如生产者的是 VirtualTopic.topic
,那消费者A是 Consumer.a.VirtualTopic.topic
,a表示消费端组的名称,名字一样的话,会类似ptp一样消费。
生产者主要代码是:
Topic destination = session.createTopic("VirtualTopic.topic");
生产者这边发送6个数据:
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; MessageProducer producer = null; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue Topic destination = session.createTopic("VirtualTopic.topic"); // 创建一个生产者 producer = session.createProducer(destination); // 创建一个消息 for (int i = 0; i < 6; i++) { // 创建一个消息 Message message = session.createTextMessage("this is VirtualTopic:" + i); // 发送消息 producer.send(message); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者a-1和消费者a-2,同时消费数据,这边仅贴消费者a-1的代码
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("Consumer.a.VirtualTopic.topic"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer-a-1 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者b自己消费消息:
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createQueue("Consumer.b.VirtualTopic.topic"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer-b-2 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
运行结果如下:
消费者a-1消费了024
消费者a-2消费了135
消费者b消费了012345