netty解码器抽象父类-ByteToMessageDecoder解析

前言

netty学习系列笔记总结,解码器抽象父类-ByteToMessageDecoder源码浅析,错误之处欢迎指正, 共同学习

累加字节流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 从对象池中取出一个List
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 第一次解码
cumulation = data;// 累计
} else {
// 第二次解码,就将 data 向 cumulation 追加,并释放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加后的 cumulation 后,调用 decode 方法进行解码
// 解码过程中,调用 fireChannelRead 方法,主要目的是将累积区的内容 decode 到 数组中
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
// ......
}
} else {
// 非ByteBuf,直接将当前对象向下进行传播
ctx.fireChannelRead(msg);
}
}

1. 从对象池中取出一个空的数组。
2. 判断成员变量是否是第一次使用,将 unsafe 中传递来的数据写入到这个 cumulation 累积区中。
3. 写到累积区后,调用子类的 decode 方法,尝试将累积区的内容解码,每成功解码一个,就调用后面节点的 channelRead 方法。若没有解码成功,什么都不做。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private Cumulator cumulator = MERGE_CUMULATOR;

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
// 判断当前cumulation是否可写
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
// 扩容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 把当前数据写入到累加器
buffer.writeBytes(in);
// 把读进来的数据进行释放
in.release();
return buffer;
}
};

将 unsafe.read 传递过来的 ByteBuf 的内容写入到 cumulation 累积区中,然后释放掉旧的内容,由于这个变量是成员变量,因此可以多次调用 channelRead 方法写入。

同时这个方法也考虑到了扩容的问题。

调用子类的decode方法进行解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 如果累计区还有可读字节
while (in.isReadable()) {
// mark 解析前数组大小
int outSize = out.size();
// 判断list中是否有对象
if (outSize > 0) {
// 调用后面的业务 handler 的 ChannelRead 方法向下传播
fireChannelRead(ctx, out, outSize);
// 清空 list
out.clear();
// 解码过程中如果当前 context 被 remove 掉,直接 break
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// mark 当前可读字节数
int oldInputLength = in.readableBytes();
// 调用 decode 方法,将成功解码后的数据放入 out 数组中,
decode(ctx, in, out);
// 解码过程中如果当前 context 被 remove 掉,直接 break
if (ctx.isRemoved()) {
break;
}
// 判断 decode 方法是否解析出对象
if (outSize == out.size()) {
// 当前累加器数据是否可以拼装成一个完整的数据包
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;//当前到数据,但没有解析到对象,continue
}
}
// 没有从累加器中读取数据
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}

if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
该方法主要逻辑:只要累积区还有未读数据,就循环进行读取。
1. 调用 decode 方法,内部调用了子类重写的 decode 方法。decode 方法的逻辑就是将累积区的内容按照约定进行解码,如果成功解码,就添加到数组中。同时该方法也会检查该 handler 的状态,如果被移除出 pipeline 了,就将累积区的内容直接刷新到后面的 handler 中。

2. 如果 Context 节点被移除了,直接结束循环。如果解码前的数组大小和解码后的数组大小相等,且累积区的可读字节数没有变化,说明此次读取什么都没做,就直接结束。如果字节数变化了,说明虽然数组没有增加,但确实在读取字节,就再继续读取。

3. 如果上面的判断过了,说明数组读到数据了,但如果累积区的 readIndex 没有变化,则抛出异常,说明没有读取数据,但数组却增加了,子类的操作是不对的。

4. 如果是个单次解码器,解码一次就直接结束了。

所以,这段代码的关键就是子类需要重写 decode 方法,将累积区的数据正确的解码并添加到数组中。每添加一次成功,就会调用 fireChannelRead 方法,将数组中的数据传递给后面的 handler。完成之后将数组的 size 设置为 0.

所以,如果你的业务 handler 在这个地方可能会被多次调用。也可能一次也不调用。取决于数组中的值。当然,如果解码 handler 被移除了,就会将累积区的所有数据刷到后面的 handler。

将解析到的ByteBuf向下传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// ......
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
// 如果累计区没有可读字节了
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;// 将次数归零
cumulation.release();// 释放累计区
cumulation = null;// Help GC
} // 如果超过了 16 次,就压缩累计区,主要是将已经读过的数据丢弃,将 readIndex 归零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}

int size = out.size();
// 如果没有向数组插入过任何数据
decodeWasNull = !out.insertSinceRecycled();
// 循环数组,向后面的 handler 发送数据,如果数组是空,那不会调用
fireChannelRead(ctx, out, size);
// 将数组回收
out.recycle();
}
} else {
// 非ByteBuf,直接将当前对象向下进行传播
ctx.fireChannelRead(msg);
}
}
1. 如果累积区没有可读数据了,将计数器归零,并释放累积区。
2. 如果不满足上面的条件,且计数器超过了 16 次,就压缩累积区的内容,压缩手段是删除已读的数据。将 readIndex 置为 0

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
// 将解析出来的每一个对象向下传播
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}

将 read 方法的数据读取到累积区,使用解码器解码累积区的数据,解码成功一个就放入到一个数组中,并将数组中的数据一次次的传递到后面的handler。

记录 decodeWasNull 属性,这个值的决定来自于你有没有成功的向数组中插入数据,如果插入了,它就是 fasle,没有插入,他就是 true。这个值的作用在于,当 channelRead 方法结束的时候,执行该 decoder 的 channelReadComplete 方法(如果你没有重写的话),会判断这个值

总结

解码的主要逻辑就是将所有的数据全部放入累积区,子类从累积区取出数据进行解码后放入到一个 数组中,ByteToMessageDecoder 会循环数组调用后面的 handler 方法,将数据一帧帧的发送到业务 handler 。完成这个的解码逻辑。

可以说,ByteToMessageDecoder 是解码器的核心,Netty 所有的解码器,都可以在此类上扩展,一切取决于 decode 的实现。只要遵守 ByteToMessageDecoder 的约定即可。

------本文结束感谢阅读------
显示评论
0%