netty启动过程源码分析

前言

netty学习系列笔记总结,服务端启动流程源码浅析,错误之处欢迎指正, 共同学习

服务端启动代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer;SocketChannel() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

服务端Channel的创建

  • 创建底层JDK Channel
  • 封装JDK Channel
  • 创建基本组件绑定Channel;

1.bind(port)[用户代码入口]:serverBootstrap.bind(port)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

2.initAndRegister()[初始化并注册]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

return regFuture;
}

3.newChannel()[创建服务端Channel]

通过serverBootstrap.channel()方法传入NioServerSocketChannel类,构造ReflectiveChannelFactory实例将NioServerSocketChannel类设置为反射类; channelFactory.newChannel()通过clazz.newInstance()调用反射类构造方法反射创建服务端Channel

4.channelFactory在哪里初始化?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//首先channelFactory在开篇示例代码b.channel(NioServerSocketChannel.class)中被设置成new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)

public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

//factory通过反射创建一个NioServerSocketChannel对象
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

5.通过JDK底层创建socketChannel(服务端Channel创建过程)

  • 反射创建服务端Channel:NioServerSocketChannel默认构造方法调用newSocket()使用provider.openServerSocketChannel()创建服务端Socket
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    //在newSocket()中创建了开篇提到的监听套接字ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
    return provider.openServerSocketChannel();
    } catch (IOException e) {
    throw new ChannelException("Failed to open a server socket.", e);
    }
    }

6.NioServerSocketChannelConfig()[TCP参数配置类]:设置底层JDK Channel TCP参数配置

1
2
3
4
5
//SelectionKey.OP_ACCEPT标志就是监听套接字所感兴趣的事件了(但是还没注册进去)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

7.configureBlocking(false)[阻塞模式]:设置非阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//NioServerSocketChannel父类构造函数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//将ServerSocketChannel设置为非阻塞模式, NIO开始
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

8.AbstractChannel()[创建id,unsafe,pipeline]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//继续父类构造方法
1)构造一个unsafe绑定在serverChanel上,newUnsafe()由子类AbstractNioMessageChannel实现, unsafe的类型为NioMessageUnsafe,NioMessageUnsafe类型专为serverChanel服务, 专门处理accept连接
2)创建用于NioServerSocketChannel的管道 boss pipeline
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

3)head和tail是pipeline的两头, head是outbound event的末尾, tail是inbound event的末尾.
按照上行事件(inbound)顺序来看, 现在pipeline中的顺序是head-->tail
DefaultChannelPipeline(AbstractChannel channel) {
this.channel = channel;

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

服务端Channel的初始化

  • init()[初始化服务端channel,初始化入口]
  • set ChildOptions,ChildAttrs:提供给通过服务端Channel创建的新连接Channel,每次accept新连接都配置用户自定义的两个属性配置
  • config handler[配置服务端Pipeline]
  • add ServerBootstrapAcceptor[添加连接器]:提供给accept接入的新连接分配NIO线程
  • 保存用户自定义基本属性,通过配置属性创建连接接入器,连接接入器每次accept新连接使用自定义属性配置新连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//1.设置NioServerSocketChannel的options和attrs.
//2.预先复制好将来要设置给NioSocketChannel的options和attrs.
//3.init做的第二件事就是在boss pipeline添加一个ChannelInitializer,
//4.那么现在pipeline中的顺序变成了head-->ChannelInitializer-->tail(注意head和tail永远在两头, addLast方法对他俩不起作用)

void init(Channel channel) throws Exception {
// ......
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

注册Selector

将底层JDK Channel注册到事件轮询器Selector上面,并把服务端Channel作为Attachment绑定在对应底层JDK Channel

  • AbstractChannel.register(channel)[注册Selector入口]

  • doRegister()

  • 调用JDK底层注册:JDK Channel注册Selector调用javaChannel().register(eventLoop().selector, 0, this),将服务端Channel通过Attachment绑定到Selector

  • invokeHandlerAddedIfNeeded():事件回调,触发Handler

  • fireChannelRegistered()[传播事件]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ......

//赋值操作,后续所有的IO操作交给eventLoop处理
AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
//实际的注册
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ......
}
}
}

register0()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void register0(ChannelPromise promise) {
try {
// ......
1.在doRegister()之后还调用了pipeline.fireChannelRegistered(), 触发ChannelInitializer#channelRegistered()方法.
doRegister();
// ......
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// ......
}
}

doRegister()

1
2
3
4
5
6
7
8
9
10
11
12
//javaChannel().register(), 这里先把interestOps注册为0
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
// ......
}
}
}

服务端口的绑定

  • AbstractUnsafe.bind()[端口绑定]

  • doBind():javaChannel().bind()[JDK动态绑定]

  • pipeline.fireChannelActive()[传播事件]:HeadContext.readIfIsAutoRead()将注册Selector的事件重新绑定为OP_ACCEPT事件,有新连接接入Selector轮询到OP_ACCEPT事件最终将连接交给Netty处理

  • 绑定OP_ACCEPT事件:当端口完成绑定触发active事件,active事件最终调用channel的read事件,read对于服务器来说可以读新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

AbstractChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ......

boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//JDK底层绑定成功,调用fireChannelActive传播
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

//上面的doBind()调用的是NioServerSocketChannel的实现:
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
第二个参数backlog的重要性:
在linux内核中TCP握手过程总共会有两个队列:
1)一个俗称半连接队列, 装着那些握手一半的连接(syn queue)
2)另一个是装着那些握手成功但是还没有被应用层accept的连接的队列(accept queue)
backlog的大小跟这两个队列的容量之和息息相关, backlog的值也不是你设置多少它就是多少的

扩展:对于TCP连接的ESTABLISHED状态, 并不需要应用层accept, 只要在accept queue里就已经变成状态ESTABLISHED

NioServerSocketChannel

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

AbstractNioChannel端口绑定成功,告诉selector需要关心accept事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

总结

Netty服务端启动:

  • 创建服务端Channel:创建底层JDK Channel,封装JDK Channel,创建基本组件绑定Channel;

  • 初始化服务端Channel:设置Channel基本属性,添加逻辑处理器;

  • 注册Selector:将底层JDK Channel注册到事件轮询器Selector上面,并把服务端Channel作为Attachment绑定在对应底层JDK Channel;

  • 端口绑定:实现本地端口监听,绑定成功重新向Selector注册OP_ACCEPT事件接收新连接

服务端Socket在哪里初始化?
反射创建服务端Channel:NioServerSocketChannel默认构造方法调用newSocket()使用provider.openServerSocketChannel()创建服务端Socket

在哪里accept连接?
端口绑定:Pipeline调用fireChannelActive()传播active事件,HeadContext使用readIfIsAutoRead()重新绑定OP_ACCEPT事件,新连接接入Selector轮询到OP_ACCEPT事件处理

------本文结束感谢阅读------
显示评论
0%