掘金 后端 ( ) • 2024-03-07 10:25

HashedWheelTimer初识

什么是时间轮

一些基本术语:

时间轮是一种实现延迟功能的算法

时间轮的定义是指一种用于高效调度任务的模型,它通过将时间分割成一系列固定大小的槽(称为"时间格"),每个槽存放着相应时间点到期的任务。随着真实时间的流逝,时间轮的指针会按顺序移动到下一个时间格,从而触发并执行到期的任务。这种设计可以高效地利用线程资源进行批量化调度,尤其适用于处理大规模的延时或周期性定时任务。

构成时间轮的基本要素主要包括:

  1. 时间格:时间轮上分割出的固定时间间隔的槽位,用于存放将要执行的任务。
  2. 轮询线程:负责根据时间轮当前所指的时间格来选取并执行相应的任务。
  3. 时间跨度:单个时间格代表的实际时间长度,例如在Kafka中可能设置为1毫秒。
  4. 环形数组:时间轮底层的数据结构,用于表示时间轮的各个时间格。

image-20240306180835312转存失败,建议直接上传图片文件

使用示例

public class HasedWheelTimerDemo {
    public static void main(String[] args) throws InterruptedException {
        // HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 8, false, -1L);
        // 完整示例
        // 1. 创建时间轮,可以使用默认构造方法创建时间轮,默认时间轮以100ms为单位,共512个槽位。
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        System.out.println("程序启动时间==" + new Date());
        // 2. 新建定时任务1 ,延时两秒后执行
        Timeout timeout1 = hashedWheelTimer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("任务运行时间==" + new Date());
            }
        }, 2000, TimeUnit.MILLISECONDS);
        // 2. 新建定时任务2 ,延时两秒后执行
        Timeout timeout2 = hashedWheelTimer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("任务运行时间2==" + new Date());
            }
        }, 2000, TimeUnit.MILLISECONDS);
        // 3. 取消定时任务2
        timeout2.cancel();
        // 4. 查看定时任务是否过期
        System.out.println("timeout1.isCancelled() = " + timeout1.isCancelled());
        System.out.println("timeout2.isCancelled() = " + timeout2.isCancelled());
        Thread.sleep(Integer.MAX_VALUE);
        // 5. 停止时间轮。(一般也不会停,只有在服务停止的时候才会停止时间轮)
        hashedWheelTimer.stop();
    }
}

上面的示例中,

  1. 创建了一个时间轮,(基本时间跨度默认100ms,时间单位个数默认512)。
  2. 给时间轮中添加了两个定时任务timeout1timeout2,定时任务内容都是两秒后控制台输出文字。添加定时任务的逻辑中会默认启动时间轮
  3. 取消定时任务timeout2
  4. 查看定时任务状态
  5. 关闭时间轮

输出结果

程序启动时间==Tue Mar 05 10:10:46 CST 2024
timeout1.isCancelled() = false
timeout2.isCancelled() = true
任务运行时间==Tue Mar 05 10:10:48 CST 2024

适用场景

  • 适用于不需要精准时间的定时任务。
  • 适用于短平快的定时任务

常见场景:

  1. 连接超时监控
  2. 心跳检测
  3. Redis 锁续期

也就是说,假如我有一个定时任务执行时间很长,(怎么定义这个很长呢?只要超过了时间轮的 Tick 时间,肯定就算长的了。)那他就不适合使用 HashedWheelTimer。否则会影响时间轮中其他任务的执行。只适合那些业务逻辑简单,执行速度快的“轻量级”的定时任务。

和JDK的Timer以及ShecduleThreadPoolExecutor比较

HashedWheelTimer Timer ShecduleThreadPoolExecutor 线程 单线程 单线程 多线程 任务异常处理 不影响时间轮线程,会打印出异常信息 中断Timer线程 和线程池处理一致 添加/删除任务速度 O(1),采用时间换空间 O(logn) O(logn) 任务追赶 会做任务追赶 会做任务追赶 会做任务追赶

时间轮还有一些优点:

  1. 时间粒度可控
  2. 性能很好

