转载

SpringBoot实现RabbitMQ发布订阅模式之实践步骤

SpringBoot实现RabbitMQ发布订阅模式之实践步骤

1、消息发送方(发布者)

1)添加maven依赖

<!-- springboot rabbitmq 使用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)配置RabbitMQ配置(application.properties)

#RabbitMQ 服务配置,不写默认走本地ip
spring.rabbitmq.host=192.168.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)创建发送方法

package com.example.provide.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.example.provide.dto.UserDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

import static org.springframework.integration.jmx.JmxHeaders.OPERATION_NAME;

/*
 * @auth yuesf
 * @data 2019/11/4
 */
@Component
public class Sender {

    private static final Logger logger = LoggerFactory.getLogger(Sender.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void storeInfoWindQSend(Object object) {
        String message = JSON.toJSONString(object);
        logger.info("RabbitMQ: 发送消息={}", message);
        rabbitTemplate.convertAndSend("demo.direct.exchange", "demo.direct", message);
        logger.info("发送消息完成 message={}", message);
    }
}

2、消息接收方(订阅者)

1)添加maven依赖

<!-- springboot rabbitmq 使用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)配置RabbitMQ配置(application.properties)

#RabbitMQ 服务配置,不写默认走本地ip
spring.rabbitmq.host=192.168.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3)声明RabbitMQ

示例中使用的直连交换机,声明一个交换机,一个队列。交换机与队列绑定关系

package com.example.consume.listener;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
 * Rabbitmq的配置示例
 * @auth yuesf
 * @data 2019/11/4
 */
@Configuration
public class RabbitConfigDemo {
    /**
     * 示例交换机
     *
     * @return
     */
    @Bean
    public DirectExchange demoExchange() {
        return new DirectExchange("demo.direct.exchange", true, false);
    }

    /**
     * 示例队列
     *
     * @return
     */
    @Bean
    public Queue demoQueue() {
        return new Queue("demo.queue", true, false, false);
    }

    /**
     * 交换机与队列的绑定关系
     *
     * @param demoQueue
     * @param demoExchange
     * @return
     */
    @Bean
    public Binding bindingDemoQueue(@Qualifier("demoQueue") Queue demoQueue,
            @Qualifier("demoExchange") DirectExchange demoExchange) {
        return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo.direct");
    }
}

4)监听方法

package com.example.consume.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
 * @auth yuesf
 * @data 2019/11/4
 */
@Component
public class RabbitDemoListener {

    private static final Logger logger = LoggerFactory.getLogger(RabbitDemoListener.class);

    @RabbitListener(queues = "demo.queue")
    public void goodsListenerProcess(Object message) {
        logger.info("接收消息 message={}", message);
    }
}
原文  http://mp.weixin.qq.com/s?__biz=MzU5MjgxNjAwMQ==&mid=2247483958&idx=2&sn=846bf9fdc4c2a7b288a7a2246745c935
正文到此结束
Loading...