默认情况下,我们通过Dubbo调用一个服务,需得等服务端执行完全部逻辑,方法才得以返回。这个就是同步调用。
但大家是否考虑过另外一个问题,Dubbo底层网络通信采用Netty,而Netty是异步的;那么它是怎么将请求转换成同步的呢?
首先我们来看请求方,在 DubboInvoker
类中,它有三种不同的调用方式。
protected Result doInvoke(final Invocation invocation) throws Throwable { try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000); //忽略返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); //异步调用 } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); //同步调用 } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } } 复制代码
可以看到,上面的代码有三个分支,分别是:忽略返回值调用、异步调用和同步调用。我们重点先看 return (Result) currentClient.request(inv, timeout).get();
关于上面这句代码,它包含两个动作:先调用 currentClient.request
方法,通过Netty发送请求数据;然后调用其返回值的 get
方法,来获取返回值。
这一步主要是将请求方法封装成Request对象,通过Netty将数据发送到服务端,然后返回一个 DefaultFuture
对象。
public ResponseFuture request(Object request, int timeout) throws RemotingException { //如果客户端已断开连接 if (closed) { throw new RemotingException("......."); } //封装请求信息 Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //构建DefaultFuture对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { //通过Netty发送网络数据 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } 复制代码
如上代码,逻辑很清晰。关于看它的返回值是一个 DefaultFuture
对象,我们再看它的构造方法。
public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000); //当前Future和请求信息的映射 FUTURES.put(id, this); //当前Channel和请求信息的映射 CHANNELS.put(id, channel); } 复制代码
在这里,我们必须先对Future有所了解。Future模式是多线程开发中非常常见的一种设计模式,在这里我们返回这个对象后,调用其get方法来获得返回值。
我们接着看get方法。
public Object get(int timeout) throws RemotingException { //设置默认超时时间 if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } //判断 如果操作未完成 if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { //通过加锁、等待 while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } //返回数据 return returnFromResponse(); } //获取返回值response private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == 30 || res.getStatus() == 31) { throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } 复制代码
如上代码,我们重点来看 get
方法。我们总结下它的运行流程:
那么我们就会想到两个问题,response在哪里被赋值、await在哪里被通知。
在Netty读取到网络数据后,其中会调用到 HeaderExchangeHandler
中的方法,我们来看一眼就明白了。
public class HeaderExchangeHandler implements ChannelHandlerDelegate { //处理返回信息 static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } } } 复制代码
上面说的很清楚,如果response 不为空,并且不是心跳数据,就调用 DefaultFuture.received
,在这个方法里面,主要就是根据返回信息的ID找到对应的Future,然后通知。
public static void received(Channel channel, Response response) try { //根据返回信息中的ID找到对应的Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { //通知方法 future.doReceived(response); } else { logger.warn("......"); } } finally { //处理完成,删除Future CHANNELS.remove(response.getId()); } } 复制代码
future.doReceived(response);
就很简单了,它就回答了我们上面的那两个小问题。赋值response和await通知。
private void doReceived(Response res) { lock.lock(); try { //赋值response response = res; if (done != null) { //通知方法 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } 复制代码
通过以上方式,Dubbo就完成了同步调用。我们再总结下它的整体流程:
DefaultFuture.get()
如果想使用异步调用的方式,我们就得配置一下。在消费者端配置文件中
<dubbo:reference id="infoUserService" interface="com.viewscenes.netsupervisor.service.InfoUserService" async="true"/> 复制代码
然后我们再看它的实现方法
if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } 复制代码
可以看到,它同样是通过 currentClient.request
返回的Future对象,但并未调用其get方法;而是将Future对象封装成FutureAdapter,然后设置到 RpcContext.getContext()
RpcContext是Dubbo中的一个上下文信息,它是一个 ThreadLocal 的临时状态记录器。我们重点看它的 setFuture
方法。
public class RpcContext { private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; private Future<?> future; public void setFuture(Future<?> future) { this.future = future; } } 复制代码
既然它是基于ThreadLocal机制实现,那么我们在获取返回值的时候,通过ThreadLocal获取到上下文信息对象,再拿到其Future对象就好了。这个时候,我们客户端应该这样来做
userService.sayHello("Jack"); Future<Object> future = RpcContext.getContext().getFuture(); System.out.println("服务返回消息:"+future.get()); 复制代码
这样做的好处是,我们不必等待在单一方法上,可以调用多个方法,它们会并行的执行。比如像官网给出的例子那样:
// 此调用会立即返回null fooService.findFoo(fooId); // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future Future<Foo> fooFuture = RpcContext.getContext().getFuture(); // 此调用会立即返回null barService.findBar(barId); // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future Future<Bar> barFuture = RpcContext.getContext().getFuture(); // 此时findFoo和findBar的请求同时在执行,客户端不需要启动多线程来支持并行,而是借助NIO的非阻塞完成 // 如果foo已返回,直接拿到返回值,否则线程wait住,等待foo返回后,线程会被notify唤醒 Foo foo = fooFuture.get(); // 同理等待bar返回 Bar bar = barFuture.get(); // 如果foo需要5秒返回,bar需要6秒返回,实际只需等6秒,即可获取到foo和bar,进行接下来的处理。 复制代码