官方对 Spring Cloud Stream 的一段介绍: Spring Cloud St ream 是一个用于 构建基于消息的微服务应用框架 。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。
老顾来翻译一下,就是现在的消息中间件比较多,如:RabbitMQ、Kafka、RocketMq等;使用方法也不一样,但是 他们的本质流程是一样 ,都有 发布/订阅、消费组以及消息分区 这三个核心概念。
所以SpringCloud就实现了 一套轻量级的消息驱动的微服务框架 ;通过使用 Spring Cloud Stream,可以 忽略消息中间件的差异 ,有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有 更多的精力关注于核心业务逻辑 的处理。 老顾先带着小伙伴们了解几个概念,这样会更方便理解。
Spring Messaging是Spring Framework中的一个模块,其作用就是 统一消息的编程模型 。
Spring Messaging在消息模型的基础上衍生出了其它的一些功能,如:
1、消息接收参数及返回值处理:消息接收参数处理器 HandlerMethodArgumentResolver 配合@Header, @Payload等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler 配合@SendTo注解使用。
2、消息体 内容转换器MessageConverter ;
3、统一抽象的消息发送模板 AbstractMessageSendingTemplate ;
4、消息通道拦截器 ChannelInterceptor ;
Spring Integration是一个功能强大的 EIP(Enterprise Intergration Patterns,即企业集成模式) 。
是对 Spring Messaging的扩展 ,它提出了不少新的概念,包括 消息的路由 MessageRoute、 消息的分发 MessageDispatcher、 消息的过滤 Filter、 消息的转换 Transformer、 消息的聚合 Aggregator、 消息的分割 Splitter等等。
总结一句话就是对 消息消费时进行额外的处理 。
我们来看一个例子
1、步骤一先创建一个 可订阅 的 消息通道messageChannel
2、定义一个消息 消费者messagehandler ,去 消费通道里面的消息 ;用了lammda表达式实现了,简单的 输出一句话
3、步骤三发送一个消息
消息最终被消息 通道里的 MessageHandler 所消费 ,最后控制台打印出:
接收到: 第一条消息内容 复制代码
我们再进入DirectChannel, 内部有一个对象UnicastingDispatcher , 这个是消息分发器 ,会分发到对应的 消息通道MessageChannel 中;
UnicastingDispatcher 是个 单播的分发器 , 只能选择一个消息通道 。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。
。控制台打印出:
。控制台打印:
SpringCloud Stream(以下简称SCS)在 Spring Integration 的基础上进行了封装 ,提出了 Binder, Binding , @EnableBinding, @StreamListener 等概念 。另外SCS也整合了其他模块
1、与 Spring Boot Actuator整合 , 提供了/bindings, /channelsendpoint
2、与Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties等外部化配置类
3、SCS增强了 消息发送失败的和消费失败情况下的处理逻辑 等功能
SCS 是 Spring Integration 的加强 ,同时 与 Spring Boot 体系 进行了融合,也是 Spring Cloud Bus 的基础。 它屏蔽了底层消息中间件的实现细节 ,希望以 统一的一套API 来进行消息的发送/消费, 底层消息中间件的实现细节由各消息中间件的Binder 完成 。
Binder是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们 分别用于构造生产者和消费者 。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了RocketMQ Binder。
,用于消息的消费和生产。
Binder绑定器是Spring Cloud Stream中一个非常重要的概念。 在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候 ,由于各消息中间件构建的初衷不同, 它们的实现细节上会有较大的差异性 ,这使得我们实现的消息交互逻辑就会非常笨重,因为 对具体的中间件实现细节有太重的依赖 ,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。 通过定义绑定器作为中间层 ,完美地实现了 应用程序与消息中间件细节之间的隔离 。通过向应用程序 暴露统一的Channel通道 ,使得应用程序 不需要再考虑各种不同的消息中间件实现 。当我们需要 升级消息中间件 ,或是更换其他消息中间件产品时,我们要做的就是 更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑 。
在Spring Cloud Stream中的消息通信方式 遵循了发布-订阅模式 ,当一条消息被投递到消息中间件之后, 它会通过共享的Topic主题进行广播 ,消息 消费者在订阅的主题中收到它并触发自身的业务逻辑处理 。这里所提到的 Topic主题是Spring Cloud Stream中的一个抽象概念 ,用来代表发布共享消息给消费者的地方。
在不同的消息中间件中,Topic可能对应着不同的概念,比如:在 RabbitMQ中的它对应了Exchange 、而在 Kakfa中则对应了Kafka中的Topic 。
虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松的进行扩展, 但是这些扩展都是针对不同的应用实例而言的 ,在现实的微服务架构中, 我们每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例 。很多情况下, 消息生产者发送消息给某个具体微服务时,只希望被消费一次 ,按照上面我们启动两个应用的例子, 虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况 。为了解决这个问题, 在Spring Cloud Stream中提供了消费组的概念 。 如果在同一个主题上的应用需要启动多个实例的时候, 我们可以通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名 ,这样这个应用的多个实例在接收到消息的时候, 只会有一个成员真正的收到消息并进行处理 。如下图所示,我们为Service-A和Service-B分别启动了两个实例,并且 根据服务名进行了分组 ,这样当消息进入主题之后, Group-A和Group-B都会收到消息的副本,但是在两个组中都只会有一个实例对其进行消费。
通过引入消费组的概念,我们已经能够在多实例的情况下, 保障每个消息只被组内一个实例进行消费 。通过上面对消费组参数设置后的实验,我们可以观察到, 消费组并无法控制消息具体被哪个实例消费 。也就是说, 对于同一条消息,它多次到达之后可能是由不同的实例进行消费的 。但是对于 一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理 ,比如: 一些用于监控服务 ,为了 统计某段时间内消息生产者发送的报告内容 ,监控服务需要在 自身内容聚合这些数据 ,那么消息生产者可以为消息增加一个固有的特征ID来进行分区, 使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果 ,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。 分区概念的引入就是为了解决这样的问题 :当生产者将消息数据发送给多个消费者实例时, 保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理 。
Spring Cloud Stream 为分区提供了通用的抽象实现 ,用来在消息中间件的 上层实现分区 处理, 所以它对于消息中间件自身是否实现了消息分区并不关心 ,这使得Spring Cloud Stream为 不具备分区功能的消息中间件也增加了分区功能扩展 。