HashMap 是非线程安全的,put操作可能导致死循环。其解决方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。这两种方案都是对读写加锁,独占式,效率比较低下。
HashMap 在并发执行put操作时会引起死循环,因为多线程导致 HashMap 的 Entry 链表形成环形数据结构,则 Entry 的 next 节点永远不为空,会死循环获取 Entry。
HashTable 使用 synchronized 来保证线程安全,但是在线程竞争激烈的情况下,效率非常低。其原因是所有访问该容器的线程都必须竞争一把锁。
针对上述问题,ConcurrentHashMap 使用锁分段技术,容器里有多把锁,每一把锁用于其中一部分数据,当多线程访问不同数据段的数据时,线程间就不会存在锁的竞争。
在JDK 1.7中,ConcurrentHashMap 采用了 Segment 数组和 HashEntry 数组的方式进行实现。其中 Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。而 HashEntry 则是用于存储键值对的数据。结构如下图所示:
一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素。每个 Segment 守护一个 HashEntry 数组的元素。
初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数组的大小 cap,并初始化 Segment 数组的第一个元素。其中 ssize 为2的幂次方,默认为16,cap 大小也是2的幂次方,最小值为2。最终结果根据初始化容量 initialCapacity 计算。
//计算segment数组长度 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //初始化segmentShift和SegmentMask this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; //计算每个Segment中HashEntry数组大小cap if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 初始化segment数组和segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; 复制代码
首先,初始化了 segments 数组,其长度 ssize 是通过 concurrencyLevel 计算得出的。需要保证ssize的长度是2的N次方,segments 数组的长度最大是65536。
然后初始化了 segmentShift 和 segmentMask 这两个全局变量,用于定位 segment 的散列算法。segmentShift 是用于散列运算的位数, segmentMask 是散列运算的掩码。
之后根据 initialCapaicity 和 loadfactor 这两个参数来计算每个 Segment 中 HashEntry 数组的大小 cap。
最后根据以上确定的参数,初始化了 segment 数组以及 segment[0]。
整个 get 操作过程都不需要加锁,因此非常高效。首先将 key 经过 Hash 之后定位到 Segment,然后再通过一个 Hash 定位到具体元素。不需要加锁是因为 get 方法将需要使用的共享变量都定义成 volatile 类型,因此能在线程之间保持可见性,在多线程同时读时能保证不会读到过期的值。
put 方法需要对共享变量进行写入操作,为了线程安全,必须加锁。 put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。插入操作首先要判断是否需要对 Segment 里的 HashEntry 数组进行扩容,然后定位添加元素的位置,将其放入到 HashEntry 数组。
Segment 的扩容比 HashMap 更恰当,因为后者是插入元素后判断是否已经到达容量,如果到达了就扩容,但是可能扩容后没有插入,进行了无效的扩容。Segment 在扩容时,首先创建一个原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组。同时为了高效, ConcurrentHashMap 不会对整个容器进行扩容,而是只对某个 segment 进行扩容。
每个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将所有的 count 累加即可。但是 volatile 修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。但是如果在调用 size 方法时锁住其余的操作,效率也很低。
ConcurrentHashMap 的做法是先尝试两次通过不加锁的方式进行计算,如果两次结果相同,说明结果正确。如果计算结果不同,则给每个 Segment 加锁,进行统计。
在JDK 1.8中,改变了分段锁的思路,采用了 Node数组 + CAS + Synchronized 来保证并发安全。底层仍然采用了数组+链表+红黑树的存储结构。
在JDK 1.8中,使用 Node 替换 HashEntry,两者作用相同。在 Node 中, val 和 next 两个变量都是 volatile 修饰的,保证了可见性。
使用 table 变量存放 Node 节点数组,默认为 null, 默认大小为16,且每次扩容时大小总是2的幂次方。在扩容时,使用 nextTable 存放新生成的数据,数组为 table 的两倍。
ForwardingNode 是一个特殊的 Node 节点,hash 值为-1,存储了 nextTable 的引用。只有table 发生扩容时,其发生作用,作为占位符放在 table 中表示当前节点为null或者已经被移动。
在 HashMap 中,其核心的数据结构是链表。而在 ConcurrentHashMap 中,如果链表的数据过长会转换为红黑树来处理。通过将链表的节点包装成 TreeNode 放在 TreeBin 中,然后经由 TreeBin 完成红黑树的转换。
TreeBin 不负责键值对的包装,用于在链表转换为红黑树时包装 TreeNode 节点,用来构建红黑树。
在构造函数中,ConcurrentHashMap 仅仅设置了一些参数。当首次插入元素时,才通过 initTable() 方法进行了初始化。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //有其他线程在初始化,挂起当前线程 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //获得了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //进行初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //下次扩容的大小,相当于0.75*n sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码
该方法的关键为sizeCtl。
sizeCtl:控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:
sizeCtl 默认为0。如果该值小于0,表示有其他线程在初始化,需要暂停该线程。如果该线程获取了初始化的权利,先将其设置为-1。最后将 sizeCtl 设置为 0.75*n,表示扩容的阈值。
put操作的核心思想依然是根据 hash 计算节点插入在 table 的位置,如果为空,直接插入,否则插入到链表或树中。
首先计算hash值,然后进入循环中遍历table,尝试插入。
int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //详细代码接下来分别讲述 } 复制代码
首先判断 table 是否为空,如果为空或者是 null,则先进行初始化操作。
if (tab == null || (n = tab.length) == 0) tab = initTable(); 复制代码
如果已经初始化过,且插入的位置没有节点,直接插入该节点。使用CAS尝试插入该节点。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } 复制代码
如果有线程在扩容,先帮助扩容。
//当前位置的hashcode等于-1,需要扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); 复制代码
如果都不满足,使用 synchronized 锁写入数据。根据数据结构的不同,如果是链表则插入尾部;如果是树节点,使用树的插入操作。
else { V oldVal = null; //对该节点进行加锁处理(hash值相同的链表的头节点) synchronized (f) { if (tabAt(tab, i) == f) { //fh > 0 表示为链表,将该节点插入到链表尾部 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //hash 和 key 都一样,替换value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; //putIfAbsent() if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //链表尾部 直接插入 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //树节点,按照树的插入操作进行插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } 复制代码
在for循环的最后,判断链表的长度是否需要链表转换为树结构。
if (binCount != 0) { // 如果链表长度已经达到临界值8,把链表转换为树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } 复制代码
最后,如果是更新节点,前边已经返回了 oldVal,否则就是插入新的节点。还需要使用 addCount() 方法,为 size 加一。
总结步骤如下: