MessageConverter 即消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter 了。
自定义常用转换器: MessageConverter, 一般来说都需要实现这个接口,然后重写以下两个方法。
toMessage: java 对象转换为 Message fromMessage: Message 对象转换为 Java 对象 复制代码
转换器类别:
json 转换器: jackson2JsonMessageConverter 可以进行 java 对象的转换功能 DefaultJackson2JavaTypeMapper 映射器:可以进行Java对象的映射关系 自定义二进制转换器: 比如图片类型、PDF、PPT、流媒体 复制代码
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下 复制代码
先创建一个 Java 实体 Order
public class Order { private String id; private String name; private String content; public Order() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Order(String id, String name, String content) { this.id = id; this.name = name; this.content = content; } @Override public String toString() { return "Order{" + "id='" + id + '/'' + ", name='" + name + '/'' + ", content='" + content + '/'' + '}'; } } 复制代码
接着在上一篇的 RabbitMQConfig 里面 配置支持 json 格式的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //3 支持json格式的转换器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
在委派 adapter 里面声明入参为 Map 的消费方法
public void consumeMessage(Map messageBody) { log.info("map方法, 消息内容:" + messageBody); } 复制代码
功能就完成了,接着写个单元测试,注意 ContentType 一定要是 json !!
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("test1001消息订单"); order.setContent("test1001订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); log.info("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } 复制代码
运行单元测试,消息就被消费了
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 4 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
public void consumeMessage(Order order) { log.info("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } 复制代码
@Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("1002"); order.setName("test1002消息订单"); order.setContent("test1002订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); log.info("order java: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); //注意这里要写你的实体类路径 messageProperties.getHeaders().put("__TypeId__", "com.hmily.rabbitmqapi.spring.domain.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } 复制代码
运行单元测试
报错提示:如出现 If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
的异常提示,这是因为 Jackson 在把字节流转换为 Java 对象时发生安全提醒,粗暴的解决方式如下:
新建一个 EnableAllJackson2JavaTypeMapper 类,其继承 DefaultJackson2JavaTypeMapper 这个类,然后在这里配置允许转换哪些对象,我这是是直接允许所有。
然后在 刚才的 SimpleMessageListenerContainer
里面, new EnableAllJackson2JavaTypeMapper()
改为 new EnableAllJackson2JavaTypeMapper()
,即可。
public class Packaged { private String id; private String name; private String description; public Packaged() { } public Packaged(String id, String name, String description) { this.id = id; this.name = name; this.description = description; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Packaged{" + "id='" + id + '/'' + ", name='" + name + '/'' + ", description='" + description + '/'' + '}'; } } 复制代码
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 5 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.hmily.rabbitmqapi.spring.domain.Order.class); idClassMapping.put("packaged", com.hmily.rabbitmqapi.spring.domain.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
public void consumeMessage(Packaged pack) { log.info("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } 复制代码
@Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("1001"); order.setName("1001订单消息"); order.setContent("1001订单描述信息"); String json1 = mapper.writeValueAsString(order); log.info("order java: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties1.setContentType("application/json"); messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("1002"); pack.setName("1002包裹消息"); pack.setDescription("1002包裹描述信息"); String json2 = mapper.writeValueAsString(pack); log.info("pack java: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); } 复制代码
注意这里面就不是写类的路径了,而是写刚才起的别名了
因为junitTest的关系,它发送完就关闭了,还有一条消息没被消费
可以上管控台确认一下
这时候直接运行一下我们项目 RabbitmqApiApplication ,就把刚才剩余的那条消息消费了
先写转换处理
public class ImageMessageConverter implements MessageConverter { private static final Logger log = LoggerFactory.getLogger(ImageMessageConverter.class); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "G:/test/file/new/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } 复制代码
消息接收
public void consumeMessage(File file) { log.info("文件对象 方法, 消息内容:" + file.getName()); } 复制代码
声明一个全局的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 6 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container; } 复制代码
编写单元测试来测试图片
@Test public void testSendExtConverterMessage() throws Exception { byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("image/png"); messageProperties.getHeaders().put("extName", "png"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "image_queue", message); } 复制代码
原图片的本地文件夹路径
运行测试后,图片生成到指定的目录下
public class PDFMessageConverter implements MessageConverter { private static final Logger log = LoggerFactory.getLogger(PDFMessageConverter.class); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "G:/test/file/new/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } 复制代码
@Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "AliTech101_RD.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); } 复制代码
验证 PDF 的处理是否成功了。