在上一章节,我们曾提到这样一个问题: 当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?
为了解决这个问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。
在服务引用的过程中,我们最终会将一个或多个服务提供者Invoker封装成服务目录对象,但最后还要将它合并转换成Cluster Invoker对象。 Invoker invoker = cluster.join(directory);
这里的cluster就是扩展点自适应类,在Dubbo中默认是Failover,所以上面代码会调用到:
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } } 复制代码
上面的代码很简单,所以最后的Invoker对象指向的是 FailoverClusterInvoker
实例。它也是一个Invoker,它继承了抽象的 AbstractClusterInvoker
。
我们看下 AbstractClusterInvoker
类中的invoke方法。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke(final Invocation invocation) throws RpcException { LoadBalance loadbalance = null; //调用服务目录,获取所有的服务提供者Invoker对象 List<Invoker<T>> invokers = directory.list(invocation); if (invokers != null && !invokers.isEmpty()) { //加载负载均衡组件 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class). getExtension(invokers.get(0).getUrl(). getMethodParameter(invocation.getMethodName(), "loadbalance", "random")); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //调用子类实现 ,不同的集群容错机制 return doInvoke(invocation, invokers, loadbalance); } } 复制代码
以上代码也很简单,我们分为三个步骤来看
关于负载均衡我们后续再深入了解,这是只知道它负责从多个Invoker中选取一个返回就行。
Dubbo为我们提供了多种集群容错机制。主要如下:
FailoverClusterInvoker在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。
FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。
FailsafeClusterInvoker 当调用过程中出现异常时,仅会打印异常,而不会抛出异常。
FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。
BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。
FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。我们重点看它的 doInvoke
方法。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; //检查invokers是否为空 checkInvokers(copyinvokers, invocation); //获取重试次数 这里默认是3次 int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries",2) + 1; if (len <= 0) { len = 1; } //异常信息对象 RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); //循环调用 失败重试len次 for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); //重新获取服务提供者列表 copyinvokers = list(invocation); //再次检查 checkInvokers(copyinvokers, invocation); } //通过loadbalance选取一个Invoker Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { //调用服务 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn(""); } return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } //重试失败 throw new RpcException(""); } 复制代码
我们可以看到,它的重点是invoker的调用是在一个循环方法中。只要不return,就会一直调用,重试 len 次。我们总结下它的过程:
FailfastClusterInvoker就很简单了,它只会进行一次调用,失败后立即抛出异常。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { throw (RpcException) e; } throw new RpcException("...."); } } 复制代码
FailsafeClusterInvoker跟上面这个差异不大,它调用失败后并不抛出异常。而是打印异常信息并返回一个空的结果对象。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); } } 复制代码
FailbackClusterInvoker 会在调用失败后,也是打印异常信息并返回一个空的结果对象,但是还没结束,它还会偷偷开启一个定时任务,再次去调用。
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); //添加失败信息 addFailed(invocation, this); return new RpcResult(); } } 复制代码
我们可以看到,调用失败后,除了打印异常信息和返回空结果对象之外,还有一个方法 addFailed
它就是开启定时任务的地方。
首先,定义一个包含2个线程的线程池对象。
Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
然后,延迟5秒后,每隔5秒调用 retryFailed
方法,直到调用成功。
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { try { //重试方法 retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic", t); } } }, 5000, 5000, TimeUnit.MILLISECONDS); } } } //ConcurrentHashMap 添加失败任务 failed.put(invocation, router); } 复制代码
最后,我们需要注意 failed.put(invocation, router);
它将当前失败的任务添加到failed,它是一个ConcurrentHashMap对象。
重试的逻辑也不复杂,从failed对象中获取失败的记录,调用即可。
void retryFailed() { //如果为空,说明已经没有了失败的任务 if (failed.size() == 0) { return; } //遍历failed,对失败的调用进行重试 Set<Entry<Invocation, AbstractClusterInvoker<?>>> failedSet = failed.entrySet(); for (Entry<Invocation, AbstractClusterInvoker<?>> entry : failedSet) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { // 再次进行调用 invoker.invoke(invocation); // 调用成功后,从 failed 中移除 invoker failed.remove(invocation); } catch (Throwable e) { logger.error("......", e); } } } 复制代码
如上代码,其中的重点是调用成功后,要将invocation移除。当再次调用到这个方法,开头的条件判断成立,就直接返回,不再继续调用。
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { final List<Invoker<T>> selected; //获取最大并行数 默认为2 final int forks = getUrl().getParameter("forks", 2); //超时时间 final int timeout = getUrl().getParameter("timeout", 1000); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); //选择Invoker 并添加到selected for (int i = 0; i < forks; i++) { Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); //阻塞队列 先进先出 final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { @Override public void run() { try { //调用服务 将结果放入队列 Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { //如果异常调用次数大于等于最大并行数 int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { //从队列中获取结果 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException("...."); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException(e.getMessage(), e); } } 复制代码
以上代码的重点就是阻塞队列LinkedBlockingQueue。如果有结果放入,poll方法会立即返回,完成整个调用。我们再总结下整体流程:
BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; //循环调用服务 for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } //异常 if (exception != null) { throw exception; } return result; } 复制代码