全新的Spring Cloud Finchley GA版本是Reactive(响应式/反应式)微服务之旅的一个重要里程碑。下面是Spring的Josh Long有关Reactive Spring Cloud介绍文章:
Spring框架5.0发布于2017年9月,这是首个引入新的Reactive编程支持的版本,它建立在Pivotal Reactor项目的基础上,我们的响应流(reactive stream)是兼容响应运行(reactive runtime)的,Spring Framework 5还包含大量新功能,这里不一一列出,这里只专注于响应式支持的功能。
什么是响应式编程(Reactive Programming)?Spring为什么要和这有关系?嗯,当你构建网络服务时,这就很重要啦。
简而言之,Spring中服务集成的底层基础设施已经更新,可以完全接受响应式编程。那么,什么是响应式编程?响应式编程又称反应式式编程,当你开始通过网络传输更多数据时,比如更频繁的REST API调用等会使得IO缓冲区饱和,IO会发生堵塞,产生时间延迟。
IO本身不是问题,传统IO的使用却是阻塞的 - 线程必须等待InputStream生成新的bytes。(通常循环执行read()读取放入byte缓冲区)。当线程发生等待时,这个线程就不能做其他任何事情。线程很昂贵!
想想如何使用Java或任何其他具有相同线程处理方法的平台实现传统服务器?如果Web服务器tomcat的线程池最大有100个线程,当第101个请求到达时,那么tomcat在完成处理现有100个请求之前,将没有多余线程提供第101个请求的处理。当然如果第101个请求到来之前其他请求已经处理完成(并且释放占有的线程),那就太好了!可能就不需要响应式编程了。如果在新请求到达之前更快地释放线程,并且在这些线程中花费的时间主要是用于输入/输出,那么就不需要响应式编程。
当使用微服务时,遭遇大数据以及长期会话(例如在websockets和服务器发送的事件以及任何其他长期维持服务器状态)等情况时,就会遭遇到IO堵塞了。
这种线程与IO的耦合其实是不必要的。几十年来,操作系统一直支持“后台管理”IO,并在前台应该参与时通知前台。实际上,Java 1.4(从2000年代早期开始)就支持NIO(Channels)了,它为我们提供了一种异步IO机制。
在这个世界中,有专门管理IO的线程并在应该需要时回调用代码。如果IO这里存在延迟,那么该线程可以自由移动并处理其他请求。它没有被IO的延迟堵塞。你不需要从编写代码从InputStream读取字节内容,字节内容会被异步推送给你。由此你高效率地反转了与数据源的交互方式。
许多项目比如来自@ NetflixOSS的RxJava、来自@Pivotal的@ProjectReactor、来自Eclipse的@vertx_project和来自@lightbend的@akkateam,都在寻求提供支持这种新异步的响应式编程模型。它们都存在共同点,这些共同点都被编入了Reactive Streams规范中了,这些项目都支持这个规范。
Reactive Streams规范支持发布者向订阅者发布内容。订阅者调用方法onNextIT来消费使用这些被发布的内容。订阅者订阅时,会给出一个Subscription对象,它用来表示可以处理多少条记录。最后一位 - 能够准确指定订户准备一次处理多少记录的能力 - 也就是流量控制,发布者因此不会在流量上压垮订阅者,提升了流处理的稳定性。在响应式编程的背景下,流量控制也称为背压backpressure。
有一个接口Processor,它只是一个桥梁; 它实现了发布者和订阅者两个。Project Reactor支持两种Publisher特殊化:
1. Flux: 发送0-N项内容。
2. Mono:只发送单项内容,或者没有。
这是对IO使用方式的基本思考,因此需要在上面的每一层集成入这种新的思考方式,包括 数据访问层、安全层、Spring Boot和微服务层。
Spring框架5.0还包括一个名为Spring WebFlux的全新的响应式Web模块(甚至支持Netty项目),它甚至提供新的函数性响应端点支持。
Spring WebFlux建立在Reactive Streams规范的基础上,因此可以与任何其他支持库互操作。使用响应式Spring Webflux可以和Lightbend的Akka Streams(以及Scala)进行交互。
新的Spring WebFlux组件模型首先是响应式和异步的。它支持如websockets和服务器发送事件等异步,使用方式与传统上处理同步情况的方式相同。想要在几纳秒内发送一条包含10条记录的简短JSON数据?那就用一个Publisher来做!
新增模块支持响应式编程
为支持响应式编程,新版本新增了一个名为新的响应式HTTP客户端WebClient。
Spring Data Kay支持通过模板和存储库对具有异步IO支持的数据访问技术进行响应性数据访问。比如下面这个代码可以实现Reactive Spring Data MongoDB的使用:
interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> { Flux<Reservation> findByEmail(String email); } @Document @AllArgsConstructor @NoArgsConstructor @Data class Reservation { @Id private String id; private String email; }
Spring Security 5支持对传统用例(如下所示)和OAuth的响应式身份验证和授权:
@Bean MapReactiveUserDetailsService authentication() { // don't do this! this is a hardcoded username and password and it // would literally pain Spring Security lead @rob_winch to see this! // return new MapReactiveUserDetailsService( User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build()); } @Bean SecurityWebFilterChain authorization(ServerHttpSecurity security) { //@formatter:off return security .csrf().disable() .httpBasic() .and() .authorizeExchange() .pathMatchers("/proxy").authenticated() .anyExchange().permitAll() .and() .build(); //@formatter:on }
无论选择使用Spring WebFlux还是Spring MVC,Spring Boot 2将所有这些结合在一起:构建REST端点;使用Actuator;管理安全性以及其他等 。
从代码库更改的角度来看,这也意味着Spring Cloud团队需要进行多方面落地,这使得这个版本变得非常重要。
响应式编程与现有功能对接
新版本将响应式编程与现有功能进行了无缝对接:服务注册,发现,安全性,CDC(T)和测试,消息传递,微代理支持,断路器等等。我们来看一些例子。
您可以使用新的响应式模块WebClient,它支持之前Spring Cloud的DiscoveryClient支持的任何服务注册功能(Netflix Eureka,Hashicorp Consul,Apache Zookeeper,Cloud Foundry等)。
@Bean WebClient client(LoadBalancerExchangeFilterFunction eff) { return WebClient.builder().filter(eff).build(); }
这样你就使用这个响应式服务注册表WebClient了。在以下示例中,reservation-service是存在服务注册表中已经被注册的服务,不是实际的主机名。
Publisher<String> emails = client .get() .uri("http://reservation-service/reservations") .retrieve() .bodyToFlux(Reservation.class) .map(Reservation::getEmail);
你可以使用Spring Cloud Stream中的响应功能来分别消费Kafka或RabbitMQ中的主题或队列消息,
@Configuration @EnableBinding(Sink.class) public class MyStreamListener { @StreamListener public void incoming (@Input(Sink.INPUT) Flux<String> names ) { names .map ( x-> new Reservation( null, x)) .flatMap ( this.reservationRepository::save ) .subscribe( x -> log.info( "saved " + x.toString())); } }
你可以使用响应式Publisher对象来通过Hystrix断路器保护和隔离可能发生错误的服务调用。
在以下示例中,我使用响应WebClient调用HTTP可能会失败,如果失败了,我们希望能够提供对一个回退Publisher用来返回结果,这样我的代码不会抛出任何异常。它优雅地降级了。那个断路器很聪明,它有自己的状态监控,如果连续多次尝试调用失败,断路器最终将直接切换到那个回退Publisher。如果下游服务重新联机(如果使用Cloud Foundry将会重新启动),那么它最终会将重新注册到注册表,注册表将发出心跳事件,并且心跳事件将把注册表中服务的本地视图删除,客户端将看到注册表中有新的实例,它将重置断路器,关闭它,并让下一个呼叫成功通过。
Publisher<String> emails = client .get() .uri("http://reservation-service/reservations") .retrieve() .bodyToFlux(Reservation.class) .map(Reservation::getEmail); Publisher<String> fallback = HystrixCommands .from( emails ) .eager() .commandName("emails") .fallback ( Flux.just ("EEK!") ) .build();
响应式新篇章
Spring开启响应式编程的新开端,首先从这种新开端受益的是两个新项目:Spring Cloud Gateway和Spring Cloud Function。
Spring Cloud Gateway是我们全新的响应式API网关。它建立在Spring的响应支持之上。它的功能是路由客户端请求到下游服务。这其实响应应式编程的一个完美用例(需求)。
以下是使用Spring Cloud Gateway将请求代理:9999/proxy到服务(通过服务注册表解析和负载平衡)和速率受限的示例。(注意:此配置可以存在于Spring Cloud Config Server中的(可刷新的)配置中,或者你可以创建的任何源Flux<Route>。)
此示例将每个经过身份验证的用户限制为每秒100个请求。您不需要通过Spring Security来协助网关做这件事,但是配置中已经暗暗地这样做了:
@Bean RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) { return rlb .routes() .route( spec -> spec .path("/rl") .flters( fs -> fs .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() )) .setPath("/reservations") ) .uri("lb://reservation-service/") ) .build(); } @Bean // 100 reqs per second, burstable to 150 RedisRateLimiter redisRateLimiter (){ return new RedisRateLimiter(100, 150); }
函数即服务
Spring Cloud Function是我们新的 function-as-a-service(函数即服务)的抽象。它将plain-'-functions函数调整为不同的“函数即服务”运行时所需的类型。它可用于AWS Lambda,Microsoft Azure,当然还有我们自己的Project Riff,Project Riff是Apache 2许可的基于Kubernetes的多语言“函数即服务”运行环境。
使用起来可能不容易!您需要创建java.util.function.Function<I,O>实例。包括I和O,在这种情况下,可能是Publisher<X>:
package com.example.uppercase; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import reactor.core.publisher.Flux; import java.util.function.Function; @SpringBootApplication public class UppercaseApplication { @Bean Function<Flux<String>, Flux<String>> uppercase() { return incoming -> incoming.map(String::toUpperCase); } public static void main(String[] args) { SpringApplication.run(UppercaseApplication.class, args); } }
正如你现在所希望的那样,响应式编程已经在Spring中真正实现了!Spring Cloud是最终全面支持的响应性编程的项目。
[该贴被admin于2018-07-04 20:10修改过]