// 下列三对操作的语义是相同的 // Condition.await() Object.wait() // Condition.signal() Object.notify() // Condition.signalAll() Object.notifyAll() public class BlockedQueue<T> { private static final int MAX_SIZE = 10; // 可重入锁 private final Lock lock = new ReentrantLock(); // 条件变量:队列不满 private final Condition notFull = lock.newCondition(); // 条件变量:队列不空 private final Condition notEmpty = lock.newCondition(); // 队列实际存储:栈 private final Stack<T> stack = new Stack<>(); // 入队 public void enq(T t) { // 先获得互斥锁,类似于管程中的入口 lock.lock(); try { while (stack.size() >= MAX_SIZE) { // 队列已满,等待队列不满,才可入队 notFull.await(); } // 入队后,通知队列不空,可出队 stack.push(t); notEmpty.signalAll(); } catch (InterruptedException ignored) { } finally { lock.unlock(); } } // 出队 public T deq() { // 先获得互斥锁,类似于管程中的入口 lock.lock(); try { while (stack.isEmpty()) { // 队列已空,等待队列不空,才可出队 notEmpty.await(); } // 出队后,通知队列不满,可入队 T pop = stack.pop(); notFull.signalAll(); return pop; } catch (InterruptedException ignored) { } finally { lock.unlock(); } return null; } }
在TCP协议层面,发送完RPC请求后,系统线程是不会等待RPC的响应结果的,需要RPC框架完成 异步转同步 的操作
protected Result doInvoke(final Invocation invocation) throws Throwable { ... return (Result) currentClient .request(inv, timeout) // 发送RPC请求,默认返回DefaultFuture .get(); // 等待RPC返回结果 }
当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行
// 锁和条件变量 private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); // RPC结果 private volatile Response response; // 回调 private volatile ResponseCallback callback;
// RPC结果是否已经返回 public boolean isDone() { return response != null; } // 调用方通过该方法等待RPC结果 public Object get(int timeout) throws RemotingException { ... if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } ... } return returnFromResponse(); }
// RPC结果返回时调用该方法 private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signalAll(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
转载请注明出处:http://zhongmingmao.me/2019/05/07/java-concurrent-condition/
访问原文「 Java并发 -- Condition 」获取最佳阅读体验并参与讨论