ServerBootStrap示例
了解netty服务器的启动,从官方提供的4.1版本echo服务器端代码开始。
1 | public final class EchoServer { |
思路
这篇文章关心的是netty服务器是如何启动的。因此先跳过bossGroup、workerGroup的创建细节和ServerBootstrap参数初始化细节,直接看bind接口,这里是真正服务器启动的地方。然后顺着netty启动的顺序,遇到新的知识点再展开描述与记录。
整体启动脉络
netty启动涉及到多线程,为了便于以后快速回顾,先将整体启动的大致过程记录下来。绿色部分是给子线程分配的关键初始化任务。
netty启动相关类介绍
一开始虽然不会详细介绍bossGroup、workerGroup和ServerBootstrap的细节,但是还是先要对启动相关类有个基本认识。
ServerBootstrap:为了简化服务器启动过程,所有启动相关的接口和参数都封装到这里,相当于一个门面(Facade)模式。Channel:是netty的核心接口类,除了提供IO操作的接口之外,还提供对于核心组件的访问接口。如果把netty比作一台机器可以说是Channel将机器上所有的基本零件连接在了一起。ChannelFuture:通过它可以读取Channel中的异步操作的状态和结果,简单描述可以分为uncompleted和completed两种情况,而completed又包括Completed successfully、Completed with failure 、Completed by cancellation三种状态。另外它也可以添加ChannelFutureListener监听器,来监听异步I/O操作结果并执行后续操作。ChannelPromise:可以设置异步操作的状态和结果的一种特殊ChannelFuture。在启动过程中会用到它的默认实现DefaultChannelPromise。DefaultChannelPromise:在设置异步IO操作成功或者失败之后会通知相应的listeners的默认ChannelPromise。
服务器的启动入口
直接看bind操作:
1 | //ServerBootstrap.java |
bind方法调用基类AbstractBootstrap的bind方法:
1 | //AbstractBootstrap.java |
bind方法继续调用doBind方法:
1 | //AbstractBootstrap.java |
doBind方法最终调用doBind0方法。
这里regFuture其实是记录注册异步操作结果的ChannelPromise实例,通过regFuture.isDone()判断注册异步操作是否完成,存在两种情况:
- 注册操作没有完成:先给
regFuture增加一个监听器,如果注册成功没有错,就会在设置regFuture结果的地方调用doBind0方法 - 注册操作已经完成,则立即调用
doBind0方法
两者的区别在于调用此方法的线程不同。注册完成就直接在主线程调用,没有完成其实是在workGroup子线程中通过触发linster回调doBind0方法。但是殊途同归的是最终真正执行bind操作的地方都是在workGroup子线程中。
initAndRegister方法
调用doBind0方法之前,regFuture和channel参数都是通过initAndRegister方法创建的。
1 | //AbstractBootstrap.java |
initAndRegister方法内部主要完成了三件事
- 创建channel
- 初始化channel
- 返回注册的异步结果regFuture
我们先看如何创建channel。
确定创建的channel类型
channel是通过channelFactory.newChannel()方法完成创建的,那channelFactory是如何创建的,对象的类型是什么?
其实通过代码可以确定channelFactory对象实际的类型是ReflectiveChannelFactory。它通过ServerBootstrap的channel接口完成创建和赋值:
1 | .channel(NioSocketChannel.class) |
1 | //AbstractBootstrap.java |
ReflectiveChannelFactory的newChannel方法通过反射的方式创建对象,创建的对象类型为通过构造函数传入的类类型,也就是NioSocketChannel.class类型。因此只要通过ServerBootstrap的channel接口传入具体的类,就决定Netty服务器创建的Channel类型。
1 |
|
NioServerSocketChannel扩展理解
Channel作为netty的核心组件之一,为了加深理解,我们先做简单的介绍:
- NioSocketChannel, 代表异步的客户端 TCP Socket 连接
- NioServerSocketChannel, 异步的服务器端 TCP Socket 连接
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接
- NioSctpServerChannel, 异步的 Sctp 服务器端连接
- OioSocketChannel, 同步的客户端 TCP Socket 连接
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接
Channel层级结构:
1 | AbstractChannel |
以上channel可以理解为netty对传输层的具体实现,而AbstractChannel是netty对传输层的抽象,但是也通过采用Facade模式聚合了很多内部组件,包括了Unsafe、ChannelPipeline、EventLoop、ChannelPromise等,对外提供了统一接口。
而在AbstractChannel层次之上又分为AbstractOioChannel(阻塞)和AbstractNioChannel(非阻塞)。他们的最大的区别在于AbstractNioChannel类内部存在一个SelectableChannel类型的成员变量。SelectableChannel是一个可以通过Selector来进行多路复用的通道。
AbstractChannel核心类介绍
在继续NioServerSocketChannel的创建之前,我们再看看AbstractChannel核心组件:
ChannelId:每个Channel的唯一标识,这个唯一标识通过DefaultChannelId产生。Unsafe:它是Channel的辅助接口,不应该被用户代码直接使用,而实际的IO操作应该都由Unsafe接口负责完成。DefaultChannelPipeline:维护一个AbstractChannelHandlerContext的链表,通过这个链表中的handler处理器来处理Channel上的数据。AbstractChannelHandlerContext放到pipeline部分再详细介绍。EventLoop:主要负责执行Channel生命周期内的事件轮询和各种任务。
AbstractNioChannel核心类介绍
SelectableChannel:是java nio中的抽象类,本身可以通过Selector来支持多路复用通道,且提供两种模式Blocking mode和Non-blocking mode,SelectableChannel的Non-blocking mode是实现Netty的异步IO事件机制的基础。而要使用Non-blocking mode,需要先设置为Non-blocking mode模式,再通过register(Selector,int,Object)接口绑定Selector并拿到返回的SelectionKey对象。SelectionKey:SelectableChannel与Selector注册的标识。
NioServerSocketChannel核心类介绍
ChannelConfig:操作Channel的配置属性集的接口,包括ChannelOption配置和传输相关的属性。ChannelOption:提供一种类型安全的方式来表现配置信息。比如配置超时时间CONNECT_TIMEOUT_MILLIS。1
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
DefaultChannelConfig:对于ChannelConfig的默认实现,内部实现了对各种ChannelOption类型的统一访问,实现了对ByteBufAllocator的访问,以及Channel中的各种默认配置的访问,如读取数据长度、连接超时时间等。DefaultServerSocketChannelConfig:DefaultChannelConfig的子类,实现了通过ServerSocketChannelConfig提供的扩展接口,内部存储了ServerSocket的实例。NioServerSocketChannelConfig:DefaultServerSocketChannelConfig的子类,实现了autoReadCleared接口。
NioServerSocketChannel的类图
创建NioServerSocketChannel
通过工厂模式创建的NioServerSocketChannel调用的是默认的构造函数,而默认构造函数内部调用了带参数的构造函数:
1 | //NioServerSocketChannel.java |
DEFAULT_SELECTOR_PROVIDER是通过SelectorProvider.provider()创建的类静态变量:
1 | private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); |
1 | //SelectorProvider.java |
1 | //DefaultSelectorProvider.java |
channel的创建和平台相关,都是通过对应平台的provider来创建平添相关的channel,以windows平台为例:DEFAULT_SELECTOR_PROVIDER对象的实际类型是WindowsSelectorProvider。
因此newSocket方法调用的provider.openServerSocketChannel方法,在WindowsSelectorProvider父类SelectorProviderImpl中被实现:
1 | //NioServerSocketChannel.java |
1 | //WindowsSelectorImpl.java |
ServerSocketChannelImpl类内部通过Net组件来实现对本地socket的操作。
所以回到this()构造函数的调用,实际传入的实参类型为ServerSocketChannelImpl。
1 | //NioServerSocketChannel.java |
到这里直接参考NioServerSocketChannel实例化流程图可以基本理解NioServerSocketChannel实例化过程。
过程中有几点还是记录下来:
ServerSocketChannelImpl保存到this.ch中,转换成了SelectableChannel类型。- 初始化过程中通过
ch.configureBlocking(false);将AbstractSelectableChannel的blocking字段设为false,也就是将ch设置为非阻塞模式。后面只要调用register接口就可以完成channel和selector的注册。 - 创建的
NioServerSocketChannelConfig对象的成员变量channel和javaSocket对应的实例对象为NioServerSocketChannel和经过ServerSocketAdaptor类型转换后的ServerSocket,实际上还是ServerSocketChannelImpl。 unsafe实际对应的是NioMessageUnsafe。NioMessageUnsafe重写了read接口,read方法通过doReadMessages()处理NioServerSocketChannel的accept操作。如果此时没有客户端连接,则退出for循环进行后续的处理,如果有客户端连接,则将客户端NioSocketChannel保存到readBuf中(默认不超过16个),如果超过16个,则也退出for循环进行后续的处理。最后将readBuf传到pipeline去解析。
pipeline的创建
其中pipeline是很重要的组件,所以再详细看一下pipeline的创建。
先介绍pipeline的核心类:
AttributeKey:用于从AttributeMap中访问Attribute的键Attribute:存储与操作泛型数据的接口AttributeMap:提供通过AttributeKey来获取Attribute类型的value的接口。DefaultAttributeMap:AttributeMap的的默认实现,用来存取AttributeMap的数据。ChannelInboundInvoker:定义了ChannelHandlerContext传递inbound事件的方法。ChannelOutboundInvoker:定义了ChannelHandlerContext传递outbound事件的接口。ChannelHandlerContext:提供了访问各种资源如Channel、ChannelHandler、EventExecutor、ChannelPipeline、ByteBufAllocator、Attribute的方法,并且实现了ChannelInboundInvoker和ChannelOutboundInvoker来规范inbound和outbound事件的处理接口。AbstractChannelHandlerContext:它继承了DefaultAttributeMap,因此它有存取AttributeMap数据的能力,同时它也实现了ChannelHandlerContext接口,因此它也拥有访问pipeline的各种资源的能力。
这里还要补充一下,Channel和AbstractChannelHandlerContext都实现了AttributeMap接口,因此每一个Channel和ChannelHandlerContext实例都可以像Map一样来存取key和value,唯一的区别是ChannelHandlerContext的AttributeMap仅用于当前具体的ChannelHandler子类实例,而Channel的AttributeMap可以被用于所有的ChannelHandlerContext链表中的ChannelHandler子类实例。
1 | //AbstractChannel.java |
1 | //DefaultChannelPipeline.java |
创建过程比较简单:
DefaultChannelPipeline绑定的channel就是NioServerSocketChannel,并创建了succeededFuture、voidPromise用于异步操作支持。- 创建
TailContext和HeadContext来填充AbstractChannelHandlerContext链表结构,channel的handler都会放到这个链表。
NioServerSocketChannel的初始化
完成了NioServerSocketChannel的创建之后,接着执行NioServerSocketChannel的初始化:
1 | //ServerBootstrap.java |
初始化完成了几个工作
- 如果用户代码有调用
AbstractBootstrap.option接口保存ChannelOption,那么这里将ChannelOption用来配置NioServerSocketChannel的config对象。 - 如果用户代码有调用
AbstractBootstrap.attr接口保存AttributeKey和Attribute,那么这里将保存的AttributeKey和Attribute键值对复制到channel。 ChannelInitializer<Channel>()对象的基类是ChannelHandler,最终会封装到一个AbstractChannelHandlerContext对象内部,再加入到pipeline双向链表中。- 调用
addLast接口时,由于NioServerSocketChannel还没有注册完成,所以会额外创建一个封装了ChannelInitializer<Channel>()的AbstractChannelHandlerContext对象,并加入到一个新的PendingHandlerAddedTask对象中,再将此对象加入到pipeline的pendingHandlerCallbackHead单项链表上。
1 | //DefaultChannelPipeline.java |
提前说明这个PendingHandlerAddedTask的作用:
存入pendingHandlerCallbackHead链表的PendingHandlerAddedTask
等到channel注册完成之后会取出并执行。执行时调用PendingHandlerAddedTask的execute方法,此方法先调用callHandlerAdded0,callHandlerAdded0再调用ctx.handler().handlerAdded(ctx);。
1 | //DefaultChannelPipeline.java |
其中ctx.handler()是ChannelInitializer,而ctx.handler().handlerAdded(ctx);实际上调用的是initChannel接口。而在channel注册时创建的ChannelInitializer<Channel>()刚好重写了此接口。
1 | //ChannelInitializer.java |
所以等到channel注册完成之后,会调用ChannelInitializer<Channel>()重写的initChannel方法执行channel的额外初始化。这部分初始化代码等到注册完成时再说。
NioServerSocketChannel注册入口
完成了NioServerSocketChannel的初始化之后,返回到initAndRegister方法继续往下看注册流程:1
ChannelFuture regFuture = config().group().register(channel);
这里的config()是ServerBootstrap的内部成员变量:1
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
config().group()对应的是AbstractBootstrap的group成员变量:
1 | //AbstractBootstrapConfig.java |
1 | //AbstractBootstrap.java |
而group成员变量是通过ServerBootstrap的group方法初始化的:
1 | //ServerBootstrap.java |
在示例代码中对应的是bossGroup,而其childGroup成员变量保存的是workerGroup:1
b.group(bossGroup, workerGroup)
它们是通过NioEventLoopGroup创建的:
1 |
|
到这里我们又进入了另一大Netty的核心:EventLoop和EventLoopGroup,待会会详细介绍。
最终config().group().register(channel)函数实际上调用的是NioEventLoopGroup的register方法,但实现是在父类MultithreadEventLoopGroup中:
1 | //MultithreadEventLoopGroup.java |
而MultithreadEventLoopGroup实际上也不是直接注册,而是通过父类的next接口获取到一个EventExecutor并转换为EventLoop类型:
1 | //MultithreadEventExecutorGroup.java |
看到这里出现了chooser、EventExecutor以及EventLoop这些新的组件,以及一开始创建的两个NioEventLoopGroup对象bossGroup和workerGroup,而为了便于理解这部分内容,在接着看注册流程之前,必须先对netty的线程模型有个基本的认识。
如果对NioEventLoopGroup和NioEventLoop有了基本的认识,则可以跳过直接看继续Channel的注册流程的章节。
Reactor模型的理解
对NioEventLoopGroup的理解参考了此篇文章:https://segmentfault.com/a/1190000007403873
要理解NioEventLoopGroup,首先还是要知道Netty线程模型是Reactor设计模式的一个实现。
Reactor设计模式中有两个重要角色:
- Reactor的职责是检测网络IO事件并分发给合适的handler处理。
- Handlers的职责是执行非阻塞的行为。
而Reactor设计模式根据线程的数量和作用又分为三种Reactor线程模型
- 单线程模型
- 多线程模型
- 主从多线程模型
单线程模型所有工作都在一个线程执行,会由于其中某个handler的阻塞而导致整个服务的阻塞。因此单线程Reactor 模型用的比较少。
多线程模型与单线程模型的区别在于由单独的线程处理accept事件,其它IO操作事件和处理工作放到一组特定的NIO线程来监听处理。Reactor多线程模型如下:
多线程模型改善了handler阻塞对系统的影响,但是对于同时处理大量连接或者在连接时需要进行一些权限检测等工作时,那么单线程处理这些工作仍然负荷较大,可能会影响其它客户端的连接,那么主从多线程则将异步连接事件后续的处理工作也划分到从Reactor线程池来处理,主Reactor线程只负责分配任务。主从Reactor多线程模型如下:
那么回头看我们的示例,netty采用的其实就是Reactor多线程模型,bossGroup单线程处理accept事件,workerGroup线程池处理其它工作:
1 |
|
有了netty模型的基本概念之后,再来细看NioEventLoopGroup和NioEventLoop:
NioEventLoopGroup类层次结构
NioEventLoopGroup实例化过程
NioEventLoopGroup实例化的序列图中,省略了NioEventLoop的实例化过程,NioEventLoop的实例化后面再单独描述。
实例化流程比较简单,只说图中没有明确提到的部分:
NioEventLoopGroup内部维护了一份类型为EventExecutor的children数组,children数组实际通过newChild()方法创建的实际对象类型是NioEventLoop。- executor存储的是一个
ThreadPerTaskExecutor类型的实例,,它实现了execute(Runnable command)接口,只要调用此接口,就会将传入的command作为线程的执行体,通过DefaultThreadFactory.newThread(Runnable r)接口创建线程,并启动线程。而最终此executor会作为参数传递到NioEventLoop中,也就是传递给children数组中的每一个元素。
1 | //ThreadPerTaskExecutor.java |
1 | //DefaultThreadFactory.java |
- 如果调用构造函数没有设置nThreads数量,则nThreads的数量为
DEFAULT_EVENT_LOOP_THREADS,也就是处理器核心数*2 - 如果nThreads数量是2的次方,则实际通过
chooserFactory.newChooser(children)创建的chooser为PowerOfTwoEventExecutorChooser,否则为GenericEventExecutorChooser,这两者的区别在于实现了不同的next方法策略来选择children中的某一个EventExecutor。
NioEventLoop类的层次结构
NioEventLoop类的层次结构比较多,重点关注SingleThreadEventExecutor中的thread和executor,它们为NioEventLoop提供了执行线程任务的基础。
NioEventLoop实例化过程
根据上面的图,再补充解释整个NioEventLoopGroup和NioEventLoop的实例化过程:
NioEventLoopGroup实例化时,根据传入的需要创建的线程数量,创建了对应数量的NioEventLoop实例,并且最终的executor实例是保存在每一个NioEventLoop实例中。- 同样的selectorProvider最终也是传递给
NioEventLoop用于创建selector实例,不同的在于selectorProvider只有一份实例。 - 图中忽略了selector实例的创建,简单描述是selectorProvider会根据操作系统来返回不同平台的selectorProvider,而不同平台的selectorProvider通过实现
openSelector()来实例化平台相关的selector。 - 整个过程
SingleThreadEventExecutor的thread属性还没有被赋值,这个属性后面会用到。
继续Channel的注册流程
通过前面五节的介绍,注册函数链式调用group().next()实际通过chooser返回EventExecutor对象也就很容易理解了,其真实意图是在NioEventLoopGroup的chlidren数组中选择一个EventExecutor。也可以认为是返回一个NioEventLoop,它们都在一个继承链上。而NioEventLoop的父类SingleThreadEventLoop实现了register(channel)接口,所以group().next().register(channel)实际最后调用的是SingleThreadEventLoop.register(channel)。调用完成实际的返回值是DefaultChannelPromise,用于保存注册的状态。
1 | //SingleThreadEventLoop.java |
而SingleThreadEventLoop.register(channel)内部的promise.channel().unsafe().register(this, promise)调用层次很深,我们直接跳过。最终调用到AbstractUnsafe的register方法:
1 | //AbstractUnsafe.java |
AbstractUnsafe的register方法首先将eventLoop绑定到NioServerSocketChannel上,然后通过eventLoop.inEventLoop方法判断当前线程是否已经处于该事件循环器线程中:
1 | //AbstractUnsafe.java |
1 | //SingleThreadEventExecutor.java |
从代码看,最终判断的依据是SingleThreadEventExecutor的thread属性,也就是我们提到的在NioEventLoop初始化完成后仍然为空的thread。所以此时判断肯定不在同一个线程中。所以最终会执行eventLoop.execute(new Runnable())的分支。
启动第一个新事件监听器线程
到这里终于进入了一个新的阶段,因为第一个事件监听器线程被创建了。
通过NioEventLoop的类的层次结构知道NioEventLoop的是一个Executor,而在SingleThreadEventExecutor中实现了execute接口。eventLoop.execute(new Runnable())调用的实际就是SingleThreadEventExecutor.execute(Runnable task)方法。
1 | //SingleThreadEventExecutor.java |
SingleThreadEventExecutor.execute(Runnable task)首先将task存入taskQueue,也就是将register0操作放入任务队列。接着因为目前还是在主线程调用的execute方法,所以会执行startThread分支。startThread方法又调用了doStartThread分支。
事件监听与处理线程
继续看doStartThread方法。在doStartThread内部通过executor创建线程并启动线程。前面在NioEventLoopGroup实例化过程中描述了executor如何创建和启动线程。这里再介绍一次:
executor存储的是一个ThreadPerTaskExecutor类型的实例。它实现了execute(Runnable command)接口,只要调用此接口,就会将传入的command作为线程的执行体。其内部通过DefaultThreadFactory.newThread(Runnable r)接口创建线程,并启动线程。而executor是会在创建NioEventLoopGroup的children数组元素时,作为参数传递给children数组中的每一个NioEventLoop保存。
因此这里再调用executor.execute(new Runnable())方法时,就会启动一个新的线程,并执行run方法。而run方法内部终于将NioEventLoop的thread字段更新为当前创建的子线程。并最终调用了SingleThreadEventExecutor.this.run()方法。
1 | //SingleThreadEventExecutor.java |
而SingleThreadEventExecutor本身并没有实现run方法,而是在子类NioEventLoop实现了run方法:
1 | //NioEventLoop.java |
先简单描述当前for循环中完成的供:
- 查询是否有IO事件和任务到来
- 进行事件处理
- 进行任务处理
下面我们一项项描述。
事件与任务监听
简单描述selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())这行代码的含义:如果hasTasks() == true, 调用selector.selectNow(),并清空selectionKeys。因为selector.selectNow()的结果肯定>=0,所以会跳出switch块,否则进入case SelectStrategy.SELECT:分支执行select(wakenUp.getAndSet(false))。这样的好处在于当有task来到的时候,只查询就绪的channel,不阻塞线程;当没有task时,则可以执行NioEventLoop.select()函数,此函数如果没有外部中断或者新任务、定时/周期任务等待处理,会循环执行selector.select(timeoutMillis)去检测网络IO事件。
所以当跳出switch块,则表示有IO事件或者任务到达。
事件处理
IO事件到达后,先通过processSelectedKeys()处理IO事件,然后通过runAllTasks处理任务。
1 | //NioEventLoop.java |
selectedKeys在前面创建NioEventLoop已经初始化过了,所以会调用processSelectedKeysOptimized()函数,但此时第一次进入此函数时selectedKeys是空的,因此直接返回,不处理任何的网络IO事件。
虽然目前selectedKeys为空,但还是看看当selectedKeys存储了对应的IO事件时,做了些什么工作:
首先通过
selectedKeys.keys[i] = null;清除网络IO事件,然后通过processSelectedKey()处理网络事件和附加的数据a,数据a可以转换为一个AbstractNioChannel或者NioTask<SelectableChannel>。如果附加的是
AbstractNioChannel类型的数据,那么最终processSelectedKey会根据具体的IO事件,通过channel的Unsafe接口去执行具体的读、写、处理连接等工作。
1 | //NioEventLoop.java |
- 而如果附加的是
NioTask<SelectableChannel>类型的数据,那么processSelectedKey则通过NioTask<SelectableChannel>的channelReady接口完成具体的后续操作。
1 | //NioEventLoop.java |
任务处理
完成IO事件的处理后,继续往下执行runAllTasks。在处理任务时,需要根据this.ioRatio成员变量控制处理任务的时间百分比。this.ioRatio的默认值为50。也就是说,在事件循环中默认情况下用于处理I/O操作的时间和用于处理任务的时间百分比都为50%。
这里我们先看当ioRatio为100的时候是如何执行的:
1 | //SingleThreadEventExecutor.java |
runAllTasks()方法通过fetchFromScheduledTaskQueue()方法将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue取出,放到taskQueue中,然后通过runAllTasksFrom(Queue<Runnable> taskQueue)将所有任务一个个取出并执行。这里的循环不会被打断,也就意味着直到所有任务执行完成才会退出。
下面看一下runAllTasks(long timeoutNanos)方法的实现,这也是当前流程中执行的方法:
1 | //SingleThreadEventExecutor.java |
runAllTasks(long timeoutNanos)首先将所有即将到达执行时间点的周期定时任务从scheduledTaskQueue取出,放到taskQueue中,目前还没有定时和周期任务。然后也会从taskQueue取出每一个task去执行,但是每取出64个(0x100)之后,会进行一次超时判断,如果超时则退出任务的执行。目前taskQueue中存放了一个执行register0方法的任务,所以此任务会出去来准备执行。
切换到子线程继续完成注册
当runAllTasks方法内部执行到safeExecute(task)方法时,会调用task.run()方法执行任务的实际工作:
1 | //AbstractEventExecutor.java |
所以task.run方法此时执行的就是register0(promise),它是在channel注册时提交的task。
1 | //AbstractChannel.java |
因此register0方法实际是在子线程中执行:
1 | //AbstractChannel.java |
这个方法完成了几个动作:
promise.setUncancellable()设置注册这个异步操作的promise状态为不可取消的(UNCANCELLABLE)。- 通过
doRegister方法完成Selector和Channel的绑定。
1 | //AbstractNioChannel.java |
javaChannel()接口获取的ch,在NioServerSocketChannel初始化时,创建的实例是ServerSocketChannelImpl,并且已经设置为非阻塞模式。只要调用SelectableChannel的register接口就可以完成注册并返回对应的SelectionKey对象。而实际register方法是在其父类AbstractSelectableChannel中实现,传入的参数为当前NioEventLoop的Selector和Channel实例。
1 | //AbstractSelectableChannel.java |
AbstractSelectableChannel.register(Selector sel, int ops, Object att)方法会判断当前的Selector是否已经注册过,如果注册过则将新的监听事件和新的SelectableChannel实例附加到当前的Selector实例中。但是当前我们的Selector还没有注册过,因此执行的分支是调用((AbstractSelector)sel).register(this, ops, att)方法,也就是调用Selector实例的注册方法。而SelectorImpl实现了此方法:
1 | //SelectorImpl.java |
此方法将Selector和SelectableChannel绑定,设置监听事件,并将this(也就是NioServerSocketChannel)作为附加属性设置到SelectionKey中。
- 将成员变量registered置为true,表示
Channel已经在当前NioEventLoop线程中完成注册。 - 为注册好的channel的pipeline增加预先定义好的handler,并将下一步要执行的任务添加到
NioEventLoop的事件队列。下面开始分析这一系列动作:执行pipeline.invokeHandlerAddedIfNeeded()将在channel初始化时存入链表的PendingHandlerAddedTask取出,并执行其execute()方法,方法会调用ctx.handler().handlerAdded(ctx);,handler()获取到的就是重写了initChannel方法的ChannelInitializer<Channel>的实例,而最终handlerAdded(ctx);就会调用到重写的initChannel方法。这些我们在前面NioServerSocketChannel初始化章节的最后描述过。重写的initChannel方法先将ServerBootstrap中保存的handler加入到pipeline的双向链表中,然后再将一个新的task加入到eventLoop的任务队列中,并会在下一次查询任务队列时被执行。这个任务的作用就是将一个新的handler加入到pipeline的双向链表中。这个handler的类型是ServerBootstrapAcceptor,具体的作用我们等到bind操作完成之后再说。等到将initChannel方法执行完毕之后,这个ChannelInitializerhandler会将自己从pipeline中移除,并将ChannelInitializer关联的ChannelHandlerContext的状态置为REMOVE_COMPLETE。
1 | //ChannelInitializer.java |
1 | //ServerBootstrap.java |
标志
ChannelPromise(也就是DefaultChannelPromise对象)为成功,也就是将注册这个异步操作标志为成功。pipeline.fireChannelRegistered()触发ChannelRegistered事件,该事件会在ChannelPipeline中传播。它会先被head处理,随后该事件通过ChannelHandlerContext来实现传递给ChannelPipeline中的下一个ChannelInboundHandler处理器处理,直到最后被tail所处理。
到这里initAndRegister()函数执行完毕,意味着channel的注册和初始化完成了。
回到主线程的dobind()
回到主线程的dobind()函数,我们一开始就说过,注册这个异步操作根据结果存在两个分支,注册成功直接调用doBind0,否则通过addListener的方式异步执行dobind0。其实只要看注册未完成情况下的分支就可以了。regFuture.addListener创建了一个重写operationComplete接口的ChannelFutureListener。operationComplete()会在regFuture异步操作完成时收到通知。到这里主线程bind()操作完成后,执行sync()阻塞等待。
1 | //AbstractBootstrap.java |
bossGroup线程继续bind0工作
回到bossGroup子线程,往回看register0函数内部的第5个步骤:
设置promise为注册成功状态safeSetSuccess(promise);。
到这里主线程创建的ChannelFutureListener在此时就会直接在bossGroup子线程内执行operationComplete接口,执行bind0。
1 | //AbstractBootstrap.java |
bind0将channel的bind操作加入到任务队列中。
NioEventLoop线程继续执行任务队列
到这里,我们的NioEventLoop任务队列又多了两个任务:
- 将
ServerBootstrapAcceptor加入到pipeline双向链表中 - 执行
channel.bind操作完成channel端口的绑定
任务一:ServerBootstrapAcceptor实现事件分派
先看第一个任务天下的handler。ServerBootstrapAcceptor在channelRead事件触发的时候,把childHandler加到childChannel的Pipeline,设置childChannel的options和attrs,最后执行childChannel的注册来绑定workerGroup的NioEventLoop,并启动workerGroup的事件监听线程。通过这种方式把已经连接的channel的后续IO事件分派给workerGroup进行处理。
1 | //ServerBootstrap.java |
任务二:服务器执行bind操作
取出的第二个任务执行channel.bind完成了端口绑定,并增加了ChannelFutureListener.CLOSE_ON_FAILURE监听器监听关闭失败的结果。到这里,整个服务器的启动工作就全部完成了。