netty启动详细记录

ServerBootStrap示例

了解netty服务器的启动,从官方提供的4.1版本echo服务器端代码开始。

Netty官方echo示例EchoServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final class EchoServer {

static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}

// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

思路

这篇文章关心的是netty服务器是如何启动的。因此先跳过bossGroupworkerGroup的创建细节和ServerBootstrap参数初始化细节,直接看bind接口,这里是真正服务器启动的地方。然后顺着netty启动的顺序,遇到新的知识点再展开描述与记录。

整体启动脉络

netty启动涉及到多线程,为了便于以后快速回顾,先将整体启动的大致过程记录下来。绿色部分是给子线程分配的关键初始化任务。

netty启动相关类介绍

一开始虽然不会详细介绍bossGroupworkerGroupServerBootstrap的细节,但是还是先要对启动相关类有个基本认识。

  • ServerBootstrap:为了简化服务器启动过程,所有启动相关的接口和参数都封装到这里,相当于一个门面(Facade)模式。
  • Channel:是netty的核心接口类,除了提供IO操作的接口之外,还提供对于核心组件的访问接口。如果把netty比作一台机器可以说是Channel将机器上所有的基本零件连接在了一起。
  • ChannelFuture:通过它可以读取Channel中的异步操作的状态和结果,简单描述可以分为uncompleted和completed两种情况,而completed又包括Completed successfully、Completed with failure 、Completed by cancellation三种状态。另外它也可以添加ChannelFutureListener监听器,来监听异步I/O操作结果并执行后续操作。
  • ChannelPromise:可以设置异步操作的状态和结果的一种特殊ChannelFuture。在启动过程中会用到它的默认实现DefaultChannelPromise
  • DefaultChannelPromise:在设置异步IO操作成功或者失败之后会通知相应的listeners的默认ChannelPromise

服务器的启动入口

直接看bind操作:

1
2
3
4
5
//ServerBootstrap.java

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

bind方法调用基类AbstractBootstrapbind方法:

1
2
3
4
5
6
7
8
9
10
//AbstractBootstrap.java

public ChannelFuture bind(SocketAddress localAddress) {
//验证服务启动的必要参数
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}

bind方法继续调用doBind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//AbstractBootstrap.java

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

doBind方法最终调用doBind0方法。
这里regFuture其实是记录注册异步操作结果的ChannelPromise实例,通过regFuture.isDone()判断注册异步操作是否完成,存在两种情况:

  • 注册操作没有完成:先给regFuture增加一个监听器,如果注册成功没有错,就会在设置regFuture结果的地方调用doBind0方法
  • 注册操作已经完成,则立即调用doBind0方法

两者的区别在于调用此方法的线程不同。注册完成就直接在主线程调用,没有完成其实是在workGroup子线程中通过触发linster回调doBind0方法。但是殊途同归的是最终真正执行bind操作的地方都是在workGroup子线程中。

initAndRegister方法

调用doBind0方法之前,regFuturechannel参数都是通过initAndRegister方法创建的。

1
2
3
4
5
6
7
8
9
10
11
12
//AbstractBootstrap.java

final ChannelFuture initAndRegister() {
Channel channel = null;
//...
channel = channelFactory.newChannel();
init(channel);
//...
ChannelFuture regFuture = config().group().register(channel);
//...
return regFuture;
}

initAndRegister方法内部主要完成了三件事

  1. 创建channel
  2. 初始化channel
  3. 返回注册的异步结果regFuture

我们先看如何创建channel。

确定创建的channel类型

channel是通过channelFactory.newChannel()方法完成创建的,那channelFactory是如何创建的,对象的类型是什么?
其实通过代码可以确定channelFactory对象实际的类型是ReflectiveChannelFactory。它通过
ServerBootstrapchannel接口完成创建和赋值:

1
.channel(NioSocketChannel.class)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//AbstractBootstrap.java

public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return self();
}

ReflectiveChannelFactorynewChannel方法通过反射的方式创建对象,创建的对象类型为通过构造函数传入的类类型,也就是NioSocketChannel.class类型。因此只要通过ServerBootstrapchannel接口传入具体的类,就决定Netty服务器创建的Channel类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
...
}

NioServerSocketChannel扩展理解

