本文主要研究一下rsocket-java
rsocket-core-0.12.1-sources.jar!/io/rsocket/RSocket.java
public interface RSocket extends Availability, Closeable { /** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads); /** * Metadata-Push interaction model of {@code RSocket}. * * @param payload Request payloads. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> metadataPush(Payload payload); @Override default double availability() { return isDisposed() ? 0.0 : 1.0; } }
定义double availability()方法
)及Closeable( 定义了Mono<Void> onClose()方法
)接口 @Test public void testFireAndForget() throws InterruptedException { //NOTE SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.printf("fire-forget: %s%n", payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .fireAndForget(DefaultPayload.create("Hello")) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }
类似udp,无需ack,比较适合metrics上报、访问日志上报等
@Test public void testRequestResponse(){ //NOTE SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload p) { return Mono.just(p); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //NOTE CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .requestResponse(DefaultPayload.create("Hello")) .map(Payload::getDataUtf8) .onErrorReturn("error") .doOnNext(System.out::println) .block(); socket.dispose(); }
类似http,但是优于http,因为它是异步的,而且是multiplexed
@Test public void testRequestStream(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor() { @Override public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.interval(Duration.ofMillis(100)) .map(aLong -> DefaultPayload.create("Interval: " + aLong)); } }); } }) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); socket .requestStream(DefaultPayload.create("Hello")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .then() .doFinally(signalType -> socket.dispose()) .then() .block(); }
类似Request-Response( 返回Mono
),只不过返回的是Flux
@Test public void testRequestChannel(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor(){ @Override public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map(Payload::getDataUtf8) .map(s -> "Echo: " + s) .map(DefaultPayload::create); } }); } }) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .requestChannel( Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello"))) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .doFinally(signalType -> socket.dispose()) .then() .block(); }
类似websocket,可以双向通信
@Test public void testMetadataPush() throws InterruptedException { //SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> metadataPush(Payload payload) { System.out.printf("metadataPush: %s%n", payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .metadataPush(DefaultPayload.create("hello","version=1.0.0+")) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }