前言
netty学习系列笔记总结,netty如何实现异步回调机制及对耗时业务的处理,错误之处欢迎指正, 共同学习
耗时业务的处理
- handler 种加入线程池
- context 中添加线程池
handler 种加入线程池
1 | .Sharable |
channelRead0 方法,我们模拟了一个耗时 1 秒的操作,于是,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。
源码分析
判定当 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务
1 | private void write(Object msg, boolean flush, ChannelPromise promise) { |
context 中添加线程池
在添加 pipeline 中的 handler 时候,添加一个线程池
当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。
1 | public static void main(String[] args) { |
源码分析
以下代码跟踪可参考 netty启动过程源码分析 中 register0 方法
1 | public EventExecutor executor() { |
如果 this.executor 为 null,就返回 channel().eventLoop(),这个是 io 读写线程,肯定是不能执行耗时任务的。而如果在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池
异步回调机制
假设有小王和小李2个同学。小王不断的从Task队列中取出一个Task, 如果队列为空, 那么小王就什么也不做, 如果该Task是一个耗时任务, 而小王执行该任务的话, 后面的Task会得不到执行, 于是, 小王可以将Task交给小李执行, 这样, 小王就可以继续执行下一个Task了, 而小李执行完毕后, 将执行结果作为了Task放入到小王的任务队列中去, 这样, 当小王执行到该任务时, 也就得到了结果
简单实现
1 | public class Person extends Thread { |
执行结果:
1 | 小王 1,这是一道简单的题 |
Netty中的实现
1 | public class NettyPerson extends Thread { |
执行结果:
1 | defaultEventExecutor-1-1 1,这是一道简单的题 |
源码分析
DefaultPromise.setSuccess(V result)
1 | public Promise<V> setSuccess(V result) { |
DefaultPromise.notifyListeners()
1 | /** |
DefaultEventExecutor.newPromise();
1 | // 利用DefaultEventExecutor生成promise时, 将该executor赋值给promise.executor. |
总结
handler 中加入线程池更加的自由,比如访问数据库等操作。
Context 中添加线程池,会将整个 handler 都交给业务线程池,不够灵活。