转载

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

在本文中,我们将向您介绍 Spring Cloud Stream ,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如 RabbitMQApache Kafka 等)连接。

Spring Cloud Stream 构建在现有Spring框架(如 Spring MessagingSpring Integration )之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的 message broker 紧密耦合。此外,有时对某些用例进行扩展是困难的。

Spring Cloud Stream 背后的想法是一个非常典型的 Spring Boot 概念—— 抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入 。这意味着您可以通过更改依赖项和配置文件来更改 message broker 。可以在这里找到目前已经支持的各种消息代理。

本文将使用 RabbitMQ 作为 message broker 。在此之前,让我们了解一下 broker (代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。

2. 微服务中的消息

在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个 Service-A 内部调用 Service-BService-C 来完成一个请求:

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

是的,还会有其他组件,比如 Spring Cloud EurekaSpring Cloud Zuul 等等,但我们还是专注关心这类架构的特有问题。

假设由于某种原因 Service-B 需要更多的时间来响应。也许它正在执行 I/O操作 或长时间的 DB事务 ,或者进一步调用其它导致 Service-B 变得更慢的服务,这些都使其无法更具效率。

现在,我们可以启动更多的 Service-B 实例来解决这个问题,这样很好,但是 Service-A 实际上是响应很快的,它需要等待 Service-B 的响应来进一步处理。这将导致 Service-A 无法接收更多的请求,这意味着我们还必须启动 Service-A 的多个实例。

另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着 Service-A 不直接通过 HTTP 调用 Service-BService-C ,而是将请求或事件发布给 message broker (消息代理)。 Service-BService-C 将成为 message broker (消息代理)上此事件的订阅者。

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:

  • 提高可伸缩性和可靠性——现在我们知道哪些服务是整个应用程序中的真正瓶颈。
  • 鼓励松散耦合—— Service-A 不需要了解 Service-BService-C 。它只需要连接到 message broker 并发布事件。事件如何进一步编排取决于代理设置。通过这种方式, Service-A 可以独立地运行,这是微服务的核心概念之一。
  • 与遗留系统交互——通常我们不能将所有东西都移动到一个新的技术堆栈中。我们仍然必须使用遗留系统,虽然速度很慢,但是很可靠。

3. RabbitMQ

高级消息队列协议(AMQP)RabbitMQ 用于消息传递的协议。虽然 RabbitMQ 支持其他一些协议,但是 AMQP 由于兼容性和它提供的大量特性而更受欢迎。

3.1 RabbitMQ架构设计

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

因此发布者将消息发布到 RabbitMQ 中称为 Exchange (交换器)。 Exchange (交换器)接收消息并将其路由到一个或多个 Queues (队列)。路由算法依赖于 Exchange (交换器)类型和 routing (路由)key/header(与消息一起传递)。将 Exchange (交换器)连接到 Queues (队列)的这些规则称为 bindings (绑定)。

绑定可以有4种类型:

  • Direct : 它根据 routing key (路由键)将 Exchange (交换器)类型直接路由到特定的 Queues (队列)。
  • Fanout :它将消息路由到绑定 Exchange (交换器)中的所有 Queues (队列)。
  • Topic :它根据完全匹配或部分据 routing key (路由键)匹配将消息路由到(0、1或更多)的 Queues (队列)。
  • Headers :它类似于 Topic (主题)交换类型,但是它是基 routing header (路由头)而不是 routing key (路由键)来路由的。
消息驱动式微服务:Spring Cloud Stream & RabbitMQ

来源:www.cloudamqp.com/

通过 Exchange (交换器)和 Queues (队列)发布和消费消息的整个过程是通过一个 Channel (通道)完成的。

有关路由的详细信息,请访问此链接。

3.2 RabbitMQ 设置

3.2.1 安装

我们可以从这里下载并安装基于我们的操作系统的二进制文件。

然而,在本文中,我们将使用 cloudamqp.com 提供的免费云安装。只需注册服务并登录即可。

在主仪表板上单击 创建新实例 :

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

然后给你的实例起个名字,然后进入下一步:

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

然后选择一个可用区:

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

最后,查看实例信息,点击右下角的 创建实例 :

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

就是这样。现在在云中运行了一个 RabbitMQ 实例。有关实例的更多信息,请转到您的仪表板并单击 新创建的实例 :

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

我们将在Spring应用程序中使用 AMQP URL 连接到这个实例,所以请在某个地方记下它。

您还可以通过单击左上角的 RabbitMQ manager 来查看管理器控制台。这将采用它来管理的您的 RabbitMQ 实例。

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

Project 配置

现在我们的设置已经准备好了,让我们创建我们的服务:

RabbitMQ

使用 Spring Initializr 创建一个脚手架项目。这将是我们的 producer 项目,我们将使用 REST 端点发布消息。

选择您喜欢的 Spring Boot 版本,添加 WebCloud Stream 依赖项,生成 Maven 项目:

消息驱动式微服务:Spring Cloud Stream & RabbitMQ

注意:

请注意 cloud-stream 依赖项。这也需要像 RabbitMQKafka 等绑定器依赖项才能工作。

由于我们将使用 RabbitMQ ,添加以下 Maven 依赖项:

<dependency>  
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码

或者,我们也可以将两者结合起来使用 spring-cloud-starter-stream-rabbit :

<dependency>  
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
复制代码

使用同样的方法,创建消费者项目,但仅使用 spring-cloud-starter-stream-rabbit 依赖项。

4. 创建生产者

如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个 HelloBinding 接口,其中包含我们的消息机制 greetingChannel :

interface HelloBinding {

    @Output("greetingChannel")
    MessageChannel greeting();
}
复制代码

因为这将发布消息,所以我们使用 @Output 注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个 Channel (通道)。

现在,让我们创建一个 REST ,它将消息推送到这个 Channel (通道)

@RestController
public class ProducerController {

    private MessageChannel greet;

    public ProducerController(HelloBinding binding) {
        greet = binding.greeting();
    }

    @GetMapping("/greet/{name}")
    public void publish(@PathVariable String name) {
        String greeting = "Hello, " + name + "!";
        Message<String> msg = MessageBuilder.withPayload(greeting)
            .build();
        this.greet.send(msg);
    }
}
复制代码

上面,我们创建了一个 ProducerController 类,它有一个 MessageChannel 类型的属性 greet 。这是通过我们前面声明的方法在构造函数中初始化的。

注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。

然后,我们有一个简单的 REST 接口,它接收 PathVariablename ,并使用 MessageBuilder 创建一个 String 类型的消息。最后,我们使用 MessageChannel 上的 .send() 方法来发布消息。

现在,我们将在的主类中添加 @EnableBinding 注解,传入 HelloBinding 告诉 Spring 加载。

@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
复制代码
消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ

最后,我们必须告诉 Spring 如何连接到 RabbitMQ (通过前面的 AMQP URL ),并将 greetingChannel 连接到一可用的消费者。

这两个都是在 application.properties 中定义的:

spring.rabbitmq.addresses=<amqp url>

spring.cloud.stream.bindings.greetingChannel.destination = greetings

server.port=8080
复制代码

5. 创建消费者

现在,我们需要监听之前创建的通道 greetingChannel 。让我们为它创建一个绑定:

public interface HelloBinding {

    String GREETING = "greetingChannel";

    @Input(GREETING)
    SubscribableChannel greeting();
}
复制代码

与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用 SubscribableChannel@Input 注解连接到 greetingChannel ,消息数据将被推送这里。

现在,让我们创建处理数据的方法:

@EnableBinding(HelloBinding.class)
public class HelloListener {

    @StreamListener(target = HelloBinding.GREETING)
    public void processHelloChannelGreeting(String msg) {
        System.out.println(msg);
    }
}
复制代码

在这里,我们创建了一个 HelloListener 类,在 processHelloChannelGreeting 方法上添加 @StreamListener 注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加 @EnableBinding 启用了 HelloBinding

同样,我们在这里使用 @EnableBinding ,而不是主类,以便告诉我们如何使用。

看看我们的主类,我们没有任何修改:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
复制代码

application.properties 配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外

spring.rabbitmq.addresses=<amqp url>  
spring.cloud.stream.bindings.greetingChannel.destination=greetings  
server.port=9090
复制代码

6. 全部测试

让我们同时启动生产者和消费者服务。首先,让我们通过点击端点 http://localhost:8080/greet/john 来生产消息。

在消费者日志中看到消息内容:

消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ

我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):

$ mvn spring-boot:run -Dserver.port=9091
复制代码

现在,当我们点击生产者 REST 端点生产消息时,我们看到两个消费者都收到了消息:

消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ

这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在 application.properties 中创建一个消费者组。消费者的配置文件:

spring.cloud.stream.bindings.greetingChannel.group = greetings-group
复制代码

现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:

消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ

这一切也可以在 RabbitMQ 管理器控制台看到:

消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ
消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ

7. 结论

在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用 Spring Cloud Stream 实现它。我们使用 RabbitMQ 作为消息代理,但是我们也可以使用其他流行的代理,比 如Kafka ,只需更改配置和依赖项。

与往常一样,本文使用的示例代码可以在GitHub获得完整的 源代码 。

原文: stackabuse.com/spring-clou…

作者:Dhananjay Singh

译者:李东

消息驱动式微服务:Spring Cloud Stream &amp; RabbitMQ
原文  https://juejin.im/post/5d2201346fb9a07ee4638512
正文到此结束
Loading...