转载

使用JMS完成消息通信

前言

对于WEB系统,向用户发送邮件、短信、站内信等几乎是必备的基础功能,但这些任务相对于所见即所得的立即响应式的请求对实时性的要求并不高,同时任务处理的量还很大。在复杂多系统的情形下,还要考虑多个子系统的通信问题。无论是从实际业务需求还是从软件工程的设计角度出发,消息通信都有必要成为一个独立的模块。本文以一个非常简单的业务场景为例,即系统给用户发送通知邮件,来讲一讲如何构建简单的消息通信。

引入JMS

在上一次的博客中我们讲述了消息队列,消息队列是消息通信系统的重要组成部分。J2EE为运行在jvm虚拟机上的程序间的通信早就制定了一套标准,也就是我们提到的JMS标准。但JMS并不涉及到具体实现,我们在本文中采用应用最为广泛的ActiveMQ为例。

首先我们需要在pom.xml中引入相关依赖。

<!-- JMS -->   <dependency>    <groupId>org.springframework</groupId>    <artifactId>spring-jms</artifactId>    <version>${spring.version}</version>   </dependency>   <dependency>    <groupId>org.apache.activemq</groupId>    <artifactId>activemq-core</artifactId>    <version>5.7.0</version>   </dependency>  

配置JMS和ActiveMQ

然后我们要在ApplicationContext.xml文件中作出相关配置。

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">  <description>JMS简单应用配置</description>  <!-- ActiveMQ 连接工厂 -->  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">   <property name="brokerURL" value="${jms.broker_url}" />  </bean>  <!-- Spring Caching 连接工厂 -->  <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">   <property name="targetConnectionFactory" ref="connectionFactory" />   <property name="sessionCacheSize" value="10" />  </bean>  <!-- Queue定义 -->  <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">   <constructor-arg value="q.notify" />  </bean>  <!-- Topic定义 -->  <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">   <constructor-arg value="t.notify" />  </bean>  <!-- Spring JMS Template -->  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">   <property name="connectionFactory" ref="cachingConnectionFactory" />  </bean>  <!-- 使用Spring JmsTemplate的消息生产者 -->  <bean id="notifyMessageProducer" class="jseed.util.modules.jms.NotifyMessageProducer">   <property name="jmsTemplate" ref="jmsTemplate" />   <property name="notifyQueue" ref="notifyQueue" />   <property name="notifyTopic" ref="notifyTopic" />  </bean>  <!-- 异步接收Queue消息Container -->  <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">   <property name="connectionFactory" ref="connectionFactory" />   <property name="destination" ref="notifyQueue" />   <property name="messageListener" ref="notifyMessageListener" />   <property name="concurrentConsumers" value="10" />  </bean>  <!-- 异步接收Topic消息Container -->  <bean id="topicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">   <property name="connectionFactory" ref="connectionFactory" />   <property name="destination" ref="notifyTopic" />   <property name="messageListener" ref="notifyMessageListener" />  </bean>  <!-- 异步接收消息处理类 -->  <bean id="notifyMessageListener" class="jseed.util.modules.jms.NotifyMessageListener" /> </beans> 

对于上文我们将一一解释:

  • ConnectionFactory:用于产生到JMS服务器的链接

  • targetConnectionFactory:真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供,在这里是ActiveMQ

  • JmsTemplate:实现消息发送的工具,由Spring提供

  • Destination:用来表示目的地的接口在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。,没有任何方法定义,只是用来做一个标识而已。

实现生产者和消费者

在这里生产者负责生成包含邮件收件人和内容等信息的消息并存入队列,而消费者则负责从这些信息中国生成邮件并发送出去。

  • 生产者代码

