转载

温故之消息队列ActiveMQ

消息队列中间件是分布式系统中的重要组件,主要解决应用耦合、异步消息、流量削锋等问题。可帮助实现高性能,高可用,可伸缩和最终一致性的架构

在消息队列方面,除了 ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ,Kafka等,还有很多其他的竞争者。这篇文章我们不会去讲解它们之间的区别,仅只详细的介绍一下 ActiveMQ,以及它在 .NET 中的使用

消息队列应用场景

异步任务

比如有以下场景:现在很多网站或App注册时都采用了验证码的机制,因此,当服务器收到客户端发起获取验证码的请求,有以下处理方式

  1. 在当前线程中立即发送短信(会阻塞当前线程一小会儿)
  2. 新建立一个线程发送短信(在 .NET 中建立一个 Task 就行)
  3. 交由其他的服务来处理这个任务(转发给消息队列,让消息队列处理)

那么,以上几种方式哪种更好呢?

  • 第一种:实时性肯定更好,收到请求立即处理,但它阻塞了当前线程,会造成其他客户端的请求被阻塞(请求少的时候我们可能根本感觉不到);
  • 第二种:在当前进程中建立一个线程来处理,实时性不如第一种,但它不会阻塞其他客户端的请求。不过一个进程中能创建的线程数量有限,因此也有瓶颈
  • 第三种:使用其他特定场景的服务,这种实时性最差(但如果服务器配置好,我们也不一定能感觉到差异),但其是使用的最多的,并且其上线后效果是最好的(稳定性、可伸缩性)

因此,如果是正式上线的版本(比如项目初期用于验证市场的版本,往往会为了速度而不考虑架构,这时可能会选择第一种或第二种方案),且峰值较高的服务,选用第三种方案无疑是最好的。因为对于上线的服务,稳定性是非常重要的

对于发送短信这样的任务(对实时性要求不是那么高),使用消息队列是非常合适的。将任务交由消息队列之后,发送短信具体要做的事情主服务就不需要干涉了。如果需要,主服务订阅任务的处理结果即可(发送成功或者失败)。这样,主服务就可以继续处理其他客户端的请求,并且,有消息队列的参与,主服务的压力就没有那么重了

当然,实际项目中,这样的场景还有很多,比如记录日志,我们都知道,写文件(磁盘I/O)很耗时。因此现在很多大型的服务,都有专门的日志服务器来处理其他服务器发送过来的日志,这时候我们可以使用 Kafka 来做这样的事情(因为它就是为了处理日志而生的)

消息服务

比如现如今的微服务、分布式集群等,各个节点之间的通信,就可以使用消息队列来处理。具体使用什么方式,可更具场景从以下两种选择

  • P2P(Point to Point)点对点模式
  • Publish/Subscribe(Pub/Sub) 发布订阅模式

后面在给出案例时会具体讲解这两种模式

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。另外,在很多大型的网站或服务中,也都会使用到它

它具有以下特性

  • 多种语言和协议编写客户端
    语言:Java、C、C++、C#、Ruby、Perl、Python、PHP;
    应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
  • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring 的最新特性
  • 通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  • 支持多种传输协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
  • 支持通过JDBC和journal提供高速的消息持久化
  • 从设计上保证了高性能的集群,客户端-服务器以及点对点的通信
  • 支持Ajax
  • 支持与Axis的整合
  • 可以很容易调用内嵌 JMS provider 进行测试

它的优势

  • 稳定性:失败重连机制,持久化服务, 容错机制, 多种恢复机制
  • 高效性:支持多种传送协议如TCP, SSL, NIO, UDP等,集群消息在多个代理之间转发防止消息丢失,支持超快的JDBC消息持久化和高效的日志系统
  • 可扩展:ActiveMQ 的高级特性都可以配置的形式来表现,很好的实现例如游标,容错机制,消息group及监控服务,同时扩展了很多成熟的框架
  • 高级特性:消息群组(Message Groups)、虚拟端点(Virtual Destinations)、通配符(Wildcards)、复合端点(Composite Destinations)

ActiveMQ在Windows上的安装配置

这方面的教程在网上有很多,我们在这就不提供了,只提供一些移动端友好的链接以帮助朋友安装配置

  • www.cnblogs.com/yangw/p/591…
  • www.cnblogs.com/chy123/p/87…
  • www.cnblogs.com/donsenChen/…
  • blog.csdn.net/j080624/art…

