【Go进阶—基础特性】定时器

在实际的应用中,我们经常会需要在特定的延迟后,或者定时去做某件事情。这时就需要用到定时器了,Go 语言提供了一次性定时器 time.Timer 和周期型定时器 time.Ticker。
如何使用
Timer 是一次性的定时器,经过指定的时间后触发一个事件,这个事件通过其本身提供的 channel 进行通知。与之相关的主要方法如下:

// 创建 Timer func NewTimer(d Duration) *Timer // 停止 Timer func (t *Timer) Stop() bool // 重置 Timer func (t *Timer) Reset(d Duration) bool // 创建 Timer,返回它的 channel func After(d Duration) <-chan Time // 创建一个延迟执行 f 函数的 Timer func AfterFunc(d Duration, f func()) *Timer

Timer 的主要使用场景有:
  1. 设定超时时间;
  2. 延迟执行某个方法。
Ticker 是周期型定时器,即周期性的触发一个事件,通过 Ticker 提供的管道将事件传递出去。主要方法有:
// 创建 Ticker func NewTicker(d Duration) *Ticker // 停止 Ticker,Ticker 在使用完后务必要释放,否则会产生资源泄露 func (t *Ticker) Stop() // 启动一个匿名的 Ticker(无法停止) func Tick(d Duration) <-chan Time

Ticker 的使用场景都和定时任务有关,例如定时进行聚合等批量处理。
实现原理
我们先来看看 Timer 和 Ticker 的数据结构:
type Timer struct { C <-chan Time r runtimeTimer }type Ticker struct { C <-chan Time r runtimeTimer }

发现二者一模一样,而且都包含有 runtimeTimer 字段,这个 runtimeTimer 才是定时器底层真正的数据结构。翻看 Timer 和 Ticker 的相关实现代码,与这两个结构本身相关的逻辑都很简单,真正复杂的是底层 runtimeTimer 的设计与维护,这也是我们要重点介绍的内容。
演进历史 Go 语言的计时器实现经历过很多个版本的迭代,到最新的版本为止,计时器的实现分别经历了以下几段历史:
  1. Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护;
  2. Go 1.10 ~ 1.13 版本,全局使用 64 个四叉堆维护所有的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护;
  3. Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发。
在最开始的实现中,运行时创建的所有计时器都会加入到全局唯一的四叉堆中,然后有一个专门的协程 timerproc 来管理这些计时器,运行时会在计时器到期或者加入了更早的计时器时唤醒 timerproc 来处理。那这样一来,就会产生两个性能上的问题,第一个就是全局唯一的四叉堆带来的锁争用问题,第二个就是唤醒 timerproc 带来的上下文切换问题。
Go 1.10 版本中将全局的四叉堆分割成了 64 个更小的四叉堆,这种分片的方式,降低了锁的粒度,解决了上面提到的第一个问题,但是第二个问题还是悬而未决。
在最新版本的实现中,所有的计时器都以最小四叉堆的形式存储在处理器 runtime.p 中,这样的设计方式让两个性能问题都迎刃而解。
数据结构 runtime.timer 是 Go 语言计时器的内部表示,每一个计时器都存储在对应处理器的最小四叉堆中,下面是运行时计时器对应的结构体:
type timer struct { pp puintptrwhenint64 periodint64 ffunc(interface{}, uintptr) arginterface{} sequintptr nextwhen int64 statusuint32 }

  • pp:计时器所在的处理器 P 的指针地址。
  • when:计时器被唤醒的时间。
  • period:计时器再次被唤醒的时间间隔,只有 Ticker 会用到。
  • f:回调函数,每次在计时器被唤醒时都会调用。
  • arg:回调函数 f 的参数。
  • seq:回调函数 f 的参数,该参数仅在 netpoll 的应用场景下使用。
  • nextwhen:当计时器状态为 timerModifiedXX 时,将会使用 nextwhen 的值设置到 when 字段上。
  • status:计时器的当前状态值。
状态 现阶段计时器所包含的状态有下面几种:
状态 含义
timerNoStatus 计时器尚未设置状态
timerWaiting 等待计时器启动
timerRunning 运行计时器的回调方法
timerDeleted 计时器已经被删除,但仍然在某些 P 的堆中
timerRemoving 计时器正在被删除
timerRemoved 计时器已经停止,且不在任何 P 的堆中
timerModifying 计时器正在被修改
timerModifiedEarlier 计时器已被修改为更早的时间
timerModifiedLater 计时器已被修改为更晚的时间
timerMoving 计时器已经被修改,正在被移动
  • 处于 timerRunning、timerRemoving、timerModifying 和 timerMoving 状态的时间比较短。
  • 处于 timerWaiting、timerRunning、timerDeleted、timerRemoving、timerModifying、timerModifiedEarlier、timerModifiedLater 和 timerMoving 状态时计时器在处理器(P)的堆上。
  • 处于 timerNoStatus 和 timerRemoved 状态时计时器不在堆上。
  • 处于 timerModifiedEarlier 和 timerModifiedLater 状态时计时器虽然在堆上,但是可能位于错误的位置上,需要重新排序。
