转载

Java 连接 ActiveMQ

编辑推荐:

文章主要介绍了发布 / 订阅消息传递域的特点,并启动 ActiveMQ,运行程序,最后对Topic 模式队列和Queue 模式队列的两种模式进行对比。

来自于csdn,,由火龙果软件Alice编辑、推荐。

Java 连接 ActiveMQ

发布订阅消息传递域中,目的地被称为主题(topic)

发布 / 订阅消息传递域的特点如下:

(1)生产者将消息发布到 topic 中,每个消息可以有多个消费者,属于 1:N 关系

(2)生产者和消费者之间有时间上的相关性,订阅某一个主题的消费者只能消费 自它订阅之后发布的消息

(3)生产者生产时,topic 不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

Java 连接 ActiveMQ

生产端代码

package com.java.elasticsearch.activemq;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import javax.jms.*;
 /**
 * @author Woo_home
 * @create by 2020/5/3 13:51
 */
 public class JmsProduceTopic {
 // 定义 MQ 连接地址
 private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
 // 定义主题名称
 private static final String TOPIC_NAME = "topic-activemq";
 public static void main(String[] args) throws JMSException {
 // 1、创建连接工厂
 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
 // 2、通过连接工厂,获得连接 Connection
 Connection connection = activeMQConnectionFactory.createConnection();
 connection.start();
 ///3、创建会话
 // 两个参数,第一个叫事务 / 第二个叫签收
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 // 4、创建目的地(具体是队列还是主题 Topic)
 Topic topic = session.createTopic(TOPIC_NAME);
 // 5、创建消息的生产者
 MessageProducer messageProducer = session.createProducer(topic);
 // 6、通过使用 MessageProducer 生产 3 条消息发送到 MQ 的队列里面
 for (int i = 0; i < 3; i++) {
 // 创建消息
 TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
// 理解为一个字符串
 // 7、通过 MessageProducer 发送给 MQ
 messageProducer.send(textMessage);
 }
 // 8、关闭资源
 messageProducer.close();
 session.close();
 connection.close();
 System.out.println("****** 消息发布到 MQ 完成 ******");
 }
 }

消费端代码

package com.java.elasticsearch.activemq;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import javax.jms.*;
 import java.io.IOException;
 /**
 * @author Woo_home
 * @create by 2020/5/3 13:54
 */
 public class JmsConsumerTopic {
 // 定义 MQ 连接地址
 private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
 // 定义主题名称
 private static final String TOPIC_NAME = "topic-activemq";
 public static void main(String[] args) throws JMSException, IOException {
 // 1、创建连接工厂
 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
 // 2、通过连接工厂,获得连接 Connection
 Connection connection = activeMQConnectionFactory.createConnection();
 connection.start();
 ///3、创建会话
 // 两个参数,第一个叫事务 / 第二个叫签收
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 // 4、创建目的地(具体是队列还是主题 Topic)
 Topic topic = session.createTopic(TOPIC_NAME);
 // 5、创建消费者
 MessageConsumer messageConsumer = session.createConsumer(topic);
 // 通过监听的方式来消费消息 MessageConsumer messageConsumer = session.createConsumer(queue)
 messageConsumer.setMessageListener((message) -> {
 if (null != message && message instanceof TextMessage) {
 TextMessage textMessage = (TextMessage) message;
 try {
 System.out.println("****** 消费者接收到Topic消息 ****** :" + textMessage.getText());
 } catch (JMSException e) {
 e.printStackTrace();
 }
 }
 });
 System.in.read();
 // 关闭资源
 messageConsumer.close();
 session.close();
 connection.close();
 }
 }

启动 ActiveMQ

启动 ActiveMQ 之后访问 http://localhost:8161/ admin /topics .jsp ,此时的 Topic 中什么也没有

Java 连接 ActiveMQ

运行程序

先运行消费端代码

Java 连接 ActiveMQ

Java 连接 ActiveMQ

再次刷新 Topic 界面时就有了我们定义的 topic-activemq http://localhost:8161/admin/topics.jsp 而且显示有一个消费者在等待

Java 连接 ActiveMQ

为了演示 Topic 发布订阅,我们运行多个消费端程序

Java 连接 ActiveMQ

Java 连接 ActiveMQ

刷新 Topic 界面,此时已有三个消费者

Java 连接 ActiveMQ

再运行生产端代码

Java 连接 ActiveMQ

Java 连接 ActiveMQ

观察我们的消费端控制台输出

Java 连接 ActiveMQ

Java 连接 ActiveMQ

Java 连接 ActiveMQ

此时 Topic 界面发现 3 个消费者并且都已入队,消费了 9 条消息

Java 连接 ActiveMQ

两种模式的区别

两大模式比较

Java 连接 ActiveMQ

原文  http://www.uml.org.cn/zjjs/202007091.asp?artid=23477
正文到此结束
Loading...