Channel作为netty的核心组件之一,为了加深理解,我们先做简单的介绍:

  • NioSocketChannel, 代表异步的客户端 TCP Socket 连接
  • NioServerSocketChannel, 异步的服务器端 TCP Socket 连接
  • NioDatagramChannel, 异步的 UDP 连接
  • NioSctpChannel, 异步的客户端 Sctp 连接
  • NioSctpServerChannel, 异步的 Sctp 服务器端连接
  • OioSocketChannel, 同步的客户端 TCP Socket 连接
  • OioServerSocketChannel, 同步的服务器端 TCP Socket 连接
  • OioDatagramChannel, 同步的 UDP 连接
  • OioSctpChannel, 同步的 Sctp 服务器端连接
  • OioSctpServerChannel, 同步的客户端 TCP Socket 连接

Channel层级结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
AbstractChannel
|
|--AbstractNioChannel
| |--AbstractNioByteChannel
| |--NioSocketChannel
|
|--AbstractNioMessageChannel
| |--NioServerSocketChannel
| |--NioDatagramChannel
| |--NioSctpServerChannel
| |--NioSctpChannel
|
|--AbstractOioChannel
| |--AbstractOioByteChannel
| |--OioByteStreamChannel
| |--OioSocketChannel
|
|--AbstractOioMessageChannel
| |--OioServerSocketChannel
| |--OioDatagramChannel
| |--OioSctpServerChannel
| |--OioSctpChannel

以上channel可以理解为netty对传输层的具体实现,而AbstractChannel是netty对传输层的抽象,但是也通过采用Facade模式聚合了很多内部组件,包括了UnsafeChannelPipelineEventLoopChannelPromise等,对外提供了统一接口。

而在AbstractChannel层次之上又分为AbstractOioChannel(阻塞)和AbstractNioChannel(非阻塞)。他们的最大的区别在于AbstractNioChannel类内部存在一个SelectableChannel类型的成员变量。SelectableChannel是一个可以通过Selector来进行多路复用的通道。

AbstractChannel核心类介绍

在继续NioServerSocketChannel的创建之前,我们再看看AbstractChannel核心组件:

  • ChannelId:每个Channel的唯一标识,这个唯一标识通过DefaultChannelId产生。
  • Unsafe:它是Channel的辅助接口,不应该被用户代码直接使用,而实际的IO操作应该都由Unsafe接口负责完成。
  • DefaultChannelPipeline:维护一个AbstractChannelHandlerContext的链表,通过这个链表中的handler处理器来处理Channel上的数据。AbstractChannelHandlerContext放到pipeline部分再详细介绍。
  • EventLoop:主要负责执行Channel生命周期内的事件轮询和各种任务。

AbstractNioChannel核心类介绍

  • SelectableChannel:是java nio中的抽象类,本身可以通过Selector来支持多路复用通道,且提供两种模式Blocking mode和Non-blocking mode,SelectableChannel的Non-blocking mode是实现Netty的异步IO事件机制的基础。而要使用Non-blocking mode,需要先设置为Non-blocking mode模式,再通过register(Selector,int,Object)接口绑定Selector并拿到返回的SelectionKey对象。
  • SelectionKeySelectableChannelSelector注册的标识。

NioServerSocketChannel核心类介绍

  • ChannelConfig:操作Channel的配置属性集的接口,包括ChannelOption配置和传输相关的属性。
  • ChannelOption:提供一种类型安全的方式来表现配置信息。比如配置超时时间CONNECT_TIMEOUT_MILLIS

    1
    public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
  • DefaultChannelConfig:对于ChannelConfig的默认实现,内部实现了对各种ChannelOption类型的统一访问,实现了对ByteBufAllocator的访问,以及Channel中的各种默认配置的访问,如读取数据长度、连接超时时间等。

  • DefaultServerSocketChannelConfigDefaultChannelConfig的子类,实现了通过ServerSocketChannelConfig提供的扩展接口,内部存储了ServerSocket的实例。
  • NioServerSocketChannelConfigDefaultServerSocketChannelConfig的子类,实现了autoReadCleared接口。

NioServerSocketChannel的类图

创建NioServerSocketChannel

通过工厂模式创建的NioServerSocketChannel调用的是默认的构造函数,而默认构造函数内部调用了带参数的构造函数:

