在微服务架构中,不同服务之间通过应用协议进行数据传输。典型的传输方式包括基于 HTTP 协议的 REST 或 SOAP API 和基于 TCP 字节流的 gRPC 等。HTTP 协议的优势在于其广泛的适用性,有非常多的服务器和客户端实现的支持,但 HTTP 协议本身比较简单,只支持请求-响应模式。gRPC 等基于 TCP 的协议使用二进制字节流传输,保证了传输的效率。不过 gRPC 基于 HTTP/2,不支持其他传输层实现。HTTP/2 协议使用了二进制字节流,但是并没有改变 HTTP/1 协议已有的语义,更多的只是改变了传输时的格式。已有应用协议的问题在于单一的交互模式,只支持请求-响应模式,而此模式对于很多应用场景来说是不合适的。典型的例子是消息推送,以 HTTP 协议为例,如果客户端需要获取最新的推送消息,就必须使用轮询。客户端不停的发送请求到服务器来检查更新,这无疑造成了大量的资源浪费。虽然服务器发送事件(Server-Sent Events,SSE)可以用来推送消息,不过 SSE 是一个简单的文本协议,仅提供有限的功能。此外,WebSocket 可以进行双向数据传输,不过它没有提供应用层协议支持。请求-响应模式的另外一个问题是,如果某个请求的响应时间过长,会阻塞之后的其他请求的处理。RSocket 协议的出现,很好的解决了已有协议的这些问题。
RSocket 是一个 OSL 七层模型中 5/6 层的协议,是 TCP/IP 之上的应用层协议。RSocket 可以使用不同的底层传输层,包括 TCP、WebSocket 和 Aeron。TCP 适用于分布式系统的各个组件之间交互,WebSocket 适用于浏览器和服务器之间的交互,Aeron 是基于 UDP 协议的传输方式,这就保证了 RSocket 可以适应于不同的场景。使用 RSocket 的应用层实现可以保持不变,只需要根据系统环境、设备能力和性能要求来选择合适的底层传输方式即可。RSocket 作为一个应用层协议,可以很容易在其基础上定义应用自己的协议。此外,RSocket 使用二进制格式,保证了传输的高效,节省带宽。而且,通过基于反应式流语义的流控制,RSocket 保证了消息传输中的双方不会因为请求的压力过大而崩溃。
RSocket 支持四种不同的交互模式,见表 1。
模式 | 说明 |
---|---|
请求-响应(request/response) | 这是最典型也最常见的模式。发送方在发送消息给接收方之后,等待与之对应的响应消息。 |
请求-响应流(request/stream) | 发送方的每个请求消息,都对应于接收方的一个消息流作为响应。 |
发后不管(fire-and-forget) | 发送方的请求消息没有与之对应的响应。 |
通道模式(channel) | 在发送方和接收方之间建立一个双向传输的通道。 |
下面介绍 RSocket 协议的具体内容。
RSocket 协议在传输时使用帧(frame)来表示单个消息。每个帧中包含的可能是请求内容、响应内容或与协议相关的数据。一个应用消息可能被切分成多个片段(fragment)以包含在一个帧中。根据底层传输协议的不同,一个表示帧长度的字段可能是必须的。由于 TCP 协议没有提供帧支持,所以 RSocket 的帧长度字段是必须的。对于提供了帧支持的传输协议,RSocket 帧只是简单的封装在传输层消息中;对于没有提供帧支持的传输协议,每个 RSocket 帧之前都需要添加一个 24 字节的字段表示帧长度。
在每个 RSocket 帧中,最起始的部分是帧头部,包括 31 字节的流标识符,6 字节的帧类型和 10 字节的标志位。RSocket 协议中的流(stream)表示的是一个操作的单元。每个流有自己唯一的标识符。流标识符由发送方生成。值为 0 的流标识符表示与连接相关的操作。客户端的流标识符从 1 开始,每次递增 2;服务器端的流标识符从 2 开始,每次递增 2。在帧头部之后的内容与帧类型相关。
RSocket 中定义了不同类型的帧,见表 2。
帧类型 | 说明 |
---|---|
SETUP | 由客户端发送来建立连接,流标识符为 0。 |
REQUEST_RESPONSE | 请求-响应模式中的请求内容。 |
REQUEST_FNF | 发后不管模式中的请求内容。 |
REQUEST_STREAM | 请求-响应流模式中的请求内容。 |
REQUEST_CHANNEL | 通道模式中的建立通道的请求。发送方只能发一个 REQUEST_CHANNEL 帧。 |
REQUEST_N | 基于反应式流语义的流控制,相当于反应式流中的 request(n)。 |
PAYLOAD | 表示负载的帧。通过不同的标志位来表示状态。NEXT 标志位表示接收到流中的数据,COMPLETE 标志位表示流结束。 |
ERROR | 表示连接层或应用层错误。 |
CANCEL | 取消当前请求。 |
KEEPALIVE | 表示连接层或应用层错误。 |
KEEPALIVE | 启用租约模式。 |
METADATA_PUSH | 异步元数据推送。 |
RESUME | 在连接中断后恢复传输。 |
RESUME_OK | 恢复传输成功。 |
EXT | 帧类型扩展。 |
RSocket 中的负载分成元数据和数据两种,二者可以使用不同的编码方式。元数据是可选的。帧头部有标志位指示帧中是否包含元数据。某些特定类型的帧可以添加元数据。
根据不同的交互模式,发送方和接收方之间有不同的帧交互,下面是几个典型的帧交互示例:
除了发后不管模式之外,其余模式中的接收方都可以通过 ERROR 帧或 CANCEL 帧来结束流。
RSocket 使用 Reactive Streams 语义来进行流控制(flow control),也就是 request(n)模式。流的发送方通过 REQUEST_N 帧来声明它允许接收方发送的 PAYLOAD 帧的数量。REQUEST_N 帧一旦发出就不能收回,而且所产生的效果是累加的。比如,发送方发送 request(2)和 request(3)帧之后,接收方允许发送 5 个 PAYLOAD 帧。
除了基于 Reactive Streams 语义的流程控制之外,RSocket 还可以使用租约模式。租约模式只是限定了在某个时间段内,请求者所能发送的最大请求数量。
RSocket 提供了不同语言的实现,包括 Java、Kotlin、JavaScript、Go、.NET 和 C++ 等。对 Java 项目来说,只需要添加相应的 Maven 依赖即可。RSocket 的 Java 实现库都在 Maven 分组 io.rsocket 中。其中常用的库包括核心功能库 rsocket-core 和表 3中列出的传输层实现。本文中使用的版本是 1.0.0-RC3。
Maven 实现库名称 | 底层实现 | 支持协议 |
---|---|---|
rsocket-transport-netty | Reactor Netty | TCP 和 WebSocket |
rsocket-transport-akka | Akka | TCP 和 WebSocket |
rsocket-transport-aeron | Aeron | UDP |
在代码清单 1 中,Maven 项目中添加了 RSocket 相关的依赖和基于 Reactor Netty 的传输层实现。
<dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-core</artifactId> <version>1.0.0-RC3</version> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>1.0.0-RC3</version> </dependency>
下面介绍 RSocket 中不同模式的使用方式。
首先从最常用的请求-响应模式开始介绍 RSocket 的用法。代码清单 2 给出了一个请求-响应模式的 RSocket 服务器和客户端的示例。RSocketFactory 类用来创建 RSocket 服务器和客户端。
import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.publisher.Mono; public class RequestResponseExample { public static void main(String[] args) { RSocketFactory.receive() .acceptor(((setup, sendingSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8())); } } ))) .transport(TcpServerTransport.create("localhost", 7000)) //指定传输层实现 .start() //启动服务器 .subscribe(); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) //指定传输层实现 .start() //启动客户端 .block(); socket.requestResponse(DefaultPayload.create("hello")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .block(); socket.dispose(); } }
RSocketFactory.receive()
方法返回用来创建服务器的 ServerRSocketFactory
类的对象。 ServerRSocketFactory
的 acceptor()
方法的参数是 SocketAcceptor
接口,该接口只有一个方法 Mono<RSocket>
accept(ConnectionSetupPayload setup, RSocket sendingSocket)
。该方法的参数 setup
表示的是 SETUP
帧的负载内容,而 sendingSocket
表示的是发送请求的 RSocket
对象。该方法返回的 Mono<RSocket>
对象包含的是处理请求的 RSocket
对象。代码中使用 Lambda
表达式实现了 SocketAcceptor
接口。在代码中, SocketAcceptor
的 accept()
方法返回的是抽象类 AbstractRSocket
的匿名实现,只实现了其中的 requestResponse()
方法。具体的请求处理逻辑是在请求数据内容上添加 "ECHO >> "
前缀。接下来的 transport()
方法指定 ServerTransport
接口的实现作为 RSocket
底层的传输层实现(这里使用 TcpServerTransport
类的 create()
方法创建在 localhost
上 7000
端口的 TCP
服务器端)。再通过 start()
方法可以得到表示服务器实例的 Mono
对象。最后的 subscribe()
方法用来触发整个启动过程。
RSocketFactory.connect()
方法用来创建 RSocket
客户端,返回 ClientRSocketFactory
类的对象。接下来的 transport()
方法指定传输层 ClientTransport
实现 (这里通过 TcpClientTransport.create()
方法创建到服务器的 TCP
连接)。再通过其 start()
方法可以得到 Mono<RSocket>
对象。最后调用 block()
方法等待客户端启动并返回 RSocket
对象。
RSocket
对象用来与服务器端进行交互。 RSocket
类的 requestResponse()
方法发送 Payload
接口表示的负载并等待响应。该方法的返回值是表示响应的 Mono<Payload>
对象。对于返回的响应,示例中只是简单地输出到控制台。 DefaultPayload.create()
方法可以简单地创建 Payload
对象。 RSocket
类的 dispose()
方法用来销毁该对象。
请求-响应流模式的用法与请求-响应模式很相似。代码清单 3 给出了请求-响应流模式的示例。服务器端的 AbstractRSocket
类的实现覆写了 requestStream()
方法。对于每个请求的 Payload 对象,都需要返回一个表示响应流的 Flux<Payload>
对象。这里的实现逻辑是把请求数据的字符串变成包含单个字符的流。客户端的 RSocket 对象使用 requestStream()
来发送请求,得到的是 Flux<Payload>
对象。
public class RequestStreamExample { public static void main(String[] args) { RSocketFactory.receive() .acceptor(((setup, sendingSocket) -> Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.fromStream(payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c)) .map(DefaultPayload::create)); } } ))) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe(); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); socket.requestStream(DefaultPayload.create("hello")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .blockLast(); socket.dispose(); } }
发后不管模式的用法和之前的两种模式也是相似的。在代码清单 4 中, AbstractRSocket
类的实现覆写了 fireAndForget()
方法,对于请求的 Payload 对象,只需要返回 Mono<Void>
对象即可。客户端 RSocket 对象使用 fireAndForget()
方法发送请求。在发后不管模式中,由于发送方不需要等待接收方的响应,因此当程序结束时,服务器端并不一定接收到了请求。
public class FireAndForgetExample { public static void main(String[] args) { RSocketFactory.receive() .acceptor(((setup, sendingSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.println("Receive: " + payload.getDataUtf8()); return Mono.empty(); } } ))) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe(); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); socket.fireAndForget(DefaultPayload.create("hello")).block(); socket.fireAndForget(DefaultPayload.create("world")).block(); socket.dispose(); } }
通道模式同样以相似的方式实现。在代码清单 5 中,服务器端的 AbstractRSocket
类的实现覆写了 requestChannel()
方法,对于请求的 Publisher<Payload>
对象,返回 Flux<Payload>
对象。客户端 RSocket
对象使用 requestChannel()
方法发送请求并处理响应。
public class RequestChannelExample { public static void main(String[] args) { RSocketFactory.receive() .acceptor(((setup, sendingSocket) -> Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads).flatMap(payload -> Flux.fromStream( payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c)) .map(DefaultPayload::create))); } } ))) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe(); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); socket.requestChannel(Flux.just("hello", "world", "goodbye").map(DefaultPayload::create)) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .blockLast(); socket.dispose(); } }
Spring 框架提供了对 RSocket 的集成,作为 spring-messaging 模块支持的一种消息传输方式。对于 Spring Boot 应用来说,只需要添加对 spring-boot-starter-rsocket
的依赖即可。本文使用的 Spring Boot 是 2.2.0.M6 版本,对应于 Spring 框架的 5.2.0.RC2
版本。代码清单 6 给出了 Spring Boot 应用中使用 RSocket 需要添加的 Maven
依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> <version>2.2.0.M6</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.3.0.RC1</version> <scope>test</scope> </dependency>
在 Spring 应用的 application.properties
文件中,添加 spring.rsocket.server.port=7100
来设置内置
RSocket 服务器的端口。代码清单 7 中的 EchoController
是代码清单 2 中的请求-响应模式的实现。方法 echo()
接收 String 类型的请求,并返回
Mono<String>作为响应。 @MessageMapping("echo")
注解指定了所处理消息的目的地。与 Spring 集成之后,RSocket
的使用变得很简洁。
@Controller public class EchoController { @MessageMapping("echo") public Mono<String> echo(String input) { return Mono.just("ECHO >> " + input); } }
下面添加测试用例来进行测试。代码清单 8 中的 AbstractTest
类是一个抽象的 RSocket
测试类。其中包含的 createRSocketRequester()
方法用来创建发送 RSocket 请求的 RSocketRequester
对象。 RSocketRequester.Builder
类用来创建 RSocketRequester
对象,其中 dataMimeType()
方法指定负载中数据的 MIME
类型, connect()
方法指定连接的传输层实现。
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.util.MimeTypeUtils; abstract class AbstractTest { @Value("${spring.rsocket.server.port}") private int serverPort; @Autowired private RSocketRequester.Builder builder; RSocketRequester createRSocketRequester() { return builder.dataMimeType(MimeTypeUtils.TEXT_PLAIN) .connect(TcpClientTransport.create(serverPort)).block(); } }
代码清单 9 中的 EchoServerTest
类测试代码清单 7 中的 EchoController
。在测试中,首先使用 createRSocketRequester()
方法来创建 RSocketRequester
对象,再使用 route()
方法来指定消息的目的地,最后使用 data()
方法指定负载中的数据。 retrieveMono()
方法用来发送请求。通过
StepVerifier 类来验证返回的 Mono<String>
对象。
@SpringBootTest class EchoServerTest extends AbstractTest { @Test @DisplayName("Test echo server") void testEcho() { RSocketRequester requester = createRSocketRequester(); Mono<String> response = requester.route("echo") .data("hello") .retrieveMono(String.class); StepVerifier.create(response) .expectNext("ECHO >> hello") .expectComplete() .verify(); } }
除了请求-响应模式之外,其他模式也可以在 Spring 中使用。代码清单 10 中的 StringSplitController
类使用了请求-响应流模式。
@Controller public class StringSplitController { @MessageMapping("stringSplit") public Flux<String> stringSplit(String input) { return Flux.fromStream(input.codePoints().mapToObj(c -> String.valueOf((char) c))); } }
代码清单 11 是代码清单 10 对应的测试用例。
@SpringBootTest public class StringSplitTest extends AbstractTest { @Test @DisplayName("Test string split") void testStringSplit() { RSocketRequester requester = createRSocketRequester(); Flux<String> response = requester.route("stringSplit") .data("hello") .retrieveFlux(String.class); StepVerifier.create(response) .expectNext("h", "e", "l", "l", "o") .expectComplete() .verify(); } }
如果需要使用 WebSocket 作为传输层实现,只需要替换 RSocket 服务器和客户端使用的传输层 Java 类即可,其他代码并不需要改动。在代码清单
12 中, WebsocketServerTransport
类是 WebSocket 服务器端实现,而 WebsocketClientTransport
类是客户端的实现。除了 transport()
方法使用的参数不同之外,其他的代码都与代码相同。
public class WebSocketRequestResponseExample { public static void main(String[] args) { RSocketFactory.receive() .acceptor(((setup, sendingSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8())); } } ))) .transport(WebsocketServerTransport.create("localhost", 7000)) .start() .subscribe(); RSocket socket = RSocketFactory.connect() .transport(WebsocketClientTransport.create("localhost", 7000)) .start() .block(); socket.requestResponse(DefaultPayload.create("hello")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .block(); socket.dispose(); } }
对于使用 Spring WebFlux 的应用,可以配置使用 RSocket 作为 WebSocket 服务器端实现。在代码清单 13 中的 Spring
配置中,在路径 "/ws"
上启用了基于 RSocket 的 WebSocket 支持。
@Configuration @EnableWebFlux public class Config { @Bean RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider( RSocketMessageHandler messageHandler) { return new RSocketWebSocketNettyRouteProvider("/ws", messageHandler.responder()); } static class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider { private final String mappingPath; private final SocketAcceptor socketAcceptor; RSocketWebSocketNettyRouteProvider(String mappingPath, SocketAcceptor socketAcceptor) { this.mappingPath = mappingPath; this.socketAcceptor = socketAcceptor; } @Override public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) { ServerTransport.ConnectionAcceptor acceptor = RSocketFactory.receive() .acceptor(this.socketAcceptor) .toConnectionAcceptor(); return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(acceptor)); } } }
由于 RSocket 有自己的二进制协议,在浏览器端的实现需要使用 RSocket 提供的 JavaScript 客户端与服务器端交互。在 Web 应用中使用 RSocket 提供的
NodeJS 模块 rsocket-websocket-client
即可。本文中包含一个 React 应用,连接到使用 RSocket
的 WebSocket 服务器并发送消息。代码清单 14中的 MessageService
类负责与服务器交互。首先创建一个 JavaScript 实现的 RSocket 客户端中的 RSocketClient
对象,并使用 RSocketWebSocketClient
作为传输层实现。在连接成功之后,使用 RSocketClient
的 requestStream()
方法发送消息并处理响应。这里的处理逻辑是调用提供的消息回调方法 messageCallback
。
import {RSocketClient, MAX_STREAM_ID} from 'rsocket-core'; import RSocketWebSocketClient from 'rsocket-websocket-client'; export default class MessageService { constructor(connectCallback, messageCallback) { this._client = new RSocketClient({ setup: { keepAlive: 10000, dataMimeType: 'text/plain', metadataMimeType: 'text/plain', }, transport: new RSocketWebSocketClient({ // eslint-disable-next-line no-restricted-globals url: `ws://${location.host}/ws` }), }); this._connectCallback = connectCallback; this._messageCallback = messageCallback; } connect() { this._client.connect().then( (socket) => { this._socket = socket; this._connectCallback(null); }, (error) => this._connectCallback(error), ); } send(message) { this._socket.requestStream({ data: message, }).subscribe({ onNext: (payload) => this._messageCallback(null, payload.data), onError: (error) => this._messageCallback(error), onSubscribe: (_subscription) => _subscription.request(MAX_STREAM_ID), }); } }
下面介绍 RSocket 相关的高级话题。
由于 RSocket 使用二进制协议,所以调试 RSocket 应用消息比 HTTP/1 协议要复杂一些。从 RSocket
帧的二进制内容无法直接得知帧的含义。需要辅助工具来解析二进制格式消息。对 Java 应用来说,只需要把日志记录器 io.rsocket.FrameLogger
设置为 DEBUG
级别,就可以看到每个 RSocket 帧的内容。代码清单 15 给出了类型为 REQUEST_RESPONSE
帧的调试信息。除此之外,还可以使用 Wireshark 工具及其 RSocket 插件。
2019-09-17 05:02:59.906 DEBUG 15532 --- [actor-tcp-nio-1] io.rsocket.FrameLogger : sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 23 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 05 04 65 63 68 6f |.....echo | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
在代码清单 2 的 AbstractRSocket
类的实现中,对于接收的 Payload 对象是直接使用的。这是因为
RSocket 默认对请求的负载进行了拷贝。这样的做法在实现上虽然简单,但会带来性能上的损失,增加响应时间。为了提高性能,可以通过 ServerRSocketFactory
类或 ClientRSocketFactory
类的 frameDecoder()
方法来指定 PayloadDecoder 接口的实现。 PayloadDecoder.ZERO_COPY
是内置提供的零拷贝实现类。当使用了负载零拷贝之后,负载的内容不再被拷贝。需要通过 Payload 对象的 release()
方法来手动释放负载对应的内存,否则会造成内存泄漏。如果使用
Spring Boot 提供的 RSocket 支持, PayloadDecoder.ZERO_COPY
默认已经被启用,并由 Spring 负责相应的内存释放。
RSocket 协议支持四种不同的交互模式,适用于不同类型的应用场景。相对于传统的 HTTP 协议来说,RSocket 提供了更多的灵活性。RSocket 可以使用 TCP、WebSocket 和 Aeron 作为传输层实现,简化了应用层协议的开发。此外,RSocket 提供的基于反应式流语义的流控制,保证了应用之间消息传递的健壮性。在分布式系统集成中,RSocket 是很好的选择。
获取代码