一般认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。
随着系统的发展,各个模块越来越庞大、业务逻辑越来越复杂,必然要做服务化和业务拆分的,这个时候各个系统之间的交互,RPC是首选。但是随着系统的继续发展,一些功能涉及几十个服务的调用,这时候需要消息中间件来解决问题。
消息中间件主要解决分布式系统之间消息的传递,同时为分布式系统中其他子系统提供了伸缩性和扩展性。为系统带来了:
1.低耦合,不管是程序还是模块之间,使用消息中间件进行间接通信。
2.异步通信能力,使得子系统之间得以充分执行自己的逻辑而无需等待。
3.高并发能力,将高峰期大量的请求存储下来慢慢交给后台进行处理,比如适用于秒杀业务。
RPC和消息中间件的场景的差异很大程度上在于就是“依赖性”和“同步性”:
RPC是强依赖,典型的同步方式,像本地调用。消息中间件方式属于异步方式。消息队列是系统级、模块级的通信。RPC是对象级、函数级通信。
业务上的必须环节一般用RPC,对于一些不影响流程的不是强依赖的可以考虑消息队列,如发送短信,统计数据,解耦应用。
1.异步处理;2.应用解耦;3.限流;4.日志处理;5消息通讯
JMS规范包含以下6个要素:
1.连接工厂;2.JMS连接;3.JMS会话;4.JMS目的(Broker);5.JMS生产者;6.JMS消费者
1.Point-to-Point点对点:
生产者发布消息到队列queue上,若没有对应的消费者则消息保留;若queue上有多个消费者的时候,消息只会被一个消费者消费。
2.Topic/主题(发布与订阅)(广播):
生产者发布消息到主题,主题会向所有消费者(订阅者)发送消息;若没有消费者在线,则消息丢失也就类似广播,没了就没了。
官网 http://activemq.apache.org/ac... :8161/admin为后台管理平台可查询队列情况及消息条数等等。
查看activemq.xml可查看ActiveMQ应用的缺省端口为61616,8161为管理平台端口。
一:原生API编程(最灵活,重要)
消费者:看代码很明显是基于JMS规范的要素来编程的,要通信必须要建立连接。
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsConsumer { /*默认连接用户名*/ private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; /* 默认连接密码*/ private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; /* 默认连接地址*/ private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { /* 连接工厂*/ ConnectionFactory connectionFactory; /* 连接*/ Connection connection = null; /* 会话*/ Session session; /* 消息的目的地*/ Destination destination; /* 消息的消费者*/ MessageConsumer messageConsumer; /* 实例化连接工厂*/ connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { /* 通过连接工厂获取连接*/ connection = connectionFactory.createConnection(); /* 启动连接*/ connection.start(); /* 创建session*/ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /* 创建一个名为HelloWorldQueue消息队列*/ //destination = session.createTopic("HelloWorldTopic"); destination = session.createQueue("HelloWorldQueue"); /* 创建消息消费者*/ messageConsumer = session.createConsumer(destination); Message message; while((message = messageConsumer.receive())!=null){ System.out.println("收到消息"+((TextMessage)message).getText()); } } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
生产者:同样的生产者也是基于JMS规范要素。
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsProducer { /*默认连接用户名*/ private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; /* 默认连接密码*/ private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; /* 默认连接地址*/ private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM = 5; public static void main(String[] args) { /* 连接工厂*/ ConnectionFactory connectionFactory; /* 连接*/ Connection connection = null; /* 会话*/ Session session; /* 消息的目的地*/ Destination destination; /* 消息的生产者*/ MessageProducer messageProducer; /* 实例化连接工厂*/ connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKEURL); try { /* 通过连接工厂获取连接*/ connection = connectionFactory.createConnection(); /* 启动连接*/ connection.start(); /* 创建session * 第一个参数表示是否使用事务,第二次参数表示是否自动确认*/ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /* 创建一个名为HelloWorldQueue消息队列*/ //destination = session.createTopic("HelloWorldTopic"); destination = session.createQueue("HelloWorldQueue"); /* 创建消息生产者*/ messageProducer = session.createProducer(destination); /* 循环发送消息*/ for(int i=0;i<SENDNUM;i++){ String msg = "发送消息"+i+" "+System.currentTimeMillis(); TextMessage textMessage = session.createTextMessage(msg); System.out.println("标准用法:"+msg); messageProducer.send(textMessage); } } catch (Exception e) { e.printStackTrace(); }finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
先启动ActiveMQ再执行代码demo就可以了解对应的特性,如上代码为点对点模型,不管消费者在生产者前后启动的都能接受到消息,毕竟生产者发布的消息无对应的消费者消费时,队列会保存消息。
同样的也可以测试下主题模式,如上放开注释即可。
生产者配置:
<!-- ActiveMQ 连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="" password="" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 队列模式--> <property name="pubSubDomain" value="false"></property> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 发布订阅模式--> <property name="pubSubDomain" value="true"></property> </bean> <!--Spring JmsTemplate 的消息生产者 end-->
Queue生产者:直接注入队列模式的bean即可使用
@Component public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); //TODO 应答 return msg; } }); } }
Topic生产者:
@Component public class TopicSender { @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); } }
消费者配置:
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="" password="" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- 消息消费者 start--> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener> <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener> </jms:listener-container> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener> <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener> </jms:listener-container> <!-- 消息消费者 end -->
队列消费者:
@Component public class QueueReceiver1 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver1 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
主题消费者:
@Component public class TopicReceiver1 implements MessageListener { public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
SpringBoot整合可在官网查询对应配置