1
2
3
4
5
6
7
8
9
10
11
//NioServerSocketChannel.java

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

DEFAULT_SELECTOR_PROVIDER是通过SelectorProvider.provider()创建的类静态变量:

1
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
1
2
3
4
5
6
7
8
//SelectorProvider.java

public static SelectorProvider provider() {
//...
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}

1
2
3
4
5
6
//DefaultSelectorProvider.java

public static SelectorProvider create() {
return new WindowsSelectorProvider();
}

channel的创建和平台相关,都是通过对应平台的provider来创建平添相关的channel,以windows平台为例:
DEFAULT_SELECTOR_PROVIDER对象的实际类型是WindowsSelectorProvider
因此newSocket方法调用的provider.openServerSocketChannel方法,在WindowsSelectorProvider父类SelectorProviderImpl中被实现:

1
2
3
4
5
6
7
8
9
10
11
//NioServerSocketChannel.java

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

1
2
3
4
5
6
//WindowsSelectorImpl.java

public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl类内部通过Net组件来实现对本地socket的操作。

所以回到this()构造函数的调用,实际传入的实参类型为ServerSocketChannelImpl

1
2
3
4
5
6
7
//NioServerSocketChannel.java

public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

到这里直接参考NioServerSocketChannel实例化流程图可以基本理解NioServerSocketChannel实例化过程。

过程中有几点还是记录下来:

  • ServerSocketChannelImpl保存到this.ch中,转换成了SelectableChannel类型。
  • 初始化过程中通过ch.configureBlocking(false);AbstractSelectableChannelblocking字段设为false,也就是将ch设置为非阻塞模式。后面只要调用register接口就可以完成channel和selector的注册。
  • 创建的NioServerSocketChannelConfig对象的成员变量channeljavaSocket对应的实例对象为NioServerSocketChannel和经过ServerSocketAdaptor类型转换后的ServerSocket,实际上还是ServerSocketChannelImpl
  • unsafe实际对应的是NioMessageUnsafeNioMessageUnsafe重写了read接口,read方法通过doReadMessages()处理NioServerSocketChannel的accept操作。如果此时没有客户端连接,则退出for循环进行后续的处理,如果有客户端连接,则将客户端NioSocketChannel保存到readBuf中(默认不超过16个),如果超过16个,则也退出for循环进行后续的处理。最后将readBuf传到pipeline去解析。

pipeline的创建

其中pipeline是很重要的组件,所以再详细看一下pipeline的创建。

先介绍pipeline的核心类:

  • AttributeKey:用于从AttributeMap中访问Attribute的键
  • Attribute:存储与操作泛型数据的接口
  • AttributeMap:提供通过AttributeKey来获取Attribute类型的value的接口。
  • DefaultAttributeMapAttributeMap的的默认实现,用来存取AttributeMap的数据。
  • ChannelInboundInvoker:定义了ChannelHandlerContext传递inbound事件的方法。
  • ChannelOutboundInvoker:定义了ChannelHandlerContext传递outbound事件的接口。
  • ChannelHandlerContext:提供了访问各种资源如ChannelChannelHandlerEventExecutorChannelPipelineByteBufAllocatorAttribute的方法,并且实现了ChannelInboundInvokerChannelOutboundInvoker来规范inbound和outbound事件的处理接口。
  • AbstractChannelHandlerContext:它继承了DefaultAttributeMap,因此它有存取AttributeMap数据的能力,同时它也实现了ChannelHandlerContext接口,因此它也拥有访问pipeline的各种资源的能力。

这里还要补充一下,ChannelAbstractChannelHandlerContext都实现了AttributeMap接口,因此每一个ChannelChannelHandlerContext实例都可以像Map一样来存取key和value,唯一的区别是ChannelHandlerContextAttributeMap仅用于当前具体的ChannelHandler子类实例,而ChannelAttributeMap可以被用于所有的ChannelHandlerContext链表中的ChannelHandler子类实例。

1
2
3
4
5
6
//AbstractChannel.java

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//DefaultChannelPipeline.java

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

