在本文中,我们将向您介绍 Spring Cloud Stream
,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如 RabbitMQ
、 Apache Kafka
等)连接。
Spring Cloud Stream
构建在现有Spring框架(如 Spring Messaging
和 Spring Integration
)之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的 message broker
紧密耦合。此外,有时对某些用例进行扩展是困难的。
Spring Cloud Stream
背后的想法是一个非常典型的 Spring Boot
概念—— 抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
。这意味着您可以通过更改依赖项和配置文件来更改 message broker
。可以在这里找到目前已经支持的各种消息代理。
本文将使用 RabbitMQ
作为 message broker
。在此之前,让我们了解一下 broker
(代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。
在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个 Service-A
内部调用 Service-B
和 Service-C
来完成一个请求:
是的,还会有其他组件,比如 Spring Cloud Eureka
、 Spring Cloud Zuul
等等,但我们还是专注关心这类架构的特有问题。
假设由于某种原因 Service-B
需要更多的时间来响应。也许它正在执行 I/O操作
或长时间的 DB事务
,或者进一步调用其它导致 Service-B
变得更慢的服务,这些都使其无法更具效率。
现在,我们可以启动更多的 Service-B
实例来解决这个问题,这样很好,但是 Service-A
实际上是响应很快的,它需要等待 Service-B
的响应来进一步处理。这将导致 Service-A
无法接收更多的请求,这意味着我们还必须启动 Service-A
的多个实例。
另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着 Service-A
不直接通过 HTTP
调用 Service-B
或 Service-C
,而是将请求或事件发布给 message broker
(消息代理)。 Service-B
和 Service-C
将成为 message broker
(消息代理)上此事件的订阅者。
与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:
Service-A
不需要了解 Service-B
和 Service-C
。它只需要连接到 message broker
并发布事件。事件如何进一步编排取决于代理设置。通过这种方式, Service-A
可以独立地运行,这是微服务的核心概念之一。 高级消息队列协议(AMQP)
是 RabbitMQ
用于消息传递的协议。虽然 RabbitMQ
支持其他一些协议,但是 AMQP
由于兼容性和它提供的大量特性而更受欢迎。
因此发布者将消息发布到 RabbitMQ
中称为 Exchange
(交换器)。 Exchange
(交换器)接收消息并将其路由到一个或多个 Queues
(队列)。路由算法依赖于 Exchange
(交换器)类型和 routing
(路由)key/header(与消息一起传递)。将 Exchange
(交换器)连接到 Queues
(队列)的这些规则称为 bindings
(绑定)。
绑定可以有4种类型:
routing key
(路由键)将 Exchange
(交换器)类型直接路由到特定的 Queues
(队列)。 Exchange
(交换器)中的所有 Queues
(队列)。 routing key
(路由键)匹配将消息路由到(0、1或更多)的 Queues
(队列)。 Topic
(主题)交换类型,但是它是基 routing header
(路由头)而不是 routing key
(路由键)来路由的。 来源:www.cloudamqp.com/
通过 Exchange
(交换器)和 Queues
(队列)发布和消费消息的整个过程是通过一个 Channel
(通道)完成的。
有关路由的详细信息,请访问此链接。
我们可以从这里下载并安装基于我们的操作系统的二进制文件。
然而,在本文中,我们将使用 cloudamqp.com
提供的免费云安装。只需注册服务并登录即可。
在主仪表板上单击 创建新实例
:
然后给你的实例起个名字,然后进入下一步:
然后选择一个可用区:
最后,查看实例信息,点击右下角的 创建实例
:
就是这样。现在在云中运行了一个 RabbitMQ
实例。有关实例的更多信息,请转到您的仪表板并单击 新创建的实例
:
我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:
我们将在Spring应用程序中使用 AMQP URL
连接到这个实例,所以请在某个地方记下它。
您还可以通过单击左上角的 RabbitMQ manager
来查看管理器控制台。这将采用它来管理的您的 RabbitMQ
实例。
现在我们的设置已经准备好了,让我们创建我们的服务:
RabbitMQ
使用 Spring Initializr
创建一个脚手架项目。这将是我们的 producer
项目,我们将使用 REST
端点发布消息。
选择您喜欢的 Spring Boot
版本,添加 Web
和 Cloud Stream
依赖项,生成 Maven
项目:
请注意 cloud-stream
依赖项。这也需要像 RabbitMQ
、 Kafka
等绑定器依赖项才能工作。
由于我们将使用 RabbitMQ
,添加以下 Maven
依赖项:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> 复制代码
或者,我们也可以将两者结合起来使用 spring-cloud-starter-stream-rabbit
:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> 复制代码
使用同样的方法,创建消费者项目,但仅使用 spring-cloud-starter-stream-rabbit
依赖项。
如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个 HelloBinding
接口,其中包含我们的消息机制 greetingChannel
:
interface HelloBinding { @Output("greetingChannel") MessageChannel greeting(); } 复制代码
因为这将发布消息,所以我们使用 @Output
注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个 Channel
(通道)。
现在,让我们创建一个 REST
,它将消息推送到这个 Channel
(通道)
@RestController public class ProducerController { private MessageChannel greet; public ProducerController(HelloBinding binding) { greet = binding.greeting(); } @GetMapping("/greet/{name}") public void publish(@PathVariable String name) { String greeting = "Hello, " + name + "!"; Message<String> msg = MessageBuilder.withPayload(greeting) .build(); this.greet.send(msg); } } 复制代码
上面,我们创建了一个 ProducerController
类,它有一个 MessageChannel
类型的属性 greet
。这是通过我们前面声明的方法在构造函数中初始化的。
注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。
然后,我们有一个简单的 REST
接口,它接收 PathVariable
的 name
,并使用 MessageBuilder
创建一个 String
类型的消息。最后,我们使用 MessageChannel
上的 .send()
方法来发布消息。
现在,我们将在的主类中添加 @EnableBinding
注解,传入 HelloBinding
告诉 Spring
加载。
@EnableBinding(HelloBinding.class) @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 复制代码
最后,我们必须告诉 Spring
如何连接到 RabbitMQ
(通过前面的 AMQP URL
),并将 greetingChannel
连接到一可用的消费者。
这两个都是在 application.properties
中定义的:
spring.rabbitmq.addresses=<amqp url> spring.cloud.stream.bindings.greetingChannel.destination = greetings server.port=8080 复制代码
现在,我们需要监听之前创建的通道 greetingChannel
。让我们为它创建一个绑定:
public interface HelloBinding { String GREETING = "greetingChannel"; @Input(GREETING) SubscribableChannel greeting(); } 复制代码
与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用 SubscribableChannel
和 @Input
注解连接到 greetingChannel
,消息数据将被推送这里。
现在,让我们创建处理数据的方法:
@EnableBinding(HelloBinding.class) public class HelloListener { @StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(msg); } } 复制代码
在这里,我们创建了一个 HelloListener
类,在 processHelloChannelGreeting
方法上添加 @StreamListener
注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加 @EnableBinding
启用了 HelloBinding
。
同样,我们在这里使用 @EnableBinding
,而不是主类,以便告诉我们如何使用。
看看我们的主类,我们没有任何修改:
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } } 复制代码
在 application.properties
配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外
spring.rabbitmq.addresses=<amqp url> spring.cloud.stream.bindings.greetingChannel.destination=greetings server.port=9090 复制代码
让我们同时启动生产者和消费者服务。首先,让我们通过点击端点 http://localhost:8080/greet/john
来生产消息。
在消费者日志中看到消息内容:
我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):
$ mvn spring-boot:run -Dserver.port=9091 复制代码
现在,当我们点击生产者 REST
端点生产消息时,我们看到两个消费者都收到了消息:
这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在 application.properties
中创建一个消费者组。消费者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group 复制代码
现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:
这一切也可以在 RabbitMQ
管理器控制台看到:
在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用 Spring Cloud Stream
实现它。我们使用 RabbitMQ
作为消息代理,但是我们也可以使用其他流行的代理,比 如Kafka
,只需更改配置和依赖项。
与往常一样,本文使用的示例代码可以在GitHub获得完整的 源代码 。
原文: stackabuse.com/spring-clou…
作者:Dhananjay Singh
译者:李东