转载

Dubbo源码之网络通信

介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出

服务暴露将做哪些事情?

  1. 注册ZK,监听动态配置节点
  2. 开启Server端
  3. 创建代理服务
  4. Exporter -> Invoker -> proxyService

服务引用将做哪些事情?

  1. 注册ZK,监听动态配置节点、providr节点、路由节点
  2. 开启Client端
  3. 创建代理服务
  4. proxyService -> Invoker

客户端请求

ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识)
复制代码

服务端响应

解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 
复制代码

Exchanger

Exchangers

门面类,提供各种便捷方法,先通过SPI获取 Exchanger ,然后调用 Exchanger 的相关方法创建 ExchangeServerExchangeClient

Exchanger

SPI接口,默认实现类 HeaderExchanger ,提供了两个快捷方法创建 ExchangeServerExchangeClient

@SPI(HeaderExchanger.NAME)
public interface Exchanger {
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
复制代码

ExchangeServer

Server端使用,默认实现类 HeaderExchangeServer ,内部调用 Transporter 开启Server服务

public interface ExchangeServer extends Server {
    Collection<ExchangeChannel> getExchangeChannels();

    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
复制代码

ExchangeClient

Client端使用,默认实现类 HeaderExchangeClient ,核心 request 方法,内部调用 Transporter 发送请求

public interface ExchangeClient extends Client, ExchangeChannel {
}
复制代码

ExchangeChannel

默认实现类 HeaderExchangeChannel ,作为 HeaderExchangeClient 的一个属性

Transporter

Transporters

门面类,提供各种便捷方法,先通过SPI获取 Transporter ,然后调用 Transporter 的相关方法创建 ServerClient

Transporter

SPI接口,默认实现类 NettyTransporter ,提供了两个快捷方法创建 ServerClient

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}
复制代码

Server

Server端使用,默认实现类 NettyServer ,用于开启Server服务,核心方法 doOpen

public class NettyServer extends AbstractServer implements Server {
}
复制代码

Client

Client端使用,默认实现类 NettyClient ,核心 request 方法用于发送请求, doOpen 用于与服务端建立连接

public class NettyClient extends AbstractClient {
}
复制代码

服务端启动服务

DubboProtocol#export =>
DubboProtocol#openServer => 
DubboProtocol#createServer =>
Exchangers#bind => 
NettyServer#doOpen
复制代码

最终,在 NettyServer#doOpen 中通过Netty开启了一个Server端

DubboProtocol#createServer
    => Exchangers#bind(url, requestHandler)
        => HeaderExchanger#bind(url, requestHandler)
            => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
                
// Transporters#bind 语句可以拆解为 
Transporters#bind
    => NettyTransporter#bind(url, handler)
        => return new NettyServer(url, handler)
            =>  NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】
复制代码

NettyServer 中的 hander 属性,最终指向的是 new DecodeHandler(new HeaderExchangeHandler(handler)) 。最终Server端返回 HeaderExchangeServer ,然后在 NettyServer 的构造函数中,对 handle 其实还做了一些封装

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {}

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
复制代码

所以,最终 NettyServer 中的 hander 属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

客户端连接服务

调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的

RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers => 
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen
复制代码

最终,在 NettyClient#doOpen 中通过Netty与Server建立连接

Exchangers#connect
    => HeaderExchanger#connect(url, handler)
        => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
                
// Transporters#connect 语句可以拆解为 
Transporters#connect
    => NettyTransporter#connect(url, handler)
        => return new NettyClient(url, handler)
            =>  NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】
复制代码

NettyClient 中的 hander 属性,最终指向的是 new DecodeHandler(new HeaderExchangeHandler(handler)) 。最终Client端返回 HeaderExchangeClient ,其中的 client 属性也对 NettyClient 做了包装处理

不过在 DubboProtocol#buildReferenceCountExchangeClient 方法中对 HeaderExchangeClient 包装了一层,最终Invoker中的Client类型是 ReferenceCountExchangeClient

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}
复制代码

ReferenceCountExchangeClientHeaderExchangeClient 没什么区别,只不过包装了一层,然后还有一个比较重要的属性 referenceCount ,用于记录客户端的个数?

客户端发送请求

调用方代理类 ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke【获取LoadBalance】 -> 
FailoverClusterInvoker#doInvoke【处理重试次数】 ->
ProtocolFilterWrapper#invoke【处理Filter链路】 ->
AbstractInvoker#invoke【设置Attachments参数】 ->
DubboInvoker#doInvoke【Exchange交接层】 ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request【return CompletableFuture】 ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush【发消息给服务端】
复制代码

DubboInvoker#doInvoke 开始与Exchange层交互,核心代码如下

protected Result doInvoke(final Invocation invocation) throws Throwable {
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    // return = false,即oneWay ,可以减少不必要的Future对象创建
    if (isOneway) {
        // send=true,即客户端发送之后再返回,否则直接返回
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
        return asyncRpcResult;
    }
}
复制代码
ReferenceCountExchangeClient#request => 
HeaderExchangeClient#request =>  
HeaderExchangeChannel#request
复制代码
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
复制代码

在这个方法中,有以下几个需要注意的点:

  1. Request 构造函数内部,会为 Request 生成一个递增唯一的ID,用于标识该请求
  2. channel#send 调用过程中,涉及到 NettyChannel#getOrAddChannel 方法的调用, NettyChannel 中有一个 ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP 缓存,用于维护 io.netty.channel.ChannelNettyChannel 的关系
  3. channel#send 调用过程中,最终会调用到 NettyChannel#send 方法,该方法真正的将消息发给Server端
  4. 返回的 DefaultFuture 是一个 CompletableFuture
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    boolean success = true;
    int timeout = 0;
    try {
        // 将消息发给Server
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            // 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
复制代码

从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器

// NettyClient.java 
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}
复制代码

先经过编码器,即 InternalEncoder#encode 方法, InternalEncoder 实现了 MessageToByteEncoder 接口,该方法内部调用了 Codec2 的相关方法,而 Codec2 是一个SPI接口,默认实现 DubboCodec

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
复制代码

服务端响应请求

上面提到了 NettyServer 中的 hander 属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandlerNettyServer 开启Server端的代码如下

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
复制代码
  1. 先经过解码器,即 InternalDecoder#decode 方法, InternalDecoder 实现了 ByteToMessageDecoder 接口,该方法内部调用了 Codec2 的相关方法,而 Codec2 是一个SPI接口,默认实现 DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
复制代码
  1. MultiMessageHandler 用于处理数组消息,如果是消息是 MultiMessage 类型, MultiMessage 实现了 Iterable 数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法
  2. AllChannelHandler 收到消息,将 channel handler message 封装成state为 ChannelState.RECEIVED 类型的 ChannelEventRunnable 对象,然后交给线程池执行
  3. ChannelEventRunnable#run 方法中判断state为 ChannelState.RECEIVED 类型,直接执行下一个handler的received方法,即 DecodeHandler ,这个过程是由线程池执行
  4. DecodeHandler#received 方法中,如果消息是 Decodeable 类型,对整个消息进行解码;如果消息是 Request 类型,对 Request.getData() 进行解码;如果消息是 Response 类型,对 Response.getResult() 进行解码
  5. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleRequest -> requestHandler#replyrequestHandlerDubboProtocol 中的一个属性, ExchangeHandlerAdapter 类型
  6. HeaderExchangeHandler#handleRequest 中会创建一个 Response 对象,它的ID属性值,就是 Request 对象的ID值,这样请求和响应就关联起来了
  7. requestHandler#reply 方法中,从 exporterMap 缓存中获取对应的 DubboExporter 对象,然后从 DubboExporter 获取 Invoker ,最后执行 Invoker#invoke 方法,然后返回一个 CompletableFuture 对象
  8. HeaderExchangeHandler#handleRequest 方法中接收返回的 CompletableFuture 对象,对它添加回调处理,在回调中将返回结果封装到 Response 对象中,然后通过channel将 Response 发出