创建过程比较简单:

  • DefaultChannelPipeline绑定的channel就是NioServerSocketChannel,并创建了succeededFuturevoidPromise用于异步操作支持。
  • 创建TailContextHeadContext来填充AbstractChannelHandlerContext链表结构,channel的handler都会放到这个链表。

NioServerSocketChannel的初始化

完成了NioServerSocketChannel的创建之后,接着执行NioServerSocketChannel的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//ServerBootstrap.java

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

初始化完成了几个工作

  1. 如果用户代码有调用AbstractBootstrap.option接口保存ChannelOption,那么这里将ChannelOption用来配置NioServerSocketChannel的config对象。
  2. 如果用户代码有调用AbstractBootstrap.attr接口保存AttributeKeyAttribute,那么这里将保存的AttributeKeyAttribute键值对复制到channel。
  3. ChannelInitializer<Channel>()对象的基类是ChannelHandler,最终会封装到一个AbstractChannelHandlerContext对象内部,再加入到pipeline双向链表中。
  4. 调用addLast接口时,由于NioServerSocketChannel还没有注册完成,所以会额外创建一个封装了ChannelInitializer<Channel>()AbstractChannelHandlerContext对象,并加入到一个新的PendingHandlerAddedTask对象中,再将此对象加入到pipeline的pendingHandlerCallbackHead单项链表上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//DefaultChannelPipeline.java

public final ChannelPipeline addLast(ChannelHandler handler) {
return addLast(null, handler);
}

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// registered 为 false 标识channel还没有注册到eventloop中。
// 这种情况下处理会在pipeline中加入一个context外,还添加了一个task,当channel注册完成的时候,这个task会执行
ChannelHandler.handlerAdded(...)。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

提前说明这个PendingHandlerAddedTask的作用:
存入pendingHandlerCallbackHead链表的PendingHandlerAddedTask
等到channel注册完成之后会取出并执行。执行时调用PendingHandlerAddedTaskexecute方法,此方法先调用callHandlerAdded0callHandlerAdded0再调用ctx.handler().handlerAdded(ctx);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//DefaultChannelPipeline.java

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

@Override
public void run() {
callHandlerAdded0(ctx);
}

@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
}

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
//...
}
}

其中ctx.handler()ChannelInitializer,而ctx.handler().handlerAdded(ctx);实际上调用的是initChannel接口。而在channel注册时创建的ChannelInitializer<Channel>()刚好重写了此接口。

1
2
3
4
5
6
7
8
//ChannelInitializer.java

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}

所以等到channel注册完成之后,会调用ChannelInitializer<Channel>()重写的initChannel方法执行channel的额外初始化。这部分初始化代码等到注册完成时再说。

NioServerSocketChannel注册入口

完成了NioServerSocketChannel的初始化之后,返回到initAndRegister方法继续往下看注册流程:

1
ChannelFuture regFuture = config().group().register(channel);

这里的config()ServerBootstrap的内部成员变量:

1
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

config().group()对应的是AbstractBootstrap的group成员变量:

1
2
3
4
5
6
//AbstractBootstrapConfig.java

public final EventLoopGroup group() {
return bootstrap.group();
}

1
2
3
4
5
6
//AbstractBootstrap.java

public final EventLoopGroup group() {
return group;
}

而group成员变量是通过ServerBootstrapgroup方法初始化的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//ServerBootstrap.java

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}

在示例代码中对应的是bossGroup,而其childGroup成员变量保存的是workerGroup

1
b.group(bossGroup, workerGroup)

它们是通过NioEventLoopGroup创建的:

1
2
3
4

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

到这里我们又进入了另一大Netty的核心:EventLoopEventLoopGroup,待会会详细介绍。

最终config().group().register(channel)函数实际上调用的是NioEventLoopGroupregister方法,但实现是在父类MultithreadEventLoopGroup中:

1
2
3
4
5
6
7
8
9
10
//MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
return next().register(channel);
}

public EventLoop next() {
return (EventLoop) super.next();
}

MultithreadEventLoopGroup实际上也不是直接注册,而是通过父类的next接口获取到一个EventExecutor并转换为EventLoop类型:

1
2
3
4
5
6
//MultithreadEventExecutorGroup.java

public EventExecutor next() {
return chooser.next();
}

