转载

[Java并发-9]Lock和Condition(下) Dubbo如何用管程实现异步转同步?

在上一篇文章中,我们讲到 Java SDK 并发包里的 Lock 有别于 synchronized 隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。那今天我们接着再来详细聊聊 Java SDK 并发包里的 Condition。

Condition 实现了管程模型里面的条件变量

在之前我们详细讲过, Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。

在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。

这里我们温故知新下前面的内容。

public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      // 入队后, 通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      // 出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

不过,这里你需要注意,Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的, 不要相互使用。

下面我们就来看看在知名项目 Dubbo 中,Lock 和 Condition 是怎么用的。不过在开始介绍源码之前,我还先要介绍两个概念:同步和异步。

通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步

Dubbo 源码分析

其实在编程领域,异步的场景还是挺多的,比如 TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用, 在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的 。可能你会觉得奇怪,平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?

其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下 Dubbo 的相关源码。

对于下面一个简单的 RPC 调用,默认情况下 sayHello() 方法,是个同步方法,也就是说,执行 service.sayHello(“dubbo”) 的时候,线程会停下来等结果。

DemoService service = 初始化部分省略
String message = 
  service.sayHello("dubbo");
System.out.println(message);

不过为了理清前后关系,还是有必要分析一下调用 DefaultFuture.get() 之前发生了什么。DubboInvoker 的 108 行调用了 DefaultFuture.get()。这一行先调用了 request(inv, timeout) 方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。

public class DubboInvoker{
  Result doInvoke(Invocation inv){
    // 下面这行就是源码中 108 行
    // 为了便于展示,做了修改
    return currentClient 
      .request(inv, timeout)
      .get();
  }
}

DefaultFuture 这个类是很关键,代码精简之后内容如下。

不过在看代码之前,你还是有必要重复一下我们的需求:

  1. 当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;
  2. 当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。

不知道你有没有似曾相识的感觉,这需求其实就是经典的 等待 - 通知机制 吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。

// 创建锁与条件变量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
    while (!isDone()) {
      done.await(timeout);
      long cur=System.nanoTime();
      if (isDone() || 
          cur-start > timeout){
        break;
      }
    }
  } finally {
    lock.unlock();
  }
  if (!isDone()) {
    throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
  return response != null;
}
// RPC 结果返回时调用该方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signal();
    }
  } finally {
    lock.unlock();
  }
}

调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。

当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。

小结

Lock&Condition 是管程的一种实现,所以能否用好 Lock 和 Condition 要看你对管程模型理解得怎么样。管程的技术前面我们已经专门用了一篇文章做了介绍,你可以结合着来学,理论联系实践,有助于加深理解。

Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。

但如果你对实现感兴趣,Java SDK 并发包里锁和条件变量是如何实现的,可以参考《Java 并发编程的艺术》一书的第 5 章《Java 中的锁》,里面详细介绍了实现原理。

原文  https://segmentfault.com/a/1190000019152276
正文到此结束
Loading...