netty-轻量级对象池-recycler

前言

netty学习系列笔记总结,轻量级对象池-Recycler源码浅析,错误之处欢迎指正, 共同学习

介绍

由于Java 创建一个实例的消耗不小,所以现在很多框架都使用对象池。创建对象的时候不需要每次都通过new方式创建,如果Recycler有对象直接获取二次利用,不需要对象的时候放入Recycler对象池。通过重用对象,能够避免频繁创建对象和销毁对象带来的损耗。

Recycler的使用

  1. 定义基于 FastThreadLocal 的轻量级对象池 Recycler 负责对象的回收和二次利用,不需要每次分配内存减少内存使用,减少 new 对象频率即减少 Young GC 频率

  2. 创建对象通过 Recycler 的 get() 获取对象池里面的对象,不需要对象时可以显式调用 recycle() 方法回收对象放到对象池

  3. 通过自定义 newObject() 方法定义对象创建,参数handle负责对象回收到对象池Recycler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static final Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};

private static class User {
private final Recycler.Handle<User> handle;

public User(Recycler.Handle<User> handle) {
this.handle = handle;
}

public void recycle() {
handle.recycle(this);
}
}

public static void main(String[] args) {
User user = RECYCLER.get();

user.recycle();
RECYCLER.get().recycle();

User user1 = RECYCLER.get();

System.out.println(user1 == user);
}

执行结果如下

1
true

Recycler的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected Recycler() {
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}

private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this,
Thread.currentThread(),
maxCapacityPerThread,// Use 32k instances as default.
maxSharedCapacityFactor,//2
ratioMask, //7
maxDelayedQueuesPerThread);//2*cpu
}
};

Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));//own 32k, shared 16k
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;//控制对象回收频率
this.maxDelayedQueues = maxDelayedQueues;//当前线程创建的对象可以在多少线程缓存
}

Recycler成员变量FastThreadLocal<Stack>类型的threadLocal,Stack成员变量包括DefaultHandle类型数组elements[实际存放对象池,Handle包装对象并且被外部对象引用从而回收对象],thread[当前Stack归属线程],ratioMask[控制对象回收频率],maxCapacity[承载元素最大容量],maxDelayedQueues[当前线程创建的对象释放的最大线程数量],head/prev/cursor,availableSharedCapacity[当前线程创建的对象在其他线程缓存的最大数量]

Recycler中获取对象

  • 获取当前线程的Stack
  • 从Stack里面弹出DefaultHandle对象
  • 创建对象并绑定到Stack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);// 跳转到用户自定义代码
}
return (T) handle.value;
}

private static final Handle NOOP_HANDLE = new Handle() {
@Override
public void recycle(Object object) {
// NOOP
}
};

// 一个handle绑定一个stack
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}

DefaultHandle(Stack<?> stack) {
this.stack = stack;
}

DefaultHandle<T> pop() {
// 当前stack里有多少对象
int size = this.size;
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}

回收对象到Recycler

  • 同线程回收对象
  • 异线程回收对象

同线程回收对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}

void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}

private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}

elements[size] = item;
this.size = size + 1;
}

回收对象到Recycler:
调用Handle的recycle()方法回收对象,使用Stack的push()方法把当前对象压入栈里面,判断当前线程是否为创建Stack的thread执行同/异线程回收对象
1.同线程回收对象->stack.pushNow()
设置recycleId/lastRecycledId为OWN_THREAD_ID,当前Stack对象数量超过承载最大容量或者Handle没有被回收过并且回收对象数量+1&对象回收频率不等于0即回收1/8对象则丢弃对象,当前Stack对象数量达到数组容量则重新创建2倍Stack对象数量的数组,赋值数组当前Stack对象数量位置为回收对象

异线程回收对象

  • 获取WeakOrderQueue
  • 创建WeakOrderQueue
  • 将对象追加到WeakOrderQueue

回收对象到Recycler:
2.异线程回收对象->stack.pushLater()
(1)获取WeakOrderQueue->DELAYED_RECYCLED.get()
通过当前Stack对象获取delayedRecycled的WeakOrderQueue
(2)创建WeakOrderQueue->WeakOrderQueue.allocate()
把WeakOrderQueue插入到Stack对象头部实现当前线程分配Stack对象
(3)将对象追加到WeakOrderQueue->queue.add()
将DefaultHandle对象追加到尾指针tail的elements数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void pushLater(DefaultHandle<?> item, Thread thread) {
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}

queue.add(item);
}

private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};

static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? new WeakOrderQueue(stack, thread) : null;
}

private static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}

private WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<Thread>(thread);
synchronized (stack) {
next = stack.head;
stack.head = this;
}

// Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
// the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
// Stack itself GCed.
availableSharedCapacity = stack.availableSharedCapacity;
}

异线程收割对象

  • 获取当前线程的Stack
  • 从Stack里面弹出对象
  • 创建对象并绑定到Stack

异线程收割对象->stack.scavenge()从其他线程回收对象
使用scavengeSome()方法获取当前需要回收WeakOrderQueue cursor,while循环向当前Stack关联的WeakOrderQueue回收对象,使用WeakOrderQueue cursor的transfer()方法把WeakOrderQueue的Link传输到Stack,循环将当前Link数组元素传输到当前Stack底层数组,cursor的关联线程owner是否为空释放cursor节点,如果没有回收到对象则重置prev指针置为空/cursor指针置为head头节点即下次从头部开始回收

------本文结束感谢阅读------
显示评论
0%