消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
RabbitMQ
简介 下载 rabbitMQ : www.rabbitmq.com/download.ht…
安装rabbitmq需要erlang,下载erlang: www.erlang.org/download.ht…
rabbitMQ安装,查看安装文档: www.rabbitmq.com/install-win…
下载完成Erlang后,直接打开文件下一步就可以安装完成了,安装完成ERLANG后再回过来安装RabbitMQ。
4、启动RabbitMQ
如果是安装版的直接在开始那里找到安装目录点击启动即可
5、安装管理工具
参考官方文档: www.rabbitmq.com/management.…
操作起来很简单,只需要在DOS下面,进入安装目录(F:/RabbitMQ Server/rabbitmq_server-3.5.3/sbin)执行如下命令就可以成功安装。
rabbitmq-plugins enable rabbitmq_management 复制代码
可以通过访问 http://localhost:15672 进行测试,默认的登陆账号为:guest,密码为:guest。
如果访问成功了,恭喜,整个rabbitMQ安装完成了。
RabbitMQ安装就不细讲了,大体就这样子,有什么问题可以具体查看其他官网详细的安装文档。
RabbitMQ
是一个由 Erlang 语言开发的 AMQP
的开源实现。
AMQP
:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ
最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
RabbitMQ
使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能, RabbitMQ
已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
多个 RabbitMQ
服务器可以组成一个集群,形成一个逻辑 Broker 。
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
RabbitMQ
支持多种消息队列协议,比如 STOMP、 MQTT
等等。
RabbitMQ
几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
RabbitMQ
提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
所有MQ产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列,生产者(producer)创建消息,然后发布到队列(queue)中去,消息将发送到订阅了该队列的消费者。
RabbitMQ
基本概念 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
网络连接,比如一个TCP连接。
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接, AMQP
命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
10: Broker
表示消息队列服务器实体。
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
不常用,和direct功能接近,不讨论。
上面只是简单的介绍了一下RabbitMQ的三种模式,接下来结合代码实例来看看。
SpringBoot就不多做介绍了,用idea创建SpringBoot项目,pom文件中导入如下包:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 复制代码
然后在applicaition.yml加入如下配置
spring: rabbitmq: host: localhost port: 5672 username: magic password: 123456 复制代码
这样就算是引入RabbitMQ就算成功了,接下来讲一下RabbitMQ三种模式结合SpringBoot。
Direct就是一对一模式,从上面可以知道,RabbitMQ有发送者,交换机,队列,接收者。Direct就是一个发送者对应一个接收者。如果有多个,只会有一个接收到消息。
先创建该模式的队列(Queue)
@Configuration public class OneByOneConfig { @Bean public Queue oneQueue(){ return new Queue("OneByOne"); } } 复制代码
可以看到新建的队列名字叫做OneByOne,接下来创建发送者类。
@Component public class OneByOneSender { @Autowired AmqpTemplate rabbitTemplate; public void send() { String context = "OneByOneSender" + new Date(); System.out.println("OneSender : " + context); this.rabbitTemplate.convertAndSend("OneByOne", context); } } 复制代码
这里面 AmqpTemplate
时 SpringBoot
包装好的用来操作消息队列的。convertAndSend是里面的一个方法,用来发送者匹配交换机,队列以及携带消息的。Drect里面只需要匹配队列以及携带消息即可。
接下来创建接收者
@Component @RabbitListener(queues = "OneByOne") public class OneByOneReceiver { @RabbitHandler public void receiver(String context){ System.out.println("OneByOne-Receiver::::"+context); } } 复制代码
@RabbitListener是用来绑定队列的,该接收者绑定了OneByOne这个队列,下面的@RabbitHandler注解是用来表示该方法是接收者接收消息的方法。
接下来进行测试,SpringBoot有测试的test类
导入OneByOneSender类
@Autowired private OneByOneSender oneByOneSender; @Test public void oneByOneTest(){ oneByOneSender.send(); } 复制代码
运行可以看到结果输出:
OneSender : OneByOneSenderThu Aug 22 17:00:56 CST 2019 OneByOne-Receiver::::OneByOneSenderThu Aug 22 17:00:56 CST 2019 复制代码
可以看到发送者发送打印的输出,以及接收者接收到的消息打印出来的结果。
Fanout模式就是发布订阅模式,发布者发布了消息时候顺便绑定交换器,交换器又是跟队列绑定的,那么跟这个交换器绑定的所有队列都会收到这个消息,相应的绑定了这些队列的所有接收者都会接收到发送的消息。
具体看代码再分析:
创建FanoutConfig类,然后创建队列,交换器,以及绑定队列与交换器。
@Configuration public class FanoutConfig { //队列1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.a"); } //队列2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.b"); } //队列3 @Bean public Queue fanoutQueue3(){ return new Queue("fanout.c"); } //交换器 @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } //绑定交换器和队列1 @Bean Binding bindingFanout1(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } //绑定交换器和队列2 @Bean Binding bindingFanout2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } //绑定交换器和队列3 @Bean Binding bindingFanout3(){ return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); } } 复制代码
可以看到上面创建了三个队列,到时候再创建三个接收者,那么这三个接收者再Fanout模式下,只要发布者绑定了该 fanoutExchange
交换器,那么他们就应该都可以收到消息。
创建发送者
@Component public class FanoutSender { @Autowired AmqpTemplate rabbitTemplate; public void fanSender1(){ Date date = new Date(); String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date); String message = "FanSender1111:"+dateString; this.rabbitTemplate.convertAndSend("fanoutExchange","",message); } public void fanSender2(){ Date date = new Date(); String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date); String message = "FanSender2222:"+dateString; this.rabbitTemplate.convertAndSend("fanoutExchange","",message); } } 复制代码
创建了两个发送者,分别绑定了fanoutExchange交换器,中间的是交换器选择队列是的条件routerKey,这个在后面的Topic模式中会用到,现在因为是所有队列都会收到,所有就没有条件。
创建三和接收者分别绑定三个队列
@Component @RabbitListener(queues = "fanout.a") public class Fanout1Reciver { @RabbitHandler public void receiver(String message){ System.out.println("FanoutReceiver---fanout.a:"+message); } } @Component @RabbitListener(queues = "fanout.b") public class Fanout2Reciver { @RabbitHandler public void receiver(String message){ System.out.println("FanoutReceiver---fanout.b:"+message); } } @Component @RabbitListener(queues = "fanout.c") public class Fanout3Reciver { @RabbitHandler public void receiver(String message){ System.out.println("FanoutReceiver---fanout.c:"+message); } } 复制代码
可以看到接收者分别绑定了fanout.a/b/c三个队列,接下来进行测试
运行下面代码:
@Autowired FanoutSender fanoutSender; @Test public void fanoutSenderTest(){ fanoutSender.fanSender1(); fanoutSender.fanSender2(); } 复制代码
结果:
FanoutReceiver---fanout.b:FanSender1111:2019-40-234 05:08:32 FanoutReceiver---fanout.c:FanSender1111:2019-40-234 05:08:32 FanoutReceiver---fanout.a:FanSender1111:2019-40-234 05:08:32 FanoutReceiver---fanout.c:FanSender2222:2019-40-234 05:08:32 FanoutReceiver---fanout.b:FanSender2222:2019-40-234 05:08:32 FanoutReceiver---fanout.a:FanSender2222:2019-40-234 05:08:32 复制代码
因为我们的 FanoutSender
发送者没有写输出,所以可以看到上面六条都是接收者的输出,两个发送者分别发送了一条消息,三个接收者都收到了这两个发送者发送的消息。
Topic模式就相当于发布订阅模式交换机和队列之间加上了一定的匹配规则。只有符合规则的消息才能到这个队列中去从而被接收者收到。看代码
创建TopicConfig:
@Configuration public class TopicConfig { @Bean public Queue topicQueue1(){ return new Queue("topic.a"); } @Bean public Queue topicQueue2(){ return new Queue("topic.b"); } @Bean public Queue topicQueue3(){ return new Queue("topic.c"); } @Bean TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding binding1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.msg"); } @Bean public Binding binding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); } @Bean public Binding binding3(){ return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*.z"); } } 复制代码
可以看到创建了三个队列和一个交换器,并且将交换器和队列进行了绑定,在绑定的过程中多了一个条件 with
,这是一种通配符方式的匹配,. 为分隔符,*代表一个,#表示0个或者多个,如上面的topic.#就可已匹配,topic,topic.z,topic.ma.z.z.z等,而topic.*.z就可以匹配topic.m.z,topic.z.z等,而topic.msg就只能匹配topic.msg条件的消息。
创建发送者:
@Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void topicSend1(){ Date date = new Date(); String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date); dateString = "[topic.msg] Send1 msg:" + dateString; System.out.println(dateString); this.rabbitTemplate.convertAndSend("topicExchange","topic.msg",dateString); } public void topicSend2(){ Date date = new Date(); String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date); dateString = "[topic.good.msg] Send2 msg:" + dateString; System.out.println(dateString); this.rabbitTemplate.convertAndSend("topicExchange","topic.good.msg",dateString); } public void topicSend3(){ Date date = new Date(); String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date); dateString = "[topic.m.z] Send3 msg:" + dateString; System.out.println(dateString); this.rabbitTemplate.convertAndSend("topicExchange","topic.m.z",dateString); } } 复制代码
其中
this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString); 复制代码
发送者发送消息时,传的三个参数,第一个时你要传给的交换机,第二个是传给交换机的条件,在Topic模式中,队列与交换机会有一个匹配的条件,如果现在有三个队列和交换机绑定,分别条件是:A: topic.# ,B: topic.msg, C:topic.*.z(#代表多个,*代表一个)。
则上面代码给的key时 topic.good.msg 就只能匹配到A队列中去。如果时topic.msg,那么就匹配到B队列中了,如果是topic.good.z/topic.msg.z 那么会匹配到A和C两个队列中去。
而同时,只要绑定了A,B,C的队列的接收者,如果上面匹配成功,消息就会被发布到队列中,相应的绑定了该队列的接收者就会获取到该消息。
创建接收者:
@Component @RabbitListener(queues = "topic.a") public class Topic1Reciver { @RabbitHandler public void receiver(String message){ System.out.println("topic.A--Receiver::::"+message); } } @Component @RabbitListener(queues = "topic.b") public class Topic2Reciver { @RabbitHandler public void receiver(String msg){ System.out.println("topic.B--Receiver::::"+msg); } } @Component @RabbitListener(queues = "topic.c") public class Topic3Reciver { @RabbitHandler public void receiver(String msg){ System.out.println("topic.C--Receiver::::"+msg); } } 复制代码
三个接收者分别绑定三个队列,看看测试以及结果
@Autowired TopicSender topicSender; @Test public void topicSenderTest(){ topicSender.topicSend1(); topicSender.topicSend2(); topicSender.topicSend3(); } 复制代码
结果:
[topic.msg] Send1 msg:2019-00-234 06:08:00 [topic.good.msg] Send2 msg:2019-00-234 06:08:00 [topic.m.z] Send3 msg:2019-00-234 06:08:00 ---------------------------------------------------- topic.B--Receiver::::[topic.msg] Send1 msg:2019-00-234 06:08:00 topic.C--Receiver::::[topic.m.z] Send3 msg:2019-00-234 06:08:00 topic.A--Receiver::::[topic.msg] Send1 msg:2019-00-234 06:08:00 topic.B--Receiver::::[topic.good.msg] Send2 msg:2019-00-234 06:08:00 topic.B--Receiver::::[topic.m.z] Send3 msg:2019-00-234 06:08:00 复制代码
分割线上面的是发送者的输出,三个发送者分别发送了一条消息,根据发送者传入的key与交换器与队列绑定的匹配规则进行匹配,最终匹配通过的将消息从交换器发到队列中,相应的绑定该队列的接收者就可以获取到这条消息。
在Java中,需要先创建queue队列,接收消息着绑定的是队列,如果队列中有了消息,就会发给绑定它的接收者。然后交换机和队列进行绑定,交换机是一个,所有队列都在交换机上绑定着,发送者发送消息时,把消息给交换机,然后加上限制条件,topic是给满足条件的队列,fanout时给所有绑定交换机的队列。普通模式就是Direct模式也就是1对1模式,发送者直接绑定队列,接收者也绑定队列。互勉~
欢迎关注我的微信公众号,一个喜欢代码和NBA的年轻人,主要用来分享技术收获。