netty-fastThreadLocal-源码分析

前言

netty学习系列笔记总结,性能优化工具类之FastThreadLocal源码浅析,错误之处欢迎指正, 共同学习

如何使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void test() {
FastThreadLocal<Object> fastThreadLocal = new FastThreadLocal<Object>() {
@Override
protected Object initialValue() {
return new Object();
}

@Override
protected void onRemoval(Object value) throws Exception {
System.out.println("onRemoval");
}
};

System.out.println(fastThreadLocal.get());
fastThreadLocal.set("lishq");
System.out.println(fastThreadLocal.get());
fastThreadLocal.remove();
System.out.println(fastThreadLocal.get());
}
1
2
3
4
5
6
java.lang.Object@b6cbcc
lishq
onRemoval
java.lang.Object@a7e666

Process finished with exit code 0

构造方法解析

Netty重新设计了更快的FastThreadLocal,主要实现涉及

  • FastThreadLocalThread
  • FastThreadLocal
  • InternalThreadLocalMap

FastThreadLocalThread是Thread类的简单扩展,主要是为了扩展threadLocalMap属性

FastThreadLocal提供的接口和传统的ThreadLocal一致,主要是set和get方法,用法也一致

不同地方在于FastThreadLocal的值是存储在InternalThreadLocalMap这个结构里面的,传统的ThreadLocal性能槽点主要是在读写的时候hash计算和当hash没有命中的时候发生的遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public FastThreadLocal() {
// 初始化时分配一个全局唯一的index
index = InternalThreadLocalMap.nextVariableIndex();
}

Object[] indexedVariables;

//获取数组对应的下标
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}

static final AtomicInteger nextIndex = new AtomicInteger();

nextIndex是InternalThreadLocalMap父类的一个全局静态的AtomicInteger类型的对象,这意味着所有的FastThreadLocal实例将共同依赖这个指针来生成唯一的索引,而且是线程安全的;

InternalThreadLocalMap实例和Thread对象一一对应;

该index也是绑定的FastThreadLocal对象的value在Object[]数组中的索引位置

get方法解析

1.FastThreadLocal的获取

1
2
3
4
5
6
7
8
9
10
11
//获取当前线程的InternalThreadLocalMap中的当前ftl的value
public final V get(InternalThreadLocalMap threadLocalMap) {
// 直接采用index下标访问threadLocalMap中数组的指定位置元素,如果该索引处的value是有效值,不是占位值,则直接返回
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}

// 没有设置有效值,执行初始化操作,获取初始值
return initialize(threadLocalMap);
}

2.获取InternalThreadLocalMap

1
2
3
4
5
6
7
8
9
10
11
12
13
public final V get() {
//获取到与当前线程关联的InternalThreadLocalMap, 通过该map来查询具体数据
return get(InternalThreadLocalMap.get());
}

public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}

如果当前线程是ftlt线程,则使用fastGet进行获取;否则使用slowGet进行获取。

fastGet:

1
2
3
4
5
6
7
8
//底层自己维护了一个ThreadLocalMap对象
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}

如果该threadLocalMap已经实例化过,则直接返回,否则,先创建一个InternalThreadLocalMap实例,然后将该实例设置到ftlt的threadLocalMap属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 无效的value值(占位符),不使用null做无效值的原因是因为netty认为null也是一个有效值,
* 例如:假设没有重写FastThreadLocal的initialValue()方法,则该方法返回为null,netty会将null作为有效值直接存储起来
*/
public static final Object UNSET = new Object();

private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}

//初始化32size的数组,并将每一个元素初始化为UNSET
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}

slowGet:

1
2
3
4
5
6
7
8
9
10
11
12
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();

private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
//这个过程比较慢?
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}

之所以成为slowGet的原因是因为:

fastGet可以直接从当前线程的属性获取;而slowGet需要根据slowThreadLocalMap的索引值与数组长度进行计算之后进行获取,如果没有直接根据索引命中的话,还可能需要进行线性探测的向后循环查找操作,当然还可能有一些清理和整理逻辑。

fastGet设置InternalThreadLocalMap,直接给当前线程的属性赋值,而slowGet的set操作需要使用线性探测法进行设置,并会至少执行一次log级别的资源回收整理操作

如上两点也是ftl比tl快的原因。但是可以看出tl在不断的回收无效的Entry使得新的Entry可以插入而不需要额外空间,但是ftl只能不断的增加index,不断向后增加,而index前边被remove掉的位置不能被重用,所以Object[]数组的size会越来越大,算是一种空间换时间的做法。

3.从InternalThreadLocalMap获取值

1
2
3
4
5
6
7
Object[] indexedVariables;

//获取指定位置的元素
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}

4.初始化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
//获取初始值
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
// 初始化后再设置,下次就不用再初始化
threadLocalMap.setIndexedVariable(index, v);
//添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
addToVariablesToRemove(threadLocalMap, this);
return v;
}

