转载

追踪解析 Netty 的 FastThreadLocal 源码

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

Netty 版本 : netty-all 4.1.34.Final

2 FastThreadLocal 简介

FastThreadLocal 是 Netty 中实现的高性能 ThreadLocal 工具,功能上和 ThreadLocal 差不多,但是性能上远高于 jdk 自带的 ThreadLocal。

3 Demo

import io.netty.util.concurrent.FastThreadLocal;

public class FastThreadLocalDemo {

    public static void main(String[] args) {

        //创建 FastThreadLocal 对象
        FastThreadLocal<String> tl = new FastThreadLocal<>();

        //FastThreadLocal 的存入功能
        long setBegin = System.nanoTime();
        tl.set("test");
        long setAfter = System.nanoTime();
        System.out.println("get : " + (setAfter - setBegin));

        //FastThreadLocal 的获取功能
        long getBegin = System.nanoTime();
        String fastGet = tl.get();
        long getAfter = System.nanoTime();
        System.out.println("get : " + (getAfter - getBegin));

        //FastThreadLocal 的移除功能
        long removeBegin = System.nanoTime();
        tl.remove();
        long removeAfter = System.nanoTime();
        System.out.println("remove : " + (removeAfter - removeBegin));
    }
}

一 FastThreadLocal 的创建

回到 Demo 中的创建代码:

FastThreadLocal<String> tl = new FastThreadLocal<>();

追踪 FastThreadLocal 的构造器:

//step 1
//FastThreadLocal.class
public FastThreadLocal() {
    //index 是一个 int 类型的变量
    index = InternalThreadLocalMap.nextVariableIndex();
}

//step 2
//InternalThreadLocalMap.class
public static int nextVariableIndex() {
    //nextIndex 是一个定义在 UnpaddedInternalThreadLocalMap 类中的静态 AtomicInteger 类型对象
    //UnpaddedInternalThreadLocalMap 是 InternalThreadLocalMap 的父类
    //这里使用自增操作获取一个 int 值,并返回
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}

其实 FastThreadLocal 的创建就只是获取一个唯一的 int 值作为标识,没有其它操作了。

二 InternalThreadLocalMap

InternalThreadLocalMap 是 FastThreadLocal 底层真正起作用的 ThreadLocal 类,并且提供了大量的静态方法。

1 获取对象

最为核心的是 InternalThreadLocalMap 的 get() 方法:

//InternalThreadLocalMap.class
public static InternalThreadLocalMap get() {
    //获取当前线程的线程对象
    Thread thread = Thread.currentThread();

    //判断线程对象的类型
    if (thread instanceof FastThreadLocalThread) {
        //在 Netty 中使用的 FastThreadLocal 的时候会用到该类型的方法
        return fastGet((FastThreadLocalThread) thread);
    } else {
        //正常情况下单独使用 FastThreadLocal,线程对象不会是 FastThreadLocalThread
        return slowGet();
    }
}

先来看 fastGet():

//InternalThreadLocalMap.class
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    //直接获取 FastThreadLocalThread 中的 threadLocalMap 对象并返回即可
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    
    //为空的话新建一个
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}

再来看 slowGet():

//InternalThreadLocalMap.class
private static InternalThreadLocalMap slowGet() {
    //获取 UnpaddedInternalThreadLocalMap 中的 slowThreadLocalMap 对象
    //slowThreadLocalMap 是一个静态的 ThreadLocal 类型对象,储存的数据类型是 InternalThreadLocalMap
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;

    //获取该对象中的 InternalThreadLocalMap 对象实例并返回
    InternalThreadLocalMap ret = slowThreadLocalMap.get();

    //为空的情况下新建一个
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

可以看到实际上对于 FastThreadLocal 来说,真正起作用的是 InternalThreadLocalMap 对象。

在普通的线程中,这个对象由于本身没有 jdk 的原生支持,所以只能附着在 ThreadLocal 对象当中。

但是由于 UnpaddedInternalThreadLocalMap 中的 slowThreadLocalMap 本身是一个静态的 ThreadLocal 对象,所以不同的线程实际上调用到的都是同一个对象,但是获取到的 InternalThreadLocalMap 却不是同一个。同一个线程中如果创建多个 FastThreadLocal 对象,获取到的是同一个 InternalThreadLocalMap。

2 set

InternalThreadLocalMap 的底层储存是一个 Object 数组,通过 setIndexedVariable(...) 方法储存进去:

//InternalThreadLocalMap.class
public boolean setIndexedVariable(int index, Object value) {
    //indexedVariables 是一个定义在 UnpaddedInternalThreadLocalMap 中的 Object 数组
    Object[] lookup = indexedVariables;

    //在这里需要判断数组的长度问题
    //index 是每个 FastThreadLocal 创建的时候都会获取的唯一标识码,同时也是数组上的位置
    if (index < lookup.length) {
        //获取原值
        Object oldValue = lookup[index];
        //赋值
        lookup[index] = value;
        //UNSET 是一个静态的 Object 对象,用于默认填充 lookup 数组
        //此处 oldValue 如果等于 UNSET,则证明该位置上原来不存在对象储存
        //如果是已经储存过对象,又调用该方法替换了一次,会返回 false
        return oldValue == UNSET;
    } else {
        //数组扩容
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

3 get

InternalThreadLocalMap 中获取值的方法是通过 indexedVariable(...) 方法:

//InternalThreadLocalMap.class
public Object indexedVariable(int index) {
    //根据 index 从数组中获取到想要的位置的值
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}

4 remove

InternalThreadLocalMap 中删除值的方法是通过 indexedVariable(...) 方法:

//InternalThreadLocalMap.class
public Object removeIndexedVariable(int index) {
    //获取数组
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object v = lookup[index];
        //将指定位置的值替换成 UNSET 对象
        lookup[index] = UNSET;
        return v;
    } else {
        return UNSET;
    }
}

InternalThreadLocalMap 还有一个静态的 remove() 方法用于清除自身:

//InternalThreadLocalMap.class
public static void remove() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        ((FastThreadLocalThread) thread).setThreadLocalMap(null);
    } else {
        slowThreadLocalMap.remove();
    }
}

