Timer

🔗数据结构

type timer struct {
  //timer所在的处理器P
   pp puintptr

   when   int64
   period int64
   f      func(any, uintptr)
   arg    any
   seq    uintptr

   nextwhen int64 //最大触发时刻 1<<63 - 1

   status uint32
}
type p struct {
  	timersLock mutex
  	timers []*timer
}

🔗实现

🔗Go 1.9 版本之前(全局四叉堆)

实现:

全局四叉堆+timerproc Goroutine loop check

缺点:

全局唯一的互斥锁,严重影响计时器的性能

🔗Go 1.10 ~ 1.13(分片四叉堆)

实现:

64组(四叉堆+timerproc Goroutine loop check)

缺点:

CPU 密集计算任务会导致 timer 唤醒延迟

将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但是timerproc Goroutine造成的频繁的上下文切换却成为了影响计时器性能的首要因素

🔗Go 1.14 版本之后(与P绑定)

原本用于管理计时器的 runtime.timerproc 也已经被移除,目前计时器都交由处理器的网络轮询器调度器触发,这种方式能够充分利用本地性、减少上下文的切换开销,也是目前性能最好的实现方式。

🔗创建timer

//Ticker
func NewTicker(d Duration) *Ticker {
   c := make(chan Time, 1)
   t := &Ticker{
      C: c, //对外暴露的C。用法:<-ticker.C
      r: runtimeTimer{
         when:   when(d),
         period: int64(d),
         f:      sendTime, //runOneTimer的时候调用,向c中写入当前时间
         arg:    c,
      },
   }
   startTimer(&t.r) //向P对应四叉堆中添加timer
   return t
}

//Timer
func NewTimer(d Duration) *Timer {
   c := make(chan Time, 1)
   t := &Timer{
      C: c, //对外暴露的C。用法:<-ticker.C
      r: runtimeTimer{
         when: when(d),
         f:    sendTime,  //runOneTimer的时候调用,向c中写入当前时间
         arg:  c,
      },
   }
   startTimer(&t.r) //向P对应四叉堆中添加timer
   return t
}
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

🔗触发时机

🔗schedule期间

runtime.schedule会检查P中的timer是否准备就绪,就绪执行

func schedule() {
   ...
  	if gp == nil {
		gp, inheritTime = findrunnable() // blocks until work is available
	}
   ...
}
func findrunnable() (gp *g, inheritTime bool) {
  top:
    ......
    //真正执行timer的地方
    now, pollUntil, _ := checkTimers(_p_, 0)//pollUntil为最早到期时间
    ......
    //防止阻塞期间有timer到期错过触发时机,delay+now不能大于pollUntil
    list := netpoll(delay)   //找不到可运行的goroutine时,调用netpoll阻塞等待
		if faketime != 0 && list.empty() {
			stopm()
			goto top
		}
  ...
}
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { 
	if len(pp.timers) > 0 {
		adjusttimers(pp, now)
		for len(pp.timers) > 0 {
			// Note that runtimer may temporarily unlock
			// pp.timersLock.
			if tw := runtimer(pp, now); tw != 0 {
				if tw > 0 {
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}
}

func runtimer(pp *p, now int64) int64 {
	for {
		t := pp.timers[0]
		switch s := atomic.Load(&t.status); s {
		case timerWaiting:
			if t.when > now {
				// Not ready to run.
				return t.when
			}

			if !atomic.Cas(&t.status, s, timerRunning) {
				continue
			}
      //执行就绪的timer
			runOneTimer(pp, t, now)
			return 0
    }
  }
}

func runOneTimer(pp *p, t *timer, now int64) {
   f := t.f
   arg := t.arg
   seq := t.seq

   if t.period > 0 { //说明是ticker。period是时间间隔
      // Leave in heap but adjust next time to fire.
      delta := t.when - now
      t.when += t.period * (1 + -delta/t.period) //调整下一次ticker触发时间when
      if t.when < 0 { // check for overflow.
         t.when = maxWhen
      }
      siftdownTimer(pp.timers, 0)
      if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
         badTimer()
      }
      updateTimer0When(pp)
   } else {// 说明是timer,从四叉堆删除
      // Remove from heap.
      dodeltimer0(pp)
      if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
         badTimer()
      }
   }

   unlock(&pp.timersLock)

   f(arg, seq) //调用sendTime,唤醒阻塞在chan C上的Goroutine
               //如果是time.sleep,调用goready,唤醒阻塞的Goroutine
   lock(&pp.timersLock)
}

// 创建time.Ticker或time.Timer时传入的回调函数,当timer就绪时调用
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

// 创建time.Sleep时传入的回调函数,当timer就绪时调用
// Ready the goroutine arg.
func goroutineReady(arg any, seq uintptr) {
	goready(arg.(*g), 0)
}

🔗sysmon期间

runtime.sysmon 会检查是否有未执行的到期timer;如果有,启动新的M处理timer

🔗与netpoll的关系

个人理解:调度器如果暂时找不到工作,会阻塞调用netpoll。调度器通过记录sched.pollUntil(下一次调netpoll的时刻)保证netpoll阻塞期间不会有timer到期。

🔗time.Sleep

type g struct{
  stack       stack   // offset known to runtime/cgo
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink
  
	_panic    *_panic // innermost panic - offset known to liblink
	_defer    *_defer // innermost defer
	m         *m      // current m; offset known to arm liblink
	sched     gobuf
  timer          *timer         // cached timer for time.Sleep
}


func timeSleep(ns int64) {
   if ns <= 0 {
      return
   }

   //获取当前阻塞goroutine
   gp := getg()
   t := gp.timer
   if t == nil {
      t = new(timer)
      gp.timer = t
   }
   t.f = goroutineReady
   t.arg = gp
   t.nextwhen = nanotime() + ns
   if t.nextwhen < 0 { // check for overflow.
      t.nextwhen = maxWhen
   }
   //goroutine被park后,通过resetForSleep将timer放入四叉堆
   gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceEvGoSleep, 1)
}

//runOneTimer中调用,将goroutine唤醒
func goroutineReady(arg any, seq uintptr) {
	goready(arg.(*g), 0)
}