看到这里出现了chooserEventExecutor以及EventLoop这些新的组件,以及一开始创建的两个NioEventLoopGroup对象bossGroupworkerGroup,而为了便于理解这部分内容,在接着看注册流程之前,必须先对netty的线程模型有个基本的认识。
如果对NioEventLoopGroupNioEventLoop有了基本的认识,则可以跳过直接看继续Channel的注册流程的章节。

Reactor模型的理解

NioEventLoopGroup的理解参考了此篇文章:https://segmentfault.com/a/1190000007403873

要理解NioEventLoopGroup,首先还是要知道Netty线程模型是Reactor设计模式的一个实现。
Reactor设计模式中有两个重要角色:

  • Reactor的职责是检测网络IO事件并分发给合适的handler处理。
  • Handlers的职责是执行非阻塞的行为。

而Reactor设计模式根据线程的数量和作用又分为三种Reactor线程模型

  • 单线程模型
  • 多线程模型
  • 主从多线程模型

单线程模型所有工作都在一个线程执行,会由于其中某个handler的阻塞而导致整个服务的阻塞。因此单线程Reactor 模型用的比较少。

多线程模型与单线程模型的区别在于由单独的线程处理accept事件,其它IO操作事件和处理工作放到一组特定的NIO线程来监听处理。Reactor多线程模型如下:

多线程模型改善了handler阻塞对系统的影响,但是对于同时处理大量连接或者在连接时需要进行一些权限检测等工作时,那么单线程处理这些工作仍然负荷较大,可能会影响其它客户端的连接,那么主从多线程则将异步连接事件后续的处理工作也划分到从Reactor线程池来处理,主Reactor线程只负责分配任务。主从Reactor多线程模型如下:

那么回头看我们的示例,netty采用的其实就是Reactor多线程模型,bossGroup单线程处理accept事件,workerGroup线程池处理其它工作:

1
2
3
4
5
6
7

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//...
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

有了netty模型的基本概念之后,再来细看NioEventLoopGroupNioEventLoop

NioEventLoopGroup类层次结构

NioEventLoopGroup实例化过程

NioEventLoopGroup实例化的序列图中,省略了NioEventLoop的实例化过程,NioEventLoop的实例化后面再单独描述。

实例化流程比较简单,只说图中没有明确提到的部分:

  • NioEventLoopGroup内部维护了一份类型为EventExecutor的children数组,children数组实际通过newChild()方法创建的实际对象类型是NioEventLoop
  • executor存储的是一个ThreadPerTaskExecutor类型的实例,,它实现了execute(Runnable command)接口,只要调用此接口,就会将传入的command作为线程的执行体,通过DefaultThreadFactory.newThread(Runnable r)接口创建线程,并启动线程。而最终此executor会作为参数传递到NioEventLoop中,也就是传递给children数组中的每一个元素。
1
2
3
4
5
6
//ThreadPerTaskExecutor.java

public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//DefaultThreadFactory.java

public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}

if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

  • 如果调用构造函数没有设置nThreads数量,则nThreads的数量为DEFAULT_EVENT_LOOP_THREADS,也就是处理器核心数*2
  • 如果nThreads数量是2的次方,则实际通过chooserFactory.newChooser(children)创建的chooser为PowerOfTwoEventExecutorChooser,否则为GenericEventExecutorChooser,这两者的区别在于实现了不同的next方法策略来选择children中的某一个EventExecutor

NioEventLoop类的层次结构

NioEventLoop类的层次结构比较多,重点关注SingleThreadEventExecutor中的thread和executor,它们为NioEventLoop提供了执行线程任务的基础。

NioEventLoop实例化过程

根据上面的图,再补充解释整个NioEventLoopGroupNioEventLoop的实例化过程:

  • NioEventLoopGroup实例化时,根据传入的需要创建的线程数量,创建了对应数量的NioEventLoop实例,并且最终的executor实例是保存在每一个NioEventLoop实例中。
  • 同样的selectorProvider最终也是传递给NioEventLoop用于创建selector实例,不同的在于selectorProvider只有一份实例。
  • 图中忽略了selector实例的创建,简单描述是selectorProvider会根据操作系统来返回不同平台的selectorProvider,而不同平台的selectorProvider通过实现openSelector()来实例化平台相关的selector。
  • 整个过程SingleThreadEventExecutor的thread属性还没有被赋值,这个属性后面会用到。

继续Channel的注册流程

通过前面五节的介绍,注册函数链式调用group().next()实际通过chooser返回EventExecutor对象也就很容易理解了,其真实意图是在NioEventLoopGroup的chlidren数组中选择一个EventExecutor。也可以认为是返回一个NioEventLoop,它们都在一个继承链上。而NioEventLoop的父类SingleThreadEventLoop实现了register(channel)接口,所以group().next().register(channel)实际最后调用的是SingleThreadEventLoop.register(channel)。调用完成实际的返回值是DefaultChannelPromise,用于保存注册的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

SingleThreadEventLoop.register(channel)内部的promise.channel().unsafe().register(this, promise)调用层次很深,我们直接跳过。最终调用到AbstractUnsaferegister方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//AbstractUnsafe.java

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

AbstractUnsaferegister方法首先将eventLoop绑定到NioServerSocketChannel上,然后通过eventLoop.inEventLoop方法判断当前线程是否已经处于该事件循环器线程中:

1
2
3
4
5
6
//AbstractUnsafe.java

public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}

1
2
3
4
5
6
//SingleThreadEventExecutor.java

public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}

从代码看,最终判断的依据是SingleThreadEventExecutor的thread属性,也就是我们提到的在NioEventLoop初始化完成后仍然为空的thread。所以此时判断肯定不在同一个线程中。所以最终会执行eventLoop.execute(new Runnable())的分支。

启动第一个新事件监听器线程

到这里终于进入了一个新的阶段,因为第一个事件监听器线程被创建了。
通过NioEventLoop的类的层次结构知道NioEventLoop的是一个Executor,而在SingleThreadEventExecutor中实现了execute接口。eventLoop.execute(new Runnable())调用的实际就是SingleThreadEventExecutor.execute(Runnable task)方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//SingleThreadEventExecutor.java

public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}

final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
//...
SingleThreadEventExecutor.this.run();
//...
}
});
}

SingleThreadEventExecutor.execute(Runnable task)首先将task存入taskQueue,也就是将register0操作放入任务队列。接着因为目前还是在主线程调用的execute方法,所以会执行startThread分支。startThread方法又调用了doStartThread分支。

事件监听与处理线程

继续看doStartThread方法。在doStartThread内部通过executor创建线程并启动线程。前面在NioEventLoopGroup实例化过程中描述了executor如何创建和启动线程。这里再介绍一次:
executor存储的是一个ThreadPerTaskExecutor类型的实例。它实现了execute(Runnable command)接口,只要调用此接口,就会将传入的command作为线程的执行体。其内部通过DefaultThreadFactory.newThread(Runnable r)接口创建线程,并启动线程。而executor是会在创建NioEventLoopGroup的children数组元素时,作为参数传递给children数组中的每一个NioEventLoop保存。

因此这里再调用executor.execute(new Runnable())方法时,就会启动一个新的线程,并执行run方法。而run方法内部终于将NioEventLoopthread字段更新为当前创建的子线程。并最终调用了SingleThreadEventExecutor.this.run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//SingleThreadEventExecutor.java

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}

terminationFuture.setSuccess(null);
}
}
}
}
});
}

SingleThreadEventExecutor本身并没有实现run方法,而是在子类NioEventLoop实现了run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//NioEventLoop.java

protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

先简单描述当前for循环中完成的供:

  1. 查询是否有IO事件和任务到来
  2. 进行事件处理
  3. 进行任务处理

下面我们一项项描述。

事件与任务监听

简单描述selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())这行代码的含义:如果hasTasks() == true, 调用selector.selectNow(),并清空selectionKeys。因为selector.selectNow()的结果肯定>=0,所以会跳出switch块,否则进入case SelectStrategy.SELECT:分支执行select(wakenUp.getAndSet(false))。这样的好处在于当有task来到的时候,只查询就绪的channel,不阻塞线程;当没有task时,则可以执行NioEventLoop.select()函数,此函数如果没有外部中断或者新任务、定时/周期任务等待处理,会循环执行selector.select(timeoutMillis)去检测网络IO事件。

所以当跳出switch块,则表示有IO事件或者任务到达。

事件处理

