netty核心组件-pipeline解析(一)

前言

netty学习系列笔记总结,Pipeline源码浅析,错误之处欢迎指正, 共同学习

背景

netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,在使用Pipeline中自己也有如下不解。

1.ChannelHandlerContext的作用是什么,为什么每一个Handler需要包装一个ChannelHandlerContext

2.数据是如何在不同Handler中传递的

3.Handler在哪一个executor中执行呢? 可以为Handler指派不同的executor吗, 如果handler在不同的executor中执行, 那么数据又是怎么在handler中传递的呢?

4.HeadContext和TailContext的作用是什么?

5.InBound Event来OutBound Event到底是什么? 有什么不同呢? 在Handler中传递有什么区别呢?

下面我们通过源码分析回答这些问题。

pipeline介绍

  • ChannelPipeline
  • ChannelHandler
  • ChannelHandlerContext

我们在之前的文章中知道,每当 ServerSocket 创建一个新的连接,就会创建一个 Socket,对应的就是目标客户端。而每一个新创建的 Socket 都将会分配一个全新的 ChannelPipeline(以下简称 pipeline),他们的关系是永久不变的;而每一个 ChannelPipeline 内部都含有多个 ChannelHandlerContext(以下简称 Context),他们一起组成了双向链表,这些 Context 用于包装我们调用 addLast 方法时添加的 ChannelHandler(以下简称 handler)。

所以说,他们的关系是这样的:

在运行过程中, 每一个NioSocketChannel对应的Pipeline实际是如下这样子

仔细观察可以发现

Pipeline内部实际上是一个双向链表, 每个元素实际上是一个ChannelHandlerContext
每一个Handler被一个ChannelHandlerContext所包装
该Pipeline中隐含着两个Context, 一个是HeadContext, 另一个是TailContext, 通过源代码可以看出, 这两个Context也是Handler.

上图中:ChannelSocket 和 ChannelPipeline 是一对一的关联关系,而 pipeline 内部的多个 Context 形成了链表,Context 只是对 Handler 的封装。

pipeline里面有多个handler, 每个handler节点过滤在pipeline中流转的event, 如果判定需要自己处理这个event,则处理(用户可以在pipeline中添加自己的handler)

总的来说,当一个请求进来的时候,会进入 Socket 对应的 pipeline,并流经 pipeline 所有的 handler

知道了他们的概念,我们继续深入看看他们的设计。

1.ChannelPipeline 作用及设计

首先看 pipeline 的接口设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

ChannelPipeline addFirst(String name, ChannelHandler handler);

ChannelPipeline addLast(String name, ChannelHandler handler);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

ChannelPipeline remove(ChannelHandler handler);

ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
}

通过 UML 图,可以看到该接口继承了 inBound,outBound,Iterable 接口,表示他可以调用当数据出站的方法和入站的方法,同时也能遍历内部的链表。

再看看他的几个具有代表性的方法,基本上都是针对 handler 链表的插入,追加,删除,替换操作,甚至,我们可以想象他就是一个 LinkedList。同时,他也能返回 channel(也就是 socket)。

handler 在 pipeline 中处理 I/O 事件的方式:

--------------------------------------------------------------------
                                            I/O Request
                                            via Channel or
                                        ChannelHandlerContext
                                                    |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
                |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

注意:

你的业务程序不能将线程阻塞,他将会影响 IO 的速度,进而影响整个 Netty 程序的性能。如果你的业务程序很快,就可以放在 IO 线程中,反之,你需要异步执行。或者在添加 handler 的时候添加一个线程池,例如:

1
2
// 下面这个任务执行的时候,将不会阻塞 IO 线程,执行的线程来自 group 线程池
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
2.ChannelHandler 作用及设计

首先看 ChannelHandler 的接口设计:

ChannelHandler 是一个顶级接口,没有继承任何接口:

1
public interface ChannelHandler {}

定义了 3 个方法:

