Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,说白了就是个服务器,主要用来存放请求消息的
这篇博客图文并茂,通俗易懂ActiveMQ作用总结
笔者将其精炼了一下,主要有4大应用场景:异步处理,应用解耦,流量削锋,消息通讯
其核心思想都是把用户的请求先存放在MQ中,然后返回用户响应,后台再慢慢去处理MQ中的消息,不需要一条龙业务全部跑完再返回响应,这样的话单位时间内请求数可以更多,响应速度也更快,相当于提高了吞吐量。其实前3种场景都差不多,笔者看来没有绝对的边界,只不过异步处理强调非同时性,应用解耦强调子系统挂掉后MQ体现的作用,流量削锋强调MQ在高并发中体现的作用。消息通讯的业务模式举例子:1.用微信和微信好友聊天 2.微信群聊天
源码地址:
安装好activeMQ,如何安装自行百度
项目适用jdk1.8,采用idea多模块架构,涉及技术有spring, activemq, tomcat
client是模拟消费者,domain是公共工具包,被maven打成jar供其它项目适用,service是模拟消息生产者
启动activemq服务器
双击activemq.bat启动
登陆 http://localhost:8161/admin/queues.jsp ,发现Queues是空的
看一下service的配置activemq_config.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <!-- 这里暴露内部统一使用的MQ地址 --> <bean id="internalTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="internalConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="internalTargetConnectionFactory" /> <property name="maxConnections" value="20" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="internalJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="internalConnectionFactory" /> </bean> <!-- 推送给用户信息 创建一个Queue--> <bean id="userServiceQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>user.service.queue</value> </constructor-arg> </bean> <!-- 推送给新闻信息 创建一个Queue--> <bean id="newsServiceQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>news.service.queue</value> </constructor-arg> </bean> <!-- 推送给客户信息 创建一个Queue--> <bean id="clientServiceQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>client.service.queue</value> </constructor-arg> </bean> </beans>复制代码
一共3种推送,每种推送对应1个队列名
PushService是1个通用接口,然后3种推送各对应1个实现,使用tomcat启动service服务
登陆localhost:8080
按照如上填写后,我们点击推送用户信息,出现如下提示框
登陆 http://localhost:8161/admin/queues.jsp ,
发现新增了一个队列,待处理消息数量1,消费者数量0,消息排队1,消息已出列0
我们看看后台执行过程
js通过ajax请求到后台
@RequestMapping(value="/user",method=RequestMethod.POST) @ResponseBody public ResultRespone userPush(User info){ ResultRespone respone = new ResultRespone(); try { userPushService.push(info); respone.setData(info); } catch (Exception e) { e.printStackTrace(); respone = new ResultRespone(false, e.getMessage()); } return respone; }复制代码
调用push()方法
@Autowired @Qualifier("userServiceQueue") private Destination destination; @Override public void push(final Object info) { pushExecutor.execute(new Runnable() { @Override public void run() { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { User p = (User) info; return session.createTextMessage(JSON.toJSONString(p)); } }); } }); }复制代码
这个过程实际上将用户属性值组成的字符串发送到了activemq服务器,到此,生产者的任务就完成了
主要通过3个listener来接收activemq发送过来的消息
看其中一个UserPushListener.java
@Component("userPushListener") public class UserPushListener implements MessageListener { protected static final Logger logger = Logger.getLogger(UserPushListener.class); @Override public void onMessage(Message message) { logger.info("[UserPushListener.onMessage]:begin onMessage."); TextMessage textMessage = (TextMessage) message; try { String jsonStr = textMessage.getText(); logger.info("[UserPushListener.onMessage]:receive message is,"+ jsonStr); if (jsonStr != null) { User info = JSON.parseObject(jsonStr, User.class); System.out.println("==============================接受到的用户信息 开始===================================="); System.out.println(info.toString()); System.out.println("==============================接受到的用户信息 结束===================================="); WebsocketController.broadcast("user", jsonStr); } } catch (JMSException e) { logger.error("[UserPushListener.onMessage]:receive message occured an exception",e); } logger.info("[UserPushListener.onMessage]:end onMessage."); } }复制代码
看一下消费端的配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <!-- 内部统一使用的MQ地址 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="50"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 推送给用户信息 --> <bean id="userPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>user.service.queue</value> </constructor-arg> </bean> <!-- 推送给新闻信息 --> <bean id="newsPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>news.service.queue</value> </constructor-arg> </bean> <!-- 推送给客户信息 --> <bean id="clientPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>client.service.queue</value> </constructor-arg> </bean> <!-- 用户接受推送 --> <bean id="userPushListenerConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="userPushListenerMQ" /> <property name="messageListener" ref="userPushListener" /> </bean> <!-- 新闻接受推送 --> <bean id="newsPushListenerConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="newsPushListenerMQ" /> <property name="messageListener" ref="newsPushListener" /> </bean> <!-- 客户接受推送 --> <bean id="clientPushListenerConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="clientPushListenerMQ" /> <property name="messageListener" ref="clientPushListener" /> </bean> </beans>复制代码
消费端监听了3个队列,所以队列一旦有消息,消费端就会监听到,而且activemq可以确认哪些消息被推送成功了
关闭service服务,启动client服务,观察日志
成功接收到消息,再次查看 http://localhost:8161/admin/queues.jsp
发现user.service.queue这个队列的消息是待处理消息数量0,消费者数量1,消息排队1,消息已出列1,表明消息推送完毕,另外两个新增的队列是客户端监听造成的,可以看出待处理消息的数量都是0