转载

Spring 5与Spring cloud的响应式编程之旅

全新的Spring Cloud Finchley GA版本是Reactive(响应式/反应式)微服务之旅的一个重要里程碑。下面是Spring的Josh Long有关Reactive Spring Cloud介绍文章:

Spring框架5.0发布于2017年9月,这是首个引入新的Reactive编程支持的版本,它建立在Pivotal Reactor项目的基础上,我们的响应流(reactive stream)是兼容响应运行(reactive runtime)的,Spring Framework 5还包含大量新功能,这里不一一列出,这里只专注于响应式支持的功能。

什么是响应式编程(Reactive Programming)?Spring为什么要和这有关系?嗯,当你构建网络服务时,这就很重要啦。

简而言之,Spring中服务集成的底层基础设施已经更新,可以完全接受响应式编程。那么,什么是响应式编程?响应式编程又称反应式式编程,当你开始通过网络传输更多数据时,比如更频繁的REST API​​调用等会使得IO缓冲区饱和,IO会发生堵塞,产生时间延迟。

IO本身不是问题,传统IO的使用却是阻塞的 - 线程必须等待InputStream生成新的bytes。(通常循环执行read()读取放入byte缓冲区)。当线程发生等待时,这个线程就不能做其他任何事情。线程很昂贵!

想想如何使用Java或任何其他具有相同线程处理方法的平台实现传统服务器?如果Web服务器tomcat的线程池最大有100个线程,当第101个请求到达时,那么tomcat在完成处理现有100个请求之前,将没有多余线程提供第101个请求的处理。当然如果第101个请求到来之前其他请求已经处理完成(并且释放占有的线程),那就太好了!可能就不需要响应式编程了。如果在新请求到达之前更快地释放线程,并且在这些线程中花费的时间主要是用于输入/输出,那么就不需要响应式编程。

当使用微服务时,遭遇大数据以及长期会话(例如在websockets和服务器发送的事件以及任何其他长期维持服务器状态)等情况时,就会遭遇到IO堵塞了。

这种线程与IO的耦合其实是不必要的。几十年来,操作系统一直支持“后台管理”IO,并在前台应该参与时通知前台。实际上,Java 1.4(从2000年代早期开始)就支持NIO(Channels)了,它为我们提供了一种异步IO机制。

在这个世界中,有专门管理IO的线程并在应该需要时回调用代码。如果IO这里存在延迟,那么该线程可以自由移动并处理其他请求。它没有被IO的延迟堵塞。你不需要从编写代码从InputStream读取字节内容,字节内容会被异步推送给你。由此你高效率地反转了与数据源的交互方式。

许多项目比如来自@ NetflixOSS的RxJava、来自@Pivotal的@ProjectReactor、来自Eclipse的@vertx_project和来自@lightbend的@akkateam,都在寻求提供支持这种新异步的响应式编程模型。它们都存在共同点,这些共同点都被编入了Reactive Streams规范中了,这些项目都支持这个规范。

Reactive Streams规范支持发布者向订阅者发布内容。订阅者调用方法onNextIT来消费使用这些被发布的内容。订阅者订阅时,会给出一个Subscription对象,它用来表示可以处理多少条记录。最后一位 - 能够准确指定订户准备一次处理多少记录的能力 - 也就是流量控制,发布者因此不会在流量上压垮订阅者,提升了流处理的稳定性。在响应式编程的背景下,流量控制也称为背压backpressure。

有一个接口Processor,它只是一个桥梁; 它实现了发布者和订阅者两个。Project Reactor支持两种Publisher特殊化:

1. Flux: 发送0-N项内容。

2. Mono:只发送单项内容,或者没有。

这是对IO使用方式的基本思考,因此需要在上面的每一层集成入这种新的思考方式,包括 数据访问层、安全层、Spring Boot和微服务层。

Spring框架5.0还包括一个名为Spring WebFlux的全新的响应式Web模块(甚至支持Netty项目),它甚至提供新的函数性响应端点支持。

Spring WebFlux建立在Reactive Streams规范的基础上,因此可以与任何其他支持库互操作。使用响应式Spring Webflux可以和Lightbend的Akka Streams(以及Scala)进行交互。

新的Spring WebFlux组件模型首先是响应式和异步的。它支持如websockets和服务器发送事件等异步,使用方式与传统上处理同步情况的方式相同。想要在几纳秒内发送一条包含10条记录的简短JSON数据?那就用一个Publisher来做!

新增模块支持响应式编程

为支持响应式编程,新版本新增了一个名为新的响应式HTTP客户端WebClient。

Spring Data Kay支持通过模板和存储库对具有异步IO支持的数据访问技术进行响应性数据访问。比如下面这个代码可以实现Reactive Spring Data MongoDB的使用:

interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> {

		Flux<Reservation> findByEmail(String email);
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
class Reservation {
		@Id
		private String id;
		private String email;
}

Spring Security 5支持对传统用例(如下所示)和OAuth的响应式身份验证和授权:

  @Bean
  MapReactiveUserDetailsService authentication() {
    // don't do this! this is a hardcoded username and password and it
    // would literally pain Spring Security lead @rob_winch to see this!
    //
    return new MapReactiveUserDetailsService(
      User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build());
  }

  @Bean
  SecurityWebFilterChain authorization(ServerHttpSecurity security) {
  //@formatter:off
  return security
  .csrf().disable()
  .httpBasic()
  .and()
  .authorizeExchange()
    .pathMatchers("/proxy").authenticated()
    .anyExchange().permitAll()
  .and()
  .build();
  //@formatter:on
  }

无论选择使用Spring WebFlux还是Spring MVC,Spring Boot 2将所有这些结合在一起:构建REST端点;使用Actuator;管理安全性以及其他等 。

从代码库更改的角度来看,这也意味着Spring Cloud团队需要进行多方面落地,这使得这个版本变得非常重要。

响应式编程与现有功能对接

新版本将响应式编程与现有功能进行了无缝对接:服务注册,发现,安全性,CDC(T)和测试,消息传递,微代理支持,断路器等等。我们来看一些例子。

您可以使用新的响应式模块WebClient,它支持之前Spring Cloud的DiscoveryClient支持的任何服务注册功能(Netflix Eureka,Hashicorp Consul,Apache Zookeeper,Cloud Foundry等)。

@Bean
WebClient client(LoadBalancerExchangeFilterFunction eff) {
  return WebClient.builder().filter(eff).build();
}

这样你就使用这个响应式服务注册表WebClient了。在以下示例中,reservation-service是存在服务注册表中已经被注册的服务,不是实际的主机名。

Publisher<String> emails = client
	.get()
	.uri("http://reservation-service/reservations")
	.retrieve()
	.bodyToFlux(Reservation.class)
	.map(Reservation::getEmail);

你可以使用Spring Cloud Stream中的响应功能来分别消费Kafka或RabbitMQ中的主题或队列消息,

@Configuration  
@EnableBinding(Sink.class)
public class MyStreamListener {

  @StreamListener
  public void incoming (@Input(Sink.INPUT) Flux<String> names ) {
    names
     .map ( x-> new Reservation( null, x))
     .flatMap ( this.reservationRepository::save )
     .subscribe( x -> log.info( "saved " + x.toString()));
   }
 }
 

你可以使用响应式Publisher对象来通过Hystrix断路器保护和隔离可能发生错误的服务调用。

在以下示例中,我使用响应WebClient调用HTTP可能会失败,如果失败了,我们希望能够提供对一个回退Publisher用来返回结果,这样我的代码不会抛出任何异常。它优雅地降级了。那个断路器很聪明,它有自己的状态监控,如果连续多次尝试调用失败,断路器最终将直接切换到那个回退Publisher。如果下游服务重新联机(如果使用Cloud Foundry将会重新启动),那么它最终会将重新注册到注册表,注册表将发出心跳事件,并且心跳事件将把注册表中服务的本地视图删除,客户端将看到注册表中有新的实例,它将重置断路器,关闭它,并让下一个呼叫成功通过。

Publisher<String> emails = client
  .get()
  .uri("http://reservation-service/reservations")
  .retrieve()
  .bodyToFlux(Reservation.class)
  .map(Reservation::getEmail);

Publisher<String> fallback = HystrixCommands
  .from( emails )
  .eager()
  .commandName("emails")
  .fallback ( Flux.just ("EEK!") )
  .build();

响应式新篇章

Spring开启响应式编程的新开端,首先从这种新开端受益的是两个新项目:Spring Cloud Gateway和Spring Cloud Function。

Spring Cloud Gateway是我们全新的响应式API网关。它建立在Spring的响应支持之上。它的功能是路由客户端请求到下游服务。这其实响应应式编程的一个完美用例(需求)。

以下是使用Spring Cloud Gateway将请求代理:9999/proxy到服务(通过服务注册表解析和负载平衡)和速率受限的示例。(注意:此配置可以存在于Spring Cloud Config Server中的(可刷新的)配置中,或者你可以创建的任何源Flux<Route>。)

此示例将每个经过身份验证的用户限制为每秒100个请求。您不需要通过Spring Security来协助网关做这件事,但是配置中已经暗暗地这样做了:

@Bean
RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) {
  return rlb
    .routes()
    .route( spec ->
      spec
       .path("/rl")
       .flters( fs -> fs
         .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() ))
         .setPath("/reservations")
       )
       .uri("lb://reservation-service/")
    )
    .build();
}


@Bean // 100 reqs per second, burstable to 150
RedisRateLimiter redisRateLimiter (){
  return new RedisRateLimiter(100, 150);
}

函数即服务

Spring Cloud Function是我们新的 function-as-a-service(函数即服务)的抽象。它将plain-'-functions函数调整为不同的“函数即服务”运行时所需的类型。它可用于AWS Lambda,Microsoft Azure,当然还有我们自己的Project Riff,Project Riff是Apache 2许可的基于Kubernetes的多语言“函数即服务”运行环境。

使用起来可能不容易!您需要创建java.util.function.Function<I,O>实例。包括I和O,在这种情况下,可能是Publisher<X>:

package com.example.uppercase;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;

@SpringBootApplication
public class UppercaseApplication {

		@Bean
		Function<Flux<String>, Flux<String>> uppercase() {
				return incoming -> incoming.map(String::toUpperCase);
		}

		public static void main(String[] args) {
				SpringApplication.run(UppercaseApplication.class, args);
		}
}

正如你现在所希望的那样,响应式编程已经在Spring中真正实现了!Spring Cloud是最终全面支持的响应性编程的项目。

[该贴被admin于2018-07-04 20:10修改过]

原文  https://www.jdon.com/49561
正文到此结束
Loading...