一说到HashMap与Hashtable区别就会想到前者线程不安全,后者线程安全。但是当我们需要线程安全的时候,Hashtable并不是一个良好的选择,concurrentHashMap才是。
得到线程安全的HashMap有以下三种方式:
①.使用Hashtable
②.使用Collections.synchronizedMap()方法
③.使用concurrentHashMap
有这3中方式,为什么推荐使用第三种,而不是其余两个方法?答案:效率,我们来看看那两种方式的源码
public static Map synchronizedMap(Map m) { return new SynchronizedMap<>(m); }
private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } 复制代码
复制代码
复制代码
我们可以看到也使用synchronized关键字将HashMap包装成一个线程安全的map与Hashtable类似,每次只有一个线程能访问此对象。
而ConcurrentHashMap jdk1.7中采用了 分段锁 技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理。jdk1.8中放弃了 Segment 分段锁,采用了 CAS + synchronized来保证线程安全。
/** * 存放node的数组,大小是2的幂次方 */ transient volatile Node[] table; /** * 扩容时用于存放数据的变量,平时为null */ private transient volatile Node[] nextTable; /** * 通过CAS更新,记录容器的容量大小 */ private transient volatile long baseCount; /** * 控制标志符 * 负数: 代表正在进行初始化或扩容操作,其中-1表示正在初始化,-N 表示有N-1个线程正在进行扩容操作 * 正数或0: 代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,类似于扩容阈值 * 它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。 * 实际容量 >= sizeCtl,则扩容 */ private transient volatile int sizeCtl; /** * 下次transfer方法的起始下标index加上1之后的值 */ private transient volatile int transferIndex; /** * CAS自旋锁标志位 */ private transient volatile int cellsBusy; /** * counter cell表,长度总为2的幂次 */ private transient volatile CounterCell[] counterCells; 复制代码
与jdk1.8中HashMap中的定义很相似,不过value和next属性用volatile修饰保证了内存可见性,没有setValue方法直接改变Node的value属性
static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; ... } 复制代码
static final class TreeNode extends LinkedHashMap.Entry { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; ... } 复制代码
static final class TreeBin extends Node { TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock ... } 复制代码
ForwardingNode是一种临时节点只有扩容时使用,表明当前桶已做过处理
static final class ForwardingNode extends Node { final Node[] nextTable; //ForwardingNode节点hash为-1,若操作中遇到此类型节点,表明有线程正在扩容 ForwardingNode(Node[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } ... } 复制代码
/** * 默认构造方法 */ public ConcurrentHashMap() {} /** * 指定容量 */ public ConcurrentHashMap(int initialCapacity) { //异常情况 if (initialCapacity < 0) throw new IllegalArgumentException(); //计算容量 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } /** * 指定map */ public ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } /** * 指定容量、负载因子 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } /** * 指定容量、负载因子、并发级别 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } 复制代码
concurrentHashMap调用Unsafe类中的方法实现CAS( 这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作 ),其内部方法大多为native方法即直接调用操作系统底层资源执行相应任务,提供了一些可以直接操控内存和线程的底层操作。
initTable方法判断sizeCtl值,若sizeCtl值为-1即有 其他线程正在初始化 ,调用Thread.yield()让出CPU时间片,而正在初始化的线程通过Unsafe.compareAndSwapInt方法修改sizeCtl为-1,初始化数组和扩容阈值。
private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //若sizeCtl<0,即存在其他线程正在初始化操作,确保只有一个线程进行初始化 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //利用CAS方法把sizectl的值置为-1,表明已有线程进行初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //获得桶容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //初始化node数组 Node[] nt = (Node[])new Node[n]; table = tab = nt; //计算扩容阈值0.75n sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } 复制代码
从源码我们可以看出最外层的流程控制采用while循环,而非if条件分支,因为Thread.yield()会让出cpu时间,目的在于确保数组初始化成功
ConcurrentHashMap的put操作 与HashMap很相似 ,但ConcurrentHashMap 不允许null作为key和value ,并且由于需要保证线程安全,有以下两个多线程情况:
①.如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法会将已经操作过扩容桶头结点置为ForwardingNode节点,如果检测到需要插入的位置被该节点占有,就帮助进行扩容。
②.如果检测到要插入的节点是非空且不是ForwardingNode节点,就对这个节点加锁,这样就保证了线程安全。
final V putVal(K key, V value, boolean onlyIfAbsent) { //与HashMap不同,ConcurrentHashMap不允许null作为key或value if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; //若table为空的话,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //若当前数组i位置上的节点为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //CAS插入节点(V:当前数组i位置上的节点; O:null; N:新Node对象) if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } //当前正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //锁住当前数组i位置上的节点 synchronized (f) { //判断是否节点f是否为当前数组i位置上的节点,防止被其它线程修改 if (tabAt(tab, i) == f) { //当前位置桶的结构为链表 if (fh >= 0) { binCount = 1; //遍历链表节点 for (Node e = f;; ++binCount) { K ek; //若hash值与key值相同,进行替换 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //若链表中找不到,尾插节点 if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } //当前位置桶结构为红黑树,TreeBin哈希值固定为-2 else if (f instanceof TreeBin) { Node p; binCount = 2; //遍历红黑树上节点,更新或增加节点 if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //若链表长度超过8,将链表转为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //节点数+1,若超过阈值则扩容 addCount(1L, binCount); return null; } /** * hash算法 */ static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 复制代码
注意最外层的for循环,因为可能线程A正在put桶C,线程B恰好也在转移桶c上节点到扩容新数组上,那么线程A会帮助线程B完成扩容操作后再次循环获得新的table
大致流程:
①.首先判断key和value是否为null,为null抛异常。
与HashMap不同,ConcurrentHashMap不允许null作为key或value,为什么这样设计?
因为ConcurrentHashmap是支持并发的,这样会有一个问题,当你通过get(k)获取对应的value时,如果获取到的是null时,你无法判断,它是put(k,v)的时候value为null,还是这个key从来没有做过映射。HashMap是非并发的,可以通过contains(key)来做这个判断。而支持并发的Map在调用m.contains(key)和m.get(key),m可能已经不同了。参考
②.重新计算hash值,因为1.8concurrentHashMap引入了红黑树来处理较多的哈希冲突,简化了hash算法,不过对比了1.8HashMap的hash算法,除了因为concurrentHashMap不允许null为key,没有把null的hash值算为0以外,其多了一步位运算,之所以这样是为了确保重哈希算出的一定不为负数,我认为是因为若算出负值,可能会影响后续的节点类型判断
③.判断当前table是否为空,空的话初始化table,初始化方法已阐述过不再多提
④.根据重哈希算出的值通过与运算得到桶索引,利用Unsafe类直接获取内存内存中对应位置上的节点,若没有碰撞即桶中无结点CAS直接添加
⑤.若发生碰撞,判断桶中第一个节点类型。是ForwardingNode节点的话,表明有其它线程正在扩容,则一起进行扩容操作
⑥.剩下场景,按节点相应结构链表或红黑树的方式插入或更新新节点
⑦.若新增节点后链表长度大于8,就把这个链表转换成红黑树
⑧.最后节点数量+1,校验是否超过阈值,若超过则扩容
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //多线程CAS发生失败的时候执行 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } ///如果check值大于等于0 则需要检验是否需要进行扩容操作 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } } 复制代码
我们可以看到在更新baseCount时用了2次CAS操作,一次直接更新basecount,若更新失败则更新CELLVALUE,若还更新失败会调用fullAddCount()方法,这个方法会死循环一直将由于多线程竞争导致CAS失败的容量变化值存到CounterCell数组中,为size()方法做准备
对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为 不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } 复制代码
为了统计元素个数,ConcurrentHashMap通过累加baseCount和CounterCell数组中的数量,元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,得到元素的总个数。
支持多线程扩容,没有加锁 ,其目的不仅为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。
以下两个场景可能会触发扩容机制(与1.8HashMap相同):
①.当桶中链表长度达到阈值8,但整个ConcurrentHashMap节点数量小于64
②.新增节点后,整个ConcurrentHashMap节点数量超过阈值
private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") // 创建node数组,容量为当前的两倍 Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 若扩容时出现OOM异常,则将阈值设为最大,表明不支持扩容 sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 创建ForwardingNode节点,作为标记位,表明当前位置桶已做过处理 ForwardingNode fwd = new ForwardingNode(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //通过CAS设置transferIndex属性值,并初始化i和bound值 //i指当前处理的槽位序号,bound指需要处理的槽位边界 //先处理最后一个桶的节点; else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 将原数组中节点复制到新数组中去 if (i < 0 || i >= n || i + n >= nextn) { int sc; //如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable if (finishing) { nextTable = null; table = nextTab; //设置新扩容阈值 sizeCtl = (n << 1) - (n >>> 1); return; } //利用CAS方法更新扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //锁住i位置上桶的节点 synchronized (f) { //确保f是i位置上桶的节点 if (tabAt(tab, i) == f) { Node ln, hn; //当前桶是链式结构 if (fh >= 0) { //构造两个链表 int runBit = fh & n; Node lastRun = f; //类似于1.8HashMap,只需要看新增的1bit是0还是1进行分类 for (Node p = f.next; p != null; p = p.next) { //n是就数组长度,不是长度-1 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就可以执行i--操作 advance = true; } //当前桶是红黑树结构,操作和上面的类似 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果扩容后已经不再需要tree的结构 反向转换为链表结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } 复制代码
如图所示(蓝色:hash值第X位为0,灰色:hash值第X位为1)
首次遍历链表分类使用fn&n可以快速把链表中的元素区分成两类只需要看新增的1bit是0还是1,runBit为0,lastRun为节点F,ln为节点F,hn为null
再次遍历链表后,ln链:B→A→F;hn链:E→D→C( 一个正向链表,一个反向链表 )
通过Unsafe类直接操作内存把ln链表设置到新数组的i位置,hn链表设置到i+n的位置
将原数组i位置节点改为ForwardingNode节点,表明此桶已做过处理构造树节点lo和hi,遍历红黑树中的节点,同样根据hash&n算法,把节点分为两类,分别插入到lo和hi为头的链表中,根据lo和hi链表中的元素个数分别生成ln和hn节点,其中ln节点的生成逻辑如下:
(1)如果lo链表的元素个数小于等于UNTREEIFY_THRESHOLD,默认为6,则通过untreeify方法把树节点链表转化成普通节点链表;
(2)否则判断hi链表中的元素个数是否等于0:如果等于0, 表明重哈希后原桶中所有节点仍这个桶中,即在lo链表中包含了所有原始节点 ,则设置原始红黑树给ln,否则根据lo链表重新构造红黑树。
看过JDK8 HashMap扩容方法应该会感觉有点像,ConcurrentHashMap对链表结构的处理类似HashMap的resize()方法最后部分,对红黑树结构处理类似HashMap里TreeNode内部类的split()方法,TreeBin的构造方法类似于TreeNode内部类的treeify()方法,但是由于ConcurrentHashMap需要保证线程安全其操作还是复杂得多
问题:为什么要反序构建链表?
参考了下1.7HashMap头插节点思想,因为后插入的数据被使用的频次更高,无法随机访问只能从头开始遍历查询,而ConcurrentHashMap利用内置锁synchronized锁住桶头节点,因为后插入的数据被使用的频次更高, 倒序使锁持有时间更短,等待时间也就短了 (仅本人看法)
给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); ////根据hash值确定节点位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //桶首节点的key与查找的key相同,则直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //树节点场景 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //链表场景 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } 复制代码
不考虑细节处理,JDK 7中ConcurrentHashMap最主要采用segment,多线程竞争会先锁住segment,在其put操作中会先定位segment位置,再定位segment中具体桶位置,而JDK 8直接操作Node数组中的桶,锁住桶首节点,锁的粒度变小了。另外segment继承ReentrantLock,而ReentrantLock相对于synchronized关键字多了 等待可中断、公平性、绑定多个条件 ,但针对于性能而言,由于synchronized内置锁不断在优化,其性能不会差。
www.jianshu.com/p/e694f1e86… blog.csdn.net/itachi85/ar…