IO事件到达后,先通过processSelectedKeys()处理IO事件,然后通过runAllTasks处理任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//NioEventLoop.java

private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i]

selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

selectedKeys在前面创建NioEventLoop已经初始化过了,所以会调用processSelectedKeysOptimized()函数,但此时第一次进入此函数时selectedKeys是空的,因此直接返回,不处理任何的网络IO事件。
虽然目前selectedKeys为空,但还是看看当selectedKeys存储了对应的IO事件时,做了些什么工作:

  • 首先通过selectedKeys.keys[i] = null;清除网络IO事件,然后通过processSelectedKey()处理网络事件和附加的数据a,数据a可以转换为一个AbstractNioChannel或者NioTask<SelectableChannel>

  • 如果附加的是AbstractNioChannel类型的数据,那么最终processSelectedKey会根据具体的IO事件,通过channel的Unsafe接口去执行具体的读、写、处理连接等工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//...

try {
int readyOps = k.readyOps();

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

  • 而如果附加的是NioTask<SelectableChannel>类型的数据,那么processSelectedKey则通过NioTask<SelectableChannel>channelReady接口完成具体的后续操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//NioEventLoop.java

private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}

任务处理

完成IO事件的处理后,继续往下执行runAllTasks。在处理任务时,需要根据this.ioRatio成员变量控制处理任务的时间百分比。this.ioRatio的默认值为50。也就是说,在事件循环中默认情况下用于处理I/O操作的时间和用于处理任务的时间百分比都为50%。
这里我们先看当ioRatio为100的时候是如何执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//SingleThreadEventExecutor.java

protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}

private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}

runAllTasks()方法通过fetchFromScheduledTaskQueue()方法将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue取出,放到taskQueue中,然后通过runAllTasksFrom(Queue<Runnable> taskQueue)将所有任务一个个取出并执行。这里的循环不会被打断,也就意味着直到所有任务执行完成才会退出。

下面看一下runAllTasks(long timeoutNanos)方法的实现,这也是当前流程中执行的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//SingleThreadEventExecutor.java

protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

runAllTasks(long timeoutNanos)首先将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue取出,放到taskQueue中,目前还没有定时和周期任务。然后也会从taskQueue取出每一个task去执行,但是每取出64个(0x100)之后,会进行一次超时判断,如果超时则退出任务的执行。目前taskQueue中存放了一个执行register0方法的任务,所以此任务会出去来准备执行。

切换到子线程继续完成注册

runAllTasks方法内部执行到safeExecute(task)方法时,会调用task.run()方法执行任务的实际工作:

1
2
3
4
5
6
7
8
9
10
//AbstractEventExecutor.java

protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}

所以task.run方法此时执行的就是register0(promise),它是在channel注册时提交的task。

1
2
3
4
5
6
7
8
9
10
//AbstractChannel.java

try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

因此register0方法实际是在子线程中执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//AbstractChannel.java

private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

这个方法完成了几个动作:

  • promise.setUncancellable()设置注册这个异步操作的promise状态为不可取消的(UNCANCELLABLE)。
  • 通过doRegister方法完成SelectorChannel的绑定。
1
2
3
4
5
6
7
8
9
10
11
12
//AbstractNioChannel.java

protected void doRegister() throws Exception {
//...
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
//...
}

protected SelectableChannel javaChannel() {
return ch;
}

javaChannel()接口获取的ch,在NioServerSocketChannel初始化时,创建的实例是ServerSocketChannelImpl,并且已经设置为非阻塞模式。只要调用SelectableChannelregister接口就可以完成注册并返回对应的SelectionKey对象。而实际register方法是在其父类AbstractSelectableChannel中实现,传入的参数为当前NioEventLoopSelectorChannel实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//AbstractSelectableChannel.java

public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}

AbstractSelectableChannel.register(Selector sel, int ops, Object att)方法会判断当前的Selector是否已经注册过,如果注册过则将新的监听事件和新的SelectableChannel实例附加到当前的Selector实例中。但是当前我们的Selector还没有注册过,因此执行的分支是调用((AbstractSelector)sel).register(this, ops, att)方法,也就是调用Selector实例的注册方法。而SelectorImpl实现了此方法:

