掘金 后端 ( ) • 2023-05-26 15:42

写在前面

我在分析Redisson的底层实现的过程中,发现时间轮算法工具类出现的非常频繁。这个类的源码不多,只有不到一千行,但是涉及的知识点比较密集,后面分为三个章节介绍一下时间轮工具类的起源,介绍、使用和具体的源码拆分。

分析

几种定时器算法的比较

时间轮工具是一个定时任务调度器,作者专门写了个ppt来介绍为什么要开发这么个工具类。
image.png
这是下面几种时间轮算法比较的几个维度
客户端调用:
START_TIMER - 开始任务,客户端将任务放入时间轮
STOP_TIMER - 停止任务,客户端查找任务,并发出中断指令
计时器调用:
PER_TICK_BOOKKEEPING - 未过期任务保持
EXPIRY_PROCESSING - 已过期任务处理
性能指标
空间:数据结构使用的内存
延迟:处理上述功能所需的时间

列表型

可以维持绝对的到期时间或者时间间隔
image.png
作者在这里有一点疏漏,其实还可以区分一下有序列表和无序列表,有序列表的START_TIMER为O(N),因为每次放入任务时,需要做一个排序操作;无序列表的PER_TICK_BOOKKEEPING为O(N),因为要找到过期时间最短的一个元素,需要遍历整个列表。

树型

可以维持绝对到期时间,如果间隔时间相等,则会退化为列表型定时器。
image.png
从图中可以看出这是一个有序树,放入任务时,需要遍历,判断应该处于哪一层,所以时间复杂度为O(log(n)),删除操作,我觉得应该也是O(log(n)),因为有遍历查找的过程。PER_TICK_BOOKKEEPING操作是O(1),因为只需要处理叶子节点即可,所有叶子节点都是临近过期的任务。

简单时间轮

image.png

  • 维持一个大的时间轮;
  • 指针每过一个时间单位,会移动一个位置,就像时钟的秒针一样;
  • 如果计时器间隔在当前光标位置的旋转范围内,则将计时器放置在相应的位置;
  • 需要指数级的内存;

优势: 客户端和执行器的相关操作时间复杂度都为O(1);
弊端: 需要消耗大量内存,因为每个时间bucket都需要保存,比如所有1秒后过期的任务都放到bucket[1]的链表里面,所有2秒后过期的任务都放到bucket[2]的链表里面,执行的时候只需要找到对应的bucket就行,无须遍历,设计也简单。但是如果有一个2^32秒后过期的任务,那么要放到bucket[2^32]里面,需要在内存中维护一个超级大的hash表,造成空间上的极大浪费。

哈希时间轮

这是netty最终选取的方案,后续会着重介绍。
image.png

假设轮盘有8个bucket,进来一个任务,其哈希值为17,把任务放到bucket[1]的链表中,元素里保存其轮数【2】。当轮盘转过2圈,又过了一个bucket的时候,任务到期,将之取出执行。如果链表中后续的任务轮数大于零【还需要继续等待】,则更新轮数,并重新保存到链表中。
image.png
时间复杂度需要根据具体的实现来分析:

  1. 有序链表
    a. 弊端:客户端放入任务,如果链表长度小于轮盘bucket数量,则平局时间复杂度为O(1);否则极端情况下,会出现时间复杂度为O(n)的情况。
    b. 优势:执行器获取任务的时间复杂度为O(1)
  2. 无序链表
    a. 弊端:执行器获取任务,如果链表长度小于轮盘bucket数量,则平局时间复杂度为O(1);否则极端情况下,会出现时间复杂度为O(n)的情况。
    b. 优势:客户端放入任务的时间复杂度为O(1)

层次时间轮

按照任务延迟时间等级分为多个时间轮
image.png
这是简单时间轮的变体,将数据多层存放,避免空间浪费。比如设计一个以天为单位的时间轮,将之分为三层,类似于钟表,小时级【hour轮,24个bucket】 -> 分钟级【minute轮,60个bucket】 -> 秒级【second轮,60个bucket】。

  • 新增任务
    新增任务时,计算其相对过期时间,比如一个deadline为2h39m15s的任务,会存放到bucket[2]里面的链表中。
  • 迁移任务
    每隔一秒轮询一次这个多层时间轮,当轮盘时间过去2h之后,将这个任务迁移到下级轮盘【minute轮】的bucket【39】后的链表中。轮盘时间再过去39m的时候,将之迁移到【second轮盘】的bucket【15】中。
  • 执行任务
    当遍历最下一层轮盘时,遍历到有到期的任务,发现其无法继续往下迁移,则取出后执行。

