本文的重点是使用Java构建RESTFul API,同时受益于反应式编程模型。但与大多数关于此主题的其他文章不同,本文不会急于直接编写代码。它将指导您完成此编程范例的主干,以便您对其有充分的了解。然后使用该知识构建API。
该系列由两部分组成。第一部分介绍了反应系统和反应式编程,并清除了这些术语之间的混淆。
然后介绍了反应式编程的基础知识,并将传统的并发模型与消息/事件驱动的并发性进行了比较。
第二部分是关于使用Spring WebFlux来弄脏和构建RESTFul API,并向读者介绍第四代反应框架。
反应系统
反应式系统架构是构建响应式系统的架构风格。这是在反应性宣言中定义的,我们将简要介绍宣言中的每个关键项目,同时使用日常系统行为解释其含义。
反应系统响应迅速:系统及时响应,提供一致的服务质量。
这意味着当负载高和低时系统以一致的方式运行。结果是用户开始建立您的系统的信心,并继续与系统做生意。
反应系统具有弹性:系统在出现故障时保持响应。
这意味着系统能够隔离故障,包含故障并在必要时使用复制来缓解故障并继续为用户提供服务。
事情经常出错。但是,如果我们拥有一个具有弹性的系统,那么它就能变得如此敏捷。
反应系统具有弹性:系统在不同的工作负载下保持响应。
这意味着系统可以对负载的变化做出反应。这再次链接到响应性,因为您可以看到没有弹性和弹性就无法实现响应性。
反应系统是消息驱动的:系统依赖于异步消息传递。它将失败作为消息传递。并且它在必要时应用反压来控制消息流。
这意味着系统对事件/消息作出反应,并且仅在资源处于活动状态时消耗资源。
反应系统和反应式编程是两回事。一种是架构风格,另一种是编程范例,可用于实现反应系统的某些特征,但不是全部。
反应式编程
现在,我们对反应系统有了很好的理解。是时候让我们深入了解反应式编程概念。
反应式编程是异步编程范例的一个分支,它允许信息驱动逻辑而不是依赖于执行的线程
现在,这听起来像维基百科或一些学术研究论文。它抛出的术语似乎有些可怕。用简单的语言:
反应式编程允许应用程序基于消息/事件在任意时间发生操作,而不是执行驱动的线程。
现在您可以看到消息驱动和响应特性可以从响应式编程中受益。但并非所有,要实现所有这些目标,我们需要更广泛的工具范围。这些不属于本文的范围。
事件、事件、还是事件
不同类型的事件:
反应式编程和事件
这些类型的事件的共同因素是这些事件在任意时间发生,因此如果我们在程序中有一个执行线程,那些线程必须等待这些事件完成。
如果系统有更多客户端正在等待这些操作的结果,那么我们需要更多线程来服务器客户端。
这就是反应式编程脱颖而出的地方。它不是等待这些事件完成,而是提供类似观察器的机制,让这些事件驱动执行。
结果是处理大量这些事件的线程和资源数量减少。
我们需要了解的最重要的事实是:
反应式编程不会使应用程序更快,但它允许应用程序以更少的资源为更多客户端提供服务。
如果我们需要扩展,我们可以横向扩展(使用更少的资源),这使得它成为构建现代微服务的完美范例。这就是反应式编程如何加强反应系统的弹性特性。
反应式编程库的特点
了解反应式编程的特征是解开可能性的关键:
结合上述所有内容,我们可以很好地了解使用反应式编程原理开发的应用程序。
这些应用程序支持处理无限数量的事件,事件驱动和了解他们正在处理的环境,并可以对这些环境中的更改做出反应。
为什么它很重要,有什么好处,还不清楚?
让我们深入了解更多细节并进行讨论。
我记得有一次我和其他开发人员谈论过咖啡的反应性编程,他的问题是。
“使用它有什么好处?”
“与我们今天使用的相比,它给桌面带来了哪些好处?”
这些都是关于任何新技术的完全有效的问题。为了回答这些问题,我们需要退一步思考我们用于使用Java构建应用程序的工具。
用Java编写的Web API通常部署到像Tomcat,WebSphere等servlet容器中。这些容器使用Servlet API进行操作,这些操作提供阻塞I / O. 因此流行的框架如Spring,Spring Web MVC也继承了这种阻塞行为。
数据库操作阻止I / O调用,JPA,JDBC都以这种方式运行。
这些阻塞操作会阻止请求线程,直到该操作完成。因此,更多请求会导致更多阻塞的线程等待I / O操作完成。结果如下。
它不仅仅是I / O操作,Java并发工具的普通公民也拥有阻塞行为:
FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call() { return searcher.search(target); }}); executor.execute(future);
//Waits if necessary for the computation to complete, and then retrieves its result.String result = future.get();
阻塞I / O方法的问题在于,使用阻塞API,我们无法支持反应系统。它是执行驱动,同步的线程。这两个原因使得资源消耗很大。下一步将是找到一种方法来找到执行操作异步非阻塞方式的更有效方法。
Java 8带来了CompletableFuture,它是异步编程的真正异步非阻塞功能。但是如果你想要编写更多结果并进行流处理,代码就会变得难以阅读,并且缺乏流畅的操作API。
非阻塞方法
Java稍后引入了java.nio包,通过引入一个概念调用Selector来解决这种阻塞行为,它可以监视多个通道。
这允许单线程监视许多输入通道。以及将数据加载到ByteBuffers而不是阻塞的Streams的关键概念。因此,ByteBuffers将提供可用的数据。
以下是使用NIO功能的服务器演示。它是使用NIO实现的echo服务器的简单实现,但是足以获得非阻塞方法基础的示例。
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NIOServer { private static final int PORT = 8888; private static final int BUFFER_SIZE = 1024; private static Selector selector = null; public static void main(String[] args) { logger("Starting NIOServer"); try { InetAddress hostIP = InetAddress.getLocalHost(); logger(String.format("Trying to accept connections on %s:%d", hostIP.getHostAddress(), PORT)); // create selector via open(); selector = Selector.open(); // create a server socket channel ServerSocketChannel server = ServerSocketChannel.open(); // get the server socket ServerSocket serverSocket = server.socket(); InetSocketAddress address = new InetSocketAddress(hostIP, PORT); // bind the server socket to address serverSocket.bind(address); // configure socket to be non-blocking server.configureBlocking(false); // register selector interest for accept event. server.register(selector, SelectionKey.OP_ACCEPT); while (true) { // get a channel from selector, this will block until a channel get selected selector.select(); // get keys for that channel Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> i = selectedKeys.iterator(); // go through selection keys one by one and see any of those events are ready // if ready process that while (i.hasNext()) { SelectionKey key = i.next(); if (key.isAcceptable()) { processAcceptEvent(server, key); } else if (key.isReadable()) { processReadEvent(key); } i.remove(); } } } catch (IOException e) { logger(e.getMessage()); e.printStackTrace(); } } /** * Handle the accept event * * @param socket Server socket channel * @param key Selection key * @throws IOException In case of error while accept the connection */<font> <b>private</b> <b>static</b> <b>void</b> processAcceptEvent(ServerSocketChannel socket, SelectionKey key) throws IOException { logger(</font><font>"Connection Accepted"</font><font>); </font><font><i>// Accept the connection and make it non-blocking</i></font><font> SocketChannel socketChannel = socket.accept(); socketChannel.configureBlocking(false); </font><font><i>// Register interest in reading this channel</i></font><font> socketChannel.register(selector, SelectionKey.OP_READ); } </font><font><i>/** * Handle the read event * * @param key Selection key for the channel. * @throws IOException */</i></font><font> <b>private</b> <b>static</b> <b>void</b> processReadEvent(SelectionKey key) throws IOException { logger(</font><font>"Handling ReadEvent"</font><font>); </font><font><i>// create a ServerSocketChannel to read the request</i></font><font> SocketChannel client = (SocketChannel) key.channel(); </font>
这种方法背后的基本原理是,Selector可以在多个通道中注册它的兴趣,当这些事件发生时,主线程通过调用匹配的处理逻辑来响应这些事件。
唯一的阻塞代码是第39行:
<font><i>//从选择器获取一个通道,这将阻塞直到一个通道被选中</i></font><font> selector.select(); </font>
select()方法阻塞,直到选择一个通道。例如,直到发生新连接。
事件循环
上面的模式等于我们在JavaScript世界中称为事件循环event loop的模式。Javascript是单线程运行时,因此它必须找到支持多个任务的方法,而不必创建多个线程。
当NodeJS出现并开始以较少的内存占用和CPU时间来处理繁重的负载时,Java社区意识到这是解决这一系列问题的更具可扩展性的方法。众所周知,多线程应用程序难以开发,难以维护。
反应性库包
现在我们已经很好地理解了旧Java世界的同步阻塞行为以及使用事件循环进行非阻塞的新方法,我们可以开始进入Reactive世界。
首先,Microsoft为.NET框架创建了反应式扩展。并通过JavaScript跟进单线程,非阻塞,异步语言,它对反应库有真正的需求,因此RxJ就存在了。
您可以在大多数流行的编程语言中找到Rx库。反应式编程的当前库提供以下内容。