netty内存管理-bytebuf总结

前言

netty学习系列笔记总结,bytebuf总结,错误之处欢迎指正, 共同学习

ByteBuf介绍

内存分配负责把数据从底层 IO 读到 ByteBuf 传递应用程序,应用程序处理完之后再把数据封装成 ByteBuf 写回到 IO,ByteBuf 是直接与底层 IO 打交道的抽象

Demo

1
2
3
4
5
6
7
8
9
10
public class Scratch {
public static void main(String[] args) {
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
//int page = 1024 * 8;
//allocator.directBuffer(2 * page);
//allocator.directBuffer(16);
ByteBuf byteBuf = allocator.directBuffer(16);
byteBuf.release();
}
}

ByteBuf结构以及重要api

ByteBuf结构

1
2
3
4
5
6
7
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
[读数据指针] [写数据指针]

read,write,set方法

read 方法读数据在当前 readerIndex 指针开始往后读, readByte()/readShort()/readInt()/readLong() 把 readerIndex 指针往前移一格,读 readerIndex 指针后面 1/2/4/8 个字节数据,write 方法把数据写到 ByteBuf, read/write 方法 readerIndex/writerIndex 指针往后移动,set 方法在当前指针设置成指定值,不会移动指针

mark,reset方法

mark 方法标记当前 readerIndex/writerIndex 指针,reset 方法还原之前读/写数据 readerIndex/writerIndex 指针,不会改变 readerIndex/writerIndex 指针。确保指针读/写完数据能够保持原样

1
2
3
4
5
6
7
8
9
10
11
public int readableBytes() {
return writerIndex - readerIndex;
}

public int writableBytes() {
return capacity() - writerIndex;
}

public int maxWritableBytes() {
return maxCapacity() - writerIndex;
}

ByteBuf分类

AbstractByteBuf 抽象实现 ByteBuf。保存和标记读写指针,记录 ByteBuf 最多分配容量及抽象方法子类实现

1
2
3
4
5
6
7
8
9
10
11
1.Pooled 和 Unpooled
Pooled 池化内存分配每次都是从预先分配好的一块内存取一段连续内存封装成 ByteBuf 提供给应用程序。
Unpooled 非池化每次进行内存分配的时候直接调用系统 API 向操作系统申请一块内存,直接分配。

2.Unsafe 和非Unsafe
Unsafe 直接获取 ByteBuf 在 JVM 的内存地址,基于内存地址调用 JDK 的Unsafe 进行读写操作,使用 UnsafeByteBufUtil.getByte(memory, idx(index)) 通过 ByteBuf 底层分配内存首地址和当前指针基于内存偏移地址获取对应的值。
非Unsafe 不依赖 JDK 底层 Unsafe 对象,使用 HeapByteBufUtil.getByte(array, index) 通过内存数组和索引获取对应的值。

3.Heap 和 Direct
Heap 在堆上进行内存分配,分配内存需要被 GC 管理无需手动释放内存,依赖底层byte 数组。
Direct 调用 JDK 的 API 进行内存分配,分配内存不受 JVM 控制。最终不会参与 GC 过程,需要手动释放内存避免造成内存无法释放,依赖 DirectByteBuffer 对象内存,分配工具: Unpooled$directBuffer()方法

ByteBufAllocator分析

ByteBuf 通过 ByteBufAllocator 内存分配管理器分配内存,内存分配管理器最顶层抽象负责分配所有类型的内存

1
2
3
4
5
6
7
8
9
10
11
12
13
1.ByteBufAllocator功能
ByteBufAllocator 重载 buffer() 方法分配一块内存。buffer()方法分配内存是否为 Direct/Heap内存依赖具体实现。
ioBuffer() 方法分配内存更希望是适合 IO 的 Direct Buffer,directBuffer()/headBuffer()方法堆内/堆外进行内存分配。
compositeBuffer() 方法分配将两个 ByteBuf 合并变成CompositeByteBuf

2.AbstractByteBufAllocator
AbstractByteBufAllocator 抽象实现 ByteBufAllocator。
buffer() 方法分配 Buffer 依赖实现分配内存,调用 directBuffer()/heapBuffer() 方法分配默认 Buffer 容量和最大扩充容量的ByteBuf。
newDirectBuffer()/newHeapBuffer() 方法分配 Pooled/Unpooled 依赖底层实现

3.ByteBufAllocator两个子类
PooledByteBufAllocator 从预先分配好的内存取一段内存,UnpooledByteBufAllocator 调用系
统API 分配内存,调用 hasUnsafe() 方法获取 Unsafe 决定分配 Unsafe/非Unsafe

UnPooledByteBufAllocator分析

1
2
3
4
5
6
7
8
9
10
11
12
13
1.heap内存的分配
newHeapBuffer() 方法通过 hasUnsafe() 方法判断是否有 Unsafe。传递【initialCapacity容
量,Byte数组】参数 setArray() 方法设置 array 以及 setIndex() 方法设置读/写指针创建
UnpooledUnsafeHeapByteBuf/UnpooledHeapByteBuf,_get***() 方法通过 Unsafe 方式返回数
组对象偏移量对应的数组索引方式返回 array 数组 index位置byte

2.direct内存的分配
newDirectBuffer() 方法通过 hasUnsafe() 方法判断是否有 Unsafe。
调用 allocateDirect(initialCapacity)创建Direct。ByteBuffer 使用 setByteBuffer() 方
法设置 buffer[UnpooledUnsafeDirectByteBuf 使用 directBufferAddress() 方法获取
buffer 内存地址设置memoryAddress ]创建 UnpooledUnsafeDirectByteBuf/
UnpooledDirectByteBuf,_get***()方法通过 addr() 方法 memoryAdress+index 计算内存地址
Unsafe 获取对应这块内存的 byte/ByteBuffer获取 buffer index位置对应的byte

PooledByteBufAllocator概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1.拿到线程局部缓存PoolThreadCache->threadCache.get()
通过 PoolThreadLocalCache 类型成员变量 threadCache 的 get() 方法获取当前线程的
PoolThreadCache 局部缓存 cache,不同线程在对象内存堆上进行分配,PoolThreadLocalCache
[继承FastThreadLocal]的 initialValue() 方法通过 heapArenas/directArenas 调用
leastUsedArena() 方法获取 heapArena/directArena 数组,构造PoolThreadLocalCache

2.在线程局部缓存的Arena上进行内存分配
线程局部缓存 PoolThreadCache 维护两块内存:heapArena/directArena堆内/堆外内存,初始化
PoolThreadLocalCache 通过 heapArenas/directArenas 调用 leastUsedArena() 方法获取用
到最少的 heapArena/directArena 竞技场,heapArenas/directArenas 通过构造PooledByteBufAllocator 调用newArenaArray() 方法根据 DEFAULT_NUM_HEAP/DIRECT_ARENA[max(io.netty.allocator.numHeap/DirectArenas,min(runtime.availableProcessors()*2
[默认使用2倍CPU核数减少同步不用加锁分配],runtime.maxMemory()/io.netty.allocator.pageSize << io.netty.allocator.maxOrder/2/3))]容量创建PoolArena数组遍历设置PoolArena的HeapArena/DirectArena

3.PooledByteBufAllocator结构
-------- -------- -------- --------
|Thread| |Thread| |Thread| |Thread|
-------- -------- -------- --------
-----|-------------|-------------|-------------|-----
| ------- ------- ------- ------- |
| |Arena| |Arena| |Arena| |Arena| |
| ------- ------- ------- ------- |
| ----------------- tinyCacheSize |
| |PoolThreadCache| smallCacheSize |
| ----------------- normalCacheSize |
-----------------------------------------------------
创建 ByteBuffer 通过 PoolThreadCache 获取 Arena 对象,PoolThreadCache 通过ThreadLocal 方式把内存分配器 Arena 塞到成员变量,每个线程调用 get()方法获取到对应
的Arena,即线程与Arena绑定。或者通过底层维护的 ByteBuffer 缓存列表譬如创建1024字节的
ByteBuffer用完释放其他地方继续分配1024字节内存通过ByteBuffer缓存列表返回,PooledByteBufAllocator 维护 tinyCacheSize、smallCacheSize 以及 normalCacheSize 缓存 ByteBuffer 的大小用来构造 PooledByteBufAllocator 使用 createSubPageCaches() 方法创建 MemoryRegionCache 数组缓存列表