Netty时间轮简介

netty的hash时间轮工具类是HashedWheelTimer,实现了Timer接口【定时任务执行器】。Timer接口比较简单,只有两个方法:newTimeout() - 向时间轮中添加一个任务,并返回Timeout任务实例;stop() - 停止时间轮,返回还未执行的定时任务。里面有三个内部类:Worker、HashedWheelTimeout、HashedWheelBucket。

  • Worker
    Worker是一个Runnable实现类,是Timer的工作引擎,主要有两个功能:执行任务和轮询。Worker类中有一个final Set集合【unprocessedTimeouts】,用于存放未处理的定时任务,在调用Timer.stop()方法停止时,返回给调用者。
  • HashedWheelTimeout
    实现的是Netty中的Timeout接口,接口中有三类信息:Timer实例【创建Timeout的Timer】、TimerTask实例、任务的状态【取消任务,是否取消,是否过期】
  • HashedWheelBucket
    bucket概念在上一章中已经做过介绍,是时间轮的一个刻度,在bucket里面会维护一个链表,用来保存产生hash冲突的任务。新建时间轮时,如果初始化的bucket数量比较少,hash冲突是不可避免的。
  • TimeTask
    此接口没有显式实现类,每次调用newTimeout时,隐式创建一个实例。其中只有一个run方法,在执行时调用。

结构示意图
image.png

功能介绍

  • 新增任务
    在Timer实例中,调用newTimeout方法添加任务,新增的任务第一时间会添加到等待列表【timeouts】,上图中黄色的列表。
  • 执行任务
    每过一个时钟周期,就遍历一次bucket数组,获取特定的bucket,判断是否有过期的任务,如果有,则执行,否则Worker睡眠等待进入下个时钟周期;
  • 取消任务
    Timeout实例有cancel方法,调用之后,如果定时任务还是初始化状态,则将之从bucket中移除,并加入【cancelledTimeouts】列表;

如何使用

新建实例时,有六个参数:

  • threadFactory
    这个参数主要是给worker线程来命名,需要的话,可以自定义,一般默认即可。
  • tickDuration
    时钟周期,遍历时间轮的时间间隔,默认是100毫秒
  • unit
    上一个参数的时间单位
  • ticksPerWheel
    时间轮中有多少个bucket,默认是512,linux中的哈希时间轮默认也是512个bucket。bucket越少,越容易发生hash冲突。
  • leakDetection
    是否检测内存泄露
  • maxPendingTimeouts
    最大等待执行的任务数量,在Timer中有一个pendingTimeouts原子变量,新增任务时递增,移除任务时递减,当pendingTimeouts大于maxPendingTimeouts时,就会报错。
// 新建一个哈希时间轮实例
HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(),
                100,TimeUnit.MILLISECONDS,10,false,1000);
// 添加一个定时任务
timer.newTimeout(timeout -> {
    log.info("--------------- time task run,111111");
    // 这个操作会报错,因为一直循环递归自己,会栈溢出
    // timeout.task().run(timeout);
}, 10, TimeUnit.SECONDS);
// 添加第二个定时任务
timer.newTimeout(timeout -> {
    log.info("--------------- time task run,22222");
    Thread.sleep(10000);
}, 5, TimeUnit.SECONDS);
log.info("=== 等待的任务:{}",timer.pendingTimeouts());

上面这个例子中,我们设定了maxPendingTimeouts为1000,如果设置为1,就会报错,因为代码里添加了两个任务,而且都是需要等待一段时间的。

java.util.concurrent.RejectedExecutionException: Number of pending timeouts (2) is greater than or equal to maximum allowed pending timeouts (1)

深入源码