时间轮也有一些缺点:

  1. 执行时间不精确。时间格的时间跨度越大,任务执行就越不精确。
  2. 定时任务只能执行一次,并没有固定频率执行定时任务的API,需要自己手动实现。

HashedWheelTimer源码分析

创建时间轮

创建时间轮原理

时间轮有很多构造函数,最后都调用重载的构造函数鸡其参数解释如下:

/**
 * threadFactory : 创建线程池的工厂,默认是Executors
 * tickDuration : 时间轮的时间格大小,默认100ms
 * unit : 时间轮的时间格时间单位,默认100ms
 * ticksPerWheel : 时间轮环形数组的大小,默认512
 * leakDetection : 内存泄露检测
 * maxPendingTimeouts : 每个时间格称作Bucket,每个Bucket能够存储的任务数量上限,默认值为-1代表无上限
 * taskExecutor : 任务执行的线程池,默认是netty重写的一个单线程。也可以自己指定线程池。
 */
public HashedWheelTimer(ThreadFactory threadFactory,
  long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
  long maxPendingTimeouts, Executor taskExecutor) {}

构造函数中都做了什么呢?源码如下

public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
    long maxPendingTimeouts, Executor taskExecutor) {
    // 省略非核心代码
// 1. 校验参数
    checkNotNull(....)
    // 2. 创建时间轮,分配时间单位跨度,时间轮的时间单位个数
    wheel = createWheel(ticksPerWheel);
// 3. 为时间轮单独创建一个线程。(只是创建线程,并未启动)
    workerThread = threadFactory.newThread(worker);
}

构造函数中一共做了这么几件事

  1. 校验7个参数的合法性
  2. 创建时间轮。在HashedWheelTimer中,时间轮的底层数据结构就是这个wheel字段。它的定义是一个HashedWheelBucket类型的数组。
  3. 创建启动时间轮的线程,但但并没有调用Thread的start方法。

着重看下第二步创建时间轮第三步时间轮的线程

创建时间轮的数据结构

时间轮的数据结构在HashedWheelTimer定义如下

private final HashedWheelBucket[] wheel;

本质上就是一个HashedWheelBucket类型的数组。。我们来看下createWheel方法如何为其赋值的。

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    // 检查时间格环形数组的长度,最大2^30
    checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
    // 长度调整为2的倍数
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    // 时间格环形数组赋初值
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i ++) {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}
  1. 每个bucket最大容量是2的30次方
  2. 时间轮的长度改为2的整数倍,不够的补足。
  3. 给wheel数组赋初值,也就是new操作。

创建运行时间轮的线程

在HashedWheelTimer中运行时间轮的线程是workerThread字段它的定义如下

private final Thread workerThread;

在构造函数中给它赋初值

workerThread = threadFactory.newThread(worker);

这里的threadFactory默认是JUC中Executors的默认线程工厂,重点是这个worker是什么呢?worker是HashedWheelTimer中的一个属性。worker字段定义如下

private final Worker worker = new Worker();

原来它是一个类,并且是HashedWheelTimer的内部类。这个类实现了Runnable接口。所以本质上workerThread就是一个Thread类型的变量,并且任务是由Worker类重写的run方法定义的。来看一下Worker类中是怎么重写run方法的

private final class Worker implements Runnable {
    @Override
    public void run() {

        do {
            // 一直到下一次任务的执行时间
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 获取下一次环形数组的索引下标
                int idx = (int) (tick & mask);
                processCancelledTasks();
                // 得到bucket
                HashedWheelBucket bucket = wheel[idx];
                transferTimeoutsToBuckets();
                // 执行任务
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } 
        // 只要workThread还在运行,就一直循环读取任务执行。这里的WORKER_STATE_UPDATER是HashedWheelTimer内部维护的一个状态,可以通过HashedWheelTimer的stop方法来停止时间轮改变这个变量,这样这个循环就会满足退出条件了。
        while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    }
}

时间轮何时启动

