netty核心组件-nioEventLoop解析

前言

netty学习系列笔记总结,NioEventLoop源码浅析,错误之处欢迎指正, 共同学习

NioEventLoop创建

NioEventLoop 的继承图

  • new NioEventLoopGroup()[线程组,默认2*cpu]

  • new ThreadPerTaskExecutor()[线程创建器]:线程执行器的作用是负责创建NioEventLoopGroup对应底层线程

  • 创建NioEventLoop对象数组,for循环创建每个NioEventLoop,调用newChild()配置NioEventLoop核心参数

  • chooserFactory.newChooser()[线程选择器]:给每个新连接分配NioEventLoop线程

1.IO线程组的创建:NioEventLoopGroup

  • nThreads: Group内产生nTreads个NioEventLoop对象,每个EventLoop都绑定一个线程, 默认值为cpu cores * 2

  • executor: 每个EventLoop在一次run方法调用的生命周期内都是绑定在一个Thread身上(EventLoop父类SingleThreadEventExecutor中的thread实例变量)

  • 每个EventLoop都绑定了一个Thread

  • selectorProvider: group内每一个EventLoop都要持有一个selector

1
2
3
4
5
6
7
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// ......

if (executor == null) {
//创建线程执行器,线程执行器的作用是负责创建NioEventLoopGroup对应底层线程,// 默认使用线程工厂是 DefaultThreadFactory
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

//getClass获取到当前类,当前类是一个NIOEventloop
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

children = new EventExecutor[nThreads];
//产生nTreads个NioEventLoop对象保存在children数组中
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建NioEventLoop对象数组,配置NioEventLoop核心参数
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
} finally {
// ......
}
}

//线程选择器,给每个新连接分配NioEventLoop线程
chooser = chooserFactory.newChooser(children);

// ......
}

2.ThreadPerTaskThread

  • 通过构造ThreadFactory,每次执行任务创建线程然后运行线程

  • 每次执行任务都会创建一个线程实体FastThreadLocalThread;

1
2
3
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

3.创建NioEventLoop线程

  • newChild():创建NioEventLoop线程

  • 保持线程执行器ThreadPerTaskExecutor

  • 创建一个MpscQueue:taskQueue用于外部线程执行Netty任务的时候,如果判断不是在NioEventLoop对应线程里面执行,而直接塞到任务队列里面,由NioEventLoop对应线程执行,PlatformDependent.newMpscQueue(maxPendingTasks)创建MpscQueue保存异步任务队列;

  • 创建一个selector:provider.openSelector()创建selector轮询初始化连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
    throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
    throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();
    selectStrategy = strategy;
    }

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
    boolean addTaskWakesUp, int maxPendingTasks,
    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return PlatformDependent.newMpscQueue(maxPendingTasks);
    }

4.创建线程选择器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//调用chooser.next()方法给新连接绑定对应的NioEventLoop
public EventExecutor next() {
return chooser.next();
}

public EventExecutorChooser newChooser(EventExecutor[] executors) {
//判断是否是2的幂
if (isPowerOfTwo(executors.length)) {
//优化:NioEventLoop索引下标=index++&(length-1)
return new PowerOfTowEventExecutorChooser(executors);
} else {
//普通:NioEventLoop索引下标=abs(index++%length)
return new GenericEventExecutorChooser(executors);
}
}

NioEventLoop启动

1.NioEventLoop启动触发器:

  • 服务端启动绑定端口

  • 新连接接入通过chooser绑定一个NioEventLoop

bind->execute(task)[入口]:调用bind()方法把具体绑定端口操作封装成Task,通过eventLoop()方法获取channelRegistered()注册绑定NioEventLoop执行NioEventLoop的execute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 提交了一个绑定任务到 eventLoop
// 在eventLoop 中对于新的任务的策略是:
// 判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中
// 如果不是,则先启动线程,然后添加任务到任务队列中去
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// bind 的实现是在 AbstractChannel 里面
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

//判断当前调用execute()方法线程是否为NioEventLoop线程,通过startThread()方法创建启动线程
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

2.创建线程

  • ThreadPerTaskExecutor.execute():通过线程执行器ThreadPerTaskExecutor执行任务创建并启动FastThreadLocalThread线程

  • thread = Thread.currentThread():NioEventLoop保存当前创建FastThreadLocalThread线程,保存的目的是为了判断后续对NioEventLoop相关执行的线程是否为本身,如果不是则封装成Task扔到TaskQueue串行执行实现线程安全

  • NioEventLoop.run()[启动]

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    private void startThread() {
    // 这一个 if 判断就是为了在线程刚创建的时候起作用,也就是线程刚创建才能进到这里
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
    if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    delayedTaskQueue.add(new ScheduledFutureTask<Void>(this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
    // 线程启动了
    thread.start();
    }
    }
    }

    private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
    @Override
    public void run() {
    thread = Thread.currentThread();
    if (interrupted) {
    thread.interrupt();
    }

    boolean success = false;
    updateLastExecutionTime();
    try {
    SingleThreadEventExecutor.this.run();
    success = true;
    } catch (Throwable t) {
    logger.warn("Unexpected exception from an event executor: ", t);
    } finally {
    for (;;) {
    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
    break;
    }
    }

    // Check if confirmShutdown() was called at the end of the loop.
    if (success && gracefulShutdownStartTime == 0) {
    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
    "before run() implementation terminates.");
    }

    try {
    // Run all remaining tasks and shutdown hooks.
    for (;;) {
    if (confirmShutdown()) {
    break;
    }
    }
    } finally {
    try {
    cleanup();
    } finally {
    STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
    threadLock.release();
    if (!taskQueue.isEmpty()) {
    logger.warn(
    "An event executor terminated with " +
    "non-empty task queue (" + taskQueue.size() + ')');
    }

    terminationFuture.setSuccess(null);
    }
    }
    }
    }
    });
    }

NioEventLoop执行逻辑

1.检测IO事件

  • 首先执行delayNanos(currentTimeNanos), 这个方法是做什么的呢?

    1
    2
    3
    4
    5
    a)每个EventLoop都有一个延迟执行任务的队列
    b)delayNanos就是去这个延迟队列里面瞄一眼是否有非IO任务未执行, 如果没有则返回1秒钟
    c)如果很不幸延迟队列里面有任务, delayNanos的计算结果就等于这个task的deadlineNanos到来之前的这段时间, 也即是说select在这个task按预约到期执行的时候就返回了, 不会耽误这个task.
    d)如果最终计算出来的可以无忧无虑select的时间(selectDeadLineNanos - currentTimeNanos)小于500000L纳秒, 就认为这点时间是干不出啥大事业的, 还是selectNow一下直接返回吧, 以免耽误了延迟队列里预约好的task.
    e)如果大于500000L纳秒, 表示很乐观, 就以1000000L纳秒为时间片, 放肆的去执行阻塞的select了, 阻塞时间就是timeoutMillis(n * 1000000L纳秒时间片).
  • 阻塞的select返回后,如果遇到以下几种情况则立即返回

    1
    2
    3
    4
    a)如果select到了就绪连接(selectedKeys > 0)
    b)被用户waken up了(wakeUp标识当前select操作是否为唤醒状态,每次select操作把wakeUp设置为false标识此次需要进行select操作并且是未唤醒状态;)
    c)任务队列(上面介绍的那个MPSC)来了一个任务
    d)延迟队列里面有个预约任务到期需要执行了
  • 如果上面情况都不满足, 代表select返回0了, 并且还有时间继续愉快的玩耍

  • 这其中有一个统计select次数的计数器selectCnt, select过多并且都返回0, 默认512就代表过多了, 这表示需要调用rebuildSelector()重建selector了, 达到512可能是触发了nio的epoll cpu 100%的bug, 避免下次JDK空轮询继续发生

  • rebuildSelector的实际工作就是:
    重新打开一个selector, 将原来的那个selector中已注册的所有channel重新注册到新的selector中, 并将老的selectionKey全部cancel掉, 最后将旧的selector关闭

  • 重建selector后, 不死心的再selectNow一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//select()[检查是否有io事件]:轮询注册到selector上面的io事件
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;

// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
// ...
} catch (CancelledKeyException ignored) {}
}

2.处理外部线程扔到TaskQueue里面的任务

  • EventLoop的大致数据结构是:一个任务队列,一个延迟任务队列(schedule)。分别存放在普通任务队列MpscQueue和定时任务队列ScheduledTaskQueue

  • 普通任务队列MpscQueue在创建NioEventLoop构造的,外部线程调用NioEventLoop的execute()方法使用addTask()方法向TaskQueue添加task;

  • 定时任务队列ScheduledTaskQueue在调用NioEventLoop的schedule()方法将Callable任务封装成ScheduledFutureTask,判断是否为当前NioEventLoop发起的schedule还是外部线程发起的schedule,当前NioEventLoop发起的schedule直接添加定时任务,外部线程发起的schedule为了保证线程安全(ScheduledTaskQueue是PriorityQueue非线程安全)添加定时任务操作当做普通任务Task保证对于定时任务队列操作都在NioEventLoop实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
protected boolean runAllTasks(long timeoutNanos) {
//先是fetchFromScheduledTaskQueue, 将延迟任务队列中已到期的task拿到非IO任务的队列中,此队列即为上文中提到的MPSC队列.
fetchFromScheduledTaskQueue();
//task即是从MPSC queue中弹出的任务
Runnable task = pollTask();
if (task == null) {
return false;
}
//又是计算一个deadline
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
//每执行64个任务就检查下时间, 如果到了deadline, 就退出, 没办法, IO任务是亲生的, 非IO任务是后妈生的, 资源肯定要先紧IO任务用.
我们使用netty时也要注意, 不要产生大量耗时的非IO任务, 以免影响了IO任务
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

this.lastExecutionTime = lastExecutionTime;
return true;
}


private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
//while循环获取定时任务队列(按照截止时间&添加时间排序)截止时间为nanoTime的定时任务(截止时间最小)添加到普通任务队列,如果添加失败则重新将定时任务添加到定时任
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}

优化

select过后, 对读写等事件的处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// 每次拿到一个之后SelectionKey立即释放array对这个key的强引用,help gc
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
for (;;) {
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
i++;
}

selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}

使用数组替换HashSet

1
2
3
a.避免了HashSet的频繁自动扩容。
b.屏蔽了remove、contains、iterator这些不需要的功能。
c.对于selectedKeys, 最重要的操作是遍历全部元素,遍历数组效率高于遍历Hashset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}

@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}
}

总结

用户代码创建Boss/Worker Group NioEventLoop创建,默认创建2倍cpu核数个NioEventLoop,每个NioEventLoop都有线程选择器chooser线程分配并且优化NioEventLoop个数,构造NioEventLoop创建Selector和定时任务队列,创建Selector通过反射使用数组实现替换Selector HashSet数据结构;NioEventLoop调用execute()方法启动FastThreadLocalThread线程,创建线程保存到成员变量;NioEventLoop执行逻辑在run()方法包括检测io事件、处理io事件、执行任务队列

问:默认情况下,Netty服务端起多少线程?何时启动?
答:默认2cpu即Runtime.getRuntime().availableProcessors()2]线程,调用execute()方法判断当前是否在本线程,如果是在本线程说明线程已经启动,如果是在外部线程调用execute()方法,首先调用startThread()方法判断当前线程是否启动,未启动就启动此线程

问:Netty是如何解决JDK空轮询Bug?
答:判断阻塞select操作是否阻塞timeoutMillis时间,未阻塞timeoutMillis时间表示可能触发JDK空轮询;判断触发JDK空轮询的次数是否超过阈值(默认512),超过阈值调用rebuildSelector()方法重建Selector把之前的Selector上面所有的Key重新移到新的Selector避免JDK空轮询的Bug

问:Netty如何保证异步串行无锁化?
答:外部线程调用EventLoop或者Channel方法通过inEventLoop()方法判断得出是外部线程,所有操作封装成Task丢到普通任务队列MpscQueue,异步执行普通任务队列MpscQueue待执行任务

------本文结束感谢阅读------
显示评论
0%