ActiveMQ在C#中的使用

首先,需要在Apache官网 上下载 .NET 的驱动,也可以通过以下链接下载

mirrors.hust.edu.cn/apache/acti…

要在项目中使用 ActiveMQ,需要引入上面下载的包中的两个 dll 文件: Apache.NMS.ActiveMQ.dllApache.NMS.dll

P2P模式案例

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。

每条消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时

P2P的特点:

  • 每条消息只有一个消费者(即一旦被消费,消息就会被移除消息队列):在运行了多个消费者之后,一条消息只会有一个消费者收到,其他的消费者是不可以收到的
  • 接收者在成功接收消息之后需向队列应答成功:我们可以通过指定应答模式来更改,默认是自动应答模式

因此,如果希望发送的每个消息都会被成功处理的话,则应该P2P模式

示例代码的基类如下

public abstract class ActiveMQBase {
    protected IConnectionFactory factory;
    protected IConnection connection;
    protected ISession session;

    public virtual void Init() {
        try {
            //初始化工厂, 端口默认为61616,指定其他会抛异常
            factory = new ConnectionFactory("tcp://localhost:61616");
            connection = factory.CreateConnection();
            connection.Start();
            session = connection.CreateSession();
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }
    
    public abstract void Run();

    // 释放相关资源
    public virtual void Release() {
        try {
            if (session != null) session.Close();
            if (connection != null) connection.Close();
        } finally {
            session = null;
            connection = null;
            factory = null;
        }
    }
}
复制代码

生产者(Producer)如下

public class ActiveMQP2PDemoProducer : ActiveMQBase {
    private IMessageProducer messageProducer;
    private ActiveMQQueue demoQueue;

    public override void Init() {
        base.Init();
        try {
            // 指定队列,以实现点对点的通信 
            demoQueue = new ActiveMQQueue("DEMO_QUEUE");
            // 创建生产者对象
            messageProducer = session.CreateProducer(demoQueue);
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }
    
    public override void Run() {
        while (true) {
            Console.WriteLine("请输入消息,exit 退出");
            string line = Console.ReadLine();
            if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) {
                break;
            }
            // 创建一条文本消息,在 MessageProvider 中存在多个创建消息的方法
            // 在实际项目中灵活选择即可
            ITextMessage message = messageProducer.CreateTextMessage(line);
            // 发送消息,可调用其他的重载,以设置是否持久化、优先级等特性
            messageProducer.Send(message);
        }
    }

    public override void Release() {
        base.Release();
        try {
            if (demoQueue != null) demoQueue.Dispose();
            if (messageProducer != null) messageProducer.Close();
        } finally {
            demoQueue = null;
            messageProducer = null;
        }
    }
}
复制代码

消费者(Consumer)如下

public class ActiveMQP2PDemoComsumer : ActiveMQBase {
    private IMessageConsumer messageConsumer;
    private ActiveMQQueue demoQueue;

    public override void Init() {
        base.Init();
        try {
            demoQueue = new ActiveMQQueue("DEMO_QUEUE");
            // 创建消息的消费者
            messageConsumer = session.CreateConsumer(demoQueue);
            // 添加监听,当消息来临时,会触发此事件
            messageConsumer.Listener += this.MessageConsumer_Listener;
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }


    private void MessageConsumer_Listener(IMessage message) {
        // 解析接收到的消息
        if (message is ITextMessage msg) {
            Console.WriteLine($"Received Message: {msg.Text}");
        }
    }

    public override void Run() {
        // 此处用于阻止控制台结束,以保证消息可被正确处理
        Console.WriteLine("请输入消息,exit 退出");
        string line = Console.ReadLine();
    }

    public override void Release() {
        base.Release();
        try {
            if (demoQueue != null) demoQueue.Dispose();
            if (messageConsumer != null){
               messageConsumer.Listener -= this.MessageConsumer_Listener;
               messageConsumer.Close();
            }
        } finally {
            demoQueue = null;
            messageConsumer = null;
        }
    }
}
复制代码

使用方式如下

// 生产者初始化
ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer();
// 消费者初始化代码则为: ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer();
demo.Init();
demo.Run();
demo.Release();
复制代码

在 ActiveMQ 管理界面可以看到如下,表示生产者发送的消息,都已经被消费者消费了

温故之消息队列ActiveMQ

Pub/Sub模式

Pub/Sub模式:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic, 系统将这些消息传递给多个订阅者,可以认为生产者与消费者之间是多对多的关系

Pub/Sub的特点