前面两个章节介绍了哈希时间轮的来源、功能介绍和使用方式,这里最后详细研究一下netty中时间轮算法的源码【HashedWheelTimer】,源码行数不多,只有800多行,对比HashMap的2300多行,已经算是想当精简了。后面主要从这几个方面:新建Timer、新建任务、遍历任务、任务过期执行。
注意这几个变量

  • startTime:工作引擎【Worker】的启动时间,只赋值一次,不做更新。后续所有的相对时间,都是以此为参照;
  • deadline:任务的到期时间,是一个相对时间,用于计算执行倒计时:remainingRounds。
  • remainingRounds:timeout中的一个参数,任务在时间轮中的倒计时。表示剩余的轮数,每次轮到当前bucket执行的时候,此数值减一;
  • tick:时钟周期的次数,tickDuration是每次时钟周期的长度,也是时间轮的精度,比如手表的精度是秒。如果tickDuration是100ms,则表示精度为100ms,如果两个任务,一个是延迟50ms执行,一个延迟80毫秒执行,在100ms精度的设定里,没有区别,会在同一批次被执行。

在此介绍的Timer,假设其参数为

  • tickDuration = 100ms
  • ticksPerWheel = 16
  • mask = 15

新建Timer

新建Timer的主要流程,工作引擎是单线程,所有的任务都在worker单线程中遍历执行。在同一个JVM中,不能创建超管64个Timer实例,否则会报错。INSTANCE_COUNTER是原子变量,统计Timer实例,当实例数量超过INSTANCE_COUNT_LIMIT【final类型,64】,会输出错误日志:You are creating too many HashedWheelTimer.class instances。

        // 将ticksPerWheel转化为2的幂,然后初始化转轮。
        wheel = createWheel(ticksPerWheel);
        // 注意这个参数mask,记录了轮子的长度,在后续添加任务和遍历任务都需要用到此参数
        mask = wheel.length - 1;
        // 新建工作引擎线程,worker实现了runnable接口
        workerThread = threadFactory.newThread(worker);
        // 判断Timer实例数量是否超过限制,如果超过64个,则输出错误日志
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
// 输出错误日志
            reportTooManyInstances();
        }

新建哈希轮

wheel的长度必须是2的N次方,假设我们创建Timer时,输入的ticksPerWheel为9,内部方法会将其转化为大于9的最相近的一个2^n数字,也就是16。

        int normalizedTicksPerWheel = 1;
// 通过计算,找到大于ticksPerWheel的,最相近的一个2^n数字
        while (normalizedTicksPerWheel < ticksPerWheel) {
// 左移运算
            normalizedTicksPerWheel <<= 1;
        }
        return normalizedTicksPerWheel;

根据上一步算出来的数字,创建wheel

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }

添加任务

startTime 的赋值节点是在Wroker启动过程中,如果Worker不启动,则任务都不能添加,因为任务的deadline是根据Worker的启动时间计算出来的一个相对值。
任务的deadline是一个相对时间,假设添加的定时任务是10秒之后执行,添加时Worker已经运行了30秒,那么deadline计算方式如下:
deadline = (System.nanoTime() - startTime) + unit.toNanos(delay)
= (30 + 10) * 10^9
= 40 * 10^9 纳秒

    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        // 计算等待执行任务的数量
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // 等待中任务数量有限制,在创建Timer时设定
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
        // 启动工作线程,会对startTime变量赋值,下一步会用到;后续在工作引擎的整个生命周期,都会用到这个变量,是工作引擎【Worke】的生日。
        start();
        // 根据Worker的生日,来计算一个相对时间,deadline通俗的解释:在worker几岁的时候去处理这个任务
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // 防止溢出,如果delay足够大的情况下,会造成long溢出,计算出来的deadline是负数
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 将定时任务封装到Timeout,添加到timeouts集合中,此集合在上一章的图示中有说明,收集等待进入wheel的任务,在worker轮询时,加入指定的bucket
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

启动工作引擎

