转载

Dubbo源码分析(十)同步调用与异步调用

默认情况下,我们通过Dubbo调用一个服务,需得等服务端执行完全部逻辑,方法才得以返回。这个就是同步调用。

但大家是否考虑过另外一个问题,Dubbo底层网络通信采用Netty,而Netty是异步的;那么它是怎么将请求转换成同步的呢?

首先我们来看请求方,在 DubboInvoker 类中,它有三种不同的调用方式。

protected Result doInvoke(final Invocation invocation) throws Throwable {
	
	try {
		boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
		boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
		int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);
		
		//忽略返回值
		if (isOneway) {
			boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
			currentClient.send(inv, isSent);
			RpcContext.getContext().setFuture(null);
			return new RpcResult();
		//异步调用
		} else if (isAsync) {
			ResponseFuture future = currentClient.request(inv, timeout);
			RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
			return new RpcResult();
		//同步调用
		} else {
			RpcContext.getContext().setFuture(null);
			return (Result) currentClient.request(inv, timeout).get();
		}
	}
}
复制代码

可以看到,上面的代码有三个分支,分别是:忽略返回值调用、异步调用和同步调用。我们重点先看 return (Result) currentClient.request(inv, timeout).get();

关于上面这句代码,它包含两个动作:先调用 currentClient.request 方法,通过Netty发送请求数据;然后调用其返回值的 get 方法,来获取返回值。

1、发送请求

这一步主要是将请求方法封装成Request对象,通过Netty将数据发送到服务端,然后返回一个 DefaultFuture 对象。

public ResponseFuture request(Object request, int timeout) throws RemotingException {

	//如果客户端已断开连接
	if (closed) {
		throw new RemotingException(".......");
	}
	//封装请求信息
	Request req = new Request();
	req.setVersion("2.0.0");
	req.setTwoWay(true);
	req.setData(request);
	
	//构建DefaultFuture对象
	DefaultFuture future = new DefaultFuture(channel, req, timeout);
	try {
		//通过Netty发送网络数据
		channel.send(req);
	} catch (RemotingException e) {
		future.cancel();
		throw e;
	}
	return future;
}
复制代码

如上代码,逻辑很清晰。关于看它的返回值是一个 DefaultFuture 对象,我们再看它的构造方法。

public DefaultFuture(Channel channel, Request request, int timeout) {
	this.channel = channel;
	this.request = request;
	this.id = request.getId();
	this.timeout = timeout > 0 ? timeout : 
				channel.getUrl().getPositiveParameter("timeout", 1000);
	//当前Future和请求信息的映射
	FUTURES.put(id, this);
	//当前Channel和请求信息的映射
	CHANNELS.put(id, channel);
}
复制代码

在这里,我们必须先对Future有所了解。Future模式是多线程开发中非常常见的一种设计模式,在这里我们返回这个对象后,调用其get方法来获得返回值。

2、获取返回值

我们接着看get方法。

public Object get(int timeout) throws RemotingException {
	//设置默认超时时间
	if (timeout <= 0) {
		timeout = Constants.DEFAULT_TIMEOUT;
	}
	//判断 如果操作未完成
	if (!isDone()) {
		long start = System.currentTimeMillis();
		lock.lock();
		try {
			//通过加锁、等待
			while (!isDone()) {
				done.await(timeout, TimeUnit.MILLISECONDS);
				if (isDone() || System.currentTimeMillis() - start > timeout) {
					break;
				}
			}
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		} finally {
			lock.unlock();
		}
		if (!isDone()) {
			throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
		}
	}
	//返回数据
	return returnFromResponse();
}

//获取返回值response
private Object returnFromResponse() throws RemotingException {
	Response res = response;
	if (res == null) {
		throw new IllegalStateException("response cannot be null");
	}
	if (res.getStatus() == Response.OK) {
		return res.getResult();
	}
	if (res.getStatus() == 30 || res.getStatus() == 31) {
		throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
	}
	throw new RemotingException(channel, res.getErrorMessage());
}
复制代码

如上代码,我们重点来看 get 方法。我们总结下它的运行流程:

  • 判断超时时间,小于0则设置默认值
  • 判断操作是否已完成,即response是否为空;如果已完成,获取返回值,并返回
  • 如果操作未完成,加锁、等待;获得通知后,再次判断操作是否完成。若完成,获取返回值,并返回。

那么我们就会想到两个问题,response在哪里被赋值、await在哪里被通知。

在Netty读取到网络数据后,其中会调用到 HeaderExchangeHandler 中的方法,我们来看一眼就明白了。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
	
	//处理返回信息
	static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}
复制代码

上面说的很清楚,如果response 不为空,并且不是心跳数据,就调用 DefaultFuture.received ,在这个方法里面,主要就是根据返回信息的ID找到对应的Future,然后通知。

public static void received(Channel channel, Response response) 	
	try {
		//根据返回信息中的ID找到对应的Future
		DefaultFuture future = FUTURES.remove(response.getId());
		if (future != null) {
			//通知方法
			future.doReceived(response);
		} else {
			logger.warn("......");
		}
	} finally {
		//处理完成,删除Future
		CHANNELS.remove(response.getId());
	}
}
复制代码

future.doReceived(response); 就很简单了,它就回答了我们上面的那两个小问题。赋值response和await通知。

private void doReceived(Response res) {
	lock.lock();
	try {
		//赋值response
		response = res;
		if (done != null) {
			//通知方法
			done.signal();
		}
	} finally {
		lock.unlock();
	}
	if (callback != null) {
		invokeCallback(callback);
	}
}
复制代码

通过以上方式,Dubbo就完成了同步调用。我们再总结下它的整体流程:

DefaultFuture.get()

二、异步调用

如果想使用异步调用的方式,我们就得配置一下。在消费者端配置文件中

<dubbo:reference id="infoUserService" 
    	interface="com.viewscenes.netsupervisor.service.InfoUserService" 
	async="true"/>
复制代码

然后我们再看它的实现方法

if (isAsync) {
	ResponseFuture future = currentClient.request(inv, timeout);
	RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
	return new RpcResult();
} 
复制代码

可以看到,它同样是通过 currentClient.request 返回的Future对象,但并未调用其get方法;而是将Future对象封装成FutureAdapter,然后设置到 RpcContext.getContext()

RpcContext是Dubbo中的一个上下文信息,它是一个 ThreadLocal 的临时状态记录器。我们重点看它的 setFuture 方法。

public class RpcContext {
	
	private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
	
	private Future<?> future;
	
	public void setFuture(Future<?> future) {
        this.future = future;
    }
}
复制代码

既然它是基于ThreadLocal机制实现,那么我们在获取返回值的时候,通过ThreadLocal获取到上下文信息对象,再拿到其Future对象就好了。这个时候,我们客户端应该这样来做

userService.sayHello("Jack");

Future<Object> future = RpcContext.getContext().getFuture();

System.out.println("服务返回消息:"+future.get());
复制代码

这样做的好处是,我们不必等待在单一方法上,可以调用多个方法,它们会并行的执行。比如像官网给出的例子那样:

// 此调用会立即返回null
fooService.findFoo(fooId);
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture(); 
 
// 此调用会立即返回null
barService.findBar(barId);
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture(); 
 
// 此时findFoo和findBar的请求同时在执行,客户端不需要启动多线程来支持并行,而是借助NIO的非阻塞完成
 
// 如果foo已返回,直接拿到返回值,否则线程wait住,等待foo返回后,线程会被notify唤醒
Foo foo = fooFuture.get(); 
// 同理等待bar返回
Bar bar = barFuture.get(); 
 
// 如果foo需要5秒返回,bar需要6秒返回,实际只需等6秒,即可获取到foo和bar,进行接下来的处理。
复制代码
原文  https://juejin.im/post/5c98cc08e51d45639b76cb98
正文到此结束
Loading...