两种途径可以启动时间轮

  1. 通过HashedWheelTimer的**start()**方法手动启动时间轮线程(不常用)
  2. 通过HashedWheelTimer的newTimeout方法添加任务时,底层会自动调用**start()**方法启动时间轮线程
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 检查参数
    // 启动时间轮线程
    start();
    // ......
    // 把任务包装秤一个 HashedWheelTimeout 添加到 timeouts 中。
    // timeouts是netty自定义的队列,它的定义是:private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();    
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 把Timeout添加到队列后,workerThread线程就会轮询这个队列,执行定时任务。
    timeouts.add(timeout);
    return timeout;
}·    

时间轮的线程都做了什么

前面说到创建运行时间轮的线程,简单介绍了下,Worker实现了Runnable接口

private final class Worker implements Runnable {
    @Override
    public void run() {

        do {
            // 一直到下一次任务的执行时间
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 获取下一次环形数组的索引下标
                int idx = (int) (tick & mask);
                processCancelledTasks();
                // 得到bucket
                HashedWheelBucket bucket = wheel[idx];
                transferTimeoutsToBuckets();
                // 执行任务
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } 
        while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    }
}

run方法中做的几件事:

  1. 计算下一次定时任务执行时间
  2. 处理已经被取消的任务
  3. 从环形数组中获取Bucket
  4. 从timeouts队列中把满足条件的timeout放到这个Bucket中。但是每次只会循环队列中的前100000个
  5. 执行Bucket中的所有任务

下面逐步介绍这5步内容

获取下一下定时任务的执行时间

waitForNextTick方法的返回值:当前时间的相对时间,这个时间之前的时间格的定时任务都会被执行。具体的详细步骤写在

private long waitForNextTick() {
    // 获取下一次执行任务所在的时间格的右区间的时间,比如,第一次执行,tick是0,tickDuration 是100ms,则第一个时间格的过期时间就是当前时间100ms后
    long deadline = tickDuration * (tick + 1);
    // 不断循环
    for (;;) {
        // 时间轮采用的是相对时间,时间轮线程启动的那一刻是startTime
        final long currentTime = System.nanoTime() - startTime;
        // 需要睡眠的时间=(下一个时间格右区间时间-当前时间+999999)/1000000
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
         if (sleepTimeMs <= 0) {
            // 走这里,说明当前时间指针已经越过了当前时间格。达到了触发当前时间格的定时任务的条件。

            if (currentTime == Long.MIN_VALUE) {
                // 当前时间是负数则返回(一般不会是这种情况
                return -Long.MAX_VALUE;
            } else {
                // 返回当前时刻时间(注意这是相对时间)
                return currentTime;
            }
        }

        try {
            // 走到这里,说明时间格所在的最大时间还没到,不满足触发定时任务的条件,所以需要线程睡一会。
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
          if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

从队列中删除所有已经被取消的任务

private void processCancelledTasks() {
    // 循环cancelledTimeouts队列,这个队列中全部是被取消的任务。
    for (;;) {
        // 获取队列中的任务
        HashedWheelTimeout timeout = cancelledTimeouts.poll();
        if (timeout == null) {           
            break;
        }
        // 从队列中删除任务
        timeout.remove();

    }
}

从环形数组中获取Bucket

每循环一个时间格,tick就会+1,比如时间轮一共有8个时间格,每个时间格1s,那么环形数组就是8个,mask = 8-1=7个,假如一个定时任务是9s后执行,tick=(9-1)/1s=8,tick&mask=1,所以时间格应该取第二个

把定时任务timeout添加到时间格的Bucket中

private void transferTimeoutsToBuckets() {
    // 每次循环10w个
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        // ...

        // 计算执行轮次,是第几轮执行
        long calculated = timeout.deadline / tickDuration;
        timeout.remainingRounds = (calculated - tick) / wheel.length;

        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // 把定时任务timeout添加到时间格的Bucket中
        bucket.addTimeout(timeout);
    }
}

执行已经满足条件的定时任务

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            next = remove(timeout);
            // 定时任务已到期
            if (timeout.deadline <= deadline) {
                // 把定时任务标记为已执行,并执行定时任务
                timeout.expire();
            }
        } else if (timeout.isCancelled()) {
            // 如果任务已取消,删除任务
            next = remove(timeout);
        } else {
            // 如果剩余轮次大于0,则-1
            timeout.remainingRounds --;
        }
        // 下一个
        timeout = next;
    }
}