时间轮(TimingWheel)高性能定时任务原理解密时间轮 是一种 实现延迟功能(定时器) 的 巧妙算法。时间轮可以高 - 掘金 (juejin.cn)
一、基本概念
时间轮 是一种 实现延迟功能(定时器) 的 巧妙算法。如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。把大批量的调度任务全部都绑定时间轮上,通过时间轮进行所有任务的管理,触发以及运行。能够高效地管理各种延时任务,周期任务,通知任务等。
相比于 JDK 自带的 Timer
、DelayQueue + ScheduledThreadPool
来说,时间轮算法是一种非常高效的调度模型。不过,时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合,因为时间轮算法的精度取决于时间段“指针”单元的最小粒度大小。比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。
时间轮(TimingWheel)算法应用范围非常广泛,各种操作系统的定时任务调度都有用到,我们熟悉的 Linux Crontab,以及 Java 开发过程中常用的 Dubbo、Netty、Akka、Quartz、ZooKeeper 、Kafka 等,几乎所有和 时间任务调度 都采用了时间轮的思想。
二、原理解析
定时的任务调度 分两种:
- 相对时间:一段时间后执行
- 绝对时间:指定某个确定的时间执行
当然,这两者之间是可以相互转换的,例如当前时间是12点,定时在5分钟之后执行,其实绝对时间就是:12:05;定时在12:05执行,相对时间就是5分钟之后执行。
时间轮(TimingWheel)是一个 存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务 TimerTask。
1、基本模型构成
- tickMs(基本时间跨度):时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。
- wheelSize(时间单位个数):时间轮的时间格个数是固定的,可用(wheelSize)来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize计算得出。
- currentTime(当前所处时间):时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 的所有任务。
2、处理流程分析
- 若时间轮的 tickMs=1ms,wheelSize=20,那么可以计算得出 interval 为 20ms;
- 初始情况下表盘指针 currentTime 指向时间格 0,此时有一个定时为 2ms 的任务插入进来会存放到时间格为 2 的 TimerTaskList 中;
- 随着时间的不断推移,指针 currentTime 不断向前推进,过了 2ms 之后,当到达时间格 2 时,就需要将时间格 2 所对应的 TimeTaskList 中的任务做相应的到期操作;
- 此时若又有一个定时为 8ms 的任务插入进来,则会存放到时间格 10 中,currentTime 再过 8ms 后会指向时间格 10。
当到达时间格 2 时,如果同时有一个定时为 19ms 的任务插入进来怎么办?
- 新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入到原本已经到期的时间格 1 中(一个显而易见的环形结构)。
总之,整个时间轮的总体跨度是不变的,随着指针 currentTime 的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在 currentTime 和 currentTime + interval 之间。
3、如何扩充?
如果此时有个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小么?
很多业务场景不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如 100 万毫秒,那么这个 wheelSize 为 100 万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。所以 层级时间轮(类似十进制/二进制的计数方式)的概念应运而生,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
复用之前的案例,第一层的时间轮 tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即为 20ms。每一层时间轮的 wheelSize 是固定的,都是 20,那么第二层的时间轮的总体时间跨度 interval 为 400ms。以此类推,这个 400ms 也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为 8000ms。
4、流程分析
当到达时间格 2 时,如果此时有个定时为 350ms 的任务,显然第一层时间轮不能满足条件,所以就 时间轮升级 到第二层时间轮中,最终被插入到第二层时间轮中时间格 17 所对应的 TimerTaskList 中;
如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格 1 的 TimerTaskList 中;
注意到在到期时间在 [400ms,800ms) 区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格 1 中,时间格 1 对应的TimerTaskList的超时时间为400ms;
随着时间的流逝,当次 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个 时间轮降级 的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中;
再经历了 40ms 之后,此时这个任务又被“察觉”到,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。
5、
除了第一层时间轮的起始时间(startMs),其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的 currentTime。 每一层的 currentTime 都必须是 tickMs 的整数倍,如果不满足则会将 currentTime 修剪为 tickMs 的整数倍,以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime = startMs - (startMs % tickMs)。currentTime 会随着时间推移而推荐,但是不会改变为 tickMs 的整数倍的既定事实。若某一时刻的时间为 timeMs,那么此时时间轮的 currentTime = timeMs - (timeMs % tickMs),时间每推进一次,每个层级的时间轮的 currentTime 都会依据此公式推进。
定时器只需持有TimingWheel的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但是每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用,以此层级调用而可以实现定时器间接持有各个层级时间轮的引用。
三、Netty工具类 - HashedWheelTimer
[【Netty】【XXL-JOB】时间轮的原理以及应用分析 - 酷酷- - 博客园](https://www.cnblogs.com/kukuxjx/p/18170762#:~:text=时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。,把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。 时间轮其实就是一种环形的数据结构%2C其设计参考了时钟转动的思维,可以想象成时钟,分成很多格子,一个格子代表一段时间。)
1、基本使用
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
hashedWheelTimer.newTimeout(new io.netty.util.TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("10秒后执行");
}
}, 10, TimeUnit.SECONDS);
2、成员变量
public class HashedWheelTimer implements Timer {
// ----------------------- 全局参数 -------------------------
// 当前java程序创建的时间轮实例数
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
// 当前java程序创建的时间轮是否达到上限
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
// 单个java程序最多创建64个时间轮实例
private static final int INSTANCE_COUNT_LIMIT = 64;
// ----------------------- 实例参数 -------------------------
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker<HashedWheelTimer> leak;
// 时间轮,轮询任务具体实现
private final Worker worker = new Worker();
// 执行上面 worker 的线程
private final Thread workerThread;
// 状态 - 初始化
public static final int WORKER_STATE_INIT = 0;
// 状态 - 已启动
public static final int WORKER_STATE_STARTED = 1;
// 状态 - 已停止
public static final int WORKER_STATE_SHUTDOWN = 2;
// ----------------------- 数据结构参数 -------------------------
// 时间轮 - 刻度大小
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
// 时间轮启动标志
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
// 存放提交的任务,比如往时间轮中提交一个任务会先放置在该队列中
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 取消的任务
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
// 当前 时间轮 中的定时任务数
private final AtomicLong pendingTimeouts = new AtomicLong(0);
// 最大定时任务数
private final long maxPendingTimeouts;
private final Executor taskExecutor;
// 启动时间
private volatile long startTime;
}
3、构造方法
时间轮:默认每个刻度100ms,每层512个刻度
public class HashedWheelTimer implements Timer {
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
public HashedWheelTimer(
ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
maxPendingTimeouts, ImmediateExecutor.INSTANCE);
}
/*
* 初始化
* threadFactory:线程工厂,用于生产线程去执行 timerTask
* tickDuration:间隔时间,时间轮刻度
* unit:间隔时间单位
* ticksPerWheel:每层时间轮有多少刻度
* leakDetection:是否开启泄漏检测
* maxPendingTimeouts:最大执行任务数
* taskExecutor:用于执行提交的TimerTasks的Executor
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {
// --------------- 数据校验 ---------------------------
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
}
4、添加定时任务
public class HashedWheelTimer implements Timer {
//新增任务
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
checkNotNull(task, "task");
checkNotNull(unit, "unit");
// ---------------- 判断定时任务数量是否达到最大限制 -------------------------
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// ---------------- 启动后台线程(如果还没启动) -------------------------
start();
// 将超时添加到超时队列中,该队列将在下一个计时点进行处理。
// 在处理过程中,所有排队的hashhedwheeltimeouts将被添加到正确的hashhedwheelbucket中。
// deadline 应该在多少纳秒之后执行任务
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
// 启动后台线程(如果还没启动)
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 如果后台线程未启动,使用 JUC 原子类 修改状态,比较并交换,更新状态为 已启动
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
// 等待时间轮后台线程启动
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
}
5、轮询线程
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 时间轮当前轮询的刻度
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
// 等待时间达到下一个刻度
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
// 下一个刻度的时间
// 下一个刻度时间毫秒值-时间轮启动时间=每个刻度大小*到下一个刻度经历的刻度数
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
if (sleepTimeMs == 0) {
sleepTimeMs = 1;
}
}
try {
// 进入到这里进行sleep,表示当前时间距离下一次tick时间还有一段距离,需要sleep
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
}
(1)转移任务到时间轮中
转移任务到时间轮中,前面我们讲过,任务添加进来时,是先放入到阻塞队列。而在现在这个方法中,就是把阻塞队列中的数据转移到时间轮的指定位置。
在这个转移方法中,写死了一个循环,每次都只转移10万个任务。然后根据 HashedWheelTimeout
的deadline延迟时间计算出时间轮需要运行多少次才能运行当前的任 务,如果当前的任务延迟时间大于时间轮跑一圈所需要的时间,那么就计算需要跑几圈才能到这个任务运行。最后计算出该任务在时间轮中的槽位,添加到时间轮的链表中。
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 循环100000次,也就是每次转移10w个任务
for (int i = 0; i < 100000; i++) {
// 从阻塞队列中获得具体的任务
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
// 计算tick次数,deadline表示当前任务的延迟时间, tickDuration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行
long calculated = timeout.deadline / tickDuration;
// 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行)
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 如果任务在 timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前 bucket, 此方法调用完后就会被执行
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// 算出任务应该插入的 wheel 的 slot, stopIndex = tick 次数 & mask, mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
// 把timeout任务插入到指定的bucket链中。
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
(2)运行时间轮中的任务
当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现,这个方法的主要作用是: 过期并执行格子中到期的任务。也就是当tick进入到指定格子时,worker线程 会调用这个方法。
HashedWheelBucket是一个链表,所以我们需要从head节点往下进行遍历。如果链表没有遍历到链表 尾部那么就继续往下遍历。
获取的timeout节点节点,如果剩余轮数remainingRounds大于0,那么就说明要到下一圈才能运行, 所以将剩余轮数减一;
如果当前剩余轮数小于等于零了,那么就将当前节点从bucket链表中移除,并判断一下当前的时间是否 大于timeout的延迟时间,如果是则调用timeout的expire执行任务。
因为要执行某个时间槽的任务,所以这里调用的是 bucket 的方法
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
// 遍历当前时间槽中的所有任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 轮数小于等于0 说明当前轮要执行
if (timeout.remainingRounds <= 0) {
// 取出当前的任务
next = remove(timeout);
// 小于当前的时间间隔了 执行
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// 按理不可能会走到这里的 The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// 如果已经取消了 移除当前返回下一个
next = remove(timeout);
} else {
// 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一
timeout.remainingRounds --;
}
timeout = next;
}
}