对的没错,工作引擎是在添加任务的时候启动的,而不是在创建Timer的时候,可以避免线程空转,浪费CPU时钟周期。
这里还有一个知识点,就是为什么要用while循环,而不使用if,因为按照JVM的定义,await之后,线程会被阻塞,直到调用notify/notifyAll方法被唤醒之后,才能继续争抢CPU时间片。问题就在这里,如果用if,可能会被其他线程调用notifyAll之后意外唤醒,导致在Worker没有启动的情况下,去计算了新增任务的deadline,从而出现异常 。

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            // 只有工作线程是初始化状态时,才能启动
            case WORKER_STATE_INIT:
                // CAS运算,避免线程不安全
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
        // 等待startTime被赋值,前面说过,这是Worker的生日,在生日不确定的情况下,不能给其安排的任务。
        while (startTime == 0) {
            try {
// startTimeInitialized是一个CountDownLaunch类型变量,在worker的run方法里,会调用countDown()方法,通知这里跳出循环。
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

遍历任务

worker启动以后,在关闭之前的全部活动,都在下面这段循环中。计算任务的执行时间线的方法是:deadline = tickDuration * (tick + 1),注意这里计算时,跳了一个时钟周期。是为了后续遍历bucket时,如果剩余轮数为零,则表示当前任务timeout的deadline必定是小于这个数据的,因为它在区间末尾【一个区间是100毫秒】,所有在此区间内的timeout,都会被执行。

do {
// 计算任务的执行时间线
    final long deadline = waitForNextTick();
    if (deadline > 0) {
// mask是2^n -1,和tick做按位与操作。假设mask是7,那么idx的值始终是0-7之间循环
        int idx = (int) (tick & mask);
// 从cancelledTimeouts中弹出被取消的任务,然后从bucket中删除
        processCancelledTasks();
        HashedWheelBucket bucket =
                wheel[idx];
// 将timeouts集合【等待放入bucket的任务】中的任务,放入到对应的bucket
        transferTimeoutsToBuckets();
// bucket中,所有小于deadline的任务,都会被取出执行
        bucket.expireTimeouts(deadline);
        tick++;
    }
// 只要worker是启动状态,则一直循环
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

任务放到对应bucket - transferTimeoutsToBuckets

在do - while循环中,有这么一步操作,将timeouts集合【等待放入bucket的任务】中的任务,放入到对应的bucket。其中怎么找到bucket,是一个相当关键的步骤,这里着重介绍一下。
继续沿用上面添加任务段落所使用的那个例子

  • deadline = 40 * 10^9 纳秒
  • tickDuration = 100毫秒 = 0.1 * 10^9
  • 所以calculated = 40 / 0.1 = 400
  • 添加任务时,worker已经执行了30秒,表示当时tick = 30 / 0.1 = 300,如果在当时while循环没有将之加入到bucket,就需要在下一个tick中处理了。这里假设就是这种情况,所以tick = 301
  • remainingRounds = (400 - 301) / 16 = 6
  • ticks这里有点不明白,ticks = 400
  • stopIndex = 400 & 15 = ‭000110010000‬ & 1111 = 0
  • 最后找到相应的bucket,wheel[0],将这个timeout放到双向链表【HashedWheelBucket】的末尾

假设有两个任务,A【calculated=400】,B【calculated=410】,具有相同的remainingRounds【16】,stopIndex不同,A【stopIndex=0】,B【stopIndex=10】

long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 这一句没有看懂。原注释:Ensure we don't schedule for past.
final long ticks = Math.max(calculated, tick); 
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);

任务过期(执行)

在worker的while循环中,有这么一行:bucket.expireTimeouts(deadline),这里就会执行过期的定时任务。

public void expireTimeouts(long deadline) {
// 从头到尾进行遍历,因为数据是无序存放的,所以需要判断每个节点
    HashedWheelTimeout timeout = head;
    // 处理bucket中所有的定时任务
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // 如果剩余轮数为零,则表示当前任务已经到了执行时间
        if (timeout.remainingRounds <= 0) {
// 从bucket中移除此任务
            next = remove(timeout);
// 执行任务
            timeout.expire();
        } else if (timeout.isCancelled()) {
// 如果任务已经取消,则移除
            next = remove(timeout);
        } else {
// 如果还没到过期时间,则剩余轮数减一,继续等待
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

timeout.expire()是具体的执行方法,比较简单。

// 用CAS来设定任务的状态
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
    return;
}
// 如果状态设定成功,则直接执行,用worker线程执行。
task.run(this);