在实际的应用中,我们经常会需要在特定的延迟后,或者定时去做某件事情。这时就需要用到定时器了,Go 语言提供了一次性定时器 time.Timer 和周期型定时器 time.Ticker。
如何使用
Timer 是一次性的定时器,经过指定的时间后触发一个事件,这个事件通过其本身提供的 channel 进行通知。与之相关的主要方法如下:
// 创建 Timer
func NewTimer(d Duration) *Timer
// 停止 Timer
func (t *Timer) Stop() bool
// 重置 Timer
func (t *Timer) Reset(d Duration) bool
// 创建 Timer,返回它的 channel
func After(d Duration) <-chan Time
// 创建一个延迟执行 f 函数的 Timer
func AfterFunc(d Duration, f func()) *Timer
Timer 的主要使用场景有:
- 设定超时时间;
- 延迟执行某个方法。
// 创建 Ticker
func NewTicker(d Duration) *Ticker
// 停止 Ticker,Ticker 在使用完后务必要释放,否则会产生资源泄露
func (t *Ticker) Stop()
// 启动一个匿名的 Ticker(无法停止)
func Tick(d Duration) <-chan Time
Ticker 的使用场景都和定时任务有关,例如定时进行聚合等批量处理。
实现原理
我们先来看看 Timer 和 Ticker 的数据结构:
type Timer struct {
C <-chan Time
r runtimeTimer
}type Ticker struct {
C <-chan Time
r runtimeTimer
}
发现二者一模一样,而且都包含有 runtimeTimer 字段,这个 runtimeTimer 才是定时器底层真正的数据结构。翻看 Timer 和 Ticker 的相关实现代码,与这两个结构本身相关的逻辑都很简单,真正复杂的是底层 runtimeTimer 的设计与维护,这也是我们要重点介绍的内容。
演进历史 Go 语言的计时器实现经历过很多个版本的迭代,到最新的版本为止,计时器的实现分别经历了以下几段历史:
- Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护;
- Go 1.10 ~ 1.13 版本,全局使用 64 个四叉堆维护所有的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护;
- Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发。
Go 1.10 版本中将全局的四叉堆分割成了 64 个更小的四叉堆,这种分片的方式,降低了锁的粒度,解决了上面提到的第一个问题,但是第二个问题还是悬而未决。
在最新版本的实现中,所有的计时器都以最小四叉堆的形式存储在处理器 runtime.p 中,这样的设计方式让两个性能问题都迎刃而解。
数据结构 runtime.timer 是 Go 语言计时器的内部表示,每一个计时器都存储在对应处理器的最小四叉堆中,下面是运行时计时器对应的结构体:
type timer struct {
pp puintptrwhenint64
periodint64
ffunc(interface{}, uintptr)
arginterface{}
sequintptr
nextwhen int64
statusuint32
}
- pp:计时器所在的处理器 P 的指针地址。
- when:计时器被唤醒的时间。
- period:计时器再次被唤醒的时间间隔,只有 Ticker 会用到。
- f:回调函数,每次在计时器被唤醒时都会调用。
- arg:回调函数 f 的参数。
- seq:回调函数 f 的参数,该参数仅在 netpoll 的应用场景下使用。
- nextwhen:当计时器状态为 timerModifiedXX 时,将会使用 nextwhen 的值设置到 when 字段上。
- status:计时器的当前状态值。
状态 | 含义 |
---|---|
timerNoStatus | 计时器尚未设置状态 |
timerWaiting | 等待计时器启动 |
timerRunning | 运行计时器的回调方法 |
timerDeleted | 计时器已经被删除,但仍然在某些 P 的堆中 |
timerRemoving | 计时器正在被删除 |
timerRemoved | 计时器已经停止,且不在任何 P 的堆中 |
timerModifying | 计时器正在被修改 |
timerModifiedEarlier | 计时器已被修改为更早的时间 |
timerModifiedLater | 计时器已被修改为更晚的时间 |
timerMoving | 计时器已经被修改,正在被移动 |
- 处于 timerRunning、timerRemoving、timerModifying 和 timerMoving 状态的时间比较短。
- 处于 timerWaiting、timerRunning、timerDeleted、timerRemoving、timerModifying、timerModifiedEarlier、timerModifiedLater 和 timerMoving 状态时计时器在处理器(P)的堆上。
- 处于 timerNoStatus 和 timerRemoved 状态时计时器不在堆上。
- 处于 timerModifiedEarlier 和 timerModifiedLater 状态时计时器虽然在堆上,但是可能位于错误的位置上,需要重新排序。
func addtimer(t *timer) {
if t.when < 0 {
t.when = maxWhen
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaitingwhen := t.whenpp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp)
doaddtimer(pp, t)
unlock(&pp.timersLock)wakeNetPoller(when)
}
- 边界处理以及状态判断;
- 调用 cleantimers 清理处理器中的计时器;
- 调用 doaddtimer 初始化网络轮询器,并将当前计时器加入处理器的 timers 四叉堆中;
- 调用 wakeNetPoller 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status);
s {
case timerWaiting, timerModifiedLater:
// timerWaiting/timerModifiedLater -> timerDeleted
...
case timerModifiedEarlier:
// timerModifiedEarlier -> timerModifying -> timerDeleted
...
case timerDeleted, timerRemoving, timerRemoved:
// timerDeleted/timerRemoving/timerRemoved
return false
case timerRunning, timerMoving:
// timerRunning/timerMoving
osyield()
case timerNoStatus:
return false
case timerModifying:
osyield()
default:
badTimer()
}
}
}
在 deltimer 中遵循了基本的规则处理:
- timerWaiting/timerModifiedLater -> timerDeleted。
- timerModifiedEarlier -> timerModifying -> timerDeleted。
- timerDeleted/timerRemoving/timerRemoved -> 无需变更,已经满足条件。
- timerRunning/timerMoving/timerModifying -> 正在执行、移动中,无法停止,等待下一次状态检查再处理。
- timerNoStatus -> 无法停止,不满足条件。
- timerWaiting -> timerModifying -> timerModifiedXX。
- timerModifiedXX -> timerModifying -> timerModifiedYY。
- timerNoStatus -> timerModifying -> timerWaiting。
- timerRemoved -> timerModifying -> timerWaiting。
- timerDeleted -> timerModifying -> timerModifiedXX。
- timerRunning -> 等待状态改变,才可以进行下一步。
- timerMoving -> 等待状态改变,才可以进行下一步。
- timerRemoving -> 等待状态改变,才可以进行下一步。
- timerModifying -> 等待状态改变,才可以进行下一步。
- 待修改的计时器已经被删除:由于既有的计时器已经没有了,因此会调用 doaddtimer 方法创建一个新的计时器,并将原本的 timer 属性赋值过去,再调用 wakeNetPoller 方法在预定时间唤醒网络轮询器。
- 正常逻辑处理:如果修改后的计时器的触发时间小于原本的触发时间,则修改该计时器的状态为 timerModifiedEarlier,并且调用 wakeNetPoller 方法在预定时间唤醒网络轮询器。
- 调度器调度时会检查处理器中的计时器是否准备就绪;
- 系统监控会检查是否有未执行的到期计时器。
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if atomic.Load(&pp.adjustTimers) == 0 {
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
}lock(&pp.timersLock)adjusttimers(pp)
这一段是调整堆中计时器的过程:
- 起始先通过 pp.adjustTimers 检查当前处理器 P 中是否有需要调整的计时器,如果没有的话:
- 当没有需要执行的计时器时,直接返回;
- 当下一个计时器没有到期并且需要删除的计时器不多于总数的 1/4 时都会直接返回。
- 如果处理器中存在需要调整的计时器,会调用 runtime.adjusttimers 根据时间将 timers 切片重新排列。
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow);
tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
执行完调整阶段的逻辑后,就是运行计时器的代码。这一段通过 runtime.runtimer 查找并执行堆中需要执行的计时器:
- 如果成功执行,runtimer 返回 0;
- 如果没有需要执行的计时器,runtimer 返回最近的计时器的触发时间,记录这个时间并返回。
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
在最后的删除阶段,如果当前 Goroutine 的处理器和传入的处理器相同,并且处理器中被删除(timerDeleted 状态)的计时器占堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 清理处理器中全部被标记为 timerDeleted 的计时器。
即使是通过每次调度器调度和窃取的时候触发,但毕竟还是具有一定的不确定性,因此 Go 中使用系统监控触发来做一个兜底:
func sysmon() {
...
for {
...
now := nanotime()
next, _ := timeSleepUntil()
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0)
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {
startm(nil, false)
}
...
}
- 调用 runtime.timeSleepUntil 获取计时器的到期时间以及持有该计时器的堆;
- 如果超过 10ms 的时间没有网络轮询,调用 runtime.netpoll 轮询;
- 如果当前有应该运行的计时器没有执行,可能存在无法被抢占的处理器,则启动新的线程处理计时器。
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0]
switch s := atomic.Load(&t.status);
s {
case timerWaiting:
if t.when > now {
return t.when
}runOneTimer(pp, t, now)
return 0case timerDeleted:
// 删除堆中的计时器
case timerModifiedEarlier, timerModifiedLater:
// 修改计时器的时间
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
它会遵循以下的规则处理计时器:
- timerNoStatus -> 崩溃:未初始化的计时器
- timerWaiting -> timerWaiting
- timerWaiting -> timerRunning -> timerNoStatus
- timerWaiting -> timerRunning -> timerWaiting
- timerModifying -> 等待状态改变
- timerModifiedXX -> timerMoving -> timerWaiting
- timerDeleted -> timerRemoving -> timerRemoved
- timerRunning -> 崩溃:并发调用该函数
- timerRemoved、timerRemoving、timerMoving -> 崩溃:计时器堆不一致
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f
arg := t.arg
seq := t.seqif t.period > 0 {
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}unlock(&pp.timersLock)
f(arg, seq)
lock(&pp.timersLock)
}
根据计时器的 period 字段,上述函数会做出不同的处理:
- 如果 period 字段大于 0,则代表它是一个 Ticker,需要周期性触发:
- 修改计时器下一次触发的时间并更新其在堆中的位置;
- 将计时器的状态更新为 timerWaiting;
- 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段。
- 如果 period 字段小于或者等于 0,说明它是一个 Timer,只需触发一次即可:
- 调用 runtime.dodeltimer0 函数删除计时器;
- 将计时器的状态更新至 timerNoStatus。
推荐阅读
- 云链合一,我的开源之路
- golang 开发框架文档集
- Golang|Golang——指针的使用、数组指针和指针数组、指针与切片、指针与结构体、多级指针
- golang 写入文件的四种方法
- golang 读取文件的几种方式
- goalng 中结构体方法的区别使用
- Golang中空接口的使用
- Goalng中 空结构体的使用
- golang 中 字符串转int, float转 string 总结