代码比较简单,不做过多分析。

二 存入元素

回到 Demo 中的存入元素的代码:

tl.set("test");

追踪 get(...) 方法:

//step 1
//FastThreadLocal.class
public final void set(V value) {
    //先判断 value 不是 UNSET 对象
    if (value != InternalThreadLocalMap.UNSET) {
        //获取 InternalThreadLocalMap 对象
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();

        //setKnownNotUnset(...) 方法会将 value 存入 threadLocalMap 中
        if (setKnownNotUnset(threadLocalMap, value)) {
            //此处会清理已经被 gc 回收的线程对象所储存的值
            registerCleaner(threadLocalMap);
        }
    } else {
        remove();
    }
}

//step 2
//FastThreadLocal.class
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    //调用 setIndexedVariable(...) 方法去存储 value,具体见上方 InternalThreadLocalMap 的详细解读
    if (threadLocalMap.setIndexedVariable(index, value)) {
        //addToVariablesToRemove(...) 方法会将 FastThreadLocal 对象存放到 threadLocalMap 中的一个集合中
        //这个集合用于在需要的时候集中销毁 FastThreadLocal
        addToVariablesToRemove(threadLocalMap, this);
        return true;
    }
    return false;
}

三 获取元素

回到 Demo 中获取元素的代码:

String fastGet = tl.get();

追踪 set(...) 方法:

//FastThreadLocal.class
public final V get() {
    //获取 InternalThreadLocalMap 对象
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    //从 InternalThreadLocalMap 中获取值
    Object v = threadLocalMap.indexedVariable(index);

    //如果存在值,则直接返回该值即可
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }

    //不存在的情况下,initialize(...) 会返回一个 null 值
    V value = initialize(threadLocalMap);

    //gc 处理
    registerCleaner(threadLocalMap);

    //返回 null
    return value;
}

三 移除元素

回到 Demo 中移除元素的代码:

String fastGet = tl.get();

追踪 remove(...) 方法:

//step 1
//FastThreadLocal.class
public final void remove() {
    //InternalThreadLocalMap 的 getIfSet() 方法会获取 InternalThreadLocalMap 对象
    remove(InternalThreadLocalMap.getIfSet());
}

//step 2
//FastThreadLocal.class
public final void remove(InternalThreadLocalMap threadLocalMap) {

    //有效性验证
    if (threadLocalMap == null) {
        return;
    }

    //清除值
    Object v = threadLocalMap.removeIndexedVariable(index);

    //到之前 threadLocalMap 中保存 FastThreadLocal 对象的集合里去删除对象
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            //此方法为空,是预留的一个处理方法,使用者也可以自己做实现
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

四 内存管理

在真实的开发环境中,可能会存在一个线程使用了此 FastThreadLocal,然后线程完成之后被 gc 回收了,但是该 FastThreadLocal 的值没有被回收的情况。

所以在 FastThreadLocal 中就由一个防止内存泄漏的方法 registerCleaner(...):

//FastThreadLocal.class
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {

    Thread current = Thread.currentThread();

    //如果 FastThreadLocalThread 被标记为要被清理,或者 index 这个位置的元素并不被收录于清理目录下,则直接返回
    if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlagSet(index)) {
        return;
    }

    //将 index 收录到清理目录中
    threadLocalMap.setCleanerFlag(index);

    //下方代码是防止内存泄漏的核心代码,但是已经被注释掉了
    //值得一提的是,在以前的 Netty 版本中是存在的,但是在笔者追踪的 4.1.34 版本中被注释掉了
    //根据解释,是官方觉得这种处理方式不够优雅,所以暂时将此段代码注释掉了,并且打上了 TODO 字样

//    ObjectCleaner.register(current, new Runnable() {
//        @Override
//        public void run() {
//            remove(threadLocalMap);
//        }
//   });
}

五 一点唠叨

FastThreadLocal 对比 jdk 的原生 ThreadLocal,性能优势主要表现在以下几个方面:

1、Netty 基于自己的业务需求,对线程对象进行了封装,并在此过程中内嵌了对 FastThreadLocal 的支持
2、FastThreadLocal 中省略了 ThreadLocal 中的节点对象的组装和 Hash 值的计算过程,结构更加简单,存、拿过程的效率更高
3、ThreadLocal 对于内存的控制比 FastThreadLocal 更加严谨,消耗更多的精力去进行内存检查和清理
4、FastThreadLocal 中静态(static)方法的使用更加频繁,是典型的以空间换时间的做法

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

原文  https://segmentfault.com/a/1190000018567351
正文到此结束
Loading...