转载

Spring整合RabbitMQ

Spring整合RabbitMQ

如果我是使用者,我应该只需要关注发送信息和接收信息。基于此可以使用Spring框架来继承RabbitMQ,从而简化RabbitMQ的操作。

1.在Maven工程中添加依赖

<!-- 添加Spring整合Rabbit依赖的包 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.2.RELEASE</version>
        </dependency>

2.修改Spring的配置文件

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <bean id="fooMessageListener" class="cn.itding.mq.rabbitmq.spring.FooMessageListener"/>

    <!-- 配置连接 -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
                               virtual-host="/" requested-heartbeat="60"/>
    <!-- 配置RabbitTemplate -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange"
                     routing-key="foo.bar"/>
    <!-- 配置RabbitAdmin -->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- 配置队列名称 -->
    <rabbit:queue name="myQueue"/>
    <!-- 配置Topic类型的交换器 -->
    <rabbit:topic-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding pattern="foo.*" queue="myQueue"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 配置监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="fooMessageListener" queue-names="myQueue" />
    </rabbit:listener-container>
</beans>
  • MessageListenerContainer:用来监听容器,为消息入队提供异步处理。
  • RabbitTemplate:用来发送和接收消息。
  • RabbitAdmin:用来声明队列、交换器、绑定。
  1. 发送消息
public class SendMessage {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        RabbitTemplate template = classPathXmlApplicationContext.getBean(RabbitTemplate.class);
        template.convertAndSend("Hello World");
        classPathXmlApplicationContext.close();
    }
}
  1. 接收消息
public class FooMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String messageBody = new String(message.getBody());
        System.out.println("接收到消息:" + messageBody);
    }
}

异步处理的案例

某天产品人员说“系统要增加一个用户注册功能,注册成功后用户能收到邮件通知”,开发人员觉得这个不难,于是写了一个注册页面,点击提交按钮后保存用户的注册信息,然后发送邮件,最后返回用户注册成功。过了一段时间,产品人员说“点击注册后响应太慢,能不能优化一下”。开发人员首先想到的是利用多线程,将保存注册和邮件发送并行执行。又没过多久,产品人员说“现在注册响应是快了,但是用户反馈没有收到注册成功个的邮件通知,能不能在发送邮件的时候先保存所发送的邮件内容,如果邮件发送失败则进行补发”。

如果有专门提供邮件发送的部门,开发人员直接使用他们提供的服务岂不是更好,而这个部门真是使用了消息的异步处理来完成邮件的可靠发送。

下面代码实现:

1.添加Maven依赖

<!-- 添加Spring整合Rabbit依赖的包 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.2.RELEASE</version>
</dependency>

2.修改Spring配置文件

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <bean id="mailMessageListener" class="cn.itding.mq.rabbitmq.async.MailMessageListener"/>

    <!-- 配置连接 -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
                               virtual-host="/" requested-heartbeat="60"/>
    <!-- 对象转JSON格式传递 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 配置RabbitTemplate -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="mailExchange"
                     routing-key="mail.test" message-converter="jsonMessageConverter"/>
    <!-- 配置RabbitAdmin -->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- 配置队列名称 -->
    <rabbit:queue name="mailQueue"/>
    <!-- 配置Topic类型的交换器 -->
    <rabbit:topic-exchange name="mailExchange">
        <rabbit:bindings>
            <rabbit:binding pattern="mail.*" queue="mailQueue"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 配置监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="mailMessageListener" queue-names="mailQueue" />
    </rabbit:listener-container>
</beans>

3.发送消息

public class Mail {
    private String form; // 发件人
    private String to; // 收件人
    private String subject; // 邮件主题
    private String content; // 邮件内容

    public String getForm() {
        return form;
    }

    public void setForm(String form) {
        this.form = form;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "Mail{" +
                "form='" + form + '/'' +
                ", to='" + to + '/'' +
                ", subject='" + subject + '/'' +
                ", content='" + content + '/'' +
                '}';
    }
}

public class Business {
    // 用户注册
    public void userRegister() {
        // 校验用户填写信息的完整性
        // 将用户相关信息保存到数据库
        // 注册成功后,发送一条消息表示发送邮件
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("asyncContext.xml");
        RabbitTemplate template = classPathXmlApplicationContext.getBean(RabbitTemplate.class);
        Mail mail = new Mail();
        mail.setTo("123456789@qq.com");
        mail.setSubject("我的一封邮件");
        mail.setContent("我的邮件内容");
        template.convertAndSend(mail);
        classPathXmlApplicationContext.close();
    }

    public static void main(String[] args) {
        Business business = new Business();
        business.userRegister();
    }
}

4.消费消息

public class MailMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        String body = new String(message.getBody());
        ObjectMapper mapper = new ObjectMapper();
        Mail mail = null;
        try {
            mail = mapper.readValue(body, Mail.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("接收到的邮件信息:" + mail);
        sendEmail(mail);
    }

    public void sendEmail(Mail mail) {
        // 调用Java Mail API
    }
}
原文  https://segmentfault.com/a/1190000022994152
正文到此结束
Loading...