directArena分配direct内存的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
1.从对象池里面拿到PooledByteBuf进行复用
调用 newByteBuf() 方法使用 PooledUnsafeDirectByteBuf/PooledDirectByteBuf 的
newInstance()方法通过对象池 RECYCLER 的 get() 方法获取 PooledDirectByteBuf 对象。
调用reuse() 方法复用,按照指定maxCapacity扩容,设置引用数量为1以及设置readerIndex/
writerIndex读/写指针为0,重置markedReaderIndex/markedWriterIndex,标记读/写指针为0

2.从缓存上进行内存分配
ByteBuf 之前使用过并且被 release。分配差不多规格大小 ByteBuf 当 capacity<pageSize
或者capacity<=chunkSize 调用 cache的allocateTiny()/allocateSmall()/allocateNormal() 方法在缓存上进行内存分配

3.从内存堆里面进行内存分配
未命中缓存当 capacity<pageSize 或者 capacity<=chunkSize 调用allocateNormal() 方法。
当capacity>chunkSize 调用 allocateHuge() 方法在内存堆里面进行内存分配

内存规格的介绍

1
2
3
4
5
  0 <-tiny->512B<-small->8K<-normal->16M<-huge->
|_____________________| |
SubPage Page Chunk
16M 作为分界点对应的 Chunk,所有的内存申请以 Chunk 为单位向操作系统申请,内存分配在 Chunk 里面执行相应操作。
16M Chunk 按照 Page 进行切分为 2048 个 Page,8K Page 按照 SubPage 切分命中缓存的分配逻辑

缓存数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  --------------------------MemoryRegionCache----------------------------
| queue: Chunk&Handler Chunk&Handler ..... Chunk&Handler |
| sizeClass: tiny(0~512B) small(512B~8K) normal(8K~16M) |
| size: N*16B 512B、1K、2K、4K 8K、16K、32K |
-----------------------------------------------------------------------

queue 由 Entry[Chunk&Handler,Handler指向唯一一段连续内存,Chunk+指向Chunk的一段连续内存确定 Entry 的内存大小和内存位置]组成Cache链。
在缓存里查找有无对应的链定位到 queue 里面的 Entry,sizeClass即内存规格包括 tiny(0~512B)、small(512B~8K) 以及 normal(8K~16M),size 即 MemoryRegionCache 缓存
ByteBuf 的大小,同一个 MemoryRegionCache的queue 里面所有元素都是固定的大小,包括tiny
(N*16B)、small(512B、1K、2K、4K) 以及 normal(8K、16K、32K)

--------------MemoryRegionCache-------------
| tiny[32] 0 16B 32B 48B ... 480B 496B |
| small[4] 512B 1K 2K 4K |
| normal[3] 8K 16K 32K |
--------------------------------------------

queue 存储每种大小的 ByteBuf,sizeClass包括 Tiny、Small以及 Normal,同一个size 的
ByteBuf有哪些可以直接利用,每个线程维护 PoolThreadCache 涵盖 tinySubPageHeap/DirectCaches、smallSubPageHeap/DirectCaches、normalHeap/DirectCaches 三种内存规格大小缓存 Cache,调用createSubPageCaches()/createNormalCaches() 方法创建MemoryRegionCache 数组

命中缓存的分配流程

1
2
3
4
5
6
7
申请内存调用 normalizeCapacity() 方法 reqCapacity 大于Tiny找2的幂次方数值确保数值大于等于reqCapacity,Tiny内存规格以16的倍数自增分段规格化,目的是为了缓存分配后续 release 放到缓存里面而不需要释放,调用cache 的 allocateTiny()/allocateSmall()/allocateNormal()方法分配缓存

