之前一篇文章已经讲解了阻塞队列SynchronousQueue的大部分内容,其中默认的非公平策略还未说明,本文就紧接上文继续讲解其中的非公平策略下的内部实现,顺便简单说明其涉及到的线程池部分的使用
回顾一下,SynchronousQueue通过两个内部类实现了公平策略和非公平策略的无缓存阻塞队列,每种操作都需要对应的互补操作同时进行才能完成,例如,入队操作必然对应出队操作,在不涉及超时和中断的情况下,必须等待另一个线程进行出队操作,两两匹配才能执行,否则就阻塞等待
之前已经对公平策略下的内部类实现TransferQueue做了详细的说明,今天就非公平策略下的内部实现类TransferStack进行说明
不同于公平策略下的操作,只有一种状态需要注意:
SNode基于栈的节点实现,变量与QNode有些不同,其中match在两个操作匹配上之后可以通过这个变量找到其匹配的节点,节点类型mode在使用上也有所不同,下面使用到时会进行说明,其他参数可参考TransferQueue的QNode说明
static final class SNode { // next指向栈中下一个元素 volatile SNode next; // next node in stack // 和当前节点匹配的节点 volatile SNode match; // the node matched to this // 等待线程 volatile Thread waiter; // to control park/unpark // 节点内容 Object item; // data; or null for REQUESTs // 节点类型 int mode; // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } // CAS更新next字段 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * * @param s the node to match * @return true if successfully matched to s */ // 尝试s节点与当前节点进行匹配,成功则唤醒等待线程继续执行 // 在使用到时才能理解,同时可参考我举例上的图示说明部分 boolean tryMatch(SNode s) { // match == null 表明当前节点未被其他节点匹配上 // cas更新match字段为s if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; // 当前节点等待线程未被其他线程操作 if (w != null) { // waiters need at most one unpark // 唤醒等待线程同时将waiter置空 waiter = null; LockSupport.unpark(w); } return true; } // 判断当前节点是否已与s进行匹配 return match == s; } /** * Tries to cancel a wait by matching node to itself. */ // 尝试取消操作 将match置为this void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } // 判断tryCancel是否操作成功 boolean isCancelled() { return match == this; } // 获取match和next在对象中的偏移量 // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
变量部分需要注意的就在于这3种类型,其中FULFILLING需要注意的是这个变量不是直接进行使用的,而是与其他两种操作进行位操作时使用
为什么是2呢?因为2在二进制中表示10,在高位上有个1,而REQUEST与DATA的0和1二进制只在最低位上用二进制0和1表示即可,我们可以通过FULFILLING与其他两种类型位操作使得高位不同来判断节点是否已经被其他节点匹配互补上,此时还能通过最低位判断出此节点是什么操作,当然,由于是栈结构,主要在栈顶元素,这里通过高位的不同来判断出是去匹配节点操作还是帮助匹配的两个节点进行一些操作,在后面要说明的transfer部分你会看到有3个条件分支执行,第3个即为帮助已经确定匹配的两个节点进行一些操作以便尽快完成出栈让自己继续执行匹配操作
/** Node represents an unfulfilled consumer */ // 数据请求操作 如take操作 代表未被匹配上的消费者 static final int REQUEST = 0; /** Node represents an unfulfilled producer */ // 数据保存操作 如put操作 代表未被匹配上的生产者 static final int DATA = 1; /** Node is fulfilling another unfulfilled DATA or REQUEST */ // 有节点与其匹配,相当于已经有互补操作,使用上不是直接使用,可参考后面的源码部分 static final int FULFILLING = 2; /** The head (top) of the stack */ // 栈顶指针 volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } }
CAS更新栈顶指针,比较简单
boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); }
判断m对应的节点是否已经被匹配,和FULFILLING进行位与操作,判断m对应的栈节点处于FULFILLING状态,即已经匹配上了,在transfer里与栈顶节点非相同操作时会入栈一个节点,此节点的mode和普通节点不一样,会通过FULFILLING|mode操作更新mode,故这里最低位来区分是保存数据还是请求数据,高位来区分此节点是否是已经找到匹配节点的节点,当然,只在此次操作中使用,具体参见下面方法的说明
/** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
创建或重置SNode节点,如果为空则创建新的SNode节点,不为空则重置节点的mode和next属性
/** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; }
类似于在公平模式下的TransferQueue.transfer,入队和出队操作,统一使用一个方法,即实现接口中的transfer方法来完成,需要明白的是保存的是每次操作这个动作,当然,与TransferQueue.transfer有所不同的在于这里有3个条件分支,按顺序含义如下:
需要注意的就是上面多次提醒的mode变量部分,需要好好理解
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 节点类型,是put还是take操作,即是保存数据还是请求数据 int mode = (e == null) ? REQUEST : DATA; for (;;) { // 获取栈顶指针 SNode h = head; // 栈为空 // 或栈顶节点和当前操作节点为相同操作 if (h == null || h.mode == mode) { // empty or same-mode // 设置超时时间且超时时间小于等于0 if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) // 栈顶非空且栈顶节点为取消操作状态 // 出栈,尝试将栈顶节点更新 casHead(h, h.next); // pop cancelled node else return null; // 创建节点,尝试更新栈顶节点 } else if (casHead(h, s = snode(s, e, h, mode))) { // 通过awaitFulfill方法自旋阻塞找到匹配操作的节点,这个下面进行说明 // 可以类比公平模式下的awaitFulfill SNode m = awaitFulfill(s, timed, nanos); // 取消或超时 if (m == s) { // wait was cancelled // 清理节点,取消本次操作 clean(s); return null; } // 栈顶节点更新为s的next元素 // 执行到这一步时应该是栈顶两个节点进行了匹配 // 出栈栈顶2个节点元素,帮助更新栈顶元素为第三个节点元素即为s.next // 当然,也可能另一个栈顶节点线程帮助更新了 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 判断下,如果当前是请求数据,即take操作,返回m.item值,即返回匹配节点的item // 当前是保存数据,即put操作,返回s.item return (E) ((mode == REQUEST) ? m.item : s.item); } // 与栈顶节点非相同操作,栈顶元素非匹配互补节点 } else if (!isFulfilling(h.mode)) { // try to fulfill // 栈顶元素处于取消操作状态 if (h.isCancelled()) // already cancelled // 尝试出栈更新栈顶元素 casHead(h, h.next); // pop and retry // 入栈新创建的节点,同时FULFILLING|mode 位与操作 // s的mode为10或者11 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 进入这里表明s已经为栈顶节点,而且s.next是其匹配节点 // 循环直到匹配上 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 空则可能被其他线程匹配上了则更新头节点为null,重新进入外层循环 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node // 这里s节点需置空,因为比较特殊,mode不同于普通节点 // 重新循环时根据情况重新创建节点 s = null; // use new node next time break; // restart main loop } // SNode mn = m.next; // 尝试m与s进行匹配,实际上是更新m节点的match为s,同时唤醒m的等待线程 if (m.tryMatch(s)) { // 成功则出栈栈顶两个元素,即更新栈顶节点 casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 未匹配上,可能被其他节点匹配上了,尝试更新s的next指针,再继续匹配 s.casNext(m, mn); // help unlink } } // 不满足上边两个条件,即此时栈顶为匹配节点,还未匹配完成,这里帮忙完成匹配出栈操作 // 注意,这里只是帮助更新head和next并不做其他操作,参考上面方法的处理 } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
与TransferQueue.awaitFulfill类似,在当前操作同之前操作相同时,未设置操作时间同时未被外部线程中断则需阻塞等待匹配节点唤醒当前阻塞的线程,整体上非常相似,由于match的存在使得判断对应的匹配节点要比TransferQueue.awaitFulfill简单许多
/** * Spins/blocks until node s is matched by a fulfill operation. * * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 获取超时时间点 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前线程 Thread w = Thread.currentThread(); // shouldSpin判断是否需要进行自旋,下一个方法进行说明 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 判断当前线程是否中断,外部中断操作,相当于取消本次操作 if (w.isInterrupted()) // 尝试将s节点的match设置为s自己,这样判断的时候就知道这个节点是被取消的 s.tryCancel(); SNode m = s.match; // match非空则表示当前节点已经被匹配match匹配上 if (m != null) return m; // 超时配置处理 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 自旋spins if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 设置等待线程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 未设置超时,直接阻塞 else if (!timed) LockSupport.park(this); // 设置超时时间阻塞 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
判断是否需要自旋操作,满足下列情况之一即需要自旋:
/** * Returns true if node s is at head or there is an active * fulfiller. */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); }
清理操作,清理栈节点s的关联关系,同时会清理整个栈节点的取消操作节点,无cleanMe节点,比TransferQueue.clean操作要简单许多
/** * Unlinks s from the stack. */ void clean(SNode s) { // item,waiter 置空 s.item = null; // forget item s.waiter = null; // forget thread // s的下一个节点处于取消操作状态,则past指向past的下一个节点 SNode past = s.next; if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head // 头节点被取消操作则进行将next节点更新为头节点 SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes // 头节点调整完毕,现在将栈节点中每个节点都会进行检查一遍,更新前后节点的关系,将取消操作的节点进行排除 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
参考公平模式下的代码,通过下列最简单的示例进行说明,一个线程take操作,一个线程put操作,画图进行说明
public class SynchronousQueueTest { public static void main(String[] args) { BlockingQueue<String> sq = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(sq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { sq.put("test"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
1.创建非公平策略下的SynchronousQueue,new TransferStack<E>() 无参构造方法默认,变量上没有进行任何操作
2.一线程执行take操作,以先执行take的线程为例子进行说明,此时另一线程put操作还未执行,take操作阻塞等待
3.另一线程执行put操作,通过!isFulfilling判断出当前栈顶未与其他节点匹配,则其尝试与栈顶节点匹配,成功则唤醒之前阻塞等待的take操作,同时处理完成
最终执行return (E) ((mode == REQUEST) ? m.item : s.item),获取操作结果,当然,其中还有一个条件分支可以帮助匹配互补更新操作,这部分自行读者可自行画图理解
线程池使用Executors.newCachedThreadPool()方法创建可缓冲线程池,这里看下源码实现:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
核心线程设置为0,最大线程池设置Integer.MAX_VALUE,存活时间60s,阻塞队列使用SynchronousQueue,默认非公平模式,可缓冲线程池通过复用空闲线程提高效率,当然,如果我们使用这种方式创建线程池可能会带来一些问题
这会造成什么问题呢?
这里最大线程数设置为Integer.MAX_VALUE,可能会创建非常多的线程,甚至导致OOM,所以阿里规范中提及了这部分内容,指出了其中存在的隐患,需要规避资源耗尽的风险,开发人员应直接使用ThreadPoolExecutor来创建线程池,每个参数需要根据自己的需求进行设置
至此,SynchronousQueue的非公平策略的内部实现也已讲解完毕,非公平策略下要注意其对于mode部分状态的处理,通过高位和低位分别区分是否已匹配和是什么类型的操作(生产者还是消费者),理解了这部分,对于非公平模式下的整体操作流程也能很快熟悉,相对来说不是十分复杂,多画图观察代码执行过程能帮助更好的理解
SynchronousQueue作为一个无数据缓冲的阻塞队列,其内部通过两个内部类(队列和栈)分别实现了公平策略和非公平策略下的队列操作,其实我们需要记住的在于其操作必须是成双成对的,在无超时无中断的情况下,一个线程执行入队操作,必然需要另一个线程执行出队操作,此时两操作互相匹配,同时完成操作,这也是其取名为Synchronous(同时发生)的含义吧
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