ServerBootStrap示例
了解netty服务器的启动,从官方提供的4.1版本echo服务器端代码开始。
1 | public final class EchoServer { |
思路
这篇文章关心的是netty服务器是如何启动的。因此先跳过bossGroup
、workerGroup
的创建细节和ServerBootstrap
参数初始化细节,直接看bind
接口,这里是真正服务器启动的地方。然后顺着netty启动的顺序,遇到新的知识点再展开描述与记录。
整体启动脉络
netty启动涉及到多线程,为了便于以后快速回顾,先将整体启动的大致过程记录下来。绿色部分是给子线程分配的关键初始化任务。
netty启动相关类介绍
一开始虽然不会详细介绍bossGroup
、workerGroup
和ServerBootstrap
的细节,但是还是先要对启动相关类有个基本认识。
ServerBootstrap
:为了简化服务器启动过程,所有启动相关的接口和参数都封装到这里,相当于一个门面(Facade)模式。Channel
:是netty的核心接口类,除了提供IO操作的接口之外,还提供对于核心组件的访问接口。如果把netty比作一台机器可以说是Channel
将机器上所有的基本零件连接在了一起。ChannelFuture
:通过它可以读取Channel
中的异步操作的状态和结果,简单描述可以分为uncompleted和completed两种情况,而completed又包括Completed successfully、Completed with failure 、Completed by cancellation三种状态。另外它也可以添加ChannelFutureListener监听器,来监听异步I/O操作结果并执行后续操作。ChannelPromise
:可以设置异步操作的状态和结果的一种特殊ChannelFuture
。在启动过程中会用到它的默认实现DefaultChannelPromise
。DefaultChannelPromise
:在设置异步IO操作成功或者失败之后会通知相应的listeners的默认ChannelPromise
。
服务器的启动入口
直接看bind
操作:
1 | //ServerBootstrap.java |
bind
方法调用基类AbstractBootstrap
的bind
方法:
1 | //AbstractBootstrap.java |
bind方法继续调用doBind方法:
1 | //AbstractBootstrap.java |
doBind
方法最终调用doBind0
方法。
这里regFuture
其实是记录注册异步操作结果的ChannelPromise
实例,通过regFuture.isDone()
判断注册异步操作是否完成,存在两种情况:
- 注册操作没有完成:先给
regFuture
增加一个监听器,如果注册成功没有错,就会在设置regFuture
结果的地方调用doBind0
方法 - 注册操作已经完成,则立即调用
doBind0
方法
两者的区别在于调用此方法的线程不同。注册完成就直接在主线程调用,没有完成其实是在workGroup
子线程中通过触发linster回调doBind0方法。但是殊途同归的是最终真正执行bind操作的地方都是在workGroup
子线程中。
initAndRegister方法
调用doBind0
方法之前,regFuture
和channel
参数都是通过initAndRegister
方法创建的。
1 | //AbstractBootstrap.java |
initAndRegister
方法内部主要完成了三件事
- 创建channel
- 初始化channel
- 返回注册的异步结果regFuture
我们先看如何创建channel。
确定创建的channel类型
channel是通过channelFactory.newChannel()
方法完成创建的,那channelFactory
是如何创建的,对象的类型是什么?
其实通过代码可以确定channelFactory
对象实际的类型是ReflectiveChannelFactory
。它通过ServerBootstrap
的channel
接口完成创建和赋值:
1 | .channel(NioSocketChannel.class) |
1 | //AbstractBootstrap.java |
ReflectiveChannelFactory
的newChannel
方法通过反射的方式创建对象,创建的对象类型为通过构造函数传入的类类型,也就是NioSocketChannel.class
类型。因此只要通过ServerBootstrap
的channel
接口传入具体的类,就决定Netty服务器创建的Channel
类型。
1 |
|
NioServerSocketChannel扩展理解
Channel
作为netty的核心组件之一,为了加深理解,我们先做简单的介绍:
- NioSocketChannel, 代表异步的客户端 TCP Socket 连接
- NioServerSocketChannel, 异步的服务器端 TCP Socket 连接
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接
- NioSctpServerChannel, 异步的 Sctp 服务器端连接
- OioSocketChannel, 同步的客户端 TCP Socket 连接
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接
Channel
层级结构:
1 | AbstractChannel |
以上channel可以理解为netty对传输层的具体实现,而AbstractChannel
是netty对传输层的抽象,但是也通过采用Facade模式聚合了很多内部组件,包括了Unsafe
、ChannelPipeline
、EventLoop
、ChannelPromise
等,对外提供了统一接口。
而在AbstractChannel
层次之上又分为AbstractOioChannel
(阻塞)和AbstractNioChannel
(非阻塞)。他们的最大的区别在于AbstractNioChannel
类内部存在一个SelectableChannel
类型的成员变量。SelectableChannel
是一个可以通过Selector
来进行多路复用的通道。
AbstractChannel核心类介绍
在继续NioServerSocketChannel
的创建之前,我们再看看AbstractChannel
核心组件:
ChannelId
:每个Channel的唯一标识,这个唯一标识通过DefaultChannelId
产生。Unsafe
:它是Channel
的辅助接口,不应该被用户代码直接使用,而实际的IO操作应该都由Unsafe
接口负责完成。DefaultChannelPipeline
:维护一个AbstractChannelHandlerContext
的链表,通过这个链表中的handler处理器来处理Channel上的数据。AbstractChannelHandlerContext
放到pipeline部分再详细介绍。EventLoop
:主要负责执行Channel生命周期内的事件轮询和各种任务。
AbstractNioChannel核心类介绍
SelectableChannel
:是java nio中的抽象类,本身可以通过Selector
来支持多路复用通道,且提供两种模式Blocking mode和Non-blocking mode,SelectableChannel
的Non-blocking mode是实现Netty的异步IO事件机制的基础。而要使用Non-blocking mode,需要先设置为Non-blocking mode模式,再通过register(Selector,int,Object)
接口绑定Selector
并拿到返回的SelectionKey
对象。SelectionKey
:SelectableChannel
与Selector
注册的标识。
NioServerSocketChannel核心类介绍
ChannelConfig
:操作Channel的配置属性集的接口,包括ChannelOption
配置和传输相关的属性。ChannelOption
:提供一种类型安全的方式来表现配置信息。比如配置超时时间CONNECT_TIMEOUT_MILLIS
。1
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
DefaultChannelConfig
:对于ChannelConfig
的默认实现,内部实现了对各种ChannelOption
类型的统一访问,实现了对ByteBufAllocator
的访问,以及Channel
中的各种默认配置的访问,如读取数据长度、连接超时时间等。DefaultServerSocketChannelConfig
:DefaultChannelConfig
的子类,实现了通过ServerSocketChannelConfig
提供的扩展接口,内部存储了ServerSocket
的实例。NioServerSocketChannelConfig
:DefaultServerSocketChannelConfig
的子类,实现了autoReadCleared
接口。
NioServerSocketChannel的类图
创建NioServerSocketChannel
通过工厂模式创建的NioServerSocketChannel
调用的是默认的构造函数,而默认构造函数内部调用了带参数的构造函数:
1 | //NioServerSocketChannel.java |
DEFAULT_SELECTOR_PROVIDER
是通过SelectorProvider.provider()
创建的类静态变量:
1 | private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); |
1 | //SelectorProvider.java |
1 | //DefaultSelectorProvider.java |
channel的创建和平台相关,都是通过对应平台的provider来创建平添相关的channel,以windows平台为例:DEFAULT_SELECTOR_PROVIDER
对象的实际类型是WindowsSelectorProvider
。
因此newSocket
方法调用的provider.openServerSocketChannel
方法,在WindowsSelectorProvider
父类SelectorProviderImpl
中被实现:
1 | //NioServerSocketChannel.java |
1 | //WindowsSelectorImpl.java |
ServerSocketChannelImpl
类内部通过Net
组件来实现对本地socket的操作。
所以回到this()
构造函数的调用,实际传入的实参类型为ServerSocketChannelImpl
。
1 | //NioServerSocketChannel.java |
到这里直接参考NioServerSocketChannel
实例化流程图可以基本理解NioServerSocketChannel
实例化过程。
过程中有几点还是记录下来:
ServerSocketChannelImpl
保存到this.ch
中,转换成了SelectableChannel
类型。- 初始化过程中通过
ch.configureBlocking(false);
将AbstractSelectableChannel
的blocking
字段设为false,也就是将ch
设置为非阻塞模式。后面只要调用register
接口就可以完成channel和selector的注册。 - 创建的
NioServerSocketChannelConfig
对象的成员变量channel
和javaSocket
对应的实例对象为NioServerSocketChannel
和经过ServerSocketAdaptor
类型转换后的ServerSocket
,实际上还是ServerSocketChannelImpl
。 unsafe
实际对应的是NioMessageUnsafe
。NioMessageUnsafe
重写了read
接口,read
方法通过doReadMessages()
处理NioServerSocketChannel
的accept操作。如果此时没有客户端连接,则退出for循环进行后续的处理,如果有客户端连接,则将客户端NioSocketChannel
保存到readBuf中(默认不超过16个),如果超过16个,则也退出for循环进行后续的处理。最后将readBuf传到pipeline
去解析。
pipeline的创建
其中pipeline是很重要的组件,所以再详细看一下pipeline的创建。
先介绍pipeline的核心类:
AttributeKey
:用于从AttributeMap中访问Attribute
的键Attribute
:存储与操作泛型数据的接口AttributeMap
:提供通过AttributeKey
来获取Attribute
类型的value的接口。DefaultAttributeMap
:AttributeMap
的的默认实现,用来存取AttributeMap
的数据。ChannelInboundInvoker
:定义了ChannelHandlerContext
传递inbound事件的方法。ChannelOutboundInvoker
:定义了ChannelHandlerContext
传递outbound事件的接口。ChannelHandlerContext
:提供了访问各种资源如Channel
、ChannelHandler
、EventExecutor
、ChannelPipeline
、ByteBufAllocator
、Attribute
的方法,并且实现了ChannelInboundInvoker
和ChannelOutboundInvoker
来规范inbound和outbound事件的处理接口。AbstractChannelHandlerContext
:它继承了DefaultAttributeMap
,因此它有存取AttributeMap
数据的能力,同时它也实现了ChannelHandlerContext
接口,因此它也拥有访问pipeline的各种资源的能力。
这里还要补充一下,Channel
和AbstractChannelHandlerContext
都实现了AttributeMap
接口,因此每一个Channel
和ChannelHandlerContext
实例都可以像Map一样来存取key和value,唯一的区别是ChannelHandlerContext
的AttributeMap
仅用于当前具体的ChannelHandler
子类实例,而Channel
的AttributeMap
可以被用于所有的ChannelHandlerContext
链表中的ChannelHandler
子类实例。
1 | //AbstractChannel.java |
1 | //DefaultChannelPipeline.java |
创建过程比较简单:
DefaultChannelPipeline
绑定的channel就是NioServerSocketChannel
,并创建了succeededFuture
、voidPromise
用于异步操作支持。- 创建
TailContext
和HeadContext
来填充AbstractChannelHandlerContext
链表结构,channel的handler都会放到这个链表。
NioServerSocketChannel的初始化
完成了NioServerSocketChannel
的创建之后,接着执行NioServerSocketChannel
的初始化:
1 | //ServerBootstrap.java |
初始化完成了几个工作
- 如果用户代码有调用
AbstractBootstrap.option
接口保存ChannelOption
,那么这里将ChannelOption
用来配置NioServerSocketChannel
的config对象。 - 如果用户代码有调用
AbstractBootstrap.attr
接口保存AttributeKey
和Attribute
,那么这里将保存的AttributeKey
和Attribute
键值对复制到channel。 ChannelInitializer<Channel>()
对象的基类是ChannelHandler
,最终会封装到一个AbstractChannelHandlerContext
对象内部,再加入到pipeline
双向链表中。- 调用
addLast
接口时,由于NioServerSocketChannel
还没有注册完成,所以会额外创建一个封装了ChannelInitializer<Channel>()
的AbstractChannelHandlerContext
对象,并加入到一个新的PendingHandlerAddedTask
对象中,再将此对象加入到pipeline的pendingHandlerCallbackHead
单项链表上。
1 | //DefaultChannelPipeline.java |
提前说明这个PendingHandlerAddedTask
的作用:
存入pendingHandlerCallbackHead
链表的PendingHandlerAddedTask
等到channel注册完成之后会取出并执行。执行时调用PendingHandlerAddedTask
的execute
方法,此方法先调用callHandlerAdded0
,callHandlerAdded0
再调用ctx.handler().handlerAdded(ctx);
。
1 | //DefaultChannelPipeline.java |
其中ctx.handler()
是ChannelInitializer
,而ctx.handler().handlerAdded(ctx);
实际上调用的是initChannel
接口。而在channel注册时创建的ChannelInitializer<Channel>()
刚好重写了此接口。
1 | //ChannelInitializer.java |
所以等到channel注册完成之后,会调用ChannelInitializer<Channel>()
重写的initChannel
方法执行channel的额外初始化。这部分初始化代码等到注册完成时再说。
NioServerSocketChannel注册入口
完成了NioServerSocketChannel
的初始化之后,返回到initAndRegister
方法继续往下看注册流程:1
ChannelFuture regFuture = config().group().register(channel);
这里的config()
是ServerBootstrap
的内部成员变量:1
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
config().group()
对应的是AbstractBootstrap
的group成员变量:
1 | //AbstractBootstrapConfig.java |
1 | //AbstractBootstrap.java |
而group成员变量是通过ServerBootstrap
的group
方法初始化的:
1 | //ServerBootstrap.java |
在示例代码中对应的是bossGroup
,而其childGroup
成员变量保存的是workerGroup
:1
b.group(bossGroup, workerGroup)
它们是通过NioEventLoopGroup
创建的:
1 |
|
到这里我们又进入了另一大Netty的核心:EventLoop
和EventLoopGroup
,待会会详细介绍。
最终config().group().register(channel)
函数实际上调用的是NioEventLoopGroup
的register
方法,但实现是在父类MultithreadEventLoopGroup
中:
1 | //MultithreadEventLoopGroup.java |
而MultithreadEventLoopGroup
实际上也不是直接注册,而是通过父类的next
接口获取到一个EventExecutor
并转换为EventLoop
类型:
1 | //MultithreadEventExecutorGroup.java |
看到这里出现了chooser
、EventExecutor
以及EventLoop
这些新的组件,以及一开始创建的两个NioEventLoopGroup
对象bossGroup
和workerGroup
,而为了便于理解这部分内容,在接着看注册流程之前,必须先对netty的线程模型有个基本的认识。
如果对NioEventLoopGroup
和NioEventLoop
有了基本的认识,则可以跳过直接看继续Channel的注册流程
的章节。
Reactor模型的理解
对NioEventLoopGroup
的理解参考了此篇文章:https://segmentfault.com/a/1190000007403873
要理解NioEventLoopGroup
,首先还是要知道Netty线程模型是Reactor设计模式的一个实现。
Reactor设计模式中有两个重要角色:
- Reactor的职责是检测网络IO事件并分发给合适的handler处理。
- Handlers的职责是执行非阻塞的行为。
而Reactor设计模式根据线程的数量和作用又分为三种Reactor线程模型
- 单线程模型
- 多线程模型
- 主从多线程模型
单线程模型所有工作都在一个线程执行,会由于其中某个handler的阻塞而导致整个服务的阻塞。因此单线程Reactor 模型用的比较少。
多线程模型与单线程模型的区别在于由单独的线程处理accept事件,其它IO操作事件和处理工作放到一组特定的NIO线程来监听处理。Reactor多线程模型如下:
多线程模型改善了handler阻塞对系统的影响,但是对于同时处理大量连接或者在连接时需要进行一些权限检测等工作时,那么单线程处理这些工作仍然负荷较大,可能会影响其它客户端的连接,那么主从多线程则将异步连接事件后续的处理工作也划分到从Reactor线程池来处理,主Reactor线程只负责分配任务。主从Reactor多线程模型如下:
那么回头看我们的示例,netty采用的其实就是Reactor多线程模型,bossGroup单线程处理accept事件,workerGroup线程池处理其它工作:
1 |
|
有了netty模型的基本概念之后,再来细看NioEventLoopGroup
和NioEventLoop
:
NioEventLoopGroup类层次结构
NioEventLoopGroup实例化过程
NioEventLoopGroup
实例化的序列图中,省略了NioEventLoop
的实例化过程,NioEventLoop
的实例化后面再单独描述。
实例化流程比较简单,只说图中没有明确提到的部分:
NioEventLoopGroup
内部维护了一份类型为EventExecutor
的children数组,children数组实际通过newChild()
方法创建的实际对象类型是NioEventLoop
。- executor存储的是一个
ThreadPerTaskExecutor
类型的实例,,它实现了execute(Runnable command)
接口,只要调用此接口,就会将传入的command作为线程的执行体,通过DefaultThreadFactory.newThread(Runnable r)
接口创建线程,并启动线程。而最终此executor会作为参数传递到NioEventLoop
中,也就是传递给children数组中的每一个元素。
1 | //ThreadPerTaskExecutor.java |
1 | //DefaultThreadFactory.java |
- 如果调用构造函数没有设置nThreads数量,则nThreads的数量为
DEFAULT_EVENT_LOOP_THREADS
,也就是处理器核心数*2 - 如果nThreads数量是2的次方,则实际通过
chooserFactory.newChooser(children)
创建的chooser为PowerOfTwoEventExecutorChooser
,否则为GenericEventExecutorChooser
,这两者的区别在于实现了不同的next
方法策略来选择children中的某一个EventExecutor
。
NioEventLoop类的层次结构
NioEventLoop
类的层次结构比较多,重点关注SingleThreadEventExecutor
中的thread和executor,它们为NioEventLoop
提供了执行线程任务的基础。
NioEventLoop实例化过程
根据上面的图,再补充解释整个NioEventLoopGroup
和NioEventLoop
的实例化过程:
NioEventLoopGroup
实例化时,根据传入的需要创建的线程数量,创建了对应数量的NioEventLoop
实例,并且最终的executor实例是保存在每一个NioEventLoop
实例中。- 同样的selectorProvider最终也是传递给
NioEventLoop
用于创建selector实例,不同的在于selectorProvider只有一份实例。 - 图中忽略了selector实例的创建,简单描述是selectorProvider会根据操作系统来返回不同平台的selectorProvider,而不同平台的selectorProvider通过实现
openSelector()
来实例化平台相关的selector。 - 整个过程
SingleThreadEventExecutor
的thread属性还没有被赋值,这个属性后面会用到。
继续Channel的注册流程
通过前面五节的介绍,注册函数链式调用group().next()
实际通过chooser返回EventExecutor
对象也就很容易理解了,其真实意图是在NioEventLoopGroup
的chlidren数组中选择一个EventExecutor
。也可以认为是返回一个NioEventLoop
,它们都在一个继承链上。而NioEventLoop
的父类SingleThreadEventLoop
实现了register(channel)
接口,所以group().next().register(channel)
实际最后调用的是SingleThreadEventLoop.register(channel)
。调用完成实际的返回值是DefaultChannelPromise
,用于保存注册的状态。
1 | //SingleThreadEventLoop.java |
而SingleThreadEventLoop.register(channel)
内部的promise.channel().unsafe().register(this, promise)
调用层次很深,我们直接跳过。最终调用到AbstractUnsafe
的register
方法:
1 | //AbstractUnsafe.java |
AbstractUnsafe
的register
方法首先将eventLoop绑定到NioServerSocketChannel
上,然后通过eventLoop.inEventLoop
方法判断当前线程是否已经处于该事件循环器线程中:
1 | //AbstractUnsafe.java |
1 | //SingleThreadEventExecutor.java |
从代码看,最终判断的依据是SingleThreadEventExecutor
的thread属性,也就是我们提到的在NioEventLoop
初始化完成后仍然为空的thread。所以此时判断肯定不在同一个线程中。所以最终会执行eventLoop.execute(new Runnable())
的分支。
启动第一个新事件监听器线程
到这里终于进入了一个新的阶段,因为第一个事件监听器线程被创建了。
通过NioEventLoop的
类的层次结构知道NioEventLoop的
是一个Executor
,而在SingleThreadEventExecutor
中实现了execute
接口。eventLoop.execute(new Runnable())
调用的实际就是SingleThreadEventExecutor.execute(Runnable task)
方法。
1 | //SingleThreadEventExecutor.java |
SingleThreadEventExecutor.execute(Runnable task)
首先将task存入taskQueue,也就是将register0
操作放入任务队列。接着因为目前还是在主线程调用的execute方法,所以会执行startThread
分支。startThread
方法又调用了doStartThread
分支。
事件监听与处理线程
继续看doStartThread
方法。在doStartThread
内部通过executor创建线程并启动线程。前面在NioEventLoopGroup
实例化过程中描述了executor如何创建和启动线程。这里再介绍一次:
executor存储的是一个ThreadPerTaskExecutor
类型的实例。它实现了execute(Runnable command)
接口,只要调用此接口,就会将传入的command作为线程的执行体。其内部通过DefaultThreadFactory.newThread(Runnable r)
接口创建线程,并启动线程。而executor是会在创建NioEventLoopGroup
的children数组元素时,作为参数传递给children数组中的每一个NioEventLoop
保存。
因此这里再调用executor.execute(new Runnable())
方法时,就会启动一个新的线程,并执行run
方法。而run
方法内部终于将NioEventLoop
的thread
字段更新为当前创建的子线程。并最终调用了SingleThreadEventExecutor.this.run()
方法。
1 | //SingleThreadEventExecutor.java |
而SingleThreadEventExecutor
本身并没有实现run
方法,而是在子类NioEventLoop
实现了run
方法:
1 | //NioEventLoop.java |
先简单描述当前for循环中完成的供:
- 查询是否有IO事件和任务到来
- 进行事件处理
- 进行任务处理
下面我们一项项描述。
事件与任务监听
简单描述selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
这行代码的含义:如果hasTasks() == true, 调用selector.selectNow()
,并清空selectionKeys。因为selector.selectNow()
的结果肯定>=0,所以会跳出switch块,否则进入case SelectStrategy.SELECT:
分支执行select(wakenUp.getAndSet(false))
。这样的好处在于当有task来到的时候,只查询就绪的channel,不阻塞线程;当没有task时,则可以执行NioEventLoop.select()
函数,此函数如果没有外部中断或者新任务、定时/周期任务等待处理,会循环执行selector.select(timeoutMillis)
去检测网络IO事件。
所以当跳出switch块,则表示有IO事件或者任务到达。
事件处理
IO事件到达后,先通过processSelectedKeys()
处理IO事件,然后通过runAllTasks
处理任务。
1 | //NioEventLoop.java |
selectedKeys
在前面创建NioEventLoop
已经初始化过了,所以会调用processSelectedKeysOptimized()
函数,但此时第一次进入此函数时selectedKeys
是空的,因此直接返回,不处理任何的网络IO事件。
虽然目前selectedKeys
为空,但还是看看当selectedKeys
存储了对应的IO事件时,做了些什么工作:
首先通过
selectedKeys.keys[i] = null;
清除网络IO事件,然后通过processSelectedKey()
处理网络事件和附加的数据a,数据a可以转换为一个AbstractNioChannel
或者NioTask<SelectableChannel>
。如果附加的是
AbstractNioChannel
类型的数据,那么最终processSelectedKey
会根据具体的IO事件,通过channel的Unsafe接口去执行具体的读、写、处理连接等工作。
1 | //NioEventLoop.java |
- 而如果附加的是
NioTask<SelectableChannel>
类型的数据,那么processSelectedKey
则通过NioTask<SelectableChannel>
的channelReady
接口完成具体的后续操作。
1 | //NioEventLoop.java |
任务处理
完成IO事件的处理后,继续往下执行runAllTasks
。在处理任务时,需要根据this.ioRatio
成员变量控制处理任务的时间百分比。this.ioRatio
的默认值为50。也就是说,在事件循环中默认情况下用于处理I/O操作的时间和用于处理任务的时间百分比都为50%。
这里我们先看当ioRatio为100的时候是如何执行的:
1 | //SingleThreadEventExecutor.java |
runAllTasks()
方法通过fetchFromScheduledTaskQueue()
方法将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue
取出,放到taskQueue
中,然后通过runAllTasksFrom(Queue<Runnable> taskQueue)
将所有任务一个个取出并执行。这里的循环不会被打断,也就意味着直到所有任务执行完成才会退出。
下面看一下runAllTasks(long timeoutNanos)
方法的实现,这也是当前流程中执行的方法:
1 | //SingleThreadEventExecutor.java |
runAllTasks(long timeoutNanos)
首先将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue
取出,放到taskQueue
中,目前还没有定时和周期任务。然后也会从taskQueue
取出每一个task去执行,但是每取出64个(0x100)之后,会进行一次超时判断,如果超时则退出任务的执行。目前taskQueue
中存放了一个执行register0
方法的任务,所以此任务会出去来准备执行。
切换到子线程继续完成注册
当runAllTasks
方法内部执行到safeExecute(task)
方法时,会调用task.run()
方法执行任务的实际工作:
1 | //AbstractEventExecutor.java |
所以task.run
方法此时执行的就是register0(promise)
,它是在channel注册时提交的task。
1 | //AbstractChannel.java |
因此register0
方法实际是在子线程中执行:
1 | //AbstractChannel.java |
这个方法完成了几个动作:
promise.setUncancellable()
设置注册这个异步操作的promise状态为不可取消的(UNCANCELLABLE)。- 通过
doRegister
方法完成Selector
和Channel
的绑定。
1 | //AbstractNioChannel.java |
javaChannel()
接口获取的ch,在NioServerSocketChannel
初始化时,创建的实例是ServerSocketChannelImpl
,并且已经设置为非阻塞模式。只要调用SelectableChannel
的register
接口就可以完成注册并返回对应的SelectionKey
对象。而实际register
方法是在其父类AbstractSelectableChannel
中实现,传入的参数为当前NioEventLoop
的Selector
和Channel
实例。
1 | //AbstractSelectableChannel.java |
AbstractSelectableChannel.register(Selector sel, int ops, Object att)
方法会判断当前的Selector
是否已经注册过,如果注册过则将新的监听事件和新的SelectableChannel
实例附加到当前的Selector
实例中。但是当前我们的Selector
还没有注册过,因此执行的分支是调用((AbstractSelector)sel).register(this, ops, att)
方法,也就是调用Selector
实例的注册方法。而SelectorImpl
实现了此方法:
1 | //SelectorImpl.java |
此方法将Selector
和SelectableChannel
绑定,设置监听事件,并将this(也就是NioServerSocketChannel
)作为附加属性设置到SelectionKey
中。
- 将成员变量registered置为true,表示
Channel
已经在当前NioEventLoop
线程中完成注册。 - 为注册好的channel的pipeline增加预先定义好的handler,并将下一步要执行的任务添加到
NioEventLoop
的事件队列。下面开始分析这一系列动作:执行pipeline.invokeHandlerAddedIfNeeded()
将在channel初始化时存入链表的PendingHandlerAddedTask
取出,并执行其execute()
方法,方法会调用ctx.handler().handlerAdded(ctx);
,handler()
获取到的就是重写了initChannel
方法的ChannelInitializer<Channel>
的实例,而最终handlerAdded(ctx);
就会调用到重写的initChannel
方法。这些我们在前面NioServerSocketChannel
初始化章节的最后描述过。重写的initChannel
方法先将ServerBootstrap
中保存的handler加入到pipeline的双向链表中,然后再将一个新的task加入到eventLoop的任务队列中,并会在下一次查询任务队列时被执行。这个任务的作用就是将一个新的handler加入到pipeline的双向链表中。这个handler的类型是ServerBootstrapAcceptor
,具体的作用我们等到bind操作完成之后再说。等到将initChannel
方法执行完毕之后,这个ChannelInitializer
handler会将自己从pipeline中移除,并将ChannelInitializer关联的ChannelHandlerContext的状态置为REMOVE_COMPLETE。
1 | //ChannelInitializer.java |
1 | //ServerBootstrap.java |
标志
ChannelPromise
(也就是DefaultChannelPromise
对象)为成功,也就是将注册这个异步操作标志为成功。pipeline.fireChannelRegistered()
触发ChannelRegistered
事件,该事件会在ChannelPipeline
中传播。它会先被head处理,随后该事件通过ChannelHandlerContext
来实现传递给ChannelPipeline
中的下一个ChannelInboundHandler
处理器处理,直到最后被tail所处理。
到这里initAndRegister()
函数执行完毕,意味着channel的注册和初始化完成了。
回到主线程的dobind()
回到主线程的dobind()
函数,我们一开始就说过,注册这个异步操作根据结果存在两个分支,注册成功直接调用doBind0
,否则通过addListener
的方式异步执行dobind0
。其实只要看注册未完成情况下的分支就可以了。regFuture.addListener
创建了一个重写operationComplete
接口的ChannelFutureListener
。operationComplete()
会在regFuture异步操作完成时收到通知。到这里主线程bind()
操作完成后,执行sync()
阻塞等待。
1 | //AbstractBootstrap.java |
bossGroup线程继续bind0工作
回到bossGroup
子线程,往回看register0
函数内部的第5个步骤:
设置promise为注册成功状态safeSetSuccess(promise);
。
到这里主线程创建的ChannelFutureListener
在此时就会直接在bossGroup
子线程内执行operationComplete
接口,执行bind0
。
1 | //AbstractBootstrap.java |
bind0
将channel的bind操作加入到任务队列中。
NioEventLoop线程继续执行任务队列
到这里,我们的NioEventLoop任务队列又多了两个任务:
- 将
ServerBootstrapAcceptor
加入到pipeline双向链表中 - 执行
channel.bind
操作完成channel端口的绑定
任务一:ServerBootstrapAcceptor实现事件分派
先看第一个任务天下的handler。ServerBootstrapAcceptor
在channelRead
事件触发的时候,把childHandler
加到childChannel
的Pipeline,设置childChannel
的options
和attrs
,最后执行childChannel
的注册来绑定workerGroup
的NioEventLoop
,并启动workerGroup
的事件监听线程。通过这种方式把已经连接的channel的后续IO事件分派给workerGroup
进行处理。
1 | //ServerBootstrap.java |
任务二:服务器执行bind操作
取出的第二个任务执行channel.bind完成了端口绑定,并增加了ChannelFutureListener.CLOSE_ON_FAILURE
监听器监听关闭失败的结果。到这里,整个服务器的启动工作就全部完成了。