1
2
3
4
5
6
7
8
9
10
11
//SelectorImpl.java

protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
//...
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
//...
var4.interestOps(var2);
return var4;
}

此方法将SelectorSelectableChannel绑定,设置监听事件,并将this(也就是NioServerSocketChannel)作为附加属性设置到SelectionKey中。

  • 将成员变量registered置为true,表示Channel已经在当前NioEventLoop线程中完成注册。
  • 为注册好的channel的pipeline增加预先定义好的handler,并将下一步要执行的任务添加到NioEventLoop的事件队列。下面开始分析这一系列动作:执行pipeline.invokeHandlerAddedIfNeeded()将在channel初始化时存入链表的PendingHandlerAddedTask取出,并执行其execute()方法,方法会调用ctx.handler().handlerAdded(ctx);handler()获取到的就是重写了initChannel方法的ChannelInitializer<Channel>的实例,而最终handlerAdded(ctx);就会调用到重写的initChannel方法。这些我们在前面NioServerSocketChannel初始化章节的最后描述过。重写的initChannel方法先将ServerBootstrap中保存的handler加入到pipeline的双向链表中,然后再将一个新的task加入到eventLoop的任务队列中,并会在下一次查询任务队列时被执行。这个任务的作用就是将一个新的handler加入到pipeline的双向链表中。这个handler的类型是ServerBootstrapAcceptor,具体的作用我们等到bind操作完成之后再说。等到将initChannel方法执行完毕之后,这个ChannelInitializerhandler会将自己从pipeline中移除,并将ChannelInitializer关联的ChannelHandlerContext的状态置为REMOVE_COMPLETE。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//ChannelInitializer.java

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
//...
initChannel((C) ctx.channel());
//...
remove(ctx);
//...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//ServerBootstrap.java

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
}

  • 标志ChannelPromise(也就是DefaultChannelPromise对象)为成功,也就是将注册这个异步操作标志为成功。

  • pipeline.fireChannelRegistered()触发ChannelRegistered事件,该事件会在ChannelPipeline中传播。它会先被head处理,随后该事件通过ChannelHandlerContext来实现传递给ChannelPipeline中的下一个ChannelInboundHandler处理器处理,直到最后被tail所处理。
    到这里initAndRegister()函数执行完毕,意味着channel的注册和初始化完成了。

回到主线程的dobind()

回到主线程的dobind()函数,我们一开始就说过,注册这个异步操作根据结果存在两个分支,注册成功直接调用doBind0,否则通过addListener的方式异步执行dobind0。其实只要看注册未完成情况下的分支就可以了。regFuture.addListener创建了一个重写operationComplete接口的ChannelFutureListeneroperationComplete()会在regFuture异步操作完成时收到通知。到这里主线程bind()操作完成后,执行sync()阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//AbstractBootstrap.java

private ChannelFuture doBind(final SocketAddress localAddress) {
//...
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

bossGroup线程继续bind0工作

回到bossGroup子线程,往回看register0函数内部的第5个步骤:
设置promise为注册成功状态safeSetSuccess(promise);
到这里主线程创建的ChannelFutureListener在此时就会直接在bossGroup子线程内执行operationComplete接口,执行bind0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//AbstractBootstrap.java

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

bind0将channel的bind操作加入到任务队列中。

NioEventLoop线程继续执行任务队列

到这里,我们的NioEventLoop任务队列又多了两个任务:

  1. ServerBootstrapAcceptor加入到pipeline双向链表中
  2. 执行channel.bind操作完成channel端口的绑定

任务一:ServerBootstrapAcceptor实现事件分派

先看第一个任务天下的handler。ServerBootstrapAcceptorchannelRead事件触发的时候,把childHandler加到childChannel的Pipeline,设置childChanneloptionsattrs,最后执行childChannel的注册来绑定workerGroupNioEventLoop,并启动workerGroup的事件监听线程。通过这种方式把已经连接的channel的后续IO事件分派给workerGroup进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//ServerBootstrap.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;

ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}

任务二:服务器执行bind操作

取出的第二个任务执行channel.bind完成了端口绑定,并增加了ChannelFutureListener.CLOSE_ON_FAILURE监听器监听关闭失败的结果。到这里,整个服务器的启动工作就全部完成了。