EventLoop 的初始化流程

基于 4.1.53.Final-SNAPSHOT 版本,以 NioEventLoopGroup 为目标对象分析。


[TOC]


概述

Netty 作为一个事件驱动的高性能网络框架,EventLoop 绝对是其中最最核心的角色。

EventLoop 就是一个事件的轮询处理器,以 NioEventLoop 为例,一个 NioEventLoop 包含了一个 Thread 对象,以及一个 Selector 对象,以及几个任务队列。

Thread 对象就是一个 Java 的线程对象,但是 Netty 进行了进一步的封装,变成了 FastThreadLocal。

Selector 对象就是 Java Nio 中的概念,是对 Socket 的网络轮询器,负责监听网络事件,是 IO 多路复用中的主要组件。

Select 负责绑定 Socket 对象,Thread 对象用来轮询时间和时间,并监听对应的 FD,任务队列则是为了满足用户自定义的事件。

EventLoopGroup 就是 EventLoop 的集合,至少包含一个 EventLoop。

就 NioEventLoopGroup 来说,它本身并不实现任何的业务逻辑,所以的逻辑都是通过 next() 方法选择一个 EventLoop 委派执行的。

在不同的系统中,EventLoop 可以有不同的选择,例如 NioEventLoop 或者 EpollEventLoop。

NioEventLoopGroup初始化流程

以下是 NioEventLoopGroup 的构造函数的逐层调用:

NioEventLoopGroup

以 NioEventLoopGroup 为例,逐步分析,以下是 NioEventLoopGroup 中参数最全的构造方法了。

image-20201019222615970

参数部分的含义如下:

参数名 参数作用
nThreads 线程组中线程的数量,在NioEventLoopGroup中默认为CPU * 2
executor 事件执行器,默认为ThreadPerTaskExecutor,最终会传到SingleThreadEventExecutor
chooserFactory 生成EventExecutorChooser的工厂,默认为DefaultEventExecutorChooserFactory
selectorProvider Selector的生成器,默认为原生的SPI方法返回
selectStrategyFactory Select选择策略的工厂,默认为DefaultSelectStrategyFactory,生成DefaultSelectStrategy
rejectedExecutionHandler 线程池拒绝策略,默认为RejectedExecutionHandler
taskQueueFactory 任务队列工厂,用于生成EventLoop中的两个任务队列

EventLoopGroup 本身并不会有业务逻辑的处理,基本上都是选择一个 EventLoop,然后调用它的方法处理。

MultithreadEventLoopGroup

NioEventLoopGroup 往上就是 MultithreadEventLoopGroup 的构造函数。

image-20201019222746272

其中 DEFAULT_EVENT_LOOP_TRHEADS 就是默认的线程数,Netty 中默认是当前CPU数的两倍

image-20201019222919297

两倍CPU数的线程非常适合执行IO密集型的任务,在 workGroup 中尤为合适。

MultithreadEventExecutorGroup

再往上就是 MultithreadEventExecutorGroup 的构造函数,这个构造函数中包含了初始化的主流程:

