ConcurrentHashMap 用法上与 HashMap 差别不大,但 ConcurrentHashMap 是线程安全的,可以在多线程环境中使用。这篇文章主要会说明 ConcurrentHashMap 专有的一些特点,与 HashMap 类似部分将不再赘述。
本文基于 JDK1.8
ConcurrentHashMap 的代码复杂度高了不少,用到了很多的成员变量和常量,先认识一下(HashMap 已经存在的变量或常量就不再赘述)。
// 默认并发度,同时允许多少个线程访问 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 扩容时每个线程扩容时至少要迁移的桶的数量,最低不能少于 16 private static final int MIN_TRANSFER_STRIDE = 16; // 辅助变量,没啥用 private static int RESIZE_STAMP_BITS = 16; // 可用于扩容的最大线程数,但一般肯定到不了这个数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 会用来计算一个标志位,实际上也没什么用 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 下面三个常量是几个特殊的哈希值 // MOVED:表示桶正在经被迁移 // TREEBIN:表示桶正在进行树化 // RESERVED:表示节点运行 computeIfAbsent 等方法 static final int MOVED = -1; static final int TREEBIN = -2; static final int RESERVED = -3; // 用于计算 key 的 hash 值 static final int HASH_BITS = 0x7fffffff; // CPU 的数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); 复制代码
这些变量基本都使用了 volatile 关键字,那是因为这些变量的再并发环境中必须都保持可见性。
// 桶数组,与 HashMap 基本一致,也是延迟加载,不过这里使用了 volatile 关键字 transient volatile Node<K,V>[] table; // 桶数组,用于扩容 private transient volatile Node<K,V>[] nextTable; // 记录所有的元素的个数,类似于 HashMap 的 size private transient volatile long baseCount; // 初始化和扩容的标志位 // 默认值:0 // 初始化前:初始化容量大小 // 正在初始化:-1 // 扩容前:触发扩容操作的元素个数,相当于 HashMap 的 threshold // 正在扩容:-(1 + 参与扩容的线程数量) private transient volatile int sizeCtl; // 扩容的时候需要对桶内的元素进行迁移,这个变量用来记录桶的下标,表示迁移的进度,下面会详细介绍这个变量 private transient volatile int transferIndex; // 更新 counterCells 时使用的自旋锁 private transient volatile int cellsBusy; // 计数用,用于计算还没来的及更新到 baseCount 中的变化 private transient volatile CounterCell[] counterCells; 复制代码
与 HashMap 相比,ConcurrentHashMap 是线程安全的。允许多个线程并发的访问容器的不同部分来减少线程间的竞争。这个容器设计出来不是为了替代 HashMap,而是为了在满足多线程环境下的需求,它有两个设计目标:
总的来说就是 ConcurrentHashMap 既要能支持高并发,也要有高性能。具体实现也经过了多次变化,特别是在 JDK1.8,几乎进行了重写,底层的存储机制也完全不一致。JDK 1.7 和 JDK1.8 底层存储的差异:
// JDK1.7 final Segment<K,V>[] segments; transient volatile HashEntry<K,V>[] table; // 每一个分段锁都会有一个 table 复制代码
// JDK1.8 transient volatile Node<K,V>[] table; 复制代码
在 JDK1.8 中,并发的粒度更细一些,可以认为 table 的长度就是并发数,而之前的版本中,Segment 的数量是并发度。
因为使用了 CAS,所以在 ConcurrentHashMap 中存在大量的自旋操作,自旋操作其实就是一个死循环,等到完成操作时就会通过 break 跳出循环。
ConcurrentHashMap 的 hash 函数与 HashMap 的相差不大,不过除了与自身进行 XOR(异或) 操作,还会与 HASH_BITES
进行与运算:
// ConcurrentHashMap.spread() static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 复制代码
HASH_BITS
的二进制表示是:
01111111 11111111 11111111 11111111 复制代码
在 JDK1.8 以前,ConcurrentHashMap 主要使用分段锁的机制来实现,在 JDK 1.8 及以后,主要使用了 CAS(sun.misc.Unsafe) + synchronized 来实现。CAS 是一种无锁的并发技术,以高效率著称,CAS 需要硬件的支持,如今的 CPU 都支持这一特性。
但 ConcurrentHashMap 并没有实现自己的 CAS,而是直接使用了 sun.misc.Unsafe
(最新的 JDK 中已经换成 jdk.internal.misc.Unsafe)。
ConcurrentHashMap 利用 CAS 实现了了以下三个原子方法来访问桶的 第一个元素 :
// 获取桶的某个位置,任何情况下可以使用 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE); } // 插入桶的第一个键值对,可以在并发环境下,任何情况下可以使用 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) { return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } // 把键值对插入到桶中,只在有锁的的区域使用 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v); } 复制代码
桶的第一个元素有特殊的意义,在 ConcurrentHashMap 中通常被用作桶的锁
CAS 除了用来访问桶之外,在用在其他需要并发更新变量的地方。比如更新 sizeCtl 变量:
// ConcurrentHashMap.initTable() // 将容器的状态设置为正在扩容 U.compareAndSetInt(this, SIZECTL, sc, -1) 复制代码
synchronized 给人的印象是很慢,很臃肿,其实这是一个误解,synchronized 底层经过不断的优化,目前性能已经与可重入锁相当。而且 synchronized 使用简单,也不会造成是死锁的情况,所以一般情况下能用 synchronized 就别用锁了,除非满足不了需求再考虑用锁。
在 ConcurrentHashMap 中 synchronized 使用时粒度都比较小,被 synchronized 包裹的代码不是很多,所以还是可以保持高性能。这也是 ConcurrentHashMap 与 Hashtable 的最大区别。Hashtable 也是使用 synchronized 来保证线程安全,但是 synchronized 都是在方法级别使用,这样就会让整个容器的并发级别很低。
扩容是一个很慢的操作,可以事先预估好大小,可以减少扩容的次数。扩容机制与 HashMap 有些不同,因为 ConcurrentHashMap 可以并发访问,所以在扩容时 写操作 的线程都不能继续,但是这些线程也可以被利用起来,参与到扩容操作中。
对容器的扩容分为两种情况:
初始化和扩容这两个过程不是独立存在的,通过下面这个图来看清整体流程时如何进行的:
实例化时会确定table大小,但是不初始化table,以及确定下一次扩容的临界点,如果构造函数传入的是另一个 Map,调用 tryPresize 来扩容。
在首次插入元素时,会初始化 table(延迟加载),调用 initTable() 进行初始化。
如果不是首次插入元素,判断是否正在扩容,如果是,则停止操作(除了 get() 操作),参与扩容流程。扩容完成后,通过自旋再次进行操作(插入或者更新),插入元素时需要检查是否达到树化的条件,如果满足,将链表转成树。插入完成后调用 addCount() 检查容器状态,如果元素大于等于扩容临界点的值,则开始扩容
初始化通过 initTable()
方法来完成。
// ConcurrentHashMap.initTable() private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 检查当前桶是否为空,为空则开始初始化 while ((tab = table) == null || tab.length == 0) { // 发现正在初始化或者在扩容,则什么也不做,进入自旋状态等待表被初始化(或扩容)完成 if ((sc = sizeCtl) < 0) Thread.yield(); // 放弃 CPU 资源 // 线程开始扩容时会把 sizeCtl 的值置为 -1,让其他线程发现正在进行初始化 else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // 确定初始化桶的数量,如果 sizeCtl 大于 0 则使用 sizeCtl 的值,否则使用默认容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 设置扩容阀值,ConcurrentHashMap 中的装载因子仅仅在构造函数中使用 sc = n - (n >>> 2); // 相当于 n * 0.75 } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码
扩容操作通过下面两个方法来发起:
addcount() 在改变容器元素的方法中被调用,主要就是检查容器当前的状态,判断是否需要扩容,如果需要,就会进行扩容。
// ConcurrentHashMap.addcount() // 这个方法主要用来给当前容器的数量进行计数顺便检查一下是否需要扩容 private final void addCount(long x, int check) { CounterCell[] cs; long b, s; // 给容器中的元素进行增或者减 // 如果 cs 不为 null(说明有并发情况)或者 baseCount 增减运算失败, if ((cs = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell c; long v; int m; boolean uncontended = true; // 那么就会通过 cs 来进行计数, // 如果 cs 是空(还不是并发)或者 (cs 中随机取余一个数组位置为空 或者 cs 这个位置的变量失败) // 说明通过 cs 来计数也失败了,最后才会调用 fullAddCount 来进行计数 if (cs == null || (m = cs.length - 1) < 0 || (c = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { // 与 LongAdder 实现一致,可以理解为并发情况下的一个计数器 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 统计当前节点的数量 s = sumCount(); } // 在增加元素的操作中 check 都会满足这个条件 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 检查扩容条件: // 1. 是否达到阀值: s >= sizeCtl (上文已经解释了 sizeCtl,sizeCtl 大于 0 时表示下次扩容的临界点) // 2. 是否可以扩容: tab != null && tab 当前的长度小于 1 << 30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 根据当前桶的数量生成一个标志位 int rs = resizeStamp(n); // 如果正在扩容 if (sc < 0) { // 检查当前扩容的进展: // 1. 如果 sc 的低 16 位不等于标识位( sizeCtl 变化了,说明容器状态已经变化),退出 // 2. 如果 sc == 标识位 + 1 (通过下面代码可知,刚开始扩容时, sc = rs + 2,如果 sc = rs + 1,说明已经没有线程在扩容),退出 // 3. 如果 sc == 标识符 + 65535,参与扩容的线程已经达到最大数量,当前线程不再参与,退出 // 4. 如果 nextTable == null 说明扩容结束(nextTable 在扩容中起中转作用,所有的元素会被限移到 nextTable 中,最后让 tab = nextTable,nextTable == null 来完成扩容),退出 // 5. transferIndex <= 0 说明没有桶还需要迁移了(transferIndex 用于标识当前迁移到哪个桶了,小于等于 0 说明已经迁移到最后一个桶或者已经迁移完成,迁移的顺序是从最后一个桶开始),退出。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 如果迁移还是进行,当前线程尝试参与扩容 if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 如果当前不在扩容中,则发起一个新的扩容 else if (U.compareAndSetInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); // 统计当前节点的数量 s = sumCount(); } } } 复制代码
tryPresize 相比于 addcount 方法相对简单,就是尝试进行扩容:
// ConcurrentHashMap.tryPresize() private final void tryPresize(int size) { // 根据 size 计算扩容的容量 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; // 判断是否可以进行扩容,如果 sizeCtl <= 0,说明已经在扩容中,那么久不会再进行扩容 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 如果当前容器还没有初始化,则进行初始化,与 initTable 相同 if (tab == null || (n = tab.length) == 0) { // 当前的扩容阀值与传入的值之间选大的作为这次初始化的大小 n = (sc > c) ? sc : c; // 进入初始化状态 if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); // 相当于 n * 0.75 } } finally { sizeCtl = sc; } } } // 如果还每达到扩容的阀值或者超过了最大容量,则停止扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { // 开始进行扩容 int rs = resizeStamp(n); if (U.compareAndSetInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } 复制代码
扩容的具体操作是通过 transfer()
方法来完成。
// ConcurrentHashMap.transfer() 该方法用于将元素都迁移到 nextTable 中 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 在迁移元素时,会将桶分段,stride 表示每段的长度,最小值为 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 初始化 nextTable if (nextTab == null) { try { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; // 这个变量用于记录当前迁移的进度,需要注意的是迁移元素从最后一个桶开始 transferIndex = n; } int nextn = nextTab.length; // fwd 是一个特殊的 Node,没有 key,也没有 val,hash 值为 MOVED,用来标识一个桶已经迁移完毕 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 用来控制迁移的进展,如果为 true 说明当前这次循环要干的事情已经完成,可以开始下一个循环 boolean advance = true; // 标示当前线程所有桶的迁移是否完成 boolean finishing = false; // 当前线程需要处理的桶的范围 [nextBound, nextindex) for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; // transferIndex <= 0 表示已经迁移完成 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSetInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 为当前线程分配桶的区间,当前线程需要将负责这个区间内的桶元素迁移到 nextTable 中 bound = nextBound; i = nextIndex - 1; advance = false; } } // 判断当前线程是否完成所有桶的迁移 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 如果为 true,说明所有的迁移任务已经完成 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); // 相当于 n * 0.75 return; } // 将参与扩容的线程数量减 1 if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 如果不相等说明还有其他的线程在参与扩容,当前线程直接退出就行,这行代码与 tryPresize() 中传入的参数有关,第一个进行扩容的线程传入的 sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,所以如果这是最后一个线程,那么 sc - 2 == resizeStamp(n) << RESIZE_STAMP_SHIFT if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 最后退出的线程需要再检查一遍容器的状态 finishing = advance = true; i = n; } } // 如果桶中的元素都迁移完成了,则在桶的节点置为 MOVED,表示桶中的元素都迁移完成了 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // 当前桶已经被处理 else { // 如果上面条件都不满足说明要开始迁移桶中的元素 synchronized (f) { // 省略搬运元素的代码... } } } } 复制代码
树化的方式与时机和 HashMap 基本一致。在单个桶的链表元素个数大于 8 时尝试进行树化操作,但是如果此时整个容器的容量少于 64 时,会进行扩容操作,而不是进行树化操作,树化后同样也维护元素的 next 指针来保持连接关系。
树化操作只需要对当前线程所访问的桶进行操作,所以整个过程比扩容要简单很多,是通过 CAS + synchronized 来完成。
// ConcurrentHashMap.treeifyBin() private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n; if (tab != null) { // 如果容器的容量小于 64,则会进行扩容操作,而不是进行树化操作 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); // 利用 CAS + synchronized 来把链表转成红黑树 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 把转换好的树放到桶上 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } } 复制代码
在每个桶上插入第一个元素的时候使用 CAS 就够了。如果插入的不是桶上的第一个元素,或者是删除或者更新操作,就还是要用到 synchronized。但不会为每一个元素都创建一个锁对象,而是使用桶上的第一个元素作为锁对象。但是仅仅将第一个元素上锁还不够,在更新之前,还需要验证它依然是这个桶的第一个节点,如果不是,就要进行重试。
除了 get() 操作之外,其他的 put()、clear() 等操作,都需要使用 CAS + synchronized 来进行并发访问。get 操作相对简单,直接通过 tabAt
方法获取就行。其他的操作逻辑整体就是一样的。这里主要介绍 putVal()
方法,put()、add()等向容器中增加或者更新元素的方法都是通过 putVal() 方法来完成的。
// ConcurrentHashMap.putVal() final V putVal(K key, V value, boolean onlyIfAbsent) { // key 和 value 都不允许为 null if (key == null || value == null) throw new NullPointerException(); // 做 hash 运算 int hash = spread(key.hashCode()); int binCount = 0; // 进入自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; // 如果桶还没有被初始化,则进入初始化(延迟加载) if (tab == null || (n = tab.length) == 0) tab = initTable(); // 如果这个桶为空,直接使用 CAS 方式来插入元素 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; } // 如果发现正在扩容,则参与进扩容,扩容完成之后,通过自旋的方式再次执行插入操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 执行 computeOnlyAbsent 之类的方法 else if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else { V oldVal = null; // 使用 CAS + synchronized 机制插入元素 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 对现有的键值对进行更新 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 使用尾插法插入新的元素 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } } // 如果桶上挂的是树,那就按照树的方法来插入节点 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } // 如果发现这个节点正在进行 computeIfAbsent 之类的操作,则抛出异常 else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (binCount != 0) { // 检查桶上节点的数量,如果超过 8 了,则尝试进行树化操作 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果是更新节点操作,那么节点数量就没有增加,直接返回即可 if (oldVal != null) return oldVal; break; } } } // 用这个方法来检查是否满足扩容的条件,与上面的 helpTransfer 方法不同,addCount 是在键值对插入之后再去检查是否需要扩容 addCount(1L, binCount); return null; } 复制代码
其他操作如 clear、comput、remove 等会改变容器元素的方法原理都类似,都是通过 CAS + synchronized 来更新元素,最后调用 addcount 方法来更新计数以及判断是否需要扩容。
因为是支持并发的,所以 size 方法的实现也会有点不一样,size 实际调用的是 sumCount 方法:
//ConcurrentHashMap.sumCount() final long sumCount() { // 统计 cs 和 baseCount 的和 CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { for (CounterCell c : cs) if (c != null) sum += c.value; } return sum; } 复制代码
在扩容代码中我们看到了 cs 和 baseCount 其实都是用来的统计容器个数,在并发情况下,会先记录到 cs 最后但是需要注意的是,因为 sumCount 没有加锁,所以最后返回的值也不是完全准确的。
另外 ConcurrentHashMap 使用的是 fail-safe 的机制,也就是说在迭代的过程中如果容器中的元素变化,也不会抛出 ConcurrentModificationException 异常。
最后说一下迭代器的问题,KeySetView,ValuesView,EntrySetView 这三个类分别可以迭代键、值、和键值对。具体的实现相对比较简单,而且对于迭代的过程也没有加上并发的控制,所以最后遍历的结果也不一定是准确的。
原文