netty异步回调机制及对耗时业务的处理

前言

netty学习系列笔记总结,netty如何实现异步回调机制及对耗时业务的处理,错误之处欢迎指正, 共同学习

耗时业务的处理

  • handler 种加入线程池
  • context 中添加线程池
handler 种加入线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ChannelHandler.Sharable
public class ServerBusinessThreadPoolHandler extends SimpleChannelInboundHandler<ByteBuf> {

public static final ChannelHandler INSTANCE = new ServerBusinessThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(1000);

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf data = Unpooled.directBuffer();
data.writeBytes(msg);
threadPool.submit(() -> {
try {
//耗时的操作
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = getResult(data);
ctx.channel().writeAndFlush(result);
});
}

}

channelRead0 方法,我们模拟了一个耗时 1 秒的操作,于是,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。

源码分析

判定当 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task ,然后放入 mpsc 队列中,等待 IO 任务执行完毕后执行队列中的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ......
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// ......
safeExecute(executor, task, promise, m);
}
}
context 中添加线程池

在添加 pipeline 中的 handler 时候,添加一个线程池
当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup businessGroup = new NioEventLoopGroup(1000);

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);


bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(businessGroup, ServerBusinessHandler.INSTANCE);
}
});

bootstrap.bind(Constant.PORT).addListener((ChannelFutureListener) future -> System.out.println("bind success in port: " + Constant.PORT));
}

源码分析

以下代码跟踪可参考 netty启动过程源码分析 中 register0 方法

1
2
3
4
5
6
7
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}

如果 this.executor 为 null,就返回 channel().eventLoop(),这个是 io 读写线程,肯定是不能执行耗时任务的。而如果在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池

异步回调机制

假设有小王和小李2个同学。小王不断的从Task队列中取出一个Task, 如果队列为空, 那么小王就什么也不做, 如果该Task是一个耗时任务, 而小王执行该任务的话, 后面的Task会得不到执行, 于是, 小王可以将Task交给小李执行, 这样, 小王就可以继续执行下一个Task了, 而小李执行完毕后, 将执行结果作为了Task放入到小王的任务队列中去, 这样, 当小王执行到该任务时, 也就得到了结果

简单实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Person extends Thread {

private static final String strFormat = " %s,这是一道%s的题";

//任务队列
BlockingQueue<Runnable> taskQueue;

private Person(String name) {
super(name);
taskQueue = new LinkedBlockingQueue<>();
}

@Override
public void run() {
for(;;) {
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//将任务提交到任务队列中去
private void submit(Runnable task) {
taskQueue.offer(task);
}

public static void main(String[] args) {
final Person wang = new Person("小王");
final Person li = new Person("小李");
//启动小李
li.start();
//启动小王
wang.start();

//提交一个简单的题
wang.submit(() -> {
System.out.println(
Thread.currentThread().getName() + String.format(strFormat,1,"简单"));
});

//将复杂的题交给li来做
wang.submit(() -> li.submit(() -> {
System.out.println(
Thread.currentThread().getName() + String.format(strFormat,2,"复杂"));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//做完之后将结果作为Task返回给wang
wang.submit(() -> System.out.println(
Thread.currentThread().getName() + " 得到复杂题执行结果"));
}));

//提交一个简单的题
wang.submit(() -> System.out.println(
Thread.currentThread().getName() + String.format(strFormat,3,"简单")));
}
}

执行结果:

1
2
3
4
小王 1,这是一道简单的题
小王 3,这是一道简单的题
小李 2,这是一道复杂的题
小王 得到复杂题执行结果
Netty中的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NettyPerson extends Thread {

private static final String strFormat = " %s,这是一道%s的题";

public static void main(String[] args) {
final DefaultEventExecutor wang = new DefaultEventExecutor();
final DefaultEventExecutor li = new DefaultEventExecutor();

wang.execute(() -> System.out.println(
Thread.currentThread().getName() + String.format(strFormat,1,"简单")));

wang.execute(() -> {
// 生成一个promise
final Promise<Integer> promise = wang.newPromise();
// 为该promise注册一个listener, 当任务执行完后回调该listener.该listener在异步任务提交者线程中执行
promise.addListener(future -> System.out.println(
Thread.currentThread().getName() + " 复杂题执行结果"));
// 在另一个线程中执行一个异步任务, 执行完后, 将promise设置为成功
li.execute(() -> {
System.out.println("执行计算任务的线程 " + Thread.currentThread());
promise.setSuccess(10);
});
});

wang.execute(() -> System.out.println(
Thread.currentThread().getName() + String.format(strFormat,3,"简单")));
}
}

执行结果:

1
2
3
4
defaultEventExecutor-1-1 1,这是一道简单的题
defaultEventExecutor-1-1 3,这是一道简单的题
执行计算任务的线程 Thread[defaultEventExecutor-3-1,5,main]
defaultEventExecutor-1-1 复杂题执行结果

源码分析
DefaultPromise.setSuccess(V result)

1
2
3
4
5
6
7
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}

DefaultPromise.notifyListeners()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 如果当前线程不是executor, 就将notifyListener包装成一个Task添加了executor的
* taskqueue中执行, 如果是executor则直接在当前线程中执行.
*
* 从上面的行为可以猜得到, executor成员的含义应该是异步任务的提交者
*/
protected EventExecutor executor() {
return executor;
}

private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
// ......
notifyListenersNow();
}

safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

DefaultEventExecutor.newPromise();

1
2
3
4
5
6
7
8
// 利用DefaultEventExecutor生成promise时, 将该executor赋值给promise.executor.
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}

public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}

总结

handler 中加入线程池更加的自由,比如访问数据库等操作。

Context 中添加线程池,会将整个 handler 都交给业务线程池,不够灵活。

------本文结束感谢阅读------
显示评论
0%