go之timer
Timer
🔗数据结构
type timer struct {
//timer所在的处理器P
pp puintptr
when int64
period int64
f func(any, uintptr)
arg any
seq uintptr
nextwhen int64 //最大触发时刻 1<<63 - 1
status uint32
}
type p struct {
timersLock mutex
timers []*timer
}
🔗实现
🔗Go 1.9 版本之前(全局四叉堆)
实现:
全局四叉堆+timerproc Goroutine loop check
缺点:
全局唯一的互斥锁,严重影响计时器的性能
🔗Go 1.10 ~ 1.13(分片四叉堆)
实现:
64组(四叉堆+timerproc Goroutine loop check)
缺点:
CPU 密集计算任务会导致 timer 唤醒延迟
将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但是timerproc Goroutine造成的频繁的上下文切换却成为了影响计时器性能的首要因素
🔗Go 1.14 版本之后(与P绑定)
原本用于管理计时器的 runtime.timerproc 也已经被移除,目前计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性、减少上下文的切换开销,也是目前性能最好的实现方式。
🔗创建timer
//Ticker
func NewTicker(d Duration) *Ticker {
c := make(chan Time, 1)
t := &Ticker{
C: c, //对外暴露的C。用法:<-ticker.C
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime, //runOneTimer的时候调用,向c中写入当前时间
arg: c,
},
}
startTimer(&t.r) //向P对应四叉堆中添加timer
return t
}
//Timer
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c, //对外暴露的C。用法:<-ticker.C
r: runtimeTimer{
when: when(d),
f: sendTime, //runOneTimer的时候调用,向c中写入当前时间
arg: c,
},
}
startTimer(&t.r) //向P对应四叉堆中添加timer
return t
}
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
🔗触发时机
🔗schedule期间
runtime.schedule会检查P中的timer是否准备就绪,就绪执行
func schedule() {
...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
...
}
func findrunnable() (gp *g, inheritTime bool) {
top:
......
//真正执行timer的地方
now, pollUntil, _ := checkTimers(_p_, 0)//pollUntil为最早到期时间
......
//防止阻塞期间有timer到期错过触发时机,delay+now不能大于pollUntil
list := netpoll(delay) //找不到可运行的goroutine时,调用netpoll阻塞等待
if faketime != 0 && list.empty() {
stopm()
goto top
}
...
}
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
}
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0]
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
//执行就绪的timer
runOneTimer(pp, t, now)
return 0
}
}
}
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f
arg := t.arg
seq := t.seq
if t.period > 0 { //说明是ticker。period是时间间隔
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period) //调整下一次ticker触发时间when
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {// 说明是timer,从四叉堆删除
// Remove from heap.
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
unlock(&pp.timersLock)
f(arg, seq) //调用sendTime,唤醒阻塞在chan C上的Goroutine
//如果是time.sleep,调用goready,唤醒阻塞的Goroutine
lock(&pp.timersLock)
}
// 创建time.Ticker或time.Timer时传入的回调函数,当timer就绪时调用
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
// 创建time.Sleep时传入的回调函数,当timer就绪时调用
// Ready the goroutine arg.
func goroutineReady(arg any, seq uintptr) {
goready(arg.(*g), 0)
}
🔗sysmon期间
runtime.sysmon 会检查是否有未执行的到期timer;如果有,启动新的M处理timer
🔗与netpoll的关系
个人理解:调度器如果暂时找不到工作,会阻塞调用netpoll。调度器通过记录sched.pollUntil(下一次调netpoll的时刻)保证netpoll阻塞期间不会有timer到期。
🔗time.Sleep
type g struct{
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
sched gobuf
timer *timer // cached timer for time.Sleep
}
func timeSleep(ns int64) {
if ns <= 0 {
return
}
//获取当前阻塞goroutine
gp := getg()
t := gp.timer
if t == nil {
t = new(timer)
gp.timer = t
}
t.f = goroutineReady
t.arg = gp
t.nextwhen = nanotime() + ns
if t.nextwhen < 0 { // check for overflow.
t.nextwhen = maxWhen
}
//goroutine被park后,通过resetForSleep将timer放入四叉堆
gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceEvGoSleep, 1)
}
//runOneTimer中调用,将goroutine唤醒
func goroutineReady(arg any, seq uintptr) {
goready(arg.(*g), 0)
}
Read other posts