转载

Spring整合RabbitMQ-05-MessageConvert

package com.wyg.rabbitmq.springamqp;

import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert;
import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert;

/**
 * RabbitConfig
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-25 15:11
 * @since JDK1.8
 * @version V1.0
 */

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("localhost:5672");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    /**
     * RabbitAdmin注入容器
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:35
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

        /*
         *  autoStartup 必须要设为 true ,否则Spring容器不会加载RabbitAdmin类
         */
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    /**
     * RabbitTemplate注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:37
     */
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /**
     * SimpleMessageListenerContainer注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 17:16
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 监听多个queue
        container.addQueueNames("test01");
        //container.addQueueNames("test01", "test02", "test03");
        // 设置当前消费者数量
        container.setConcurrentConsumers(1);
        // 设置最大的消费者数量
        container.setMaxConcurrentConsumers(5);
        // 设置不要重回队列
        container.setDefaultRequeueRejected(false);
        // 设置自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 设置消费端tag策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + System.currentTimeMillis();
            }
        });


        // 方式二,使用适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate());
        // 自定处理消息方法,不设置默认为handleMessage
        adapter.setDefaultListenerMethod("consumeMsg");
        // 自定义消息转换器
        // adapter.setMessageConverter(new MyTextMessageConvert());

        // 1.自定义消息转换器 Json,实际上json对应的是Map类型参数
        /* Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 2.Java对象
        /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        defaultJackson2JavaTypeMapper.setTrustedPackages("*");
        jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 3.Java对象多映射关系
        /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map<String, Class<?>> map = new HashMap<>();
        map.put("order", Order.class);
        map.put("user", User.class);
        defaultJackson2JavaTypeMapper.setTrustedPackages("*");
        defaultJackson2JavaTypeMapper.setIdClassMapping(map);
        jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 4.全局转换器
        ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
        MyTextMessageConvert textMessageConvert = new MyTextMessageConvert();
        converter.addDelegate("text", textMessageConvert);
        converter.addDelegate("appliction/text", textMessageConvert);
        converter.addDelegate("text/plain", textMessageConvert);

        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter("*");
        converter.addDelegate("json", jsonMessageConverter);
        converter.addDelegate("application/json", jsonMessageConverter);

        // png
        converter.addDelegate("png", new MyPngMesssageConvert());
        converter.addDelegate("image/png", new MyPngMesssageConvert());

        // pdf
        converter.addDelegate("pdf", new MyPDFMessageConvert());
        adapter.setMessageConverter(converter);

        container.setMessageListener(adapter);
        return container;
    }

}

复制代码

MyMessageListenerDelegate

package com.wyg.rabbitmq.springamqp;

import java.io.File;
import java.util.Map;

import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-11-29 14:23
 * @since JDK1.8
 * @version V1.0
 */

public class MyMessageListenerDelegate {

    // 默认方法
    public void handleMessage(byte[] body) {
        System.out.println("默认处理方法,message:" + new String(body));
    }

    // 自定义处理方法
    public void consumeMsg(byte[] msgBody) {
        System.out.println("自定义处理方法,message:" + new String(msgBody));
    }

    // 自定义处理String类型消息方法
    public void consumeMsg(String msgBody) {
        System.out.println("自定义处理String消息方法,message:" + new String(msgBody));
    }

    public void method01(String msgBody) {
        System.out.println("method01处理String消息方法,message:" + new String(msgBody));
    }

    public void method02(String msgBody) {
        System.out.println("method02处理String消息方法,message:" + new String(msgBody));
    }

    public void method03(String msgBody) {
        System.out.println("method03处理String消息方法,message:" + new String(msgBody));
    }

    // json 对应map类型参数
    public void consumeMsg(Map msgBody) {
        System.out.println("自定义json处理方法,message:" + msgBody.toString());
    }

    // Java对象 对应Order类型参数
    public void consumeMsg(Order msgBody) {
        System.out.println("自定义Order对象处理方法,message:" + msgBody.toString());
    }

    // Java对象 对应User类型参数
    public void consumeMsg(User user) {
        System.out.println("自定义User对象处理方法,message:" + user.toString());
    }

    // File类型的参数
    public void consumeMsg(File file) {
        System.out.println("自定义File对象处理方法,message:" + file.getPath());
    }
}

复制代码

Order

package com.wyg.rabbitmq.springamqp.convert;

import java.io.Serializable;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 15:47
 * @since JDK1.8
 * @version V1.0
 */

public class Order implements Serializable {

    private static final long serialVersionUID = -4975357142857575433L;
    private String id;
    private String content;
    private double price;

    public Order() {}