1
2
3
4
5
6
7
8
9
10
11
12
public interface ChannelHandler {

// 当把 ChannelHandler 添加到 pipeline 时被调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;

// 当从 pipeline 中移除时调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

// 当处理过程中在 pipeline 发生异常时调用
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

总的来说,ChannelHandler 的作用就是处理 IO 事件或拦截 IO 事件,并将其转发给下一个处理程序 ChannelHandler。

从上面的代码中,可以看到,ChannelHandler 并没有提供很多的方法,因为 Handler 处理事件时分入站和出站的,两个方向的操作都是不同的,因此,Netty 定义了两个子接口继承 ChannelHandler。

2.1. ChannelInboundHandler 入站事件接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ChannelInboundHandler extends ChannelHandler {

void channelRegistered(ChannelHandlerContext ctx) throws Exception;

void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

void channelActive(ChannelHandlerContext ctx) throws Exception;

void channelInactive(ChannelHandlerContext ctx) throws Exception;

void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

如果你经常使用 Netty 程序,你会非常的熟悉这些方法,比如 channelActive 用于当 Channel 处于活动状态时被调用;channelRead —— 当从Channel 读取数据时被调用等等方法。通常我们需要重写一些方法,当发生关注的事件,我们需要在方法中实现我们的业务逻辑,因为当事件发生时,Netty 会回调对应的方法。

注意:当你重写了上面的 channelRead 方法时,你需要显示的释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了了一个使用方法 ReferenceCountUtil.release().

2.2. ChannelOutboundHandler 出站事件接口

ChannelOutboundHandler 负责出站操作和处理出站数据。接口方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ChannelOutboundHandler extends ChannelHandler {

void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;

void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

void read(ChannelHandlerContext ctx) throws Exception;

void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

void flush(ChannelHandlerContext ctx) throws Exception;
}

大家可以熟悉熟悉这个接口,比如 bind 方法,当请求将 Channel 绑定到本地地址时调用,close 方法,当请求关闭 Channel 时调用等等,总的来说,出站操作都是一些连接和写出数据类似的方法。和入站操作有很大的不同。

总之,我们要区别入站方法和出站方法,这在 pipeline 中将会起很大的作用。

2.3. ChannelDuplexHandler 处理出站和入站事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}

@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

从上面的代码中可以看出 ChannelDuplexHandler 间接实现了入站接口并直接实现了出站接口。是一个通用的能够同时处理入站事件和出站事件的类。

介绍了完了 ChannelHandler 的设计,我们再来看看 ChannelHandlerContext 。

3.ChannelHandlerContext 作用及设计

实际上,从上面的代码中,我们已经看到了 Context 的用处,在 ChannelDuplexHandler 中,cxt 无处不在。事实上,以read 方法为例:调用 handler 的 read 方法,如果你不处理,就会调用 context 的 read 方法,context 再调用下一个 context 的 handler 的 read 方法。

我们看看 ChannelHandlerContext 的接口 UML :

ChannelHandlerContext 继承了出站方法调用接口和入站方法调用接口。那么, ChannelInboundInvoker 和 ChannelOutboundInvoker 又有哪些方法呢?

ChannelInboundInvoker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ChannelInboundInvoker {

ChannelInboundInvoker fireChannelRegistered();

ChannelInboundInvoker fireChannelUnregistered();

ChannelInboundInvoker fireChannelActive();

ChannelInboundInvoker fireChannelInactive();

ChannelInboundInvoker fireExceptionCaught(Throwable cause);

ChannelInboundInvoker fireUserEventTriggered(Object event);

ChannelInboundInvoker fireChannelRead(Object msg);

ChannelInboundInvoker fireChannelReadComplete();

ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker.java

可以看到,这两个 invoker 就是针对入站或出站方法来的,就是再 入站或出站 handler 的外层再包装一层,达到在方法前后拦截并做一些特定操作的目的。

而 ChannelHandlerContext 不仅仅时继承了他们两个的方法,同时也定义了一些自己的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

Channel channel();

EventExecutor executor();

String name();

ChannelHandler handler();

boolean isRemoved();

ChannelPipeline pipeline();

ByteBufAllocator alloc();
}

这些方法能够获取 Context 上下文环境中对应的比如 channel,executor,handler ,pipeline,内存分配器,关联的 handler 是否被删除。

我们可以认为,Context 就是包装了 handler 相关的一切,以方便 Context 可以在 pipeline 方便的操作 handler 相关的资源和行为。

pipeline创建过程

介绍完了 pipeline 的接口设计和一些方法,那么我们就看看,netty中的pipeline是怎么玩转起来的

  • pipeline 初始化

  • pipeline 添加节点

  • pipeline 删除节点

1.pipeline 初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 构造AbstratChannel通过 newChannelPipeline()创建Channel对应的Pipeline,创建
* TailContext tail节点和HeadContext head节点通过prev/next组成双向链表数据结构
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// Pipeline在创建Channel的时候被创建
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
// pipeline中保存了channel的引用
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

1.Pipeline中的两大哨兵:head和tail

2.TailContext tail节点继承AbstractChannelHandlerContext即为Pipeline节点数据结构
ChannelHandlerContext,实现ChannelInboundHandler传播inBound事件即属于Inbound处理器
ChannelHandler,exceptionCaught()/channelRead()方法用于异常未处理警告/Msg未处理建议处理收尾

3.HeadContext head节点继承AbstractChannelHandlerContext即为Pipeline节点数据结构
ChannelHandlerContext,实现ChannelOutboundHandler传播outBound事件即属于Outbound处理器
ChannelHandler,使用pipeline.channel().unsafe()获取Channel的Unsafe实现底层数据读写,用于传播事件/读写事件委托Unsafe操作

pipeline中的每个节点是一个ChannelHandlerContext对象,每个context节点保存了它包裹的执行器 ChannelHandler 执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息

默认情况下,一条pipeline会有两个节点,head和tail

2.pipeline 添加节点

当将Handler添加到Pipeline中时, 最终调用的方法是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
// 防止多线程并发操作pipeline底层的双向链表
synchronized (this) {
// 检查是否有重复 handler
checkMultiplicity(handler);
// 创建节点
newCtx = newContext(group, filterName(name, handler), handler);
// 添加节点
addLast0(newCtx);

if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

// ......
}
// 回调用户代码
callHandlerAdded0(newCtx);
return this;
}

2.1. 检查是否有重复 handler

1
2
3
4
5
6
7
8
9
10
11
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}

通过 checkMultiplicity() 方法判断 ChannelHandler 是否为 ChannelHandlerAdapter 实例,ChannelHandler 强制转换 ChannelHandlerAdapter 判断是否可共享[isSharable()] & 是否已经被添加过 [h.added] , ChannelHandlerAdapter 非共享并且已经被添加过抛出异常拒绝添加

2.2. 创建节点

1
2
3
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

该方法主要做了两件事

1.将handler包装一个ChannelHandlerContext
2.从group中取得一个childExecutor, 赋值给DefaultChannelHandlerContext的executor成员
3.根据该handler是InBoundHandler还是OutBoundHandler为该Context设置inbound或outbound

其中DefaultChannelHandlerContext的executor的含义是.

执行所包装的Handler的executor

所以可以看出.

Handler可以在不同的executor中执行, 如果不指定, 则该executor是NioEventLoop, 这就是默认情况下Handler在NioEventLoop中执行.

2.3. 添加节点

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

调用 addLast0() 方法获取 tail 节点的前置节点 prev ,将当前节点的前置节点 prev 置为 tail 节点的前置节点 prev ,当前节点的后置节点 next 置为 tail 节点, tail 节点的前置节点 prev 的后置节点 next 置为当前节点, tail 节点的前置节点 prev 置为当前节点,通过链表的方式添加到 Channel 的 Pipeline

2.4. 回调用户代码

1
2
3
4
5
6
7
8
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
// ......
}
}

handlerAdded执行用户代码示例

1
2
3
4
5
6
7
8
9
10
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {

@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
ctx.write("hello world");
}, 3, TimeUnit.SECONDS);
}
}

setAddComplete

1
2
3
4
5
6
7
8
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}

修改节点的状态至:REMOVE_COMPLETE(说明该节点已经被移除) 或者 ADD_COMPLETE

pipeline 删除节点

netty 有个最大的特性之一就是Handler可插拔,做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器

下面是权限认证Handler最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此Handler,否则,直接关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
if (paas(password)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}

private boolean paas(ByteBuf password) {
return false;
}
}

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}

// 找到节点
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}

/**
* 从head节点的后置节点next遍历循环判断节点的Handler是否为指定ChannelHandler获取封装
* ChannelHandler的ChannelHandlerContext节点,ChannelHandlerContext节点为空抛异常
*/
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}

AbstractChannelHandlerContext ctx = head.next;
for (;;) {

if (ctx == null) {
return null;
}

if (ctx.handler() == handler) {
return ctx;
}

ctx = ctx.next;
}
}

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;

synchronized (this) {
remove0(ctx);

if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}

// ......
}

// 获取当前节点的ChannelHandler使用handlerRemove()方法回调删除Handler事件
callHandlerRemoved0(ctx);
return ctx;
}

总结

1.了解了pipeline 的接口设计和一些方法。Context 包装 handler,多个 Context 在 pipeline 中形成了双向链表,添加和删除节点均只需要调整链表结构。

2.pipeline中的每个节点包着具体的处理器ChannelHandler,节点根据ChannelHandler的类型是ChannelInboundHandler还是ChannelOutboundHandler来判断该节点属于in还是out或者两者都是

下一篇,总结 pipeline 的事件传播机制。

参考

https://www.jianshu.com/p/6efa9c5fa702

------本文结束感谢阅读------
显示评论
0%