netty核心组件-pipeline解析(二)

前言

netty学习系列笔记总结,Pipeline源码浅析,错误之处欢迎指正, 共同学习

背景

netty核心组件-pipeline解析(一) 中我们了解了pipeline 的基本概念和初始化及节点添加与删除逻辑,知道了 Netty 是如何处理网络数据的,这篇分析 pipeline 的事件和异常的传播。

pipeline中的inBound事件传播

netty-接受请求过程源码分析 我们已经分析了新连接的建立过程。接着我们在 NioEventLoop 类的 processSelectedKey 方法中,监听 accpet 事件和 read 事件。

启动服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});

ChannelFuture f = b.bind(8880).sync();

f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
}

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
ctx.fireChannelRead(msg);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().pipeline().fireChannelRead("hello world");
}
}

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}

通过 telnet 来连接上面启动好的netty服务,断点打在 NioEventLoop 类的 processSelectedKey 方法中,监听 accpet 事件和 read 事件。

1
2
3
4
5
6
7
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// AbstractNioMessageChannel$NioMessageUnsafe
unsafe.read();
if (!ch.isOpen()) {
return;
}
}

进入的是 NioSocketChannelUnsafe 的抽象父类 AbstractNioMessageChannel 的 read 方法。精简过的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

// 读取数据到容器
// ......
// 让 handler 处理容器中的数据
pipeline.fireChannelRead(readBuf.get(i));

// 告诉容器处理完毕了,触发完成事件
pipeline.fireChannelReadComplete();

}

首先从 unsafe 中读取数据,然后,将读好的数据交给 pipeline,pipeline 调用 inbound 的 channelRead 方法,读取成功后,调用 inbound 的 handler 的 ChannelReadComplete 方法。

我们首先进入 pipeline 的 fireChannelRead 方法,这个方法是实现了 invoker 的方法。

1
2
3
4
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

内部调用的是 AbstractChannelHandlerContext.invokeChannelRead(head, msg) 静态方法,并传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}

调用这个 Context (也就是 head) 的 invokeChannelRead 方法,并传入数据。

1
2
3
4
5
6
7
8
9
10
11
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

我们知道, 在Handler处理完数据后, 要想将数据传递到后一个Handler中, 要调用Context的相关方法, 假如这里一个InBoundHandler处理完了一个数据, 调用了

1
2
3
4
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

这个方法将msg传递到下一个handler中, 而前面已经知道, 下一个handler可能运行在另一个executor中, 那么解答不同exexutor中handler间数据的传递就在这个方法中了. 下面看这个方法做了什么.

1
2
3
4
5
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}

先调用的findContextInbound()

该方法很简单,找到当前 Context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。

重复上面的逻辑,最终到达我们自定义的 handler。

再看invokeChannelRead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {

EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

现在真相大白了, 先返回下一个handler的executor, 然后利用

executor.inEventLoop()
判断当前handler的executort和下一个Handler的executor是不是相同, 如果是, 就直接在当前executor中执行, 如果不是, 则打包成一个Task, 加入到下一个executor的TaskQueue中执行.

而next.invokeChannelRead(msg)是

1
2
3
private void invokeChannelRead(Object msg) {
((ChannelInboundHandler) handler()).channelRead(this, msg);
}

即, 调用handler的channelRead方法, 至此, msg传入到了下一个handler中

所以如果两个handler在不同executor中执行, 那么将msg传递到下一个handler是通过TaskQueue来进行了.

现在, 可以总结ChannelHandlerContext的作用了.

它将与处理数据无关的职能从handler中剥离了出去, 用来管理数据在pipeline中的传递.

TailContext节点
如果数据在 handler 传递过程中没有进行处理,最后传递到 TailContext 节点进行释放

1
2
3
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}

对未处理inbound消息做最后的处理

1
2
3
4
5
6
7
8
9
10
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
// 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存
ReferenceCountUtil.release(msg);
}
}

控制台打印输出:

1
2
3
InBoundHandlerA: hello world
InBoundHandlerB: hello world
InBoundHandlerC: hello world

SimpleChannelInboundHandler

在前面的例子中,假如中间有一个ChannelHandler未对channelRead事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。

我们还可以继承 SimpleChannelInboundHandler 来自定义ChannelHandler,它的channelRead方法,对消息对象做了msg处理,防止内存泄露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
}

pipeline中的outBound事件传播

以后再补充吧(TODO)

pipeline 中异常的传播

以后再补充吧(TODO)

总结

以后再补充吧(TODO)

------本文结束感谢阅读------
显示评论
0%