ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。
相比于同样线程安全的HashTable来说,效率等各方面都有极大地提高。
这里先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。
synchronized
java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。
synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式: 偏向锁,轻量级锁,重量级锁
。
偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。
重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。
CAS
volatile(非锁)
java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i++操作,不保证每次结果都正确,因为i++操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况 volatile是无法保证的。
自旋锁
分段锁
ReentrantLock
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面,官方给出的解释如下:
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value都不能为null if (key == null || value == null) throw new NullPointerException(); // 计算hash值 int hash = spread(key.hashCode()); // 要插入的元素所在桶的元素个数 int binCount = 0; // 死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 如果桶未初始化或者桶个数为0,则初始化桶 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作 // 如果使用CAS插入元素成功,则break跳出循环,流程结束 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 如果要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素 tab = helpTransfer(tab, f); else { // 如果这个桶不为空且不在迁移元素,则锁住这个桶(分段锁) // 并查找要插入的元素是否在这个桶中 // 存在,则替换值(onlyIfAbsent=false) // 不存在,则插入到链表结尾或插入树中 V oldVal = null; synchronized (f) { // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过 if (tabAt(tab, i) == f) { // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树) // 那就是桶中的元素使用的是链表方式存储 if (fh >= 0) { // 桶中元素个数赋值为1 binCount = 1; // 遍历整个桶,每次结束binCount加1 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false) // 并退出循环 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 如果到链表尾部还没有找到元素 // 就把它插入到链表结尾并退出循环 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 如果第一个元素是树节点 Node<K,V> p; // 桶中元素个数赋值为2 binCount = 2; // 调用红黑树的插入方法插入元素 // 如果成功插入则返回null // 否则返回寻找到的节点 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false) // 并退出循环 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 如果binCount不为0,说明成功插入了元素或者寻找到了元素 if (binCount != 0) { // 如果链表元素个数达到了8,则尝试树化 // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数 // 所以不会重复树化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果要插入的元素已经存在,则返回旧值 if (oldVal != null) return oldVal; // 退出外层大循环,流程结束 break; } } } // 成功插入元素,元素个数加1(是否要扩容在这个里面) addCount(1L, binCount); // 成功插入元素返回null return null; }
整体流程跟HashMap比较类似,大致是以下几步:
添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。
为什么使用synchronized而不是ReentrantLock?
因为synchronized已经得到了极大地优化,在特定情况下并不比ReentrantLock差。
第一次放元素时,初始化桶数组。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化 // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环 // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU // 如果下一次循环更新完毕了,则table.length!=0,退出循环 try { // 再次检查table是否为空,防止ABA问题 if ((tab = table) == null || tab.length == 0) { // 如果sc为0则使用默认值16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 新建数组 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 赋值给table桶数组 table = tab = nt; // 设置sc为数组长度的0.75倍 // n - (n >>> 2) = n - n/4 = 0.75n // 可见这里装载因子和扩容门槛都是写死了的 // 这也正是没有threshold和loadFactor属性的原因 sc = n - (n >>> 2); } } finally { // 把sc赋值给sizeCtl,这时存储的是扩容门槛 sizeCtl = sc; } break; } } return tab; }
每次添加元素后,元素数量加1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 这里使用的思想跟LongAdder类是一模一样的(后面会讲) // 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想) // 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段 // 这样可以保证尽量小的减少冲突 // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 如果as为空 // 或者长度为0 // 或者当前线程所在的段为null // 或者在当前线程的段上加数量失败 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋) // 不同线程对应不同的段都更新失败了 // 说明已经发生冲突了,那么就对counterCells进行扩容 // 以减少多个线程hash到同一个段的概率 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 计算元素个数 s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果元素个数达到了扩容门槛,则进行扩容 // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs是扩容时的一个邮戳标识 int rs = resizeStamp(n); if (sc < 0) { // sc<0说明正在扩容中 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 扩容已经完成了,退出循环 // 正常应该只会触发nextTable==null这个条件,其它条件没看出来何时触发 break; // 扩容未完成,则当前线程加入迁移元素中 // 并把扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 这里是触发扩容的那个线程进入的地方 // sizeCtl的高16位存储着rs这个扩容邮戳 // sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads) // 所以官方说的扩容时sizeCtl的值为 -(1+nThreads)是错误的 // 进入迁移元素 transfer(tab, null); // 重新计算元素个数 s = sumCount(); } } }
线程添加元素时发现正在扩容且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空 // 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素 // 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0,说明正在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 当前线程帮忙迁移元素 transfer(tab, nextTab); break; } } return nextTab; } return table; }
当前桶元素迁移完成了才去协助迁移其它桶元素;
扩容时容量变为两倍,并把部分元素迁移到其它桶中。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating // 如果nextTab为空,说明还没开始迁移 // 就新建一个新桶数组 try { // 新桶数组是原桶的两倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // 新桶数组大小 int nextn = nextTab.length; // 新建一个ForwardingNode类型的节点,并把新桶数组存储在里面 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 整个while循环就是在算i的值,过程太复杂,不用太关心 // i的值会从n-1依次递减,感兴趣的可以打下断点就知道了 // 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { // 如果一次遍历完成了 // 也就是整个map所有桶中的元素都迁移完成了 int sc; if (finishing) { // 如果全部迁移完成了,则替换旧桶数组 // 并设置下一次扩容门槛为新桶数组容量的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 当前线程扩容完成,把扩容线程数-1 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 扩容完成两边肯定相等 return; // 把finishing设置为true // finishing为true才会走到上面的if条件 finishing = advance = true; // i重新赋值为n // 这样会再重新遍历一次桶数组,看看是不是都迁移完成了 // 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // 如果桶中无数据,直接放入ForwardingNode标记该桶已迁移 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // 如果桶中第一个元素的hash值为MOVED // 说明它是ForwardingNode节点 // 也就是该桶已迁移 advance = true; // already processed else { // 锁定该桶并迁移元素 synchronized (f) { // 再次判断当前桶第一个元素是否有修改 // 也就是可能其它线程先一步迁移了元素 if (tabAt(tab, i) == f) { // 把一个链表分化成两个链表 // 规则是桶中各元素的hash与桶大小n进行与操作 // 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中 // 其中低位链表迁移到新桶中的位置相对旧桶不变 // 高位链表迁移到新桶中位置正好是其在旧桶的位置加n // 这也正是为什么扩容时容量在变成两倍的原因 Node<K,V> ln, hn; if (fh >= 0) { // 第一个元素的hash值大于等于0 // 说明该桶中元素是以链表形式存储的 // 这里与HashMap迁移算法基本类似 // 唯一不同的是多了一步寻找lastRun // 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表 // 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0 // 则最后后面三个0对应的元素肯定还是在同一个桶中 // 这时lastRun对应的就是倒数第三个节点 // 至于为啥要这样处理,我也没太搞明白 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 看看最后这几个元素归属于低位链表还是高位链表 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 遍历链表,把hash&n为0的放在低位链表中 // 不为0的放在高位链表中 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 低位链表的位置不变 setTabAt(nextTab, i, ln); // 高位链表的位置是原位置加n setTabAt(nextTab, i + n, hn); // 标记当前桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } else if (f instanceof TreeBin) { // 如果第一个元素是树节点 // 也是一样,分化成两颗树 // 也是根据hash&n为0放在低位树中 // 不为0放在高位树中 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍历整颗树,根据hash&n是否为0分化成两颗树 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果分化的树中元素个数小于等于6,则退化成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位树的位置不变 setTabAt(nextTab, i, ln); // 高位树的位置是原位置加n setTabAt(nextTab, i + n, hn); // 标记该桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } } } } } }