在开发中我们或许回遇到这样的情况:有一个Web 版的文件浏览器,通过它用户可以在浏览器里查看服务器上的目录和文件。这个项目依赖运维部门提供的文件浏览服务,而这个文件浏览服务只支持消息队列(MQ)方式接入。消息队列在互联网大厂中用的非常多,主要用作流量削峰和系统解耦。在这种接入方式中,发送消息和消费结果这两个操作之间是异步的,你可以参考下面的示意图来理解。
消息队列(MQ)示意图
在这个 Web 项目中,用户通过浏览器发过来一个请求,会被转换成一个异步消息发送给 MQ,等 MQ 返回结果后,再将这个结果返回至浏览器。问题来了:给 MQ 发送消息的线程是处理 Web 请求的线程 T1,但消费 MQ 结果的线程并不是线程 T1,那线程 T1 如何等待 MQ 的返回结果呢?示例代码如下。
class Message{ String id; String content; } // 该方法可以发送消息 void send(Message msg){ // 省略相关代码 } //MQ 消息返回后会调用该方法 // 该方法的执行线程不同于 // 发送消息的线程 void onMessage(Message msg){ // 省略相关代码 } // 处理浏览器发来的请求 Respond handleWebReq(){ // 创建一消息 Message msg1 = new Message("1","{...}"); // 发送消息 send(msg1); // 如何等待 MQ 返回的消息呢? String result = ...; }
上面遇到的问题,在现实世界里比比皆是,只是我们一不小心就忽略了。比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:“您预订的包间服务员正在收拾,请您稍等片刻。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。
我们等待包间收拾完的这个过程和前面的等待 MQ 返回消息本质上是一样的,都是 等待一个条件满足
:就餐需要等待包间收拾完,程序里要等待 MQ 返回消息。
通过类比,我们的程序也需要大堂经理这样的逻辑,但是怎么去设计呢?其实设计方案前人早就搞定了,而且还将其总结成了一个设计模式: Guarded Suspension
。所谓 Guarded Suspension,直译过来就是“保护性地暂停”。那下面我们就来看看,Guarded Suspension 模式是如何模拟大堂经理进行保护性地暂停的。
下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject,内部有一个成员变量——受保护的对象,以及两个成员方法—— get(Predicate<T> p)
和 onChanged(T obj)
Guarded Suspension 模式结构图
GuardedObject 的内部实现非常简单,是管程的一个经典用法,你可以参考下面的示例代码,核心是:get() 方法通过条件变量的 await() 方法实现等待,onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能。
class GuardedObject<T>{ // 受保护的对象 T obj; final Lock lock = new ReentrantLock(); final Condition done = lock.newCondition(); final int timeout=1; // 获取受保护对象 T get(Predicate<T> p) { lock.lock(); try { //MESA 管程推荐写法 while(!p.test(obj)){ done.await(timeout, TimeUnit.SECONDS); } }catch(InterruptedException e){ throw new RuntimeException(e); }finally{ lock.unlock(); } // 返回非空的受保护对象 return obj; } // 事件通知方法 void onChanged(T obj) { lock.lock(); try { this.obj = obj; done.signalAll(); } finally { lock.unlock(); } } }
Guarded Suspension 模式里 GuardedObject 有两个核心方法,一个是 get() 方法,一个是 onChanged() 方法。很显然,在处理 Web 请求的方法 handleWebReq() 中,可以调用 GuardedObject 的 get() 方法来实现等待;在 MQ 消息的消费方法 onMessage() 中,可以调用 GuardedObject 的 onChanged() 方法来实现唤醒。
// 处理浏览器发来的请求 Respond handleWebReq(){ // 创建一消息 Message msg1 = new Message("1","{...}"); // 发送消息 send(msg1); // 利用 GuardedObject 实现等待 GuardedObject<Message> go =new GuardObjec<>(); Message r = go.get( t->t != null); } void onMessage(Message msg){ // 如何找到匹配的 go? GuardedObject<Message> go=??? go.onChanged(msg); }
但是在实现的时候会遇到一个问题,handleWebReq() 里面创建了 GuardedObject 对象的实例 go,并调用其 get() 方等待结果,那在 onMessage() 方法中,如何才能够找到匹配的 GuardedObject 对象呢?
我们可以参考大堂经理识别就餐人的办法,来扩展一下 Guarded Suspension 模式,从而使它能够很方便地解决我们的问题。在程序中,每个发送到 MQ 的消息,都有一个唯一性的属性 id,所以我们可以维护一个 MQ 消息 id 和 GuardedObject 对象实例的关系。
下面的示例代码是扩展 Guarded Suspension 模式的实现,扩展后的 GuardedObject 内部维护了一个 Map,其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 对象实例,同时增加了静态方法 create() 和 fireEvent();create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑。
class GuardedObject<T>{ // 受保护的对象 T obj; final Lock lock = new ReentrantLock(); final Condition done = lock.newCondition(); final int timeout=2; // 保存所有 GuardedObject final static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>(); // 静态方法创建 GuardedObject static <K> GuardedObject create(K key){ GuardedObject go=new GuardedObject(); gos.put(key, go); return go; } static <K, T> void fireEvent(K key, T obj){ GuardedObject go=gos.remove(key); if (go != null){ go.onChanged(obj); } } // 获取受保护对象 T get(Predicate<T> p) { lock.lock(); try { //MESA 管程推荐写法 while(!p.test(obj)){ done.await(timeout, TimeUnit.SECONDS); } }catch(InterruptedException e){ throw new RuntimeException(e); }finally{ lock.unlock(); } // 返回非空的受保护对象 return obj; } // 事件通知方法 void onChanged(T obj) { lock.lock(); try { this.obj = obj; done.signalAll(); } finally { lock.unlock(); } } }
这样利用扩展后的 GuardedObject 来解决我们之前的问题就很简单了,具体代码如下所示。
// 处理浏览器发来的请求 Respond handleWebReq(){ int id= 序号生成器.get(); // 创建一消息 Message msg1 = new Message(id,"{...}"); // 创建 GuardedObject 实例 GuardedObject<Message> go= GuardedObject.create(id); // 发送消息 send(msg1); // 等待 MQ 消息 Message r = go.get( t->t != null); } void onMessage(Message msg){ // 唤醒等待的线程 GuardedObject.fireEvent( msg.id, msg); }
Guarded Suspension 模式本质上是一种等待唤醒机制的实现,只不过 Guarded Suspension 模式将其规范化了。但 Guarded Suspension 模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对 GuardedObject 的功能进行了增强,Dubbo 中 DefaultFuture 这个类也是采用的这种方式,你可以对比着来看,相信对 DefaultFuture 的实现原理会理解得更透彻。当然,你也可以创建新的类来实现对 Guarded Suspension 模式的扩展。
Guarded Suspension 模式也常被称作 Guarded Wait 模式、Spin Lock 模式(因为使用了 while 循环去等待),这些名字都很形象,不过它还有一个更形象的非官方名字:多线程版本的 if。单线程场景中,if 语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着 if 判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if 判断条件的结果是可能发生变化的。所以,用“多线程版本的 if”来理解这个模式会更简单。