// ChannelEventRunnable.java
public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            // RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandler
            handler.received(channel, message);
        } catch (Exception e) {}
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
           } catch (Exception e) {}
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
           } catch (Exception e) {}
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {}
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
           } catch (Exception e) {}
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}
复制代码
InternalDecoder#decode
    => NettyServerHandler#channelRead
        => AbstractPeer#received
            => MultiMessageHandler#received
                => HeartbeatHandler#received
                    => AllChannelHandler#received
                    
                    ------------------ 异步执行,放到线程池 ----------------------
                    => ChannelEventRunnable#run
                        => DecodeHandler#received
                            => DecodeHandler#decode
                                => DecodeableRpcInvocation#decode
                        => HeaderExchangeHandler#received
                            => HeaderExchangeHandler#handleRequest
                                => DubboProtocol.requestHandler#reply
                    ------------------ 异步执行 -----------------------

                                    ----------------扩展点-------------------
                                    => ProtocolFilterWrapper.invoke
                                    => EchoFilter.invoke
                                        => ClassLoaderFilter.invoke
                                        => GenericFilter.invoke
                                            => TraceFilter.invoke
                                            => MonitorFilter.invoke
                                                => TimeoutFilter.invoke
                                                => ExceptionFilter.invoke
                                                    => InvokerWrapper.invoke
                                    -----------------扩展点-------------------
                                                        => AbstractProxyInvoker#invoke
                                                            => JavassistProxyFactory.AbstractProxyInvoker#doInvoke
                                                                => 代理类#invokeMethod
                                                                    => 真正的service方法


                            //把接收处理的结果,数据发回consumer  future#whenComplete                                                              
                            => channel.send(response)
                                => HeaderExchangeChannel
                                    => NettyChannel.send
                                        => NioSocketChannel#writeAndFlush(message)                                                  
复制代码

服务端发送结果

HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 
复制代码

客户端响应结果

在客户端启动的时候,入参handler和服务端的handler是同一个

// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);

// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect =>
    NettyTransporter#connect
        return NettyClient
复制代码

NettyClient 构造函数中,对handler做了包装

ChannelHandlers.wrap(handler, url)

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
复制代码

所以,最终 NettyClient 中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服务端处理流程一样一样

  1. 接收消息,经过 MultiMessageHandlerHeartbeatHandler 处理,到达 AllDispatcher
  2. AllChannelHandler 中将消息封装成 new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message) 类型,交由线程池执行
  3. 线程池执行任务,经过 DecodeHandler 到达 HeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#receivedDefaultFuture 中维护了一个 请求ID和DefaultFuture的映射关系 ,Request和Response通过请求ID可以一一对应
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                t.cancel();
            }
            future.doReceived(response);
        } else {
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}
复制代码
  1. 通过 response.Id 获取 DefaultFuture
  2. 执行 CompletableFuture#complete 方法可以让 执行了 CompletableFuture#get 的用户线程得到响应,获取结果返回。至此整个调用过程完成

同步转异步

可是我们在代码中很多时候都是同步调用,很少自己去调用 CompletableFuture#get 方法,这一部分逻辑又是怎么处理的。在 DubboInvoker#doInvoke 方法中,返回的是一个 AsyncRpcResult

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // return = false,即oneWay ,可以减少不必要的Future对象创建
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {c
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            // 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了
            asyncRpcResult.subscribeTo(responseFuture);

            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
复制代码

AsyncToSyncInvoker

AsyncToSyncInvoker#invoke 方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用 AsyncRpcResult#get 方法阻塞用户线程,以达到同步效果

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            // 如果是同步调用,调用 asyncResult#get 阻塞用户线程
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}
复制代码
原文  https://juejin.im/post/5da6b966f265da5ba95c3bd7
正文到此结束
Loading...