  • 每条消息可以有多个消费者
  • 为了消费消息,订阅者必须保持运行的状态
  • 为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样即使订阅者没有运行,在运行之后它也能接收到发布者的消息

因此,如果允许发送的消息可以被一个或多个消费者消费、或者可以不被消费,那么可以采用 Pub/Sub 模型

在 C# 中,它与 P2P 的使用区别不大,只需要将上述代码生产者和消费者初始化代码中

demoQueue = new ActiveMQQueue("DEMO_QUEUE");
复制代码

这部分换成

demoTopic = new ActiveMQTopic("DEMO_TOPIC");
复制代码

在管理员界面可以看到如下数据

温故之消息队列ActiveMQ

通过示例可以看出,P2P 是基于 Queue 的,而 Pub/Sub 模式则是基于 Topic 的。

在 Pub/Sub 模式下,可以实现多对多的通信,即可以有多个生产者,也可以有多个消费者,一旦有消息到来,它们会都会收到消息。

而P2P模式下,它可以允许有多个生产者,也可以有多个消费者。与 Pub/Sub 不同的是,如果有多个消费者,如果有消息到来,这些消费者会轮流着去消费该消息,而不是每个消费者都收到消息。即一条消息只会有一个消费者

由于在 C# 中,这两种模式的使用方式差别很小,而运行之后产生的行为却差别较大。因此,在实际项目中,我们需要注意这两者之间的区别,以免带来不必要的困惑

实际项目中的一些问题

ActiveMQ服务器宕机怎么办如果我们想要在服务器宕机之后恢复数据,则需要对消息进行持久化

在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。它们的最大限制在配置文件的 <systemUsage> 节点中配置

但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ 会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除(即重启之后不会从临时文件中恢复消息)

因此,为了保证数据的可靠性

  • 尽量使用持久化消息(消息不重要也可以不用持久化)
  • 可以将持久化与非持久化文件的限制调大一点,以保证服务最大可用

丢消息

这同样是持久化消息的问题。对于这种情况,我们可以

  1. 尽量将消息持久化
  2. 如果不想持久化,那么我们应该尽可能的及时处理非持久化的消息
  3. 使用事务,它可以保证消息不会因为连接关闭而丢失

持久化消息比较慢

默认的情况下,非持久化消息是异步发送的;而持久化消息是同步发送的。遇到慢一点的硬盘,发送消息的速度也会很慢

但如果开启事务的情况下,消息都会异步发送,效率会有非常大的提升。所以在发送持久化消息时,我们应该务必开启事务。并且我们也建议发送非持久化消息时也开启事务

自定义 ActiveMQ 的重发策略(Redelivery Policy)

可通过 ConnectionFactory.RedeliveryPolicy 属性设置

  • CollisionAvoidancePercent :默认值 0.15, 设置防止冲突范围的正负百分比,只有启用 UseCollisionAvoidance 参数时才生效
  • MaximumRedeliveries :默认值 6, 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传
  • InitialRedeliveryDelay :默认值 1000, 初始重发延迟时间
  • UseCollisionAvoidance :默认值 false , 启用防止冲突功能
  • UseExponentialBackOff :默认值 false , 启用指数倍数递增的方式增加延迟时间
  • BackOffMultiplier :默认值 5, 重连时间间隔递增倍数,只有值大于1和启用 UseExponentialBackOff 参数时才生效。

多消费者并发处理

在有多个消费者,ActiveMQ 中累积了大量的数据的情况下,有可能会出现只有一个消费者消费、其他消费者不“工作”的情况

这种情况下,我们只需要将 ActiveMQ 的 prefetch 值设置得小一点即可。在 Queue模式时,其默认值为 1000;Topic 下为 32766。可通过 ConnectionFactory.PrefetchPolicy 设置

这篇文章就先讲到这里,后面我们会讲解 ActiveMQ 的一些其他场景,如分布式集群。欢迎持续关注公众号【嘿嘿的学习日记】,Thank you~

温故之消息队列ActiveMQ
原文  https://juejin.im/post/5b59b24bf265da0fb018787c
正文到此结束
Loading...