//初始化参数:由子类复写
protected V initialValue() throws Exception {
return null;
}

//设置值
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}

如果索引小于indexedVariables.length,直接获取indexedVariables[index];否则,进行扩容设置。

首先获取旧数组及其长度;然后进行新数组容量的计算(计算方式与1.8的HashMap一样:都是获取比给定值大的最小的2的n次方的数);然后创建新数组并拷贝旧数组元素到新数组,最后对扩容多出来的元素初始化为UNSET,然后设置value值,最后将新数组赋值给indexedVariables成员变量。

到此为止设置值的操作就结束了,最后:添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
//v搞成set集合,目的很简单,set里面不会放置重复的 threadLocal,放置同一个threadLocal多次 所有使用TheadLocal都会放到 variablesToRemoveIndex 数组中这个索引位置的
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
// 如果拿到的不是 UNSET ,说明这是第二次操作了,因此可以强转为 Set
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
//放到要清除set里面
variablesToRemove.add(variable);
}

这个方法的目的是将 FastThreadLocal 对象保存到一个 Set 中,因为 Netty 的 Map 只是一个数组,没有键,所以保存到一个 Set 中,这样就可以判断是否 set 过这个 map,例如 Netty 的 isSet 方法就是根据这个判断的。

5.注册资源清理器

1
2
3
4
5
6
7
8
9
10
//当该ftl所在的线程不强可达时,清理其上当前ftl的value和set<FastThreadLocal<?>>中当前的ftl
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
//如果已经开启了自动清理功能 或者 已经对threadLocalMap中当前的FastThreadLocal开启了清理线程
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlagSet(index)) {
return;
}
// 设置是否已经开启了对当前的FastThreadLocal清理线程的标志
threadLocalMap.setCleanerFlag(index);
}

获取当前线程,如果当前线程是 FastThreadLocalThread 类型 且 cleanupFastThreadLocals 是 true,则返回 true,直接return。也就是说,Netty 线程池里面创建的线程都符合这条件,只有用户自定义的线程池不符合。 当然还有一个条件:如果这个 ftl 的 index + 1 在 map 中的值不是空对象,则已经注册过了,也直接 return,不再重复注册。

set方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 如果value是UNSET,表示删除当前的ThreadLocal对应的value;
* 如果不是UNSET,则可能是修改,也可能是新增;
* 如果是修改,修改value结束后返回;
* 如果是新增,则先新增value,然后新增ThreadLocal到Set中,最后注册Cleaner清除线程
*/
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (setKnownNotUnset(threadLocalMap, value)) {
registerCleaner(threadLocalMap);
}
} else {
// 如果设置的值是UNSET,表示清除该FastThreadLocal的value
remove();
}
}

private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 新增value
if (threadLocalMap.setIndexedVariable(index, value)) {
//添加清除map的线程,针对使用Jdk的Thread,防止内存泄漏
addToVariablesToRemove(threadLocalMap, this);
return true;
}
// 修改value
return false;
}

remove方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//清除当前的FastThreadLocal
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}

public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

// 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value
Object v = threadLocalMap.removeIndexedVariable(index);
// 从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
removeFromVariablesToRemove(threadLocalMap, this);
//如果删除的是有效值,则进行onRemove方法的回调
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}

private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {

Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}

@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}

//删除指定位置的对象
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}

removeAll方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}

try {
// 从indexedVariables[VARIABLES_TO_REMOVE_INDEX]获取目前InternalThreadLocalMap存储的有效的FastThreadLocal的值,之后遍历Set,进行remove操作
// 注意:这也是为什么我们会将有效的FastThreadLocal存储在一个Set中的原因(另外,如果没有Set<FastThreadLocal<?>>这个集合的话,我们需要直接去遍历整个indexedVariables数组,可能其中有效的并不多,影响效率)
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
//将set先转换为数组,set的for-remove模式会报并发修改异常,array不会
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
//删除当前线程的InternalThreadLocalMap
InternalThreadLocalMap.remove();
}
}

public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}

首先获取当前线程map,然后获取 Set,将 Set 转成数组,遍历数组,调用 ftl 的 remove 方法。最后,删除线程中 的 map 属性。

总结

ftl使用了单纯的数组操作来替代了tl的hash表操作,所以在高并发的情况下,ftl操作速度更快。

ftl直接根据index进行数组set,而tl需要先根据tl的hashcode计算数组下标(而ftl是直接获取),然后再根据线性探测法进行set操作,其间如果发生hash冲突且有无效的Entry时,还要进行Entry的清理和整理操作。最后不管是否冲突,都要进行一次log级别的Entry回收操作,所以慢了。

ftl相较于tl不好的地方就是内存占用大,不会重复利用已经被删除(用UNSET占位)的数组位置,只会一味增大,是典型的“空间换时间”的操作。

------本文结束感谢阅读------
显示评论
0%