public class NotifyMessageProducer {  private JmsTemplate jmsTemplate;  private Destination notifyQueue;  private Destination notifyTopic;  public void sendQueue(final User user) {   sendMessage(user, notifyQueue);  }  public void sendTopic(final User user) {   sendMessage(user, notifyTopic);  }  /**   * 使用jmsTemplate最简便的封装convertAndSend()发送Map类型的消息.   */  private void sendMessage(User user, Destination destination) {   Map map = new HashMap();   map.put("userName", user.getName());   map.put("email", user.getEmail());   jmsTemplate.convertAndSend(destination, map);  }  public void setJmsTemplate(JmsTemplate jmsTemplate) {   this.jmsTemplate = jmsTemplate;  }  public void setNotifyQueue(Destination notifyQueue) {   this.notifyQueue = notifyQueue;  }  public void setNotifyTopic(Destination nodifyTopic) {   this.notifyTopic = nodifyTopic;  } } 
  • 消费者代码

public class NotifyMessageListener implements MessageListener {  private static Logger logger = LoggerFactory.getLogger(NotifyMessageListener.class);  @Autowired(required = false)  private MailService simpleMailService;  /**   * MessageListener回调函数.   */  @Override  public void onMessage(Message message) {   try {    MapMessage mapMessage = (MapMessage) message;    // 打印消息详情    logger.info("UserName:{}, Email:{}", mapMessage.getString("userName"), mapMessage.getString("email"));    // 发送邮件    if (simpleMailService != null) {     simpleMailService.sendNotificationMail(mapMessage.getString("userName"));    }   } catch (Exception e) {    logger.error("处理消息时发生异常.", e);   }  } } 
  • 邮件模型

public class ApplicationEmail implements Serializable {    public String getAddress() {   return address;  }  public void setAddress(String address) {   this.address = address;  }  public String getCc() {   return cc;  }  public void setCc(String cc) {   this.cc = cc;  }  public String getSubject() {   return subject;  }  public void setSubject(String subject) {   this.subject = subject;  }  public String getContent() {   return content;  }  public void setContent(String content) {   this.content = content;  }  /**收件人**/    private String address;    /**抄送给**/     private String cc;    /**邮件主题**/    private String subject;       /**邮件内容**/    private String content;    /**附件**/     //private MultipartFile[] attachment = new MultipartFile[0];  }   
  • 邮件帮助类(此处采用了java mail依赖包)

public class MailService {  private static Logger logger = LoggerFactory.getLogger(MailService.class);  private JavaMailSender mailSender;  private String textTemplate;  /**   * 发送纯文本的用户修改通知邮件.   */  public void sendNotificationMail(String userName) {   SimpleMailMessage msg = new SimpleMailMessage();   msg.setFrom("suemi94@qq.com");   msg.setTo("suemi994@gmail.com");   msg.setSubject("用户修改通知");   // 将用户名与当期日期格式化到邮件内容的字符串模板   String content = String.format(textTemplate, userName, new Date());   msg.setText(content);   try {    mailSender.send(msg);    if (logger.isInfoEnabled()) {     logger.info("纯文本邮件已发送至{}", StringUtils.join(msg.getTo(), ","));    }   } catch (Exception e) {    logger.error("发送邮件失败", e);   }  }  /**   * 同步发送邮件   *    * @param email   * @throws MessagingException   * @throws IOException   */  public void sendMailBySynchronizationMode(ApplicationEmail email) throws MessagingException, IOException {     Session session=Session.getDefaultInstance(new Properties());    MimeMessage mime= new MimeMessage(session);    MimeMessageHelper helper = new MimeMessageHelper(mime, true, "utf-8");     helper.setFrom("suemi94@qq.com");//发件人     helper.setTo(InternetAddress.parse(email.getAddress()));//收件人     helper.setReplyTo("suemi94@qq.com");//回复到       helper.setSubject(email.getSubject());//邮件主题     helper.setText(email.getContent(), true);//true表示设定html格式      mailSender.send(mime);  }  /**   * Spring的MailSender.   */  public void setMailSender(JavaMailSender mailSender) {   this.mailSender = mailSender;  }  /**   * 邮件内容的字符串模板.   */  public void setTextTemplate(String textTemplate) {   this.textTemplate = textTemplate;  } } 
正文到此结束
Loading...