package com.wyg.rabbitmq.springamqp; import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert; import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert; /** * RabbitConfig * * @author wyg0405@gmail.com * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */ @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses("localhost:5672"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); cachingConnectionFactory.setVirtualHost("/"); return cachingConnectionFactory; } /** * RabbitAdmin注入容器 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 16:35 */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); /* * autoStartup 必须要设为 true ,否则Spring容器不会加载RabbitAdmin类 */ rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * RabbitTemplate注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 16:37 */ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 17:16 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 监听多个queue container.addQueueNames("test01"); //container.addQueueNames("test01", "test02", "test03"); // 设置当前消费者数量 container.setConcurrentConsumers(1); // 设置最大的消费者数量 container.setMaxConcurrentConsumers(5); // 设置不要重回队列 container.setDefaultRequeueRejected(false); // 设置自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置消费端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 方式二,使用适配器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate()); // 自定处理消息方法,不设置默认为handleMessage adapter.setDefaultListenerMethod("consumeMsg"); // 自定义消息转换器 // adapter.setMessageConverter(new MyTextMessageConvert()); // 1.自定义消息转换器 Json,实际上json对应的是Map类型参数 /* Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jsonMessageConverter);*/ // 2.Java对象 /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); defaultJackson2JavaTypeMapper.setTrustedPackages("*"); jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jsonMessageConverter);*/ // 3.Java对象多映射关系 /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> map = new HashMap<>(); map.put("order", Order.class); map.put("user", User.class); defaultJackson2JavaTypeMapper.setTrustedPackages("*"); defaultJackson2JavaTypeMapper.setIdClassMapping(map); jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jsonMessageConverter);*/ // 4.全局转换器 ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter(); MyTextMessageConvert textMessageConvert = new MyTextMessageConvert(); converter.addDelegate("text", textMessageConvert); converter.addDelegate("appliction/text", textMessageConvert); converter.addDelegate("text/plain", textMessageConvert); Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter("*"); converter.addDelegate("json", jsonMessageConverter); converter.addDelegate("application/json", jsonMessageConverter); // png converter.addDelegate("png", new MyPngMesssageConvert()); converter.addDelegate("image/png", new MyPngMesssageConvert()); // pdf converter.addDelegate("pdf", new MyPDFMessageConvert()); adapter.setMessageConverter(converter); container.setMessageListener(adapter); return container; } } 复制代码
package com.wyg.rabbitmq.springamqp; import java.io.File; import java.util.Map; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; /** * * @author wyg0405@gmail.com * @date 2019-11-29 14:23 * @since JDK1.8 * @version V1.0 */ public class MyMessageListenerDelegate { // 默认方法 public void handleMessage(byte[] body) { System.out.println("默认处理方法,message:" + new String(body)); } // 自定义处理方法 public void consumeMsg(byte[] msgBody) { System.out.println("自定义处理方法,message:" + new String(msgBody)); } // 自定义处理String类型消息方法 public void consumeMsg(String msgBody) { System.out.println("自定义处理String消息方法,message:" + new String(msgBody)); } public void method01(String msgBody) { System.out.println("method01处理String消息方法,message:" + new String(msgBody)); } public void method02(String msgBody) { System.out.println("method02处理String消息方法,message:" + new String(msgBody)); } public void method03(String msgBody) { System.out.println("method03处理String消息方法,message:" + new String(msgBody)); } // json 对应map类型参数 public void consumeMsg(Map msgBody) { System.out.println("自定义json处理方法,message:" + msgBody.toString()); } // Java对象 对应Order类型参数 public void consumeMsg(Order msgBody) { System.out.println("自定义Order对象处理方法,message:" + msgBody.toString()); } // Java对象 对应User类型参数 public void consumeMsg(User user) { System.out.println("自定义User对象处理方法,message:" + user.toString()); } // File类型的参数 public void consumeMsg(File file) { System.out.println("自定义File对象处理方法,message:" + file.getPath()); } } 复制代码
package com.wyg.rabbitmq.springamqp.convert; import java.io.Serializable; /** * * @author wyg0405@gmail.com * @date 2019-12-01 15:47 * @since JDK1.8 * @version V1.0 */ public class Order implements Serializable { private static final long serialVersionUID = -4975357142857575433L; private String id; private String content; private double price; public Order() {} public Order(String id, String content, double price) { this.id = id; this.content = content; this.price = price; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("/"id/":/"").append(id).append('/"'); sb.append(",/"content/":/"").append(content).append('/"'); sb.append(",/"price/":").append(price); sb.append('}'); return sb.toString(); } } 复制代码
package com.wyg.rabbitmq.springamqp.convert; import java.io.Serializable; /** * * @author wyg0405@gmail.com * @date 2019-12-01 17:20 * @since JDK1.8 * @version V1.0 */ public class User implements Serializable { private static final long serialVersionUID = 2959945432292661959L; private String id; private String name; private int age; public User() {} public User(String id, String name, int age) { this.id = id; this.name = name; this.age = age; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("/"id/":/"").append(id).append('/"'); sb.append(",/"name/":/"").append(name).append('/"'); sb.append(",/"age/":").append(age); sb.append('}'); return sb.toString(); } } 复制代码
自定义文本消息转换器
package com.wyg.rabbitmq.springamqp.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; /** * 自定义消息转换器 * * @author wyg0405@gmail.com * @date 2019-12-01 15:11 * @since JDK1.8 * @version V1.0 */ public class MyTextMessageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if (null != contentType && contentType.contains("text")) { // 转换为String return new String(message.getBody()); } return message.getBody(); } } 复制代码
自定义png转换器
package com.wyg.rabbitmq.springamqp.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; /** * * @author wyg0405@gmail.com * @date 2019-12-01 19:32 * @since JDK1.8 * @version V1.0 */ public class MyPngMesssageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return null; } @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("---------PNGMessageConvert---------"); String contentType = message.getMessageProperties().getContentType(); String extName = (String)message.getMessageProperties().getHeaders().get("extName"); String filename = System.currentTimeMillis() + extName; // 将文件流读到本地 String filePath="E://"+filename; File file= new File(filePath); try { Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } } 复制代码
自定义pdf转换器
package com.wyg.rabbitmq.springamqp.convert; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; /** * * @author wyg0405@gmail.com * @date 2019-12-01 19:34 * @since JDK1.8 * @version V1.0 */ public class MyPDFMessageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return null; } @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("---------PDFMessageConvert---------"); String contentType = message.getMessageProperties().getContentType(); String extName = (String)message.getMessageProperties().getHeaders().get("extName"); String filename = System.currentTimeMillis() + extName; // 将文件流读到本地 String filePath = "E://" + filename; File file = new File(filePath); try { Files.copy(new ByteArrayInputStream(message.getBody()), file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } } 复制代码
单元测试
package com.wyg.rabbitmq.springamqp; import java.io.*; import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitConfigTest { @Autowired RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private SimpleMessageListenerContainer simpleMessageListenerContainer; @Test public void testSimpleMessageListenerContainerSendMsg() { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey for (int i = 0; i < 3; i++) { rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "条消息").getBytes()); } } @Test public void sendTextMsg() { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey for (int i = 0; i < 3; i++) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/text"); String body = "第" + i + "条消息"; Message msg = new Message(body.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } } @Test public void sendJsonMsg() throws JsonProcessingException { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String json = mapper.writeValueAsString(order); System.out.println("order to json:" + json); MessageProperties messageProperties = new MessageProperties(); // 这里 ContentType 一定要写成 "application/json" messageProperties.setContentType("application/json"); Message msg = new Message(json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendJavaTypeMsg() throws JsonProcessingException { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String order2Json = mapper.writeValueAsString(order); System.out.println("order to json:" + order2Json); MessageProperties messageProperties = new MessageProperties(); // 这里 ContentType 一定要写成 "application/json" messageProperties.setContentType("application/json"); // __TypeId__ 为固定形式 messageProperties.getHeaders().put("__TypeId__", "com.wyg.rabbitmq.springamqp.convert.Order"); Message msg = new Message(order2Json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendJavaTypeMappingMsg() throws JsonProcessingException { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey // 发送Order类型 Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String order2Json = mapper.writeValueAsString(order); System.out.println("order to json:" + order2Json); MessageProperties messageProperties = new MessageProperties(); // 这里 ContentType 一定要写成 "application/json" messageProperties.setContentType("application/json"); // __TypeId__ 为固定形式, order标签 messageProperties.getHeaders().put("__TypeId__", "order"); Message msg = new Message(order2Json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); // 发送User类型 User user = new User("111", "jack", 20); String user2Json = mapper.writeValueAsString(user); System.out.println("User to Json:" + user2Json); MessageProperties messageProperties2 = new MessageProperties(); // __TypeId__ 为固定形式, user标签 messageProperties2.getHeaders().put("__TypeId__", "user"); Message usermsg = new Message(user2Json.getBytes(), messageProperties2); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", usermsg); } @Test public void sendPNGMsg() throws IOException { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey String filePath="C://Users//wyg04//Desktop//image//green.png"; byte[] allBytes = Files.readAllBytes(Paths.get(filePath)); MessageProperties messageProperties = new MessageProperties(); // 这里 ContentType 一定要写成 "image/png"或"png" messageProperties.setContentType("png"); // 拓展名 messageProperties.getHeaders().put("extName", ".png"); Message msg = new Message(allBytes, messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendPDFMsg() throws IOException { // 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02", // "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey String filePath="C://Users//wyg04//Desktop//深度工作.pdf"; byte[] allBytes = Files.readAllBytes(Paths.get(filePath)); MessageProperties messageProperties = new MessageProperties(); // 这里 ContentType 一定要写成 "pdf" messageProperties.setContentType("pdf"); // 拓展名 messageProperties.getHeaders().put("extName", ".pdf"); Message msg = new Message(allBytes, messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } } 复制代码