    public Order(String id, String content, double price) {
        this.id = id;
        this.content = content;
        this.price = price;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("/"id/":/"").append(id).append('/"');
        sb.append(",/"content/":/"").append(content).append('/"');
        sb.append(",/"price/":").append(price);
        sb.append('}');
        return sb.toString();
    }
}

复制代码

User

package com.wyg.rabbitmq.springamqp.convert;

import java.io.Serializable;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 17:20
 * @since JDK1.8
 * @version V1.0
 */

public class User implements Serializable {

    private static final long serialVersionUID = 2959945432292661959L;
    private String id;
    private String name;
    private int age;

    public User() {}

    public User(String id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("/"id/":/"").append(id).append('/"');
        sb.append(",/"name/":/"").append(name).append('/"');
        sb.append(",/"age/":").append(age);
        sb.append('}');
        return sb.toString();
    }
}

复制代码

MyTextMessageConvert

自定义文本消息转换器

package com.wyg.rabbitmq.springamqp.convert;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * 自定义消息转换器
 * 
 * @author wyg0405@gmail.com
 * @date 2019-12-01 15:11
 * @since JDK1.8
 * @version V1.0
 */

public class MyTextMessageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            // 转换为String
            return new String(message.getBody());
        }
        return message.getBody();
    }

}

复制代码

MyPngMesssageConvert

自定义png转换器

package com.wyg.rabbitmq.springamqp.convert;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 19:32
 * @since JDK1.8
 * @version V1.0
 */

public class MyPngMesssageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("---------PNGMessageConvert---------");
        String contentType = message.getMessageProperties().getContentType();
        String extName = (String)message.getMessageProperties().getHeaders().get("extName");
        String filename = System.currentTimeMillis() + extName;
          // 将文件流读到本地
          String filePath="E://"+filename;
          File file= new File(filePath);
          try {
            Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
          } catch (IOException e) {
            e.printStackTrace();
          }
          return file;

    }
}

复制代码

MyPDFMessageConvert

自定义pdf转换器

package com.wyg.rabbitmq.springamqp.convert;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 19:34
 * @since JDK1.8
 * @version V1.0
 */

public class MyPDFMessageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("---------PDFMessageConvert---------");
        String contentType = message.getMessageProperties().getContentType();
        String extName = (String)message.getMessageProperties().getHeaders().get("extName");
        String filename = System.currentTimeMillis() + extName;
        // 将文件流读到本地
        String filePath = "E://" + filename;
        File file = new File(filePath);
        try {
            Files.copy(new ByteArrayInputStream(message.getBody()), file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;

    }

}

复制代码

单元测试

package com.wyg.rabbitmq.springamqp;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
    @Autowired
    RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;
   
    @Test
    public void testSimpleMessageListenerContainerSendMsg() {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "条消息").getBytes());

        }
    }

    @Test
    public void sendTextMsg() {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        for (int i = 0; i < 3; i++) {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/text");
            String body = "第" + i + "条消息";
            Message msg = new Message(body.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

        }
    }

    @Test
    public void sendJsonMsg() throws JsonProcessingException {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + json);

        MessageProperties messageProperties = new MessageProperties();
        // 这里 ContentType 一定要写成 "application/json"
        messageProperties.setContentType("application/json");
        Message msg = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendJavaTypeMsg() throws JsonProcessingException {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String order2Json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + order2Json);

        MessageProperties messageProperties = new MessageProperties();
        // 这里 ContentType 一定要写成 "application/json"
        messageProperties.setContentType("application/json");
        // __TypeId__ 为固定形式
        messageProperties.getHeaders().put("__TypeId__", "com.wyg.rabbitmq.springamqp.convert.Order");
        Message msg = new Message(order2Json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendJavaTypeMappingMsg() throws JsonProcessingException {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        // 发送Order类型
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String order2Json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + order2Json);
        MessageProperties messageProperties = new MessageProperties();
        // 这里 ContentType 一定要写成 "application/json"
        messageProperties.setContentType("application/json");
        // __TypeId__ 为固定形式, order标签
        messageProperties.getHeaders().put("__TypeId__", "order");
        Message msg = new Message(order2Json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

        // 发送User类型
        User user = new User("111", "jack", 20);
        String user2Json = mapper.writeValueAsString(user);
        System.out.println("User to Json:" + user2Json);

        MessageProperties messageProperties2 = new MessageProperties();
        // __TypeId__ 为固定形式, user标签
        messageProperties2.getHeaders().put("__TypeId__", "user");
        Message usermsg = new Message(user2Json.getBytes(), messageProperties2);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", usermsg);
    }

    @Test
    public void sendPNGMsg() throws IOException {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        String filePath="C://Users//wyg04//Desktop//image//green.png";
        byte[] allBytes = Files.readAllBytes(Paths.get(filePath));


        MessageProperties messageProperties = new MessageProperties();
        // 这里 ContentType 一定要写成 "image/png"或"png"
        messageProperties.setContentType("png");
        // 拓展名
        messageProperties.getHeaders().put("extName", ".png");
        Message msg = new Message(allBytes, messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendPDFMsg() throws IOException {
        // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
        // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
        String filePath="C://Users//wyg04//Desktop//深度工作.pdf";
        byte[] allBytes = Files.readAllBytes(Paths.get(filePath));


        MessageProperties messageProperties = new MessageProperties();
        // 这里 ContentType 一定要写成 "pdf"
        messageProperties.setContentType("pdf");
        // 拓展名
        messageProperties.getHeaders().put("extName", ".pdf");
        Message msg = new Message(allBytes, messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

}
复制代码
原文  https://juejin.im/post/5de3c6cd6fb9a0715559ab6b
正文到此结束
Loading...