Disruptor


[TOC]


Example

/**
 * 定义的事件类型
 */
static class DateEvent {
  private Date value;
  public void setValue(Date value) {
    this.value = value;
  }
  public static EventFactory<DateEvent> factory() {
    return DateEvent::new;
  }
  @Override
  public String toString() {
    return "DateEvent{" +
      "value=" + value +
      '}';
  }
}
/**
 * 事件处理器
 */
static class LongEventHandler implements EventHandler<DateEvent> {

  @Override
  public void onEvent(DateEvent event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println("Event:" + event + ",sequence:" + sequence + ",endOfBatch:" + endOfBatch);
  }

}

public static void main(String[] args) throws InterruptedException {
  Disruptor<DateEvent> disruptor = new Disruptor<>(DateEvent.factory(), 4, DaemonThreadFactory.INSTANCE);
  disruptor.handleEventsWith(new LongEventHandler());
  // 需要开启整个队列
  disruptor.start();
  for(int i= 0;i < 100;i++){
    TimeUnit.SECONDS.sleep(3);
    //    创建 Event 的时候也可以获取到 sequence
    disruptor.publishEvent((event, sequence) -> event.setValue(new Date()));
  }
}

( Disruptor 在创建的时候就需要指定时间类型以及队列大小,后续指定消费者之后,还需要手动开启。

Introduction

以下是 Disruptor 官网的介绍图,其中包含了所有的相关对象和概念:

models

(Disruptor 最开始听说的是一个高性能无锁队列,但是实际上它不仅仅是队列。

Disruptor 类似于一套本地的 MQ 系统(Message Queue,消息队列),也可以看做是一套生产者/消费者模型,它包含了 Producer,Consumer 以及 Queue(中间队列),在开始前(调用 start() 前)就需要指定创建消费者以及中间队列。

Disruptor 支持单生产者和多生产者两种模式,默认支持多消费者,并且消费者之间不共享消费进度(每个事件会被分发给所有的消费者


相关组件对象

RingBuffer

Disruptor 的存储组件,保存发布的事件,使用环形数组保存所有数据,RingBuffer 在 Disruptor 创建的时候就指定好大小,并且在之后的流程中保持不变。

所谓的环形数组底层就是一个普通数组,维护了读写两个游标以此形成一个环,从写游标开始写,从读游标开始读。

(游标就是下文的 Sequence,控制游标的就是 Sequencer。

Sequence

(相当于是一个并发安全的 Long 类型。

表示消费的序列,在 RingBuffer(生产者和消费者都是通过 RingBuffer 获取获取) 会保存该值。

对于生产者来说,Sequence 保存了写入的位置,对于消费者来说,Sequence 保存在自身的读取的位置(多个消费者不共享。

Sequencer

Sequence 的管理者,包含了生产者和消费者的相关 Sequence(就是持有了生产者的写入指针和消费者的读取指针。

根据生产者的个数分为 SingleProducerSequencer 和 MutilProducerSequencer。

如果消费者有多个,Single 就是保存单生产者和消费者的关系,而 Mutil 保证的就是多生产者和消费者的关系,以及多生产者之间的并发安全。

EventFactory

Event 就是 RingBuffer 中保存的数据类型,EventFactory 的作用就是创建这些实例对象。

在创建 RingBuffer 的时候传入, 初始化的时候预创建所有的对象。

// RingBufferFields 是 RingBuffer 的父类
// 单独定义了 RingBuffer 的内部属性
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer)
{
  this.sequencer = sequencer;
  this.bufferSize = sequencer.getBufferSize();
    // 相关属性校验
  // 创建待填充的固定大小的数组
  this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
  // 填充对应数组
  fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory){
  for (int i = 0; i < bufferSize; i++){
    // 最开始的两个对象不做赋值
    entries[BUFFER_PAD + i] = eventFactory.newInstance();
  }
}

WaitStrategy

等待策略,表示消费者在等待事件发布的时候采取的动作(生产者在等待消费的时候也可以使用吧。

消费者(监听器)注册流程

// Disruptor#handleEventsWith
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
  // 直接创建 EventProcessor
  // 初始化一个 Sequence 的作用
  return createEventProcessors(new Sequence[0], handlers);
}

// Disruptor#createEventProcessors
// 创建消费者实例
// 参数包含 barrierSequence,是他依赖的上层消费者,当前消费者只能消费上层已经全部消费过的数据
// 例如,当前依赖的三个上层消费者的 offset [1,10,10],那么此时只能消费 1 的数据
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){
  // 只能在未开始的时候添加消费者
  checkNotStarted();
    // 每个 Handler 对应一个 Sequence
  final Sequence[] processorSequences = new Sequence[eventHandlers.length];
  // 创建 Barrier
  final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    // 遍历创建 BatchEventProcessor
  for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){
    final EventHandler<? super T> eventHandler = eventHandlers[i];
    // !!important  创建的最终消费实例好似 BatchEventProcessor
    final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
    // 异常处理,这个是 Disruptor 确定的
    if (exceptionHandler != null){
      batchEventProcessor.setExceptionHandler(exceptionHandler);
    }
    // 消费者注册中心?
    consumerRepository.add(batchEventProcessor, eventHandler, barrier);
    // 消费者的 Processor
    processorSequences[i] = batchEventProcessor.getSequence();
  }
  // 更新 Disruptor 的 GatingSequences
  updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
  // 返回一个 EventhandlerGroup,调用当前方法的所有 EventHandler 会被包含在一个 Group 里面
  return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

// Disruptor#updateGatingSequencesForNextInChain
// 更新 GatingSequences
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){
  if (processorSequences.length > 0){
    // 将消费者的 Sequences 添加到 ringBuffer
    ringBuffer.addGatingSequences(processorSequences);
    // 因为当前消费者消费的都是 barrierSequences 中消费过的数据,所以当前消费者肯定是最低的 offset
    // 因此依赖的 barrier 就没必要保存了
    for (final Sequence barrierSequence : barrierSequences){
      ringBuffer.removeGatingSequence(barrierSequence);
    }
    consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
  }
}

消费者最终的实例对象为 BatchEventProcessor。

每个消费者都会将自身的 Sequence 保存到 ringBuffer 的 gatingSequences(参考 kafka 的 offset,ringBuffer 中只有所有的消费者都消费过才会将数据清除。

Disruptor 通过 ConsumerRepository 来保存所有的 BatchEventProcessor。

很关键的是,Disruptor 不允许在运行过程中添加消费者。


另外可以看一下 ConsumerRepository 的功能,ConsumerRepository 用于持有所有的 EventHandler 以及对应的 Sequence 信息(算是一个辅助类,用于完成一些相对独立的逻辑。

ConsumerRepository 中包含以下三个成员变量,分别从不同用维度保存映射关系。

  1. eventProcessorInfoBySequence - Map
  2. eventProcessorInfoByEventHandler - Map, EventProcessorInfo>
  3. consumerInfos - Collection
public void add(
  final EventProcessor eventprocessor, 
  final EventHandler<? super T> handler,
  final SequenceBarrier barrier){
    // 将 EventProcessor,EventHandler,Barrier 打包成一个 EventProcessorInfo
    final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<>(eventprocessor, handler, barrier);
    // 分别存了三份!!!应该有不同的获取需求吧
    eventProcessorInfoByEventHandler.put(handler, consumerInfo);
    eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
    consumerInfos.add(consumerInfo);
}

(暂时不是很清楚这个玩意儿的作用。

事件生产流程

RingBuffer 回预先创建所有的事件对象,所以后续的发布流程就是获取指定对象,填充属性并且发布。

过程中主要控制生产者的生产速度,不能把没消费完的事件覆盖了。

获取可写入位置

RingBuffer 是一个环形数组,所以就需要读写两个指针表示进度(emm,因为先学的 InnoDb 的 Redo Log 所以感觉 write point 和 checkpoint 两个还挺好理解的。

因为是个一对多或者多对多的场景,并且消费者各自保存自己的消费进度,所以写入的场景都需要考虑多个消费者的最低消费速度,而写入的速度保存在 RingBuffer

RingBuffer 中使用 Sequencer#next 确定下一个写入的位置,Sequencer 都继承了 AbstractSequencer,因此也保存了以下几个变量:

protected final int bufferSize;        //     RingBuffer 的大小
protected final WaitStrategy waitStrategy;    // 等待策略
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);   // 当前写入的位置
protected volatile Sequence[] gatingSequences = new Sequence[0];        // gatingSequences 就是所有消费者的 Sequence
// 在注册流程中会将消费者的 Sequence 全部添加进来


根据消费者的个数可以分为 SingleProducerSequencer 以及 MultiProducerSequencer。

(SingleProduceSequencer 中不存在争用,所以实现相对简单。


SingleProducerSequencer

@Override
public long next(int n)
{
  if (n < 1){
    throw new IllegalArgumentException("n must be > 0");
  }

  // 下次的可用槽位
  long nextValue = this.nextValue;
    // 一共需要n个槽位
  long nextSequence = nextValue + n;
  // 可能会超过,一般情况下都是负数,超过表示需要重头开始填充
  // 比如当前 bufferSize = 16,但是 nextSequence = 19,wrapPoint = 3 表示已经过了一轮了
  long wrapPoint = nextSequence - bufferSize;
  // cacheValue 是所有消费者的最小的下标(最慢的速度
  long cachedGatingSequence = this.cachedValue;

  // 主要是 cacheValue = 5,但是 wrapPoint = 6 的时候,此时说明生产的速度已经要大于消费的速度了
  // 所以需要进循环等待
  if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
    cursor.setVolatile(nextValue);  // StoreLoad fence
    long minSequence;
    // 等待直到消费赶上
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){
      LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }
      // 重新设置最小消费槽位
    this.cachedValue = minSequence;
  }
    // 下次的消费槽位
  this.nextValue = nextSequence;
  return nextSequence;
}

单线程并没有并发和伪共享问题,所以直接使用的一个 long 类型的 nextValue 保存写指针(读指针保存在各个消费者那,会在 gatingSequences 中保存。

MultiProducerSequencer

MultiProducerSequencer 表示多个生产者,所以会存在写入位置并发写入的问题,该类中使用 CAS 实现并发更新的安全性。

// MultiProducerSequencer#next
public long next(int n){
  if (n < 1){
    throw new IllegalArgumentException("n must be > 0");
  }
  long current;
  long next;
  do{
    // cursor 是 Sequence 实现
    // 不再和 Single 一样单纯的 long 了
    current = cursor.get();
    // 后续的判断都差不多
    next = current + n;
    long wrapPoint = next - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){
      long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
      if (wrapPoint > gatingSequence){
        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
        continue;
      }
      gatingSequenceCache.set(gatingSequence);
      // 更新的时候使用的 CSA,外层套了个自旋
    }else if (cursor.compareAndSet(current, next))]{
      break;
    }
  }
  while (true);
  return next;
}

如果是一个无限长的序列,可写入的位置必须小于多生产者的最小为止,因为是环形数组所以需要额外判断超出的情况。

ProducerSequencer 中保存了所有消费者的 Sequence,每次都会获取最小的消费下标,写入不能超过这个下标,在 Sequencer 中另外有缓存避免每次遍历消费者(感觉多此一举,消费者能有多少。

获取指定下标对象

// RingBuffer#get
public E get(long sequence){
  return elementAt(sequence);
}

// RingBufferFields#elementAt
protected final E elementAt(long sequence){
  return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

通过 Unsafe 获取数组对象对象指定下标的值。

因为 indexMask 是2此幂,所以 sequence & indexMask 相当于是 sequence % indexMask 了(真就大量优化。

发布事件

发布事件修改的是生产者的写入指针,对于 SingleProducerSequencer 来说就是简单的修改 cursor。

// SingleProducerSequencer#pushlish
public void publish(long sequence){
  // 修改 cursor
  cursor.set(sequence);
  // 唤醒所有阻塞的消费者
  waitStrategy.signalAllWhenBlocking();
}

主要是多线程生产的时候,因为整个发布顺序是先获取可写位置,赋值,发布,所以后获取的位置(比较靠后)可能先发布,直接替换就不成立了。

MultiProducerSequencer 使用的 availableBuffer 表示每个槽位是否可读。

// 长度和 RingBuffer 相同,表示每个位置的版本号,每次修改某个位置,该位置的 flag + 1
private final int[] availableBuffer;
// availableBuffer 保存的都是每个 sequence >>> indexShift
private final int indexShift;

MultiProducerSequencer 的发布就是修改每个位置上的 flag:

// MultiProducerSequencer#publish
public void publish(final long sequence){
  setAvailable(sequence);
  waitStrategy.signalAllWhenBlocking();
}
// MultiProducerSequencer#setAvailable
private void setAvailable(final long sequence){
  setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}

// MultiProducerSequencer#setAvailableBufferValue
private void setAvailableBufferValue(int index, int flag){
  long bufferAddress = (index * SCALE) + BASE;
  UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

// MultiProducerSequencer#calculateIndex
// indexMask 可以参考 HashMap,以 & 来实现取余
private int calculateIndex(final long sequence){
  return ((int) sequence) & indexMask;
}

// MultiProducerSequencer#calculateAvailabilityFlag
// 根据 sequence 确定 flag,sequence 增加 bufferSize 的时候 flag 才会+1
// bufferSize = 16 的时候,indexShift = 4,懂?
private int calculateAvailabilityFlag(final long sequence){
  return (int) (sequence >>> indexShift);
}

简单的可以理解为 avaliableBuffer 中保存的是每个位置的版本号(从0开始的,因此可以根据 Flag 判断是否可读。

flag 也不需要单独保存,可以根据 sequence 计算。

事件消费流程

在消费者注册流程中可以看出来,Disruptor 的事件消费实例是 BatchEventProcessor,消费逻辑当然也在里面。

(BatchEventProcessor 继承了 Runnable,所以可以直接看 run 方法。

// BatchEventProcessor#run
public void run(){
  // CAS 更新运行状态
  if (running.compareAndSet(IDLE, RUNNING)){
    // 清除 Alter,这里应该是有依赖关系的消费者
    sequenceBarrier.clearAlert();
    // 调用 LifecycleAware#onStart 的钩子方法
    notifyStart();
    try
    {
      // 确定状态
      if (running.get() == RUNNING){
        // 处理事件(外层没 while 应该是包在里面了
        processEvents();
      }
    } finally {
      // 调用钩子方法
      notifyShutdown();
      // 关闭状态
      running.set(IDLE);
    }
  } else{
    // CAS 失败表示已经开启过了
    if (running.get() == RUNNING) {
      throw new IllegalStateException("Thread is already running");
    } else{
      // 提前退出
      earlyExit();
    }
  }
}

// 已经开启过了,在开启的时候如果发现不是 RUNNING 状态,先调用 Start 钩子,在调用 shutdown 钩子???????? 
private void earlyExit(){
  notifyStart();
  notifyShutdown();
}

开始消费的时候会在前后调用钩子方法,Handler 的状态比较简单就省略吧,直接看消费的核心 processEvent 方法:

// BatchEventProcessor#processEvent
private void processEvents(){
  T event = null;
  long nextSequence = sequence.get() + 1L;

  while (true){
    try{
      // 等待并获取可用的下标,返回的是可用的,传参表示期望的,返回的可能更大
      final long availableSequence = sequenceBarrier.waitFor(nextSequence);
      // 没有实现,看着好像是钩子方法
      if (batchStartAware != null){
        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
      }
      // 遍历 nextSequence -> availableSequence
      while (nextSequence <= availableSequence){
        // 获取对应事件
        event = dataProvider.get(nextSequence);
        // 处理事件
        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
        nextSequence++;
      }
      // 更新消费进度
      sequence.set(availableSequence);
    }catch (final TimeoutException e){
            // 。。。
    }
  }
}

// ProcessingSequenceBarrier#waitFor
// 参数就是需要等待的下标
public long waitFor(final long sequence)
  throws AlertException, InterruptedException, TimeoutException{
  checkAlert();
  // 直接调用等待策略的 waitFor 方法
  long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
  if (availableSequence < sequence){
    return availableSequence;
  }
  return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

事件的处理调用的事 EventHandler#onEvent

消费者等待策略(Consumer Wait Strategy

消费速度大于生产速度的时候,消费者就需要阻塞等待生产者的事件生产(按照我看的源码,其实消费太慢的时候,生产者也在考虑使用 WaitStrategy 阻塞。

WaitStrategy 是顶层的接口,定义了对应的等待策略:

public interface WaitStrategy
{
    /**
     * Wait for the given sequence to be available.  It is possible for this method to return a value
     * less than the sequence number supplied depending on the implementation of the WaitStrategy.  A common
     * use for this is to signal a timeout.  Any EventProcessor that is using a WaitStrategy to get notifications
     * about message becoming available should remember to handle this case.  The {@link BatchEventProcessor} explicitly
     * handles this case and will signal a timeout if required.
     *
     * @param sequence          to be waited on.
     * @param cursor            the main sequence from ringbuffer. Wait/notify strategies will
     *                          need this as it's the only sequence that is also notified upon update.
     * @param dependentSequence on which to wait.
     * @param barrier           the processor is waiting on.
     * @return the sequence that is available which may be greater than the requested sequence.
     * @throws AlertException       if the status of the Disruptor has changed.
     * @throws InterruptedException if the thread is interrupted.
     * @throws TimeoutException if a timeout occurs before waiting completes (not used by some strategies)
     * 等待直到可以消费
     */
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;

    /**
     * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced.
     * 唤醒所有等待的线程
     */
    void signalAllWhenBlocking();
}

对应的子类实现有如下几种:

实现类 作用
BlockingWaitStrategy 使用 ReentrantLock$Condition#await 实现的阻塞等待
BusySpinWaitStrategy 调用 Thread#onSpinWait 实现等待(可能没有,那就是空轮训
LiteBlockingWaitStrategy
LiteTimeoutBlockingWaitStrategy
PhasedBackoffWaitStrategy
SleepingWaitStrategy
TimeoutBlockingWaitStrategy
YieldingWaitStrategy

(懒得看了,有空再写作用。

层级消费实现

如果消费者存在依赖关系,例如A只能消费B消费过的数据,这种时候需要做的就是 A 等待 B 的消费,A甚至不再需要等待生产者(不再直接等待。

类似的依赖关系是通过 SequenceBarrier 来实现的,Barrier 中保存了依赖的消费者的 Sequence,通过对 Sequence 的比较来判断自己的消费下标。

(最上层的消费者根据的是生产者的 Sequence 来判断自己的消费进度,如果存在依赖关系之后,下级的消费者只需要关注上级消费者的 Sequence 就好。

ProcessingSequenceBarrier 中保存了 WaitStrategy 和依赖的所有消费者的 Sequence。

// ProcessingSequenceBarrier 构造函数
ProcessingSequenceBarrier(
  final Sequencer sequencer,
  final WaitStrategy waitStrategy,
  final Sequence cursorSequence,
  final Sequence[] dependentSequences){            // 依赖的所有 Sequence
  this.sequencer = sequencer;
  this.waitStrategy = waitStrategy;
  this.cursorSequence = cursorSequence;
  if (0 == dependentSequences.length){
    dependentSequence = cursorSequence;
  } else{
    // 如果是多个会被包装成 Sequence
    dependentSequence = new FixedSequenceGroup(dependentSequences);
  }
}

之后看如何实现的依赖关系:

// ProcessingSequenceBarrier#waitFor
// 传入的参数是下次希望消费的位置
// 返回的是可以消费的位置,可能比传入的大
public long waitFor(final long sequence)
  throws AlertException, InterruptedException, TimeoutException{
  checkAlert();
  long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
  if (availableSequence < sequence){
    return availableSequence;
  }
  return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

总结

相对于依靠 BlockedQueue 实现的生产者消费者模型,Disruptor 有以下几种优化:

  1. 对象池(Disruptor 所有的事件都通过 RingBuffer 保存,RingBuffer 就是一个对象池,所有的数据都需要塞到对应的对象池中
  2. 填充缓存行,消除伪共享(伪共享是多个 CPU 的缓存行中包含同一段数据,双方各自的修改都会使缓存行失效而重新从主内存中读取

除了以上优化,就是 Disruptor 对生产者/消费者对控制,通过 sequence 来表示相对的速度。

Disruptor 使用 RingBuffer 作为事件队列,RingBuffer 以环形队列的形式实现,并且在初始化的时候就会预实例化所有的对象(以对象池的形式实现,事件发布的流程就是填充对象数据并且发布。

Disruptor 使用 Sequence 控制生产和消费进度,生产者需要等待消费者的消费进度,不能超过所有消费者最慢的那个,消费者也不会超过生产进度,不然都会使用 WaitStartegy(等待策略)拉住。

Disruptor 还支持带依赖的消费关系,消费者 A 只能消费被 B,C 都消费过的事件,此时消费者 A 就已经不依据生产者的进度消费了,而是依据 B,C 的消费进度。

results matching ""

    No results matching ""