Spring整合RabbitMQ
如果我是使用者,我应该只需要关注发送信息和接收信息。基于此可以使用Spring框架来继承RabbitMQ,从而简化RabbitMQ的操作。
1.在Maven工程中添加依赖
<!-- 添加Spring整合Rabbit依赖的包 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.2.RELEASE</version> </dependency>
2.修改Spring的配置文件
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean id="fooMessageListener" class="cn.itding.mq.rabbitmq.spring.FooMessageListener"/> <!-- 配置连接 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/> <!-- 配置RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="foo.bar"/> <!-- 配置RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 配置队列名称 --> <rabbit:queue name="myQueue"/> <!-- 配置Topic类型的交换器 --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding pattern="foo.*" queue="myQueue"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 配置监听器 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="fooMessageListener" queue-names="myQueue" /> </rabbit:listener-container> </beans>
public class SendMessage { public static void main(String[] args) { ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RabbitTemplate template = classPathXmlApplicationContext.getBean(RabbitTemplate.class); template.convertAndSend("Hello World"); classPathXmlApplicationContext.close(); } }
public class FooMessageListener implements MessageListener { @Override public void onMessage(Message message) { String messageBody = new String(message.getBody()); System.out.println("接收到消息:" + messageBody); } }
异步处理的案例
某天产品人员说“系统要增加一个用户注册功能,注册成功后用户能收到邮件通知”,开发人员觉得这个不难,于是写了一个注册页面,点击提交按钮后保存用户的注册信息,然后发送邮件,最后返回用户注册成功。过了一段时间,产品人员说“点击注册后响应太慢,能不能优化一下”。开发人员首先想到的是利用多线程,将保存注册和邮件发送并行执行。又没过多久,产品人员说“现在注册响应是快了,但是用户反馈没有收到注册成功个的邮件通知,能不能在发送邮件的时候先保存所发送的邮件内容,如果邮件发送失败则进行补发”。
如果有专门提供邮件发送的部门,开发人员直接使用他们提供的服务岂不是更好,而这个部门真是使用了消息的异步处理来完成邮件的可靠发送。
下面代码实现:
1.添加Maven依赖
<!-- 添加Spring整合Rabbit依赖的包 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.2.RELEASE</version> </dependency>
2.修改Spring配置文件
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean id="mailMessageListener" class="cn.itding.mq.rabbitmq.async.MailMessageListener"/> <!-- 配置连接 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/> <!-- 对象转JSON格式传递 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 配置RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="mailExchange" routing-key="mail.test" message-converter="jsonMessageConverter"/> <!-- 配置RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 配置队列名称 --> <rabbit:queue name="mailQueue"/> <!-- 配置Topic类型的交换器 --> <rabbit:topic-exchange name="mailExchange"> <rabbit:bindings> <rabbit:binding pattern="mail.*" queue="mailQueue"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 配置监听器 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="mailMessageListener" queue-names="mailQueue" /> </rabbit:listener-container> </beans>
3.发送消息
public class Mail { private String form; // 发件人 private String to; // 收件人 private String subject; // 邮件主题 private String content; // 邮件内容 public String getForm() { return form; } public void setForm(String form) { this.form = form; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return "Mail{" + "form='" + form + '/'' + ", to='" + to + '/'' + ", subject='" + subject + '/'' + ", content='" + content + '/'' + '}'; } } public class Business { // 用户注册 public void userRegister() { // 校验用户填写信息的完整性 // 将用户相关信息保存到数据库 // 注册成功后,发送一条消息表示发送邮件 ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("asyncContext.xml"); RabbitTemplate template = classPathXmlApplicationContext.getBean(RabbitTemplate.class); Mail mail = new Mail(); mail.setTo("123456789@qq.com"); mail.setSubject("我的一封邮件"); mail.setContent("我的邮件内容"); template.convertAndSend(mail); classPathXmlApplicationContext.close(); } public static void main(String[] args) { Business business = new Business(); business.userRegister(); } }
4.消费消息
public class MailMessageListener implements MessageListener{ @Override public void onMessage(Message message) { String body = new String(message.getBody()); ObjectMapper mapper = new ObjectMapper(); Mail mail = null; try { mail = mapper.readValue(body, Mail.class); } catch (IOException e) { e.printStackTrace(); } System.out.println("接收到的邮件信息:" + mail); sendEmail(mail); } public void sendEmail(Mail mail) { // 调用Java Mail API } }