相关操作 添加计时器 当我们调用 time.NewTimer 或 time.NewTicker 时,会执行 runtime.addtimer 函数添加计时器:
func addtimer(t *timer) { if t.when < 0 { t.when = maxWhen } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaitingwhen := t.whenpp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) doaddtimer(pp, t) unlock(&pp.timersLock)wakeNetPoller(when) }

  1. 边界处理以及状态判断;
  2. 调用 cleantimers 清理处理器中的计时器;
  3. 调用 doaddtimer 初始化网络轮询器,并将当前计时器加入处理器的 timers 四叉堆中;
  4. 调用 wakeNetPoller 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
删除计时器 在计时器的使用中,一般会调用 timer.Stop() 方法来停止计时器,本质上就是让这个 timer 从轮询器中消失,也就是从处理器 P 的堆中移除 timer:
func deltimer(t *timer) bool { for { switch s := atomic.Load(&t.status); s { case timerWaiting, timerModifiedLater: // timerWaiting/timerModifiedLater -> timerDeleted ... case timerModifiedEarlier: // timerModifiedEarlier -> timerModifying -> timerDeleted ... case timerDeleted, timerRemoving, timerRemoved: // timerDeleted/timerRemoving/timerRemoved return false case timerRunning, timerMoving: // timerRunning/timerMoving osyield() case timerNoStatus: return false case timerModifying: osyield() default: badTimer() } } }

在 deltimer 中遵循了基本的规则处理:
  1. timerWaiting/timerModifiedLater -> timerDeleted。
  2. timerModifiedEarlier -> timerModifying -> timerDeleted。
  3. timerDeleted/timerRemoving/timerRemoved -> 无需变更,已经满足条件。
  4. timerRunning/timerMoving/timerModifying -> 正在执行、移动中,无法停止,等待下一次状态检查再处理。
  5. timerNoStatus -> 无法停止,不满足条件。
修改计时器 在我们调用 timer.Reset 方法来重新设置 Duration 值的时候,我们就是在对底层的计时器进行修改,对应的是 runtime.modtimer 方法。这个方法比较复杂,就不详细介绍了,有兴趣的可以自己研究一下。modtimer 遵循下述规则处理:
  1. timerWaiting -> timerModifying -> timerModifiedXX。
  2. timerModifiedXX -> timerModifying -> timerModifiedYY。
  3. timerNoStatus -> timerModifying -> timerWaiting。
  4. timerRemoved -> timerModifying -> timerWaiting。
  5. timerDeleted -> timerModifying -> timerModifiedXX。
  6. timerRunning -> 等待状态改变,才可以进行下一步。
  7. timerMoving -> 等待状态改变,才可以进行下一步。
  8. timerRemoving -> 等待状态改变,才可以进行下一步。
  9. timerModifying -> 等待状态改变,才可以进行下一步。
在完成了计时器的状态处理后,会分为两种情况处理:
  1. 待修改的计时器已经被删除:由于既有的计时器已经没有了,因此会调用 doaddtimer 方法创建一个新的计时器,并将原本的 timer 属性赋值过去,再调用 wakeNetPoller 方法在预定时间唤醒网络轮询器。
  2. 正常逻辑处理:如果修改后的计时器的触发时间小于原本的触发时间,则修改该计时器的状态为 timerModifiedEarlier,并且调用 wakeNetPoller 方法在预定时间唤醒网络轮询器。
触发计时器 Go 语言会在两种场景下触发计时器,运行计时器中保存的函数:
  • 调度器调度时会检查处理器中的计时器是否准备就绪;
  • 系统监控会检查是否有未执行的到期计时器。
调度器的触发一共分两种情况,一种是在调度循环 schedule 中,另一种是当前处理器 P 没有可执行的 G 和计时器,去其他 P 窃取计时器和 G 的 findrunnable 函数中。触发计时器时执行的是 checkTimers 函数,来剖析一下它的大致过程。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { if atomic.Load(&pp.adjustTimers) == 0 { next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { return now, 0, false } if now == 0 { now = nanotime() } if now < next { if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } }lock(&pp.timersLock)adjusttimers(pp)

这一段是调整堆中计时器的过程:
  • 起始先通过 pp.adjustTimers 检查当前处理器 P 中是否有需要调整的计时器,如果没有的话:
    • 当没有需要执行的计时器时,直接返回;
    • 当下一个计时器没有到期并且需要删除的计时器不多于总数的 1/4 时都会直接返回。
  • 如果处理器中存在需要调整的计时器,会调用 runtime.adjusttimers 根据时间将 timers 切片重新排列。
rnow = now if len(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) > 0 { if tw := runtimer(pp, rnow); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } }

执行完调整阶段的逻辑后,就是运行计时器的代码。这一段通过 runtime.runtimer 查找并执行堆中需要执行的计时器:
  • 如果成功执行,runtimer 返回 0;
  • 如果没有需要执行的计时器,runtimer 返回最近的计时器的触发时间,记录这个时间并返回。
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) }unlock(&pp.timersLock) return rnow, pollUntil, ran }