// MultithreadEventExecutorGroup的构造函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
         // 执行器为空的话需要新建一个
        if (executor == null) {
            // 这个执行器很特别,任何任务都会直接开一个新的线程执行
            // 这里创建的Thread其实经过包装,真实对象是FastThreadLocalThread,肯定是继承了Thread的
            // newDefaultThraedFactory就是默认的线程工厂
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 初始化Group中的执行器数组
        // EventExecutor的数量和线程数量一致
        children = new EventExecutor[nThreads];
        // 遍历创建执行器
        for (int i = 0; i < nThreads; i++) {
            boolean success = false;
            try {
                /**
                 * 最终创建的就是NioEventLoop
                 *
                 * @see NioEventLoopGroup#newChild
                 */
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // 异常处理暂时忽略
            }
        }

        // 用创建的NioEventLoop,构造一个选择器
        chooser = chooserFactory.newChooser(children);
        // 声明一个监听器
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                // 操作是否完成
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        // 为每一个事件处理器添加监听
        // 每个EventLoop在关闭之后都会触发上面的监听器
        for (EventExecutor e : children) {
            e.terminationFuture().addListener(terminationListener);
        }

        // 为Group的所有处理器保存一份只读版本
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

参数中用可变长参数适配不同参数数目的调用,鉴于Netty的牛逼我就不予置评了。

整个构造函数就做了如下几件事情:

  1. 创建默认的执行器。
  2. 创建 nThread 个 EventLoop,通过调用 newChild 模板方法实现不同的 EventLoop 的创建。
  3. 创建对应的 EventExecutorChooser,通过 EventExecutorChooserFactory 。
  4. 为每个 EventLoop 添加关闭时的监听器。

整个 NioEventLoopGroup 的构造函数调用链下来,主要的还是最后 NioEventLoop 的创建,另外还有一些选择器等相关工具类的创建。

选择器就是 NioEventLoopGroup 用来挑选工作线程的工具类,由其持有所有的NioEventLoop成员。

上文说的选择一个 NioEventLoop 的实现就是通过和这个工具类。

默认的执行器 - ThreadPerTaskExecutor

该执行器的作用就是在创建 EventLoop 所需要的工作线程的时候,作为执行器提供服务。

一定意义上我觉得作为线程工厂更合适,直接创建线程并分发给 EventLoop。

将工厂进一步分装为执行器的原因暂时未知,该执行器会在

以下是ThreadPerTaskExecutor的全部代码。

image-20201019223907388

这个执行器的功能就是每来一个任务就通过 ThreadFatcory 开启一个新的线程去执行,简单而又粗暴。

执行器中的所有线程通过 threadFactory#newThread 方法创建出来的,threadFactory 默认就是 DefaultThreadFactory。

以下是 DefaultThreadFactory 创建新线程的方法。

image-20201019223644097

该方法在创建线程的时候,进一步包装 Thread 对象会 FastThreadLocalThread,也使用 FastThreadLocalRunnable 进一步包装了 Runnable 对象。

包装 FastThreadLocalThread 的原因是因为 Netty 内部大量使用了 FastThreadLocal,需要除了原来 ThreadLocalMap 意外的数据结构保存线程局部变量。

包装 FastThreadLocalRunnable 的作用是在执行完成后清除 FastThreadLocal 中遗留的数据,避免内存泄露。

选择器 - EventExecutorChooser

以下是 DefaultEventExecutorChooserFactory#newChooser 方法源码。

image-20201019232145341

通过事务执行器的线程个数是否为二次方判断使用哪种Chooser。

以下分别是两种 Chooser 的方法,PowerOfTwoEventExecutorChooser 的如下:

image-20201019232346486

GenericEventExecutorChooser 的如下:

image-20201019232445792

两种方法并没有本质上的不同,都是轮询的方法,Group中每个EventLoop都会均匀的得到任务或者Channel。

就是在选则方法上的不同,PowerOfTwoEventExecutorChooser 采用了类似 HashMap 中的取下标实现。

另外还有意思的是这个方法,判断一个数是否是二次方:

image-20201019232710692

以上方法就创建完成了一个 EventGroup,但是除了 选择器和执行器之外,最主要的还是 EventLoop 的数组。

NioEventLoop 的初始化

NioEventLoop 的初始化就是从 MultithreadEventExecutorGroup#newChild 的模板方法开始的。

以下是 NioEventLoopGroup#newChild 的方法逻辑。

image-20201019224315626

newChild 是模板方法,所以 NioEventLoopGroup 的实现就是创建 NioEventLoop。

忽略乱七八糟的可变长函数,直接看 NioEventLoop 的构造函数,如下:

image-20201019230708750

整个 NioEventLoop 的构造函数都是围绕着 Selector 来的,主要就是创建一个 Selector 对象。

使用原生的 NIO 的时候可能更加习惯先创建 ServerSocket,在创建 Selector 绑定上之后再监听事件。

但是在 Netty 中,Selector 其实早早就被创建出来了。

SingleThreadEventLoop

该类是 NioEventLoop 的父类,主要的作用就是提供了一个低优先级的任务队列。

以下是SingleThreadEventLoop的构造函数,通过传递过来的参数初始化tailTasks。

image-20201126235626665

tailTasks就是所谓的低优先级队列,因为它实在轮询周期最后才执行的,但是里面的任务都会被执行完毕。

SingleThreadEventExecutor

SingleThreadEventExecutor 是 SingleThreadEventLoop 的父类,该类中实现了具体的执行器逻辑,也包含了正常的任务队列。

以下是SingleThreadEventExecutor的构造函数:

image-20201126235713997

此处完成了 taskQueue 的初始化,它就是常规的任务队列,在轮询周期中会有一定的时间来执行其内的任务。

addTaskWakesUp默认为false,暂时不知道具体作用,此时线程并没有启动。

另外值得关注的就是ThreadExecutorMap类的调用。

image-20201127000235931

对executor的调用进行了一次包装,包装中替换Runnable方法.

在Runnable方法执行前后会分别调用 setCurrentEventExecutor 方法,以下是调用的方法源码:

image-20201127000602305

该方法会将 EventExecutor 和 线程的映射关系保存到 ThreadExecutorMap#mapping 对象中,以后就可以通过

 ThreadExecutorMap.currentExecutor()

来获取当前线程正在使用给的执行器。

暂时不是很清楚这么做的意义,可能是为了执行器公用,但是如果我在业务里面调用了这个执行器,不是会直接创建出一个新线程执行吗,这样就有点奇怪了。

AbstractScheduledEventExecutor

从 SingleThreadEventExecutor 构造函数再往上调用的就是 AbstractScheduledEventExecutor 的构造函数了,该类的构造函数比较简单。

重要的是该类包含了另外一个关键队列 - 延时/定时任务队列。

image-20201127002758835

如上图所示就是优先级队列,里面保存的是定时任务,轮询周期中会将到期的任务添加到taskQueue的末尾。

通过以上的构造函数调用流程可以勉强分析出 NioEventLoop 的整体结构:

首先是三个任务队列:

  1. scheduledTaskQueue - DefaultPriorityQueue类型,用于定时或者延时任务
  2. taskQueue - mpsc型队列,用于常规任务
  3. tailTasks - mpsc型队列,用于低优先任务保存

总结

从初始化流程中可以明确的之后以下几点内容:

Q: EventLoopGroup 和 EventLoop 的关系。

首先明确,EventLoopGroup 基本不直接处理任何业务逻辑。

准确来说,EventLoopGroup 更像是一个调度中心,所有的逻辑通过 Chooser 选择一个 EventLoop 然后委托执行。

Q: EventLoop 的结构

EventLoop 持有一个 Selector 对象,一个 Thread 对象,以及三个任务队列,分别对应的网络事件,定时事件以及用户自定义事件。

Q: 在NioEventLoopGroup中任务的分派规则

因为是通过Chooser的分派,所以看两种Chooser的源码很容易能看出来总体来说还是通过轮询的形式来的。

每一个EventLoop都会获得均匀的任务。

Q: Runnable,Thread 在 EventLoop 中的进一步包装

Thread 在 EventLoop 中包装为 FastThreadLocalThread 提供 FastThreadLocal 的数据存储结构。

Runnable 在 EventLoop 中包装为 FastThreadLocalThreadRunnable,额外提供在 FastThreadLocal 中清除残留数据的功能

以上两者和 FastThreadLocal 类结合,成体系成套的出现会比 ThreadLocal 有明显的性能增长。

results matching ""

    No results matching ""