消息队列中间件是分布式系统中的重要组件,主要解决应用耦合、异步消息、流量削锋等问题。可帮助实现高性能,高可用,可伸缩和最终一致性的架构
在消息队列方面,除了 ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ,Kafka等,还有很多其他的竞争者。这篇文章我们不会去讲解它们之间的区别,仅只详细的介绍一下 ActiveMQ,以及它在 .NET 中的使用
比如有以下场景:现在很多网站或App注册时都采用了验证码的机制,因此,当服务器收到客户端发起获取验证码的请求,有以下处理方式
那么,以上几种方式哪种更好呢?
因此,如果是正式上线的版本(比如项目初期用于验证市场的版本,往往会为了速度而不考虑架构,这时可能会选择第一种或第二种方案),且峰值较高的服务,选用第三种方案无疑是最好的。因为对于上线的服务,稳定性是非常重要的
对于发送短信这样的任务(对实时性要求不是那么高),使用消息队列是非常合适的。将任务交由消息队列之后,发送短信具体要做的事情主服务就不需要干涉了。如果需要,主服务订阅任务的处理结果即可(发送成功或者失败)。这样,主服务就可以继续处理其他客户端的请求,并且,有消息队列的参与,主服务的压力就没有那么重了
当然,实际项目中,这样的场景还有很多,比如记录日志,我们都知道,写文件(磁盘I/O)很耗时。因此现在很多大型的服务,都有专门的日志服务器来处理其他服务器发送过来的日志,这时候我们可以使用 Kafka 来做这样的事情(因为它就是为了处理日志而生的)
比如现如今的微服务、分布式集群等,各个节点之间的通信,就可以使用消息队列来处理。具体使用什么方式,可更具场景从以下两种选择
后面在给出案例时会具体讲解这两种模式
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。另外,在很多大型的网站或服务中,也都会使用到它
它具有以下特性
它的优势
这方面的教程在网上有很多,我们在这就不提供了,只提供一些移动端友好的链接以帮助朋友安装配置
首先,需要在Apache官网 上下载 .NET 的驱动,也可以通过以下链接下载
mirrors.hust.edu.cn/apache/acti…
要在项目中使用 ActiveMQ,需要引入上面下载的包中的两个 dll 文件: Apache.NMS.ActiveMQ.dll
和 Apache.NMS.dll
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。
每条消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时
P2P的特点:
因此,如果希望发送的每个消息都会被成功处理的话,则应该P2P模式
示例代码的基类如下
public abstract class ActiveMQBase { protected IConnectionFactory factory; protected IConnection connection; protected ISession session; public virtual void Init() { try { //初始化工厂, 端口默认为61616,指定其他会抛异常 factory = new ConnectionFactory("tcp://localhost:61616"); connection = factory.CreateConnection(); connection.Start(); session = connection.CreateSession(); } catch (Exception e) { Console.WriteLine($"Error: {e.Message}"); } } public abstract void Run(); // 释放相关资源 public virtual void Release() { try { if (session != null) session.Close(); if (connection != null) connection.Close(); } finally { session = null; connection = null; factory = null; } } } 复制代码
生产者(Producer)如下
public class ActiveMQP2PDemoProducer : ActiveMQBase { private IMessageProducer messageProducer; private ActiveMQQueue demoQueue; public override void Init() { base.Init(); try { // 指定队列,以实现点对点的通信 demoQueue = new ActiveMQQueue("DEMO_QUEUE"); // 创建生产者对象 messageProducer = session.CreateProducer(demoQueue); } catch (Exception e) { Console.WriteLine($"Error: {e.Message}"); } } public override void Run() { while (true) { Console.WriteLine("请输入消息,exit 退出"); string line = Console.ReadLine(); if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) { break; } // 创建一条文本消息,在 MessageProvider 中存在多个创建消息的方法 // 在实际项目中灵活选择即可 ITextMessage message = messageProducer.CreateTextMessage(line); // 发送消息,可调用其他的重载,以设置是否持久化、优先级等特性 messageProducer.Send(message); } } public override void Release() { base.Release(); try { if (demoQueue != null) demoQueue.Dispose(); if (messageProducer != null) messageProducer.Close(); } finally { demoQueue = null; messageProducer = null; } } } 复制代码
消费者(Consumer)如下
public class ActiveMQP2PDemoComsumer : ActiveMQBase { private IMessageConsumer messageConsumer; private ActiveMQQueue demoQueue; public override void Init() { base.Init(); try { demoQueue = new ActiveMQQueue("DEMO_QUEUE"); // 创建消息的消费者 messageConsumer = session.CreateConsumer(demoQueue); // 添加监听,当消息来临时,会触发此事件 messageConsumer.Listener += this.MessageConsumer_Listener; } catch (Exception e) { Console.WriteLine($"Error: {e.Message}"); } } private void MessageConsumer_Listener(IMessage message) { // 解析接收到的消息 if (message is ITextMessage msg) { Console.WriteLine($"Received Message: {msg.Text}"); } } public override void Run() { // 此处用于阻止控制台结束,以保证消息可被正确处理 Console.WriteLine("请输入消息,exit 退出"); string line = Console.ReadLine(); } public override void Release() { base.Release(); try { if (demoQueue != null) demoQueue.Dispose(); if (messageConsumer != null){ messageConsumer.Listener -= this.MessageConsumer_Listener; messageConsumer.Close(); } } finally { demoQueue = null; messageConsumer = null; } } } 复制代码
使用方式如下
// 生产者初始化 ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer(); // 消费者初始化代码则为: ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer(); demo.Init(); demo.Run(); demo.Release(); 复制代码
在 ActiveMQ 管理界面可以看到如下,表示生产者发送的消息,都已经被消费者消费了
Pub/Sub模式:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic, 系统将这些消息传递给多个订阅者,可以认为生产者与消费者之间是多对多的关系
Pub/Sub的特点
因此,如果允许发送的消息可以被一个或多个消费者消费、或者可以不被消费,那么可以采用 Pub/Sub 模型
在 C# 中,它与 P2P 的使用区别不大,只需要将上述代码生产者和消费者初始化代码中
demoQueue = new ActiveMQQueue("DEMO_QUEUE"); 复制代码
这部分换成
demoTopic = new ActiveMQTopic("DEMO_TOPIC"); 复制代码
在管理员界面可以看到如下数据
通过示例可以看出,P2P 是基于 Queue 的,而 Pub/Sub 模式则是基于 Topic 的。
在 Pub/Sub 模式下,可以实现多对多的通信,即可以有多个生产者,也可以有多个消费者,一旦有消息到来,它们会都会收到消息。
而P2P模式下,它可以允许有多个生产者,也可以有多个消费者。与 Pub/Sub 不同的是,如果有多个消费者,如果有消息到来,这些消费者会轮流着去消费该消息,而不是每个消费者都收到消息。即一条消息只会有一个消费者
由于在 C# 中,这两种模式的使用方式差别很小,而运行之后产生的行为却差别较大。因此,在实际项目中,我们需要注意这两者之间的区别,以免带来不必要的困惑
ActiveMQ服务器宕机怎么办如果我们想要在服务器宕机之后恢复数据,则需要对消息进行持久化
在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。它们的最大限制在配置文件的 <systemUsage>
节点中配置
但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ 会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除(即重启之后不会从临时文件中恢复消息)
因此,为了保证数据的可靠性
这同样是持久化消息的问题。对于这种情况,我们可以
默认的情况下,非持久化消息是异步发送的;而持久化消息是同步发送的。遇到慢一点的硬盘,发送消息的速度也会很慢
但如果开启事务的情况下,消息都会异步发送,效率会有非常大的提升。所以在发送持久化消息时,我们应该务必开启事务。并且我们也建议发送非持久化消息时也开启事务
可通过 ConnectionFactory.RedeliveryPolicy
属性设置
CollisionAvoidancePercent
:默认值 0.15, 设置防止冲突范围的正负百分比,只有启用 UseCollisionAvoidance
参数时才生效 MaximumRedeliveries
:默认值 6, 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传 InitialRedeliveryDelay
:默认值 1000, 初始重发延迟时间 UseCollisionAvoidance
:默认值 false
, 启用防止冲突功能 UseExponentialBackOff
:默认值 false
, 启用指数倍数递增的方式增加延迟时间 BackOffMultiplier
:默认值 5, 重连时间间隔递增倍数,只有值大于1和启用 UseExponentialBackOff
参数时才生效。 在有多个消费者,ActiveMQ 中累积了大量的数据的情况下,有可能会出现只有一个消费者消费、其他消费者不“工作”的情况
这种情况下,我们只需要将 ActiveMQ 的 prefetch
值设置得小一点即可。在 Queue模式时,其默认值为 1000;Topic 下为 32766。可通过 ConnectionFactory.PrefetchPolicy
设置
这篇文章就先讲到这里,后面我们会讲解 ActiveMQ 的一些其他场景,如分布式集群。欢迎持续关注公众号【嘿嘿的学习日记】,Thank you~