在最后的删除阶段,如果当前 Goroutine 的处理器和传入的处理器相同,并且处理器中被删除(timerDeleted 状态)的计时器占堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 清理处理器中全部被标记为 timerDeleted 的计时器。
即使是通过每次调度器调度和窃取的时候触发,但毕竟还是具有一定的不确定性,因此 Go 中使用系统监控触发来做一个兜底:
func sysmon() { ... for { ... now := nanotime() next, _ := timeSleepUntil() ... lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) if !list.empty() { incidlelocked(-1) injectglist(&list) incidlelocked(1) } } if next < now { startm(nil, false) } ... }

  • 调用 runtime.timeSleepUntil 获取计时器的到期时间以及持有该计时器的堆;
  • 如果超过 10ms 的时间没有网络轮询,调用 runtime.netpoll 轮询;
  • 如果当前有应该运行的计时器没有执行,可能存在无法被抢占的处理器,则启动新的线程处理计时器。
运行计时器 runtime.runtimer 函数会检查处理器四叉堆上最顶上的计时器,该函数也会处理计时器的删除和更新:
func runtimer(pp *p, now int64) int64 { for { t := pp.timers[0] switch s := atomic.Load(&t.status); s { case timerWaiting: if t.when > now { return t.when }runOneTimer(pp, t, now) return 0case timerDeleted: // 删除堆中的计时器 case timerModifiedEarlier, timerModifiedLater: // 修改计时器的时间 case timerModifying: osyield() case timerNoStatus, timerRemoved: badTimer() case timerRunning, timerRemoving, timerMoving: badTimer() default: badTimer() } } }

它会遵循以下的规则处理计时器:
  • timerNoStatus -> 崩溃:未初始化的计时器
  • timerWaiting -> timerWaiting
  • timerWaiting -> timerRunning -> timerNoStatus
  • timerWaiting -> timerRunning -> timerWaiting
  • timerModifying -> 等待状态改变
  • timerModifiedXX -> timerMoving -> timerWaiting
  • timerDeleted -> timerRemoving -> timerRemoved
  • timerRunning -> 崩溃:并发调用该函数
  • timerRemoved、timerRemoving、timerMoving -> 崩溃:计时器堆不一致
如果处理器四叉堆顶部的计时器没有到触发时间会直接返回,否则调用 runtime.runOneTimer 运行堆顶的计时器:
func runOneTimer(pp *p, t *timer, now int64) { f := t.f arg := t.arg seq := t.seqif t.period > 0 { delta := t.when - now t.when += t.period * (1 + -delta/t.period) siftdownTimer(pp.timers, 0) if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) } else { dodeltimer0(pp) if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } }unlock(&pp.timersLock) f(arg, seq) lock(&pp.timersLock) }

根据计时器的 period 字段,上述函数会做出不同的处理:
  • 如果 period 字段大于 0,则代表它是一个 Ticker,需要周期性触发:
    • 修改计时器下一次触发的时间并更新其在堆中的位置;
    • 将计时器的状态更新为 timerWaiting;
    • 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段。
  • 如果 period 字段小于或者等于 0,说明它是一个 Timer,只需触发一次即可:
    • 调用 runtime.dodeltimer0 函数删除计时器;
    • 将计时器的状态更新至 timerNoStatus。
【【Go进阶—基础特性】定时器】在完成更新计时器后,上互斥锁,调用计时器的回调方法 f,传入相应参数。完成整个流程。

    推荐阅读