Disruptor(高性能内存队列
Disruptor 框架
以下源码基于 3.4.4 版本
概述
以下是 Disruptor 官网的介绍图:
Disruptor 是 LMAX 基于 Java 语言实现的高性能队列,相比于 Java 的 BlockedQueue,它有以下几个特点:
- 无锁化(后续讲到的等待策略,所选择上所等待
- 内存的预分配(队列会实现创建指定数量的对象
- 事件广播(通过 Disruptor 发布的事件会被各个消费者分别消费
Disruptor 对生产者/消费者的各个模块都进行了抽象(上图中也标注了各个角色),各个角色的作用如下:
RingBuffer 是最重要的中间队列,保存事件对象,也协调生产者和消费者之间的依赖关系。
Producer 是生产者,任何持有 RingBuffer 的对象都可以作为生产者。
Consumer 是消费者,消费者需要实现 EventHandler,并且需要向 Disruptor 注册信息。
Sequence 表示的是生产/消费的序号(或者说偏移量?)可以类比于 AtomicLong,但是 Sequence 通过内存填充避免了伪共享
生产者的 Sequence 由 RingBuffer 统一管理,消费者的 Sequence 则由各个消费者各自管理。
因此各个消费者会分别消费事件,不会互相影响,类似 Kafka 的消费者组,所以消息会被每个消费者都处理一次。
某些层面上 Disruptor 和 Guava 的 EventBus 有点像(后续可以对比一下两者的实现。
EventBus 是监听器模型,而 Disruptor 则是生产者/消费者模型,相对来说 Disruptor 的实现更加复杂也更加灵活高效。
源码分析
Disruptor
Disruptor 是整个框架的核心,负责协调生产者和队列、队列和消费者之间的关系,并对外提供基础 API。
Disruptor 主要持有 RingBuffer 的对象引用,以及所有的消费者信息(生产者的信息并不需要保存,谁持有该对象都可以成为生产者。
创建流程
(先通过创建过程来了解 Disruptor 整个对象的构造。
Disruptor 的创建方法如下:
1 |
|
Disruptor 的创建流程主要就是创建了对应的 RingBuffer 对象,并且指定消费者所用的线程工厂。
消费者的线程模型非常重要,这是非常容易出问题的一个点,并且在源码中也建议不要使用线程池实现。
整体的参数含义如下:
参数名称 | 含义 |
---|---|
eventFactory | 事件工厂(RingBuffer 会调用该接口方法,创建 RingBufferSize 个对象重复使用 |
ringBufferSize | RingBuffer 的大小,必须为2次幂 |
threadFactory | 线程工厂(用于创建消费者所需要的线程,也可以指定线程池 |
producerType | 生产者类型(根据单生产者还是多生产者会使用不同的并发策略 |
waitStrategy | 等待策略(生产者的等待策略,而消费者的等待策略在指定消费者的时候决定 |
(以上参数基本就是 Disruptor 的所有控制参数了,接下来在看 RingBuffer 的创建流程。
RingBuffer 创建流程
以下是 3.4.4 版本中 RingBuffer 的定义注释以及继承图:
(基于环形数组实现的可重复使用实例对象的存储组件,保存的数据的在生产者和消费者之间交换。
RingBufferFields 、RingBufferPad 是 RingBuffer 做数据填充,避免缓存行的伪共享的实现。
伪共享是指相关性较差的数据使用同一缓存行保存,而各自的修改会导致缓存行更频繁的失效,从何导致的性能降低。
Disruptor 的处理方法很优雅,直接扩充当前重点数据大小到大于等于缓存行大小为止。
EventSink 和 DataProvider 是 RingBuffer 的两个角色,对于生产者来说是事件的接受者,对于消费者来说又是数据的提供者。
Sequenced 等接口表示这 RingBuffer 对 Sequence 的操作角色。
RingBuffer 对外提供的创建方法如下:
1 |
|
RingBuffer 的大小是固定的,并且创建的时候就需要传入 EventFactory,Buffer 中所有的事件对象都是通过该工厂创建的。
(因为大小是固定的,所以 RingBuffer 更可以类比 ArrayBlockedQueue 实现的生消模型。
另外为了更好地做并发控制, RingBuffer 也区分了生产者类型,以 SINGLE 生产者类型为例:
1 |
|
SINGLE 对应的 Sequencer 类型为 SingleProducerSequencer,而 MULTI 对应的则是 MultiProducerSequencer。
(两种 Sequencer 的并发控制是完全不一样的,毕竟单一生产者咩有并发
最后就是 RingBuffer 的构造函数的调用:
1 |
|
RingBuffer 在创建的过程中间就会调用 EventFactory#newInstance 方法创建所需要的所有对象,为了后续的重复使用。
RingBuffer 的大小必须为2次幂,为了使用 k & (n-1) 求对应数组下标。
RingBuffer 中为了避免伪共享,做了很多的填充,例如整个的数组会多创建一些填充对象。
总结来说,RingBuffer 的创建流程主要完成以下几件事情:
- 创建环形数组并且创建所有 Event 对象
- 根据生产者类型创建对应的 Sequencer
消费者
Disruptor 在启动前就需要指定消费者,同时也可以指定各消费者之间的依赖关系(也就是层级消费。
消费者的依赖关系也就是层级消费,以 EventHandlerGroup 作为基本单位进行依赖关系的编排,GroupA 可以根据 GroupB 的消费进度进行事件消费。
即使在 EventHandlerGroup 中的 EventHandler 对象也不会共享一个 Sequence,会各自消费完整的事件列表。
注册和启动流程
Disruptor 中的消费者需要提前注册(上文中提到的 Disruptor 会保存所有的消费者信息),然后随着框架的启动而开始执行。
Disruptor 提供了多种方式来进行注册:(消费者是向 Disruptor 对象注册的
- EventHandler(最终会被包装为 EventProcessor 进行注册,当前 Disruptor 所持有的 RingBuffer 会作为 DataProvider 传入。
- EventProcessorFactory(会使用工厂类直接创建 EventProcessor 进行消费者的注册
- EventProcessor(继承了 Runnable,在启动时执行
- WorkHandler
以下是通过 EventHandler 创建消费者的过程:
(源码中主要需要注意 Disruptor 对于 Sequence 的处理,因为期间需要相互依赖。
1 |
|
注册消费主要流程如下:
- 检查 Disruptor 是否已经开启
- 创建对应的 EventProcessor (具体对象为 BatchEventProcessor,包含了ExceptionHandler 和 RingBuffer。
- 向 ConsumerRepository 注册当前的消费者信息(消费者并未启动,所以此时需要集中管理
- 处理 Sequence(非常重要,依赖关系都靠这个协调
- 向 RingBuffer 添加当前的消费者的 Sequence(保证生产者的 Sequence 不超过消费者
- 移除 RingBuffer 中当前消费者依赖的 Sequence(当前消费者的序号肯定小于依赖,所以只需要关注当前消费者的序号就好
- 处理具有依赖关系的消费者之间的 Sequence(当前消费者不能超过依赖目标的序号。
- 返回 EventHandlerGroup(EventHandlerGroup 对象包含 after 等方法可以作为顺序处理逻辑的编排方法
消费者最终的实例对象为 BatchEventProcessor(后续的消费逻辑,通过 RingBuffer 获取事件以及调用对应处理方法的逻辑都在该类中实现,EventProcessor 继承了 Runnable 所以可以直接执行。
需要注意的是,Disruptor 不允许在运行过程中添加消费者,所以在
Disruptor#start()
前就需要注册全部的消费者。
启动流程
启动流程对应的是 Disruptor#start
方法,在启动之前所有的消费者都以 EventProcessor 的形式保存在 ConsumerRepository 中。
方法的源码如下:
1 |
|
EventProcessor 继承了 Runnable 方法可以直接使用 Executor 启动该类,在 Disruotor 创建的时候传入的 ThreadFactory 参数会被包装为 Executor,此时就用到了。
消费流程
启动过程中 EvnetProcessor 就作为 Runnable 被放入线程池执行,所以消费的主题流程也实现在继承的 run() 方法中。
(以下的实现 BatchEventProcessor 为主,WorkProcessor 还没看呢
以下是 BatchEventProcessor 的处理逻辑:
1 |
|
在消费者启动和关闭的时候都有对应的回调方法(notifyStart / notifyStart),对应的就是 LifecycleAware 接口:
EventHandler 可以通过继承该接口实现前后的回调,在整个生命周期各执行一次。
消费的正常逻辑就是以下几步:
- CAS 修改状态(IDLE -> RUNNING
- 前置回调(LifecycleAware#onStart
- 事件处理(processEvents
- 后置回调(LifecycleAware#onShutdown
- 状态修改 (任何状态 -> IDLE
事件轮询和阻塞逻辑
以下是 processEvents 的处理逻辑:
1 |
|
事件的轮训通过一个死循环包括,不是 AlertEcveption 就无法退出。
一个消费者是一个无限执行的任务,所以最好不要用线程池去执行消费的 Runnable,或者说线程数和消费者数量最好是 1:1
BatchEventProcessor 并不会直接访问 RingBuffer 获取可用事件,而是通过 SequenceBarrier 实现(此前是通过 RingBuffer#newBarrier 创建的。
消费者通过 SequenceBarrier 来实现对生产者和上层消费者的依赖。
在获取到可用序号后,会先执行批量处理的前置回调 BatchStartAware#onBatchStart。
BatchStartAware 也是通过 EventHandler 实现的。
(Disruptor 这个实现我喜欢,所有的 Aware 都需要富集到 EventHandler 中统一注册。
完成回调之后,遍历可用序号逐个从 RingBuffer 中获取事件(DataProvider 就是 RingBuffer。
然后调用 EventHandler#onEvent 完成实际的事件处理。
对于执行过程中的 TimeoutException(等待的超时,处理过程中的超时),都会触发 TimeoutHandler#onTimeout。
对于 AlterException 则会判断状态,在非运行中状态时跳出循环。
对于其他未知异常则会调用 ExceptionHandler#handleEventException 方法处理。
阻塞逻辑
在消费速度大于生产速度的时候,就需要消费者阻塞等待生产。
消费者并不会直接访问 RingBuffer,而是通过 SequenceBarrier,以下是对应的 waitFor 方法:
1 |
|
而 SequenceBarrier 也是通过 WaitStrategy 抽象出等待逻辑,在 Disruptor 官方实现中提供了以下几种:
实现类 | 作用 |
---|---|
BlockingWaitStrategy | 使用 ReentrantLock$Condition#await 实现的阻塞等待 |
BusySpinWaitStrategy | 调用 Thread#onSpinWait 实现等待(可能没有,那就是空轮训 |
LiteBlockingWaitStrategy | |
LiteTimeoutBlockingWaitStrategy | |
PhasedBackoffWaitStrategy | |
SleepingWaitStrategy | |
TimeoutBlockingWaitStrategy | |
YieldingWaitStrategy |
以 BlockingWaitStrategy 为例子:
1 |
|
消费者对于生产者的依赖是直接使用 ReentreLock 上锁,并使用 Condition 阻塞的,但是对于上层消费者,只有使用空轮询等待
如果上层消费者有多个,dependentSequence 就是被包装的 FixedSequenceGroup,获取对应的序号就是获取一组 Sequence 中最小的序号。
参考了其他的实现,对于上层消费者的等待实现基本都是空轮询,所以对于同类消费者分层的时候需要保证消费的高效,如果上层消费者阻塞会直接拉爆下层消费者所在工作线程。
层级消费的实现
上面消费的流程已经说明了大部分的实现了,下层的消费者必须要持有上层消费者的 Sequence。
总结
状态流转
stateDiagram-v2
state "IDLE(空闲)" as I
state "HALTED(停止)" as H
state "RUNNING(运行中)" as R
[*] --> I
I --> R: Disruptor#start(EventProcessor 被送入 Executor 执行
R --> H: Disruptor#halt(修改状态并设置告警
R --> H: Disruptor#shutdown(等待所有事件都被消费完,再调用 halt
H --> I: 感知到告警(checkAlert,跳出循环后修改
Disruptor#halt 方法除了修改当前 EventProcessor 的状态,还会在依赖的 SequenceBarrier 中记录一个告警状态,并且唤醒所有等待中消费者。
重新执行的过程中感知到告警状态就会抛出 AlertException,从而跳出整个 BatchEventProcessor#processEvents 的处理循环,而后在外层 BatchEventProcessor#run 中修改为 IDLE 状态。
整体流程如下:
graph TD
A("Runnable#run(整个流程的起点") --> B[/"更新当前状态(IDEL -> RUNNING"/]
B --更新成功--> C["清空告警(clearAlert"]
C --> D["执行 LifecycleAware#onStart"]
D --> E[/"判断当前状态(RUNNING"/]
E --> F
subgraph 事件处理
F["获取可用序号(sequenceBarrier#waitFor"]
F --有可用事件,返回可用的最大序号--> G["执行 batchStartAware#onBatchStart"]
G --> H["获取 nextSequence 对应事件"]
H --> I["处理事件(处理完 nextSequence++"]
I --> H
I --> J["设置当前消费序号(nextSequence"]
J --> F
F --状态改变,抛出 AlertExceotion--> K["break(跳出循环"]
F --等待超时--> M["执行 TimeoutHandler#onTimeout"]
M --> F
end
K --> L["执行 LifecycleAware#onShutdown"]
L --> N("更新状态到 IDEL(可以重新启动")
(在 Disruptor#shutdown 之后,是可以重新直接 Disruptor#start 的,生产者/消费者的序号没有清空。
消费者的线程模型
Disruptor 的构造函数中已经表明,作者不建议使用 Executor 来执行消费者的任务。
因为从上文可知,消费者的线程需要循环去获取事件,Runnable 主流程在 Disruptor 关闭前就不会退出,也就是说他会独占一个线程。
另外在生产者端,发布事件的时候,生产速度是受限于所有消费者组中的最慢消费速度的。
因此在使用 Disruptor 的实现,都需要尽可能使用单个线程处理消费者逻辑。
例如在一个【用户注册】的场景,需要在注册后进行【发送欢迎短信】、【赠送注册积分】等逻辑,就可以由单个线程去接收用户注册事件,然后外接线程池去完成对应业务。
生产者
生产者不在 Disruptor 的控制范围之内,任何持有 Disruptor 对象的都可以作为生产者,调用 Disruptor#pushlishEvent 发布事件。
上文提到过,Disruptor 使用的环形队列保存待消费的事务,并且 RingBuffer 在一开始就会创建所有的 Event 对象。
所以生产者的流程简单来说就是如下流程:
- 获取可用的序号,并获取序号对应的 Event(该序号表示的 Event 可以使用
- 重新赋值 Event(不需要重新创建对象
- 更新生产者序号
然后在看一下这些流程在 Disruptor 的实现。
生产的形式可以分为以下几种(Disruptor 的方法声明:
EventTranslator 就是对应的事件赋值接口,相关定义如下:
接口参数【event】表示当前需要赋值的事件对象,而【sequence】表示事件对应的序号。
方法接收一个 EventTranslator 的 Lambda 实现,对于获取的事件会通过【translateTo】方法重新赋值并且重新发布。
最终都是调用 RingBuffer 的对应方法,以第一个 EventTranslator 为例,其方法实现如下:
具体的使用场景(RingBuffer 的具体发布流程)如下:
1 |
|
根据最开始生产者类型的区别,sequencer 会有不同的实现(这里又是一种策略模式的表现。
单生产者并不需要并发控制,而多生产者需要在并发的情况下保证生产者的 Sequnce 正确,并且如果出现消费不及时的情况,生产者还需要等待。
(等待的逻辑也保存在 【sequencer.next()】中。
SingleProducerSequencer
对于单生产者模式对应的类型为【SingleProducerSequencer】,不需要对生产的序号作并发控制,但是需要与消费者的序号协调:生产者的序号不能超过消费者的序号。
因为是环形队列,所以生产的速度不能赶上消费者的速度(覆盖了未消费的事件。
在序号中的表示就是:生产者的序号不能超过消费者的最低序号。
以下是单生产者的下个可用序号获取流程:(感觉整个脑回路有点怪
1 |
|
gatingSequences 就是各个消费者的序号,在注册消费者的时候添加(通过 AtomicReferenceFieldUpdater 添加的。
(我一直以为是没有更新的空数组,日。
MultiProducerSequencer
1 |
|
相关实现
Disruptor 中对象间引用关系
Disruptor 如何实现依赖关系
Disruptor 中的依赖关系根据角色划分可以简单理解为以下几种:
- 生产者对于消费者的依赖(生产者不能覆盖掉未被消费的事件
- 消费者对于生产者的依赖(消费者不能消费旧事件
- 下层消费者对于上层消费者的依赖(下层消费者只能消费上层消费过的事件
总结
Disruptor 实现高性能的基础。
- RingBuffer 对于对象的复用
RingBuffer 就是 Disruptor 实现的对象池。
复用的对象数组可以降低了 GC 频率,提高 CPU 的利用率,相对于 ArrayBlockedQueue 来说,RingBuffer 创建的事件对象数目是固定的。
- 避免了伪共享(缓存行
(伪共享的影响可以参考 Java 中横向和纵向访问二维数组的时间消耗,存在几倍的延迟。
Sequence 中通过填充 long 对象的形式来避免伪共享。
- 无锁化实现
在 RingBuffer 生产者的实现中,区分了单生产者和多生产者,多生产者以及消费者层面都是通过 CAS 来保证并发安全。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!