转载

Spring Cloud Stream 基础应用实战

点击上方" 程序员历小冰 ",选择“置顶或者星标”

你的关注意义重大!

本文摘自笔者出版的书籍《Spring Cloud 微服务架构进阶》

SpringCloudStream 应用模型下图所示。 Spring Cloud Stream由一个中间件中立的核组成。 应用通过Spring Cloud Stream插入的input和output通道与外界交流。 通道通过指定中间件的Binder实现与外部代理连接。

业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。

Spring Cloud Stream 基础应用实战

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

目前只提供了RabbitMQ和Kafka的Binder实现

小节主要讲述 SpringCloudStream 的编程模型。 SpringCloudStream 提供了一系列的预先定义的注解来声明input和output channel。

你可以通过给一个应用的配置类(configuration class)添加 @EnableBinding 注解来将一个 Spring 应用转变成 SpringCloudStream 应用。 @EnableBinding 注解本身[带了] @Configuration 元注解并且触发 SpringCloudStream 框架的配置。

@EnableBinding 注解可以[携带]具有提供可绑定组件函数的接口类作为参数(比如说消息信道)。 @EnableBinding 注解只能使用在你的 Configuration 类上,你可以尽可能多的提供你需要的接口作为该注解的参数,比如说 @EnableBinding(value={Order.class,Payment.class}OrderPayment 接口都可以声明 @Input@Output Channel。

SpringCloudStream 应用中,一个接口可以通过 @Input @Output 函数来声明随意数目的input和output channels。

使用这个接口当作 @EnableBinding 的参数可以触发 SpringCloudStream 框架创建三个信道,名字分别为 orders , hotDrinkscoldDrinks

使用 @Input @Output 注解,你可以给每个信道一个自定义的名称,就像下面这个例子一样。

在这个例子,信道名称就是 inboundOrders

SpringCloudStream 提供了预先定义的三中接口来定义input channel和output channel。

Source 用来声明输出型channel。

Sink 用来声明输入型channel。

Processor 用来声明既可以输出也可以输入型的channel。

对于任何一个bound interface, SpringCloudStream 将会生成一个该接口的bean。调用这些bean的被 @Input@Output 修饰的方法可以返回相应的bound channel。在下面例子中,当调用 SendingBean 对象的 hello 方法时会给output channel发送一个信息。它调用注入的 Source bean来获取目标target。

Bound channels也可以直接被注入。

如果在声明channel时自定义了channel的名称,那么这个名称将会替换方法的名称,在注入时发挥作用。比如下面这个例子:

这个channel可以通过下面这个方式来进行注入。

你可以通过使用 SpringIntegration 注解或者 SpringCloudStream @StreamListener 注解来编写 SpringCloudStream 应用。 @StreamListener 注解基于 SpringMessaging 注解来建模(比如说 @MessageMapping @JmsListener @RabbitListener )。 除此之外,该注解添加了content类型管理和类型强制特性。

Spring Integration支持

因为 SpringCloudStream 是基于 SpringIntegration ,Stream完全继承了Integration的架构和基础组件。比如说,你可以把 Source 的output channel绑定到 MessageSource

或者你也可以通过transformer来使用一个processor channel。

使用@StreamListener

作为Spring Integration的补充, SpringCloudStream 提供了它自己的 @StreamListener 注解,该注解基于Spring Messaging注解(比如说 @MessageMapping@JmsListener@RabbitListener )。 @StreamListener 注解提供了处理inbound message的更加简便的模型。 SpringCloudStream 提供了可扩展的 MessageConverter 机制来处理数据转化,并将转化后的数据分配给相应的被 @StreamListener 修饰的方法。下面这个例子展示了一个处理外部 Vote 事件的应用。

@StreamListener 和Spring Integration的 @ServiceActivator 的区别可以在下面这个例子中展现。一个inbound的 Message 对象有一个string类型的payload和一个值为 application/jsoncontentType 。在使用 @StreamListener 时, MessageConverter 原理会使用 contentType 来解析 String payload并赋值给 Vote 对象。 就像其他的Spring Messaging方法一样,被 @StreamListener 注解的方法的参数可以使用 @Payload@Headers@Header 进行注解。 对于会返回数据的方法,你必须使用 @SendTo 注解来指定该返回数据发送到哪个output channel。

使用@StreamListener来分配消息

SpringCloudStream 支持将消息分配到多个 @StreamListener 修饰的方法。 为了能使用该分配机制,一个方法必须首先满足下列条件:

  • 方法不能有返回值。

  • 方法必须是单独一类消息的处理函数(响应式编程的方法并不支持)

使用注解的 condition 属性中的SpEL表达式可以首先上述的消息分配机制。所有匹配了该 condition 的方法都会在同一个线程中被调用,但是方法调用相对顺序不能保证。 下面就是一个 @StreamListener 分配消息的例子。在这个例子中,所有携带值为 footype 头部的消息都会被分配给 receiveFoo 方法,所有携带值为 bartype 头部的消息都会被分配给 receiveBar 方法。

代码实例

Stream 可以声明自定义的channel,通过使用 @Input@Output 来声明channel的名称和方向。 @Input 是声明输入方向的channel,而 @Output 是声明输出方向的channel。

@EnableBinding 注解用于声明某个类所需要绑定的channel实例,其 value 属性值是需要绑定的channel的定义类。

使用者首先需要使用@EnableBinding注解实现对消息通道的绑定,该注解中还传入了一个参数 MessageInput.classMessageInput 是一个接口,该接口是对输入消息通道绑定的定义。然后在 InputController 类中定义了 listener 方法,并在该方法上添加了 @StreamListener 注解,该注解表示该方法为消息中间件上数据流的事件监听器, MessageInput.INPUT_MESSAGE 参数表示这是input消息通道上的监听处理器。

而输出型channel只需要使用 @Autowired 注解进行自动注入。然后就可以使用实例的函数进行消息的发送。

stream 通过 application.yml 进行配置,比如说上述代码中 MessageInputMessageOutput 接口分别使用 @Input@Output 定义了输入和输出的消息通道的绑定信息。一个是 input ,一个是 output 。配置文件中的bindings字段就对应上述的绑定信息,比如说下面的配置文件中,bindings字段下一共有两个binding配置,分别是input和output,与代码中的名称一致。 content-type 表明binding接受或者发送消息的类型, binder 则声明该binding所对应的绑定器。

binders 字段声明了项目中所有的绑定器信息,由于 stream 支持多种消息队列,所以将与消息队列交互的实现抽象成 Binder ,不同的 Binder 对应不同的消息队列。 type 就是指明绑定器的类型,比如说rabbit或者kafka。 environment 中是配置了与绑定器交互的消息队列的基本信息,比如说网络信息,认证信息,分区信息等。

-关注我

Spring Cloud Stream 基础应用实战

推荐阅读

Guava的布隆过滤器

超详细的Guava RateLimiter限流原理解析

原文  http://mp.weixin.qq.com/s?__biz=Mzg2NjE5NDQyOA==&mid=2247483862&idx=1&sn=bb5d008265dc9de5078c3ced56fe0ae4
正文到此结束
Loading...