1.找到对应 size 的 MemoryRegionCache->cacheForTiny()/cacheForSmall()/cacheForNormal()调用cacheForTiny()/cacheForSmall()/cacheForNormal()方法使用PoolArena的tinyIdx()/smallIdx()/log2(normCapacity>>numShiftsNormalDirect/numShiftsNormalHeap)方法计算索引通过数组下标方式获取缓存节点MemoryRegionCache

2.从queue中弹出一个entry[chunk连续内存]给ByteBuf初始化调用queue的poll()方法弹出个entry使用initBuf()方法根据entry的chunk和handle通过initBuf()/initBufWithSubpage()方法调用PooledByteBuf的init()方法设置ByteBuf的chunk和handle给ByteBuf初始化

3.将弹出的entry扔到对象池进行复用->entry.recycle()调用entry的recycle()方法设置chunk为null&handle为-1使用recyclerHandle的recycle()方法压到栈里扔到对象池后续ByteBuf回收从对象池获取entry将entry的chunk和handle指向被回收的ByteBuf进行复用减少GC以及减少对象重复创建和销毁

arena、chunk、page、subpage概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  PoolThreadCache
-------------
| --------- |
| | Cache | |
| --------- |
| --------- |
| | Arena | |
| --------- |
-------------
Arena划分开辟连续内存进行分配,Cache直接缓存连续内存进行分配
Arena
-------------------------------------------------------------
| ------------- ------------- ------------- |
| | --------- | | --------- | | --------- | |
| | | Chunk | | | | Chunk | | | | Chunk | | |
| | --------- | | -------- | | --------- | |
| | | | |-------->| | | |-------->| | | | |
| | --------- | | -------- | | --------- | |
| | | Chunk | | | | Chunk | | | | Chunk | | |
| | --------- | | -------- | | --------- | |
| | | | |<--------| | | |<--------| | | | |
| | --------- | | -------- | | --------- | |
| | | Chunk | | | | Chunk | | | | Chunk | | |
| | --------- | | -------- | | --------- | |
| ------------- ------------- ------------- |
| ChunkList ChunkList ChunkList |
-------------------------------------------------------------
Arena的ChunkList[每个节点都是Chunk]通过链表方式连接并且每个ChunkList里面有对应的Chunk进行双向链表连接是因为实时计算每个Chunk的分配情况按照内存使用率分别归为ChunkList
PoolArena维护不同使用率的PoolChunkList即Chunk集合q100/q075/q050/q025/q000/qInit调用prevList()方法双向链表连接

--------------------- ---------------------
| ------ ------ | | ------ ------ |
| | 8K | ... | 8K | | | | 2K | ... | 2K | |
| ------ ------ | | ------ ------ |
| ------ ------ | | ------ ------ |
| | 8K | ... | 8K | | | | 2K | ... | 2K | |
| ------ ------ | | ------ ------ |
--------------------- ---------------------
Chunk SubPage[]

Chunk以8K大小划分为Page,Page以2K大小划分为SubPage
PoolArena维护PoolSubpage tinySubpagePools/smallSubpagePools,PoolSubpage的chunk表示子页SubPage从属Chunk,elemSize表示子页SubPage划分数值,bitmap记录子页SubPage内存分配情况[0:未分配/1:已分配],prev/next表示子页SubPage以双向链表方式连接

内存分配从线程的PoolThreadCache里面获取对应的Arena,Arena通过ChunkList获取Chunk进行内存分配,Chunk内存分配判断分配内存大小超过1个Page以Page为单位分配,远远小于1个Page获取Page切分成若干SubPage进行内存划分

page 级别内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1.尝试在现有的chunk上分配
调用PoolChunkList q050/q025/q000/qInit/q075的allocate()方法尝试在现有的chunk上分配,首次PoolChunkList为空无法在现有的chunk上分配,从head节点开始往下遍历每个chunk尝试分配获取handle,handle小于0表示未分配指向next节点继续分配,handle大于0调用chunk的initBuf()方法初始化PooledByteBuf并且判断chunk的使用率是否超过最大使用率从当前PoolChunk移除添加到nextList下一个链表

