上一篇文章中说明了ThreadLocal的使用,部分源码实现以及Thread,ThreadLocal,ThreadLocalMap三者之间的关联关系,其中核心实现ThreadLocalMap将在本篇文章中进行讲解,让我们一起来探究下jdk中的ThreadLocalMap是如何实现的
JDK版本号:1.8.0_171
在源码正式解读之前有些知识需要提前了解的,以便更好的理解源码实现。之前文章中的关联关系是理解ThreadLocal内部实现结构的重点,这里回顾下:
在图里我们可以看到 threadLocals变量 就是 ThreadLocalMap 实现的一个实例对象,每个线程对应一个ThreadLocalMap,其实现是kv结构,在第一次看到这个名字大家就会想到Map结构吧,而其底层实现与Map是类似的,如果你曾经看过HashMap源码,可以回想下HashMap的实现原理,当然也可以参考我以前对HashMap源码文章的分析。在这里大家需要提前去了解下 散列表 这种数据结构,否则下面的知识理解起来比较困难
在注释上我们可以看到作者对其进行了一些解释:ThreadLocalMap是一个定制的hashmap,仅用于维护线程的本地变量值,只有ThreadLocal有操作权限,是Thread的私有属性。哈希表中的key是弱引用WeakReference实现,当GC时会进行清理未被引用的entry。有些人可能会比较疑惑,请大家继续往下阅读就好,我们一步一步来理解其内部实现
这里只以jdk8版本源码进行比较,最重要的区别在于解决hash冲突的方式,在 HashMap中使用链表法解决冲突 ,使得其底层数据结构的实现使用了链表,当然,为了提升效率,在达到阈值时转化为了红黑树。然而, 在ThreadLocalMap中并未使用这种方式,而采用了开放寻址法解决冲突 ,因而并不需要链表这种数据结构
那么这两种解决冲突的方式有什么不同呢?
这里借用王争老师的《数据结构和算法之美》中的结论,大家可以思考下理解理解:
使用开放寻址法解决冲突的散列表,装载因子的上限不能太大。导致这种方法比链表法更浪费内存空间,所以当数据量比较小、装载因子小的时候,适合采用开放寻址法。
再次提醒下,在源码的学习中需要先去了解散列表(也就是哈希表)这种数据结构,散列冲突的原因以及其解决散列冲突的方式,如果未了解过,建议先去学习下,否则下面的讲解理解起来相对比较困难。这里我只说明下开放寻址法中使用 线性探测 解决冲突的方式,因为这种方式也就是ThreadLocalMap中解决散列冲突的方式,下面通过插入,查找,删除操作了解其使用,便于后续ThreadLocalMap的源码实现理解
在插入数据时如果出现了散列冲突,就重新探测一个空闲位置,将其插入。当我们往散列表中插入数据时,如果某个数据经过散列函数散列之后,存储位置已经被占用了,那么就从当前位置开始,依次往后查找,直至查找到空闲位置为止。举个例子,如下图,我们散列表大小为14,我们输入一个数x在经过散列函数hash()散列后应该存放到数组中散列槽(即数组索引)为2的位置上,但是2已经被占用了,这样就造成了散列冲突,那么我们就需要依次向后查找空闲位置,最终查找到5的空闲位置,将数x保存在5的位置上
散列表元素查找和插入过程类似。当我们查找时散列到对应的散列槽上,比较对应散列槽上的数据与我们查找的数据是否相等,相等则表示查找成功了,不等则说明冲突了,我们需要与插入元素类似的方式依次向后进行查找比较,找到正确的元素即可。 如果遍历到数组中的空闲位置,还没有找到,就说明要查找的元素并没有在散列表中 。通常比较的是键值对中的键,就如同map结构
散列表中的元素删除操作就有些不太一样了,你可以想想,如果我们仅仅将散列表中对应元素删除后什么都不做会有什么问题?看下查找操作,你就应该明白,如果不处理的话查找操作会被破坏,还是继续元素插入的例子,在5的位置已经放了对应的数据x,此时执行删除操作,将3位置上保存的数据删除,如果不做任何处理,我们再次查找散列表中是否存在x,在查找操作进行到3位置上时发现空闲,就会认为查找的元素不在散列表中,这样就造成了查找错误,因为x明明在5的位置上
所以你应该明白的是, 删除操作需要保证查找操作的正确性 ,那么如何解决呢?其中一种就是在删除元素之后,将之后不为null的数据rehash操作,这样就能保证查找的正确性(参考下图),当然,这也就是ThreadLocalMap删除操作实现的方式,然而,实际上并不止这一种方式,比如还可以逻辑删除,即不是真正删除,只是打个标识,说明这个数据被删除了,查询时发现这个标识就跳过
由于WeakReference在ThreadLocalMap中的使用不得不先来了解下内存泄漏这个知识点,经常有人说ThreadLocal使用时要注意不要内存泄漏,那么,什么是内存泄漏?
创建的对象不再被其他应用程序使用,但因为被其他对象所引用着(即通过可达性分析,从GC Roots具有到该对象的链路),因此垃圾回收器没办法回收它们,此时将在内存中一直被占用,造成浪费
简单来说,就是我们其实已经使用完毕了,但是由于还具有有效的引用导致GC无法回收,对象在内存中一直占用导致浪费,其实这是一个相对的概念,也就是说这些创建的对象是不是我们想要保留还要使用的,如果不是,那就算内存泄漏,如果是,那就不算。内存泄漏和WeakReference本身是没有关系的,基本上都是使用不当导致的
ThreadLocalMap中很明显的部分在于key是弱引用,在key的生命周期完成后发生GC必然会被回收,而value本身就是被Entry强引用,但是并不是说一定会造成内存泄漏,如果value只在线程中被定义,初始化,使用,那么在线程生命周期结束之后,通过可达性算法分析,value同样会被回收,如果value在线程外被定义,初始化,也就是在线程外通过可达性算法还可以找到链路,那么即使线程结束,value依旧不会被回收,但是这里请注意,如果你这里的value还有用,那么也不算内存泄漏,只有当这个value无用了没被回收才能算内存泄漏。也就是说内存泄漏并不是ThreadLocalMap的专属,ThreadLocalMap不背这个锅,就像使用集合类也有可能造成内存泄漏一样,最终还是需要理解并正确的使用才能避免这种状况
那么什么情况下使用ThreadLocalMap会造成内存泄漏呢?最常见的就是在 线程池 中了,线程池中的线程存活时间基本都是与程序同生共死的,这样就导致Thread持有的ThreadLocalMap一直都不会被回收,再加上ThreadLocalMap中的Entry对ThreadLocal是弱引用(WeakReference),所以只要 ThreadLocal 结束了自己的生命周期是可以被回收掉的。但是Entry中的Value却是被Entry强引用的,所以即便Value的生命周期结束了,Value 也是无法被回收的,从而导致内存泄露。所以ThreadLocal使用时推荐手动remove操作避免可能出现的内存泄漏风险
Entry继承WeakReference,ThreadLocal作为key使用弱引用,在ThreadLocalMap的数组中存放的也就是这个Entry,这里使用WeakReference也就是不想让ThreadLocal在生命周期结束后由于这里的引用而导致其使用的内存无法被回收
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
与HashMap实现类似,通过数组实现散列表,只不过HashMap使用的是Node链表节点,而这里Entry仅仅保存kv值,不需要next指针,当然,造成这种不同结果的原因就是因为解决hash冲突的方式是不同的
/** * 初始化容量,必须为2的幂,默认16 */ private static final int INITIAL_CAPACITY = 16; /** * table,Entry数组 */ private Entry[] table; /** * table中元素实际个数 */ private int size = 0; /** * 扩容阈值大小,默认为0 */ private int threshold; // Default to 0
ThreadLocalMap只有当被使用到时才会进行初始化操作,也就是懒加载模式,其中散列槽位的计算通过ThreadLocal中生成的hashcode和(数组长度-1)进行与操作进行定位,非常方便,如果需要继承父线程中的ThreadLocalMap变量,则需要使用复制父线程的ThreadLocalMap构造方法,通过ThreadLocal.createInheritedMap方法进行操作
/* 懒加载,只有需要保存键值时才初始化 */ ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { // 初始化table数组 table = new Entry[INITIAL_CAPACITY]; // hashcode和(容量-1)与操作作为该数据存放到数组中的索引位置 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); // 在对应的数组索引位置创建Entry table[i] = new Entry(firstKey, firstValue); // 数组中元素数量加1 size = 1; // 设置阈值 setThreshold(INITIAL_CAPACITY); } /* 参数传入父线程相关的ThreadLocalMap */ private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); // 把父线程的ThreadLocalMap中的数据复制到当前线程的ThreadLocalMap table = new Entry[len]; // 循环遍历复制 for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); // key非空的部分entry if (key != null) { Object value = key.childValue(e.value); // 创建Entry Entry c = new Entry(key, value); // 确定数组中的索引位置 int h = key.threadLocalHashCode & (len - 1); // 若索引位置已经被使用,则使用开放寻址法解决hash冲突,重新计算索引位置 while (table[h] != null) h = nextIndex(h, len); // 将创建的Entry保存到对应的table数组索引中 table[h] = c; // 数组中实际元素个数加1 size++; } } } }
首先为了方便讲解做个说明,key(也就是ThreadLocal)为null的,但是value非空的entry这里称为无用entry,并不是null值,只是key为null,也就是被ThreadLocal被回收了的entry
调整阈值时使用数组容量的2/3作为新的阈值,同时在rehash方法中可以看到在数组中元素个数达到阈值的3/4,也就是容量的1/2时进行扩容操作,可以看出这种解决hash冲突的方式还是比较浪费内存空间的
private void setThreshold(int len) { threshold = len * 2 / 3; }
prevIndex查找索引i的前一个值,如果当前索引为0,则直接取数组最大索引值len - 1。nextIndex查找索引i的后一个值,如果为最大索引值len - 1,则直接取0。相当于在一个环形数组中查找i的前后索引,这里封装好便于使用
private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); }
获取ThreadLocal对应的数组table中的Entry对象,由于其使用开放寻址法解决hash冲突,故在未直接命中时需要进一步的查找处理
private Entry getEntry(ThreadLocal<?> key) { // 获取散列槽也就是数组的索引位置 int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) // 命中则直接返回 return e; else // 未命中则通过getEntryAfterMiss继续查找 return getEntryAfterMiss(key, i, e); }
在首次通过threadLocalHashCode计算索引槽位时,未直接命中,即上面的getEntry方法,则通过此方法进一步继续查找
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; // table容量 int len = tab.length; // e为null说明数组中无对应的entry,直接返回null结束查找 while (e != null) { // 获取对应entry的ThreadLocal ThreadLocal<?> k = e.get(); // 数组槽位上的key相等,则命中返回 if (k == key) return e; if (k == null) // 键为null说明ThreadLocal已经被回收 // 这里进行清除无用entry操作并对其之后的entry进行rehash操作以保证查找的正确性 // 直到遍历到null的数组槽位停止 expungeStaleEntry(i); else // 非空且key不等则继续查找下一个索引位置的entry // 也就是继续向后查找匹配 i = nextIndex(i, len); e = tab[i]; } return null; }
清除ThreadLocal已经被回收的无用entry,同时进行rehash操作,由于删除了数组中的entry,为了保证开放寻址法查找匹配entry的正确性,必然要对删除元素这个操作进行后续的处理,而这里是通过对其之后到第一个null元素之间的所有元素进行rehash操作,可以参考我上面讲解的开放寻址法删除操作的处理。可以看到expungeStaleEntry是对哈希槽中删除entry所处的这一段数据进行处理以保证查找的正确性
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot // 删除无用entry,置空操作,标识gc可回收 tab[staleSlot].value = null; tab[staleSlot] = null; // 记录减1 size--; // Rehash until we encounter null // 遍历删除entry之后的entry // 清除数组中无用的entry // 同时非null时rehash操作,如果冲突继续使用开放寻址法解决 // 直到遇见null的数组位置才停止表明这一段数据处理完毕 Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { // 无用entry处理 e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); // 非i位置表示需要重新设置该entry所处的槽位 // 也就是进行rehash操作 if (h != i) { // 先将i处 置空释放 tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. // 开放寻址法计算正确的槽位 while (tab[h] != null) h = nextIndex(h, len); // entry赋值 tab[h] = e; } } } return i; }
以ThreadLocal为key构建Entry添加到table对应的散列槽上,在ThreadLocal.set和ThreadLocal.setInitialValue中被调用
private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; // 定位散列槽位 int i = key.threadLocalHashCode & (len-1); // hash冲突使用开放寻址法解决 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); // 对应key相同,更新value if (k == key) { e.value = value; return; } // ThreadLocal被回收,但是这个槽位上的entry还在 // 可以理解为使用当前的新的key和value取代无用的entry // 但是需要注意replaceStaleEntry内部处理并没有这么简单,可参考下面的方法说明 if (k == null) { replaceStaleEntry(key, value, i); return; } } // 执行到这里说明循环中未处理,i处为null,没有entry占用,可被使用 tab[i] = new Entry(key, value); int sz = ++size; // 尝试清理部分无效entry,可参考下面的方法解释 if (!cleanSomeSlots(i, sz) && sz >= threshold) // 如果未清理任何数据同时使用长度已经达到阈值限制则执行rehash方法 rehash(); }
使用入参取代无用的hash槽位上的entry,即staleSlot上的key已经被回收了,但是entry还在占用,使用key和value来进行替换。同时会通过expungeStaleEntry清理这一段数组数据,cleanSomeSlots来尝试检查清理整个数组中一些无用的entry
这里有个非常关键的地方需要理解,这个方法是在set中被调用处理遇见无用entry的操作,我们需要保存新的键值对,同时对无用entry进行处理,清理无用entry有个重要的地方在于,从哪个点开始进行处理,回想上面的expungeStaleEntry方法,我们在清除无用的entry时还会对其后的部分entry进行rehash迁移操作,那么我们就需要找到当前无用entry这一段数组中第一个无用entry所在的位置,有点绕人,也不是很难理解,首先,这一段数组也就是在这个staleSlot位置的无用entry所在的,向前查找第一个非null的entry和向后查找第一个非null的entry,我们需要处理这段数组中无用的entry,而expungeStaleEntry是从无用entry向后进行处理的,所以我们需要找到这段数组中第一个无用entry所在的位置,参考下图理解下
图中3,5,8的位置为无用的entry,此时执行replaceStaleEntry操作的是5的位置,那么最终会从3的位置执行expungeStaleEntry对这一段数组数据进行处理,其他情况类似,你可以自己画图理解,这里就不一一进行说明了
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; // 待清除的槽位 // 这里其实是找到这段非null数组中第一个无用entry的位置 int slotToExpunge = staleSlot; // 遍历staleSlot前的槽位上的entry,找到其之前一个槽位非null,key为null的entry // 当然是最靠近null的那个无用entry的位置 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // 遍历staleSlot之后的非null的entry // 这里和向前遍历处理不一样 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // 对应槽位上的entry的key相等,则替换value if (k == key) { e.value = value; // 与staleSlot位置交换 // 因为staleSlot位置是无用的entry,直接替换掉,这样再次散列时其可以直接从staleSlot位置获取了 tab[i] = tab[staleSlot]; tab[staleSlot] = e; // 如果上面的向前遍历未找到无用entry则根据这里的i处已经被更换为key为null的entry,从i处开始清理 // 如果向前遍历已经找到无用entry则不更新slotToExpunge,从向前遍历得到的slotToExpunge开始处理 if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // 当前entry为无用entry同时向前遍历未找到无用entry则更新这里向后遍历中获得的第一个无用entry的位置 // 最后的代码会从这个位置开始清理 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // 未找到匹配的key则直接替换掉staleSlot位置的entry即可 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // 不相等则说明其他位置有无用的entry 需要进行清理操作 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
扫描数组中部分槽位,清理无用entry,通过 n >>>= 1
控制扫描次数,如果找到无用entry则重置扫描次数,如果未扫描到无用entry则每次调用扫描次数为 log(n)
。cleanSomeSlots是尝试查找部分无用entry并进行清理操作,为了降低扫描次数,并不一一进行检查操作
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; // 扫描到无用entry if (e != null && e.get() == null) { // 重置n n = len; removed = true; // 调用expungeStaleEntry处理 i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
移除对应key的entry操作,同时通过expungeStaleEntry进行删除操作的后续处理
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); // 开放寻址法定位 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { // 找到匹配entry则进行清理 // clear清理的是key,先变为无用entry再使用expungeStaleEntry清理 e.clear(); expungeStaleEntry(i); return; } } }
对整个table哈希槽进行rehash操作,同时判断是否需要进行扩容操作
private void rehash() { // 清理table中所有的无用entry expungeStaleEntries(); // 判断是否进行扩容操作 if (size >= threshold - threshold / 4) resize(); }
通过expungeStaleEntry清理table中的所有无用entry
private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } }
扩容操作,固定扩容为原数组容量的2倍,这里迁移entry也比较暴力,直接计算对应槽位,然后使用开放寻址法解决冲突,将entry放入对应的新数组中的槽位上
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { // ThreadLocal为空则value也置空便于GC回收 e.value = null; // Help the GC } else { // 在新table中设置到对应的槽位上 int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } // 设置阈值 setThreshold(newLen); size = count; table = newTab; }
借用在《Java并发编程实战》中的话可以解释下这个原因:
在Java的实现方案里面,ThreadLocal仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在Thread里面,这样的设计容易理解。而从数据的亲缘性上来讲,ThreadLocalMap属于Thread也更加合理。当然还有一个更加深层次的原因,那就是不容易产生内存泄露。假如ThreadLocal持有的Map会持有Thread对象的引用,这就意味着,只要ThreadLocal对象存在,那么Map中的 Thread对象就永远不会被回收。ThreadLocal的生命周期往往都比线程要长,所以这种设计方案很容易导致内存泄露。而Java的实现中Thread持有ThreadLocalMap,而且ThreadLocalMap里对ThreadLocal 的引用还是弱引用(WeakReference),所以只要 Thread 对象可以被回收,那么ThreadLocalMap就能被回收。Java的这种实现方案虽然看上去复杂一些,但是更加安全。Java 的ThreadLocal实现应该称得上深思熟虑了,不过即便如此深思熟虑,还是不能百分百地让程序员避免内存泄露,例如在线程池中使用 ThreadLocal,如果不谨慎就可能导致内存泄露。
到此关于ThreadLocal的源码基本讲解完毕,总体来说有些地方还是比较难理解的,你可以多思考思考,相信有不一样的收获
本文讲解了Hash冲突的解决方式之一开放寻址法是为了帮助大家更好的理解源码实现,同时说明了内存泄漏的风险,然后对源码部分进行了详细的说明,正确理解其内部数据结构是理解源码实现的关键,同时结合上篇文章,理清Thread,ThreadLocal,ThreadLocalMap三者之间的关联关系,相信对于ThreadLocal的使用便会了然于心。对于实现中最重要的在于通过expungeStaleEntry,cleanSomeSlots,expungeStaleEntries完成无用entry的清理,同时进行数据的rehash操作便于开放寻址法查找数据的正确性,相信对大家而言不是过于复杂
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