前言
netty学习系列笔记总结,Pipeline源码浅析,错误之处欢迎指正, 共同学习
背景
在 netty核心组件-pipeline解析(一) 中我们了解了pipeline 的基本概念和初始化及节点添加与删除逻辑,知道了 Netty 是如何处理网络数据的,这篇分析 pipeline 的事件和异常的传播。
pipeline中的inBound事件传播
在 netty-接受请求过程源码分析 我们已经分析了新连接的建立过程。接着我们在 NioEventLoop 类的 processSelectedKey 方法中,监听 accpet 事件和 read 事件。
启动服务端
1 | public static void main(String[] args) throws Exception { |
通过 telnet 来连接上面启动好的netty服务,断点打在 NioEventLoop 类的 processSelectedKey 方法中,监听 accpet 事件和 read 事件。
1 | if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { |
进入的是 NioSocketChannelUnsafe 的抽象父类 AbstractNioMessageChannel 的 read 方法。精简过的代码如下:
1 | public final void read() { |
首先从 unsafe 中读取数据,然后,将读好的数据交给 pipeline,pipeline 调用 inbound 的 channelRead 方法,读取成功后,调用 inbound 的 handler 的 ChannelReadComplete 方法。
我们首先进入 pipeline 的 fireChannelRead 方法,这个方法是实现了 invoker 的方法。
1 | public final ChannelPipeline fireChannelRead(Object msg) { |
内部调用的是 AbstractChannelHandlerContext.invokeChannelRead(head, msg) 静态方法,并传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。
1 | static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { |
调用这个 Context (也就是 head) 的 invokeChannelRead 方法,并传入数据。
1 | private void invokeChannelRead(Object msg) { |
我们知道, 在Handler处理完数据后, 要想将数据传递到后一个Handler中, 要调用Context的相关方法, 假如这里一个InBoundHandler处理完了一个数据, 调用了
1 |
|
这个方法将msg传递到下一个handler中, 而前面已经知道, 下一个handler可能运行在另一个executor中, 那么解答不同exexutor中handler间数据的传递就在这个方法中了. 下面看这个方法做了什么.
1 |
|
先调用的findContextInbound()
该方法很简单,找到当前 Context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。
重复上面的逻辑,最终到达我们自定义的 handler。
再看invokeChannelRead()
1 | static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { |
现在真相大白了, 先返回下一个handler的executor, 然后利用
executor.inEventLoop()
判断当前handler的executort和下一个Handler的executor是不是相同, 如果是, 就直接在当前executor中执行, 如果不是, 则打包成一个Task, 加入到下一个executor的TaskQueue中执行.
而next.invokeChannelRead(msg)是
1 | private void invokeChannelRead(Object msg) { |
即, 调用handler的channelRead方法, 至此, msg传入到了下一个handler中
所以如果两个handler在不同executor中执行, 那么将msg传递到下一个handler是通过TaskQueue来进行了.
现在, 可以总结ChannelHandlerContext的作用了.
它将与处理数据无关的职能从handler中剥离了出去, 用来管理数据在pipeline中的传递.
TailContext节点
如果数据在 handler 传递过程中没有进行处理,最后传递到 TailContext 节点进行释放
1 | public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
对未处理inbound消息做最后的处理
1 | protected void onUnhandledInboundMessage(Object msg) { |
控制台打印输出:
1 | InBoundHandlerA: hello world |
SimpleChannelInboundHandler
在前面的例子中,假如中间有一个ChannelHandler未对channelRead事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。
我们还可以继承 SimpleChannelInboundHandler 来自定义ChannelHandler,它的channelRead方法,对消息对象做了msg处理,防止内存泄露。
1 | public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter { |
pipeline中的outBound事件传播
以后再补充吧(TODO)
pipeline 中异常的传播
以后再补充吧(TODO)
总结
以后再补充吧(TODO)