2.创建一个chunk进行内存分配
调用newChunk()方法创建PoolChunk对象,通过PoolChunk的allocate()方法分配normCapacity内存获取handle指向chunk里面一块连续内存,通过allocateDirect()方法获取直接内存使用PoolChunk构造方法创建1<<maxOrder[11]容量memoryMap和depthMap一次一级地向下移动在每个级别遍历左到右并将值设置为子深度创建PoolChunk对象,调用PoolChunk对象的allocate()方法使用allocateRun()方法计算分配深度通过allocateNode()方法分配节点[从0层开始往下查询空闲节点即未使用且大小满足normCapacity的节点,调用setValue()方法标记节点为unusable[12]即已被使用,使用updateParentsAlloc()方法逐层往上查询父节点标记已被使用在内存中分配索引
0<----------------------------0~16M
1<------------------------0~8M 8~16M
2<----------------0~4M 4~8M 8~12M 12~16M
...
10<-----------------0~16K 16K~32K 32K~48K ...
11<---------------0~8K 8K~16K 16K~24K 24K~32K ...

3.初始化PooledByteBuf
调用PoolChunk的initBuf()方法初始化PooledByteBuf即获取chunk的一块连续内存过后把对应的标记打到PooledByteBuf,调用memoryMapIdx()方法计算chunk连续内存在memoryMap的索引,使用bitmapIdx()方法计算chunk连续内存在bitMap的索引,通过runOffset(memoryMapIdx)计算偏移量以及runLength()方法计算最大长度调用PooledByteBuf的init()方法设置初始化PooledByteBuf

subpage 级别的内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
调用tinyIdx()方法计算normCapacity>>4获取tinySubpagePools的索引tableIdx,根据tableIdx获取tinySubpagePools下标位置的PoolSubpage头节点head,默认情况头节点head无任何内存信息指向它自己表示当前头节点head为空,头节点head的后置节点next非头节点head本身调用initBufWithSubpage()方法初始化PooledByteBuf,反之调用allocateNormal()方法进行subpage级别的内存分配

-------------tinySubpagePools--------------
| tiny[32] 0 16B 32B 48B ... 480B 496B |
-------------------------------------------

1.定位一个Subpage对象
调用allocateNormal()方法内存分配通过PoolChunk对象的allocate()方法使用allocateSubpage()方法创建/初始化normCapacity容量新PoolSubpage添加到拥有此PoolChunk的PoolArena的子页面池里,调用arena的findSubpagePoolHead()方法获取PoolArena拥有的PoolSubPage池的头部并对头节点进行同步,子页面只能从页面分配根据maxOrder分配深度调用allocateNode()方法分配节点获取节点index,使用subpageIdx()方法获取SubPage的索引subpageIdx

---------------------
| ------ ------ |
| | 8K | ... | 8K | |
| ------ ------ |
| ------ ------ |
| | 8K | ... | 8K | |
| ------ ------ |
---------------------
Chunk中的SubPages

2.初始化Subpage
以SubPage的索引获取subpages数组subpageIdx位置的subpage,subpage为空调用PoolSubpage的构造方法创建PoolSubpage,使用init()方法pageSize/normCapacity计算最大SubPage划分数量初始化位图bitmap标识Subpage是否被分配初始化PoolSubpage,调用addToPool()方法把PoolSubpage添加到头节点head所在的链表子页面池,使用allocate()方法获取位图bitmap未被使用的Subpage,可用Subpage为0从子页面池移除Subpage,调用toHandle()方法将bitmapIdx转成为handle[对应Chunk里面第几个节点第几个Subpage即一块内存里面的哪一块连续内存]把memoryMapIdx作为低32位和bitmapIdx作为高32
-------------tinySubpagePools--------------
| tiny[32] 0 16B 32B 48B ... 480B 496B |
| | | |
| 16B |
-------------------------------------------

3.初始化PooledByteBuf
调用PoolChunk的initBuf()方法初始化PooledByteBuf即获取chunk的一块连续内存过后把对应的标记打到PooledByteBuf,调用memoryMapIdx()方法计算chunk连续内存在memoryMap的索引,使用bitmapIdx()方法计算chunk连续内存在bitMap的索引,调用initBufWithSubpage()方法通过runOffset(memoryMapIdx)+(bitmapIdx & 0x3FFFFFFF)* subpage.elemSize计算偏移量以及Subpage划分数量调用PooledByteBuf的init()方法设置初始化PooledByteBuf

ByteBuf的回收

调用ByteBuf的release()方法使用AbstractReferenceCountedByteBuf的release0()方法判断引用数量是否等于decrement相等调用deallocate()方法设置handle为-1表示不指向任何一块内存以及memory设置为空

1
2
3
4
5
6
7
8
1.连续的内存区段加到缓存
调用chunk.arena.free()方法通过PoolThreadCache的add()方法把连续的内存区段[chunk&handle唯一标识]添加到缓存,使用PoolThreadCache的cache()方法获取MemoryRegionCache节点,调用MemoryRegionCache的add()方法把chunk和handle封装成Entry加到queue,通过newEntry()方法获取对象池RECYCLER的Entry调用queue的offer()方法添加到queue

2.标记连续的内存区段为未使用
调用freeChunk()方法使用chunk.parent.free()方法通过Chunk释放连续内存,memoryMapIdx()/bitmapIdx()方法获取连续内存的memoryMapIdx/bitmapIdx,bitmapIdx不等于0表示释放SubPage子页面内存通过arena的findSubpagePoolHead()方法获取PoolSubPage头节点head调用subpage的free()方法释放把连续内存对应的位图标识为0,非SubPage通过分配内存反向标记将连续内存标记为未使用,Page级别完全二叉树,SubPage级别位图

3.ByteBuf加到对象池
调用recycle()方法通过recyclerHandle的recycle()方法将ByteBuf加到对象池即PooledByteBuf被销毁之后在对象池里面

总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1.ByteBuf的api和分类
read**()/write**() 方法,AbstractByteBuf 实现ByteBuf 的数据结构抽象出一系列和数据读写相关的api给子类实现。
ByteBuf分类按照三个维度区分:堆、Unsafe、UnPooled

2.分配Pooled内存的总步骤:
首先现成私有变量 PoolThreadCache 维护的缓存空间查找之前使用过被释放的内存,有的话基于连续内存进行分配,没有的话用一定算法在预先分配好的Chunk进行内存分配。

3.不同规格的pooled内存分配与释放:
Page级别的内存分配和释放通过完全二叉树的标记查找某一段连续内存,Page级别以下的内存分配首先查找到 Page 然后把此 Page 按照 SubPage 大小进行划分。
最后通过位图的方式进行内存分配和释放,内存被释放的时候可能被加入到不同级别的缓存队列供下次分配使用

4.内存的类别有哪些?
* 堆内[基于byte字节内存数组分配]/堆外[基于JDK的DirectByteBuffer内存分配]
* Unsafe[通过JDK的Unsafe对象基于物理内存地址进行数据读写]/非Unsafe[调用JDK的API进行读写]
* UnPooled[每次分配内存申请内存]/Pooled[预先分配好一整块内存,分配的时候用一定算法从一整块内存取出一块连续内存]

5.如何减少多线程内存分配之间的竞争?
PooledByteBufAllocator内存分配器结构维护Arena数组,所有的内存分配都在Arena上进行,通过PoolThreadCache对象将线程和Arena进行一一绑定。
默认情况一个Nio线程管理一个Arena实现多线程内存分配相互不受影响减少多线程内存分配之间的竞争

6.不同大小的内存是如何进行分配的?
Page 级别的内存分配和释放通过完全二叉树的标记查找某一段连续内存,Page级别以下的内存分配首先
查找到Page然后把此Page按照SubPage大小进行划分最后通过位图的方式进行内存分配和释放,内存被释放的时候可能被加入到不同级别的缓存队列供下次分配使用
------本文结束感谢阅读------
显示评论
0%