前言
netty学习系列笔记总结,NioEventLoop源码浅析,错误之处欢迎指正, 共同学习
NioEventLoop创建
NioEventLoop 的继承图
new NioEventLoopGroup()[线程组,默认2*cpu]
new ThreadPerTaskExecutor()[线程创建器]:线程执行器的作用是负责创建NioEventLoopGroup对应底层线程
创建NioEventLoop对象数组,for循环创建每个NioEventLoop,调用newChild()配置NioEventLoop核心参数
chooserFactory.newChooser()[线程选择器]:给每个新连接分配NioEventLoop线程
1.IO线程组的创建:NioEventLoopGroup
nThreads: Group内产生nTreads个NioEventLoop对象,每个EventLoop都绑定一个线程, 默认值为cpu cores * 2
executor: 每个EventLoop在一次run方法调用的生命周期内都是绑定在一个Thread身上(EventLoop父类SingleThreadEventExecutor中的thread实例变量)
每个EventLoop都绑定了一个Thread
selectorProvider: group内每一个EventLoop都要持有一个selector
1 | public NioEventLoopGroup(int nThreads, Executor executor) { |
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, |
2.ThreadPerTaskThread
通过构造ThreadFactory,每次执行任务创建线程然后运行线程
每次执行任务都会创建一个线程实体FastThreadLocalThread;
1 | protected Thread newThread(Runnable r, String name) { |
3.创建NioEventLoop线程
newChild():创建NioEventLoop线程
保持线程执行器ThreadPerTaskExecutor
创建一个MpscQueue:taskQueue用于外部线程执行Netty任务的时候,如果判断不是在NioEventLoop对应线程里面执行,而直接塞到任务队列里面,由NioEventLoop对应线程执行,PlatformDependent.newMpscQueue(maxPendingTasks)创建MpscQueue保存异步任务队列;
创建一个selector:provider.openSelector()创建selector轮询初始化连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
4.创建线程选择器
1 | //调用chooser.next()方法给新连接绑定对应的NioEventLoop |
NioEventLoop启动
1.NioEventLoop启动触发器:
服务端启动绑定端口
新连接接入通过chooser绑定一个NioEventLoop
bind->execute(task)[入口]:调用bind()方法把具体绑定端口操作封装成Task,通过eventLoop()方法获取channelRegistered()注册绑定NioEventLoop执行NioEventLoop的execute()方法
1 | private static void doBind0( |
2.创建线程
ThreadPerTaskExecutor.execute():通过线程执行器ThreadPerTaskExecutor执行任务创建并启动FastThreadLocalThread线程
thread = Thread.currentThread():NioEventLoop保存当前创建FastThreadLocalThread线程,保存的目的是为了判断后续对NioEventLoop相关执行的线程是否为本身,如果不是则封装成Task扔到TaskQueue串行执行实现线程安全
NioEventLoop.run()[启动]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70private void startThread() {
// 这一个 if 判断就是为了在线程刚创建的时候起作用,也就是线程刚创建才能进到这里
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
// 线程启动了
thread.start();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
NioEventLoop执行逻辑
1.检测IO事件
首先执行delayNanos(currentTimeNanos), 这个方法是做什么的呢?
1
2
3
4
5a)每个EventLoop都有一个延迟执行任务的队列
b)delayNanos就是去这个延迟队列里面瞄一眼是否有非IO任务未执行, 如果没有则返回1秒钟
c)如果很不幸延迟队列里面有任务, delayNanos的计算结果就等于这个task的deadlineNanos到来之前的这段时间, 也即是说select在这个task按预约到期执行的时候就返回了, 不会耽误这个task.
d)如果最终计算出来的可以无忧无虑select的时间(selectDeadLineNanos - currentTimeNanos)小于500000L纳秒, 就认为这点时间是干不出啥大事业的, 还是selectNow一下直接返回吧, 以免耽误了延迟队列里预约好的task.
e)如果大于500000L纳秒, 表示很乐观, 就以1000000L纳秒为时间片, 放肆的去执行阻塞的select了, 阻塞时间就是timeoutMillis(n * 1000000L纳秒时间片).阻塞的select返回后,如果遇到以下几种情况则立即返回
1
2
3
4a)如果select到了就绪连接(selectedKeys > 0)
b)被用户waken up了(wakeUp标识当前select操作是否为唤醒状态,每次select操作把wakeUp设置为false标识此次需要进行select操作并且是未唤醒状态;)
c)任务队列(上面介绍的那个MPSC)来了一个任务
d)延迟队列里面有个预约任务到期需要执行了如果上面情况都不满足, 代表select返回0了, 并且还有时间继续愉快的玩耍
这其中有一个统计select次数的计数器selectCnt, select过多并且都返回0, 默认512就代表过多了, 这表示需要调用rebuildSelector()重建selector了, 达到512可能是触发了nio的epoll cpu 100%的bug, 避免下次JDK空轮询继续发生
rebuildSelector的实际工作就是:
重新打开一个selector, 将原来的那个selector中已注册的所有channel重新注册到新的selector中, 并将老的selectionKey全部cancel掉, 最后将旧的selector关闭重建selector后, 不死心的再selectNow一下
1 | //select()[检查是否有io事件]:轮询注册到selector上面的io事件 |
2.处理外部线程扔到TaskQueue里面的任务
EventLoop的大致数据结构是:一个任务队列,一个延迟任务队列(schedule)。分别存放在普通任务队列MpscQueue和定时任务队列ScheduledTaskQueue
普通任务队列MpscQueue在创建NioEventLoop构造的,外部线程调用NioEventLoop的execute()方法使用addTask()方法向TaskQueue添加task;
定时任务队列ScheduledTaskQueue在调用NioEventLoop的schedule()方法将Callable任务封装成ScheduledFutureTask,判断是否为当前NioEventLoop发起的schedule还是外部线程发起的schedule,当前NioEventLoop发起的schedule直接添加定时任务,外部线程发起的schedule为了保证线程安全(ScheduledTaskQueue是PriorityQueue非线程安全)添加定时任务操作当做普通任务Task保证对于定时任务队列操作都在NioEventLoop实现
1 | protected boolean runAllTasks(long timeoutNanos) { |
优化
select过后, 对读写等事件的处理优化
1 | private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { |
使用数组替换HashSet
1 | a.避免了HashSet的频繁自动扩容。 |
1 | final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { |
总结
用户代码创建Boss/Worker Group NioEventLoop创建,默认创建2倍cpu核数个NioEventLoop,每个NioEventLoop都有线程选择器chooser线程分配并且优化NioEventLoop个数,构造NioEventLoop创建Selector和定时任务队列,创建Selector通过反射使用数组实现替换Selector HashSet数据结构;NioEventLoop调用execute()方法启动FastThreadLocalThread线程,创建线程保存到成员变量;NioEventLoop执行逻辑在run()方法包括检测io事件、处理io事件、执行任务队列
问:默认情况下,Netty服务端起多少线程?何时启动?
答:默认2cpu即Runtime.getRuntime().availableProcessors()2]线程,调用execute()方法判断当前是否在本线程,如果是在本线程说明线程已经启动,如果是在外部线程调用execute()方法,首先调用startThread()方法判断当前线程是否启动,未启动就启动此线程
问:Netty是如何解决JDK空轮询Bug?
答:判断阻塞select操作是否阻塞timeoutMillis时间,未阻塞timeoutMillis时间表示可能触发JDK空轮询;判断触发JDK空轮询的次数是否超过阈值(默认512),超过阈值调用rebuildSelector()方法重建Selector把之前的Selector上面所有的Key重新移到新的Selector避免JDK空轮询的Bug
问:Netty如何保证异步串行无锁化?
答:外部线程调用EventLoop或者Channel方法通过inEventLoop()方法判断得出是外部线程,所有操作封装成Task丢到普通任务队列MpscQueue,异步执行普通任务队列MpscQueue待执行任务