- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
rabbitmq端口说明:5672-amqp,25672-clustering,61613-stomp,1883-mqtt
- spring.rabbitmq.addresses=127.0.0.1:5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.publisher-confirms=true
- spring.rabbitmq.virtual-host=/
- package com.rabbitmq.send;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component
- public class Sender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void send(String msg) {
- this.rabbitTemplate.convertAndSend("foo", msg);
- }
- }
- package com.rabbitmq.listener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.messaging.handler.annotation.Payload;
- @Configuration
- @RabbitListener(queues = "foo")
- public class Listener {
- private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
- @Bean
- public Queue fooQueue() {
- return new Queue("foo");
- }
- @RabbitHandler
- public void process(@Payload String foo) {
- LOGGER.info("Listener: " + foo);
- }
- }
- package com.rabbitmq.controller;
- import com.rabbitmq.send.Sender;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import javax.servlet.http.HttpServletRequest;
- @RestController
- public class RabbitmqController {
- @Autowired
- private Sender sender;
- @GetMapping("/send")
- public String send(HttpServletRequest request, String msg) {
- sender.send(msg);
- return "Send OK.";
- }
- }
- INFO 5559 --- [cTaskExecutor-1] c.rabbitmq.listener.Listener : Listener: this is a test
- [SimpleAsyncTaskExecutor-1] INFO c.rabbitmq.listener.Listener - Listener: this is a test
- package com.rabbitmq.config;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
- @Configuration
- public class AmqpConfig {
- public static final String FOO_EXCHANGE = "callback.exchange.foo";
- public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
- public static final String FOO_QUEUE = "callback.queue.foo";
- @Value("${spring.rabbitmq.addresses}")
- private String addresses;
- @Value("${spring.rabbitmq.username}")
- private String username;
- @Value("${spring.rabbitmq.password}")
- private String password;
- @Value("${spring.rabbitmq.virtual-host}")
- private String virtualHost;
- @Value("${spring.rabbitmq.publisher-confirms}")
- private boolean publisherConfirms;
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses(addresses);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(virtualHost);
- /** 如果要进行消息回调,则这里必须要设置为true */
- connectionFactory.setPublisherConfirms(publisherConfirms);
- return connectionFactory;
- }
- @Bean
- /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- }
- package com.rabbitmq.send;
- import com.rabbitmq.config.AmqpConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.util.UUID;
- @Component
- public class Sender implements RabbitTemplate.ConfirmCallback {
- private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
- private RabbitTemplate rabbitTemplate;
- @Autowired
- public Sender(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- this.rabbitTemplate.setConfirmCallback(this);
- }
- public void send(String msg) {
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- LOGGER.info("send: " + correlationData.getId());
- this.rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE, AmqpConfig.FOO_ROUTINGKEY, msg, correlationData);
- }
- /** 回调方法 */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- LOGGER.info("confirm: " + correlationData.getId());
- }
- }
- package com.rabbitmq.listener;
- import com.rabbitmq.config.AmqpConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.messaging.handler.annotation.Payload;
- @Configuration
- @RabbitListener(queues = AmqpConfig.FOO_QUEUE)
- public class Listener {
- private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
- /** 设置交换机类型 */
- @Bean
- public DirectExchange defaultExchange() {
- /**
- * DirectExchange:按照routingkey分发到指定队列
- * TopicExchange:多关键字匹配
- * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
- * HeadersExchange :通过添加属性key-value匹配
- */
- return new DirectExchange(AmqpConfig.FOO_EXCHANGE);
- }
- @Bean
- public Queue fooQueue() {
- return new Queue(AmqpConfig.FOO_QUEUE);
- }
- @Bean
- public Binding binding() {
- /** 将队列绑定到交换机 */
- return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);
- }
- @RabbitHandler
- public void process(@Payload String foo) {
- LOGGER.info("Listener: " + foo);
- }
- }
这里参看了这篇博文 http://blog.csdn.net/liaokailin/article/details/49559571
- @Bean
- public SimpleMessageListenerContainer messageContainer() {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueues(fooQueue());
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(1);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- LOGGER.info("Listener onMessage : " + new String(body));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
- }
- });
- return container;
- }
- INFO 15122 --- [nio-8080-exec-1] com.jikefriend.rabbitmq.send.Sender : send: c678afb7-af8b-42b2-9370-ea7f9d6004a0
- [http-nio-8080-exec-1] INFO com.jikefriend.rabbitmq.send.Sender - send: c678afb7-af8b-42b2-9370-ea7f9d6004a0
- INFO 15122 --- [ 127.0.0.1:5672] com.jikefriend.rabbitmq.send.Sender : confirm: c678afb7-af8b-42b2-9370-ea7f9d6004a0
- [AMQP Connection 127.0.0.1:5672] INFO com.jikefriend.rabbitmq.send.Sender - confirm: c678afb7-af8b-42b2-9370-ea7f9d6004a0
- INFO 15122 --- [cTaskExecutor-1] c.jikefriend.rabbitmq.listener.Listener : Listener: this is a test
- [SimpleAsyncTaskExecutor-1] INFO c.j.rabbitmq.listener.Listener - Listener: this is a test