今日内容是消息队列。大佬讲了三个消息队列分别是 Pegion、Kafka 和 RabbitMQ。其中 Pegion 是公司自己研发的组件。今天主要在 SpringBoot 中集成 RabbitMQ , 将消息储存在消息队列中并消费的过程。
Message Broker 是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:
AMQP 是 Advanced Message Queuing Protocol 的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP 定义了这些特性:
本文要介绍的 RabbitMQ 就是以 AMQP 协议实现的一种中间件产品,它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。
下面,我们通过在 SpringBoot 应用中整合 RabbitMQ,并实现一个简单的发送、接收消息的例子来对 RabbitMQ 有一个直观的感受和理解。
在 SpringBoot 中整合 RabbitMQ 是一件非常容易的事,因为之前我们已经介绍过 Starter POMs,其中的 AMQP 模块就可以很好的支持 RabbitMQ,下面我们就来详细说说整合过程:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> 复制代码
spring.application.name=rabbitmq-hello spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=spring spring.rabbitmq.password=123456 复制代码
@Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } } 复制代码
@Component @RabbitListener(queues = "hello") public class Receiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } } 复制代码
@Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } } 复制代码
@SpringBootApplication public class HelloApplication { public static void main(String[] args) { SpringApplication.run(HelloApplication.class, args); } } 复制代码
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = HelloApplication.class) public class HelloApplicationTests { @Autowired private Sender sender; @Test public void hello() throws Exception { sender.send(); } } 复制代码
完成程序编写之后,下面开始尝试运行。首先确保 RabbitMQ Server 已经开始,然后进行下面的操作:
启动应用主类,从控制台中,我们看到如下内容,程序创建了一个访问127.0.0.1:5672中springcloud的连接。 o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@29836d32 [delegate=amqp://springcloud@127.0.0.1:5672/]
同时,我们通过 RabbitMQ 的控制面板,可以看到 Connection 和 Channels 中包含当前连接的条目。
运行单元测试类,我们可以看到控制台中输出下面的内容,消息被发送到了 RabbitMQ Server 的hello队列中。 Sender : hello Sun Sep 25 11:06:11 CST 2016
切换到应用主类的控制台,我们可以看到类似如下输出,消费者对hello队列的监听程序执行了,并输出了接受到的消息信息。 Receiver : hello Sun Sep 25 11:06:11 CST 2016
大功告成!