首頁 > 軟體

Go中定時器實現原理及原始碼解析

2021-03-07 16:00:53

轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com

本文使用的go的原始碼15.7,需要注意的是由於timer是1.14版本進行改版,但是1.14和1.15版本的timer並無很大區別

我在春節期間寫了一篇文章有關時間輪的:https://www.luozhiyun.com/archives/444。後來有同學建議我去看看 1.14版本之後的 timer 優化。然後我就自己就時間輪和 timer 也做了個 benchmark:

goos: darwin
goarch: amd64
pkg: gin-test/api/main
BenchmarkTimingWheel_StartStop/N-1m-12   4582120               254 ns/op              85 B/op          2 allocs/op
BenchmarkTimingWheel_StartStop/N-5m-12   3356630               427 ns/op              46 B/op          1 allocs/op
BenchmarkTimingWheel_StartStop/N-10m-12                  2474842               483 ns/op              60 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-1m-12                 6777975               179 ns/op              84 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-5m-12                 6431217               231 ns/op              85 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-10m-12                5374492               266 ns/op              83 B/op          1 allocs/op
PASS
ok      gin-test/api/main       60.414s

從上面可以直接看出,在新增了一千萬個定時器後,時間輪的單次呼叫時間有明顯的上漲,但是 timer 卻依然很穩。

從官方的一個資料顯示:

name                      old time/op  new time/op  delta
AfterFunc-12              1.57ms ± 1%  0.07ms ± 1%  -95.42%  (p=0.000 n=10+8)
After-12                  1.63ms ± 3%  0.11ms ± 1%  -93.54%  (p=0.000 n=9+10)
Stop-12                   78.3µs ± 3%  73.6µs ± 3%   -6.01%  (p=0.000 n=9+10)
SimultaneousAfterFunc-12   138µs ± 1%   111µs ± 1%  -19.57%  (p=0.000 n=10+9)
StartStop-12              28.7µs ± 1%  31.5µs ± 5%   +9.64%  (p=0.000 n=10+7)
Reset-12                  6.78µs ± 1%  4.24µs ± 7%  -37.45%  (p=0.000 n=9+10)
Sleep-12                   183µs ± 1%   125µs ± 1%  -31.67%  (p=0.000 n=10+9)
Ticker-12                 5.40ms ± 2%  0.03ms ± 1%  -99.43%  (p=0.000 n=10+10)
...

在很多項測試中,效能確實得到了很大的增強。下面也就一起看看效能暴漲的原因。

介紹

1.13 版本的 timer

Go 在1.14版本之前是使用 64 個最小堆,執行時建立的所有計時器都會加入到最小堆中,每個處理器(P)建立的計時器會由對應的最小堆維護。

下面是1.13版本 runtime.time原始碼:

const timersLen = 64

var timers [timersLen]struct {
	timersBucket
	// padding, 防止false sharing 
	pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}
// 獲取 P 對應的 Bucket
func (t *timer) assignBucket() *timersBucket {
	id := uint8(getg().m.p.ptr().id) % timersLen
	t.tb = &timers[id].timersBucket
	return t.tb
}

type timersBucket struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
    // timer 列表
	t            []*timer
}

通過上面的 assignBucket 方法可以知道,如果當前機器上的處理器 P 的個數超過了 64,多個處理器上的計時器就可能儲存在同一個桶 timersBucket 中。

每個桶負責管理一堆這樣有序的 timer,同時每個桶都會有一個對應的 timerproc 非同步任務來負責不斷排程這些 timer。t

imerproc 會從 timersBucket 不斷取堆頂元素,如果堆頂的 timer 已到期則執行,沒有任務到期則 sleep,所有任務都消耗完了,那麼呼叫 gopark 掛起,直到有新的 timer 被新增到桶中時,才會被重新喚醒。

timerproc 在 sleep 的時候會呼叫 notetsleepg ,繼而引發entersyscallblock呼叫,該方法會主動呼叫 handoffp ,解綁 M 和 P。當下一個定時時間到來時,又會進行 M 和 P 的繫結,處理器 P 和執行緒 M 之間頻繁的上下文切換也是 timer 的首要效能影響因素之一。

1.14 版本後 timer 的變化

在Go 在1.14版本之後,移除了timersBucket,所有的計時器都以最小四叉堆的形式儲存 P 中。

type p struct {
	... 
	// 互斥鎖
	timersLock mutex
	// 儲存計時器的最小四叉堆
	timers []*timer
	// 計時器數量
	numTimers uint32
	// 處於 timerModifiedEarlier 狀態的計時器數量
	adjustTimers uint32
	// 處於 timerDeleted 狀態的計時器數量
	deletedTimers uint32
	...
}

timer 不再使用 timerproc 非同步任務來排程,而是改用排程迴圈或系統監控排程的時候進行觸發執行,減少了執行緒之間上下文切換帶來的效能損失,並且通過使用 netpoll 阻塞喚醒機制可以讓 timer 更加及時的得到執行。

timer的使用

time.Timer計時器必須通過time.NewTimertime.AfterFunc或者 time.After 函數建立。

如下time.NewTimer

通過定時器的欄位C,我們可以及時得知定時器到期的這個事件來臨,C是一個chan time.Time型別的緩衝通道,一旦觸及到期時間,定時器就會向自己的C欄位傳送一個time.Time型別的元素值

func main() {
	//初始化定時器
	t := time.NewTimer(2 * time.Second)
	//當前時間
	now := time.Now()
	fmt.Printf("Now time : %v.n", now)

	expire := <- t.C
	fmt.Printf("Expiration time: %v.n", expire)
}

time.After一般是配合select來使用:

func main() { 
	ch1 := make(chan int, 1)
	select {
	case e1 := <-ch1:
		//如果ch1通道成功讀取資料,則執行該case處理語句
		fmt.Printf("1th case is selected. e1=%v",e1)
	case <- time.After(2 * time.Second):
		fmt.Println("Timed out")
	}
}

time.Afterfunc可以在設定時間過後執行一個函數:

func main() {
	f := func(){
		fmt.Printf("Expiration time : %v.n", time.Now())
	}
	time.AfterFunc(1*time.Second, f)
 	time.Sleep(2 * time.Second)
}

分析

初始化&Timer結構體

我們先看看NewTimer方法是如何建立一個Timer的:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

func NewTimer(d Duration) *Timer {
	// 初始化一個channel,用於返回
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	// 呼叫runtime.time的startTimer方法
	startTimer(&t.r)
	return t
}

func startTimer(*runtimeTimer)

NewTimer方法主要是初始化一個Timer,然後呼叫startTimer方法,並返回Timer。startTimer方法的真正邏輯並不在time包裡面,我們可以使用到上一節提到的使用dlv偵錯組合程式碼:

sleep.go:94     0xd8ea09        e872c7faff              call $time.startTimer

得知startTimer實際上呼叫的是runtime.time.startTimer方法。也就是說time.Timer只是對runtime包中timer的一層wrap。這層自身實現的最核心功能是將底層的超時回撥轉換為傳送channel訊息。

下面我們看看runtime.startTimer

func startTimer(t *timer) {
	...
	addtimer(t)
}

startTimer方法會將傳入的runtimeTimer轉為timer,然後呼叫addtimer方法。

在NewTimer方法中會初始化一個runtimeTimer結構體,這個結構體實際上會被當做runtime.time中的timer結構體傳入到startTimer方法中,所以下面我們來看看timer:

type timer struct {
	// 對應處理器P的指標
	pp puintptr 
	// 定時器被喚醒的時間
	when   int64
    // 喚醒的間隔時間
	period int64
	// 喚醒時被呼叫的函數
	f      func(interface{}, uintptr)
	// 被呼叫的函數的引數
	arg    interface{} 
	seq    uintptr
	// 處於timerModifiedXX狀態時用於設定when欄位 
	nextwhen int64 
	// 定時器的狀態
	status uint32
}

除此之外,timer還有一些標誌位來表示 status 狀態:

const (
	// 初始化狀態
	timerNoStatus = iota

	// 等待被呼叫
	// timer 已在 P 的列表中
	timerWaiting

	// 表示 timer 在執行中 
	timerRunning

	// timer 已被刪除 
	timerDeleted

	// timer 正在被移除 
	timerRemoving

	// timer 已被移除,並停止執行 
	timerRemoved

	// timer 被修改了 
	timerModifying
 
  	// 被修改到了更早的時間 
	timerModifiedEarlier 
  
  	// 被修改到了更晚的時間
	timerModifiedLater
 
  // 已經被修改,並且正在被移動
	timerMoving
)

addtimer 新增 timer

runtime.addtimer

func addtimer(t *timer) {
	// 定時器被喚醒的時間的時間不能為負數
	if t.when < 0 {
		t.when = maxWhen
	}
	// 狀態必須為初始化
	if t.status != timerNoStatus {
		throw("addtimer called with initialized timer")
	}
	// 設定為等待排程
	t.status = timerWaiting

	when := t.when
	// 獲取當前 P
	pp := getg().m.p.ptr()
	lock(&pp.timersLock)
	// 清理 P 的 timer 列表頭中的 timer
	cleantimers(pp)
	// 將 timer 加入到 P 的最小堆中
	doaddtimer(pp, t)
	unlock(&pp.timersLock)
	// 喚醒 netpoller 中休眠的執行緒
	wakeNetPoller(when)
}
  1. addtimer 會對 timer 被喚醒的時間 when 進行校驗,以及校驗 status 必須是新出初始化的 timer;
  2. 接著會在加鎖後呼叫 cleantimers 對 P 中對應的 timer 列表的頭節點進行清理工作,清理完後呼叫 doaddtimer 將 timer 加入到 P 的最小堆中,並釋放鎖;
  3. 呼叫 wakeNetPoller 喚醒 netpoller 中休眠的執行緒。

下面分別來看看 addtimer 中幾個重要函數的具體實現:

runtime.cleantimers

func cleantimers(pp *p) {
	gp := getg()
	for {
		// 排程器列表為空,直接返回
		if len(pp.timers) == 0 {
			return
		}
		// 如果當前 G 被搶佔了,直接返回
		if gp.preemptStop {
			return
		}
		// 獲取第一個 timer
		t := pp.timers[0]
		if t.pp.ptr() != pp {
			throw("cleantimers: bad p")
		}
		switch s := atomic.Load(&t.status); s {
		case timerDeleted:
			// 設定 timer 的狀態
			if !atomic.Cas(&t.status, s, timerRemoving) {
				continue
			}
			// 刪除第一個 timer
			dodeltimer0(pp)
			// 刪除完畢後重置狀態為 timerRemoved
			if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
				badTimer()
			}
			atomic.Xadd(&pp.deletedTimers, -1)
		// timer 被修改到了更早或更晚的時間
		case timerModifiedEarlier, timerModifiedLater:
			// 將 timer 狀態設定為 timerMoving
			if !atomic.Cas(&t.status, s, timerMoving) {
				continue
			}
			// 重新設定 when 欄位
			t.when = t.nextwhen
			// 在列表中刪除後重新加入 
			dodeltimer0(pp)
			doaddtimer(pp, t)
			if s == timerModifiedEarlier {
				atomic.Xadd(&pp.adjustTimers, -1)
			}
			// 設定狀態為 timerWaiting
			if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
				badTimer()
			}
		default: 
			return
		}
	}
}

cleantimers 函數中使用了一個無限迴圈來獲取頭節點。如果頭節點的狀態是 timerDeleted ,那麼需要從 timer 列表中刪除;如果頭節點的狀態是 timerModifiedEarlier 或 timerModifiedLater ,表示頭節點的觸發的時間被修改到了更早或更晚的時間,那麼就先從 timer佇列中刪除再重新新增。

runtime.doaddtimer

func doaddtimer(pp *p, t *timer) { 
	// Timers 依賴於 netpoller
	// 所以如果 netpoller 沒有啟動,需要啟動一下
	if netpollInited == 0 {
		netpollGenericInit()
	}
	// 校驗是否早已在 timer 列表中
	if t.pp != 0 {
		throw("doaddtimer: P already set in timer")
	}
	// 設定 timer 與 P 的關聯
	t.pp.set(pp)
	i := len(pp.timers)
	// 將 timer 加入到 P 的 timer 列表中
	pp.timers = append(pp.timers, t)
	// 維護 timer 在 最小堆中的位置
	siftupTimer(pp.timers, i)
	// 如果 timer 是列表中頭節點,需要設定一下 timer0When
	if t == pp.timers[0] {
		atomic.Store64(&pp.timer0When, uint64(t.when))
	}
	atomic.Xadd(&pp.numTimers, 1)
}

doaddtimer 函數實際上很簡單,主要是將 timer 與 P 設定關聯關係,並將 timer 加入到 P 的 timer 列表中,並維護 timer 列表最小堆的順序。

runtime.wakeNetPoller

func wakeNetPoller(when int64) {
	if atomic.Load64(&sched.lastpoll) == 0 {  
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		// 如果計時器的觸發時間小於netpoller的下一次輪詢時間
		if pollerPollUntil == 0 || pollerPollUntil > when {
			// 向netpollBreakWr裡面寫入資料,立即中斷netpoll
			netpollBreak()
		}
	}
}

func netpollBreak() {
	if atomic.Cas(&netpollWakeSig, 0, 1) {
		for {
			var b byte
            // 向 netpollBreakWr 裡面寫入資料
			n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
			if n == 1 {
				break
			}
			if n == -_EINTR {
				continue
			}
			if n == -_EAGAIN {
				return
			}
			println("runtime: netpollBreak write failed with", -n)
			throw("runtime: netpollBreak write failed")
		}
	}
}

wakeNetPoller 主要是將 timer 下次排程的時間和 netpoller 的下一次輪詢時間相比,如果小於的話,呼叫 netpollBreak 向 netpollBreakWr 裡面寫入資料,立即中斷netpoll。具體如何中斷的,我們下面再聊。

stopTimer 終止 timer

終止 timer 的邏輯主要是 timer 的狀態的變更:

如果該timer處於 timerWaiting 或 timerModifiedLater 或 timerModifiedEarlier:

  • timerModifying -> timerDeleted

如果該timer處於 其他狀態:

  • 待狀態改變或者直接返回

所以在終止 timer 的過程中不會去刪除 timer,而是標記一個狀態,等待被刪除。

modTimer 修改 timer

func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
	if when < 0 {
		when = maxWhen
	} 
	status := uint32(timerNoStatus)
	wasRemoved := false
	var pending bool
	var mp *m
loop:
	for {
		// 修改 timer 狀態
		switch status = atomic.Load(&t.status); status {
		...
	} 
	t.period = period
	t.f = f
	t.arg = arg
	t.seq = seq
	// 如果 timer 已被刪除,那麼需要重新新增到 timer 列表中
	if wasRemoved {
		t.when = when
		pp := getg().m.p.ptr()
		lock(&pp.timersLock)
		doaddtimer(pp, t)
		unlock(&pp.timersLock)
		if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
			badTimer()
		}
		releasem(mp)
		wakeNetPoller(when)
	} else {
		
		t.nextwhen = when

		newStatus := uint32(timerModifiedLater)
		// 如果修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier
		if when < t.when {
			newStatus = timerModifiedEarlier
		} 
		... 
		if !atomic.Cas(&t.status, timerModifying, newStatus) {
			badTimer()
		}
		releasem(mp)

		// 如果修改時間提前,那麼觸發 netpoll 中斷
		if newStatus == timerModifiedEarlier {
			wakeNetPoller(when)
		}
	}

	return pending
}

modtimer 進入到 for 迴圈後會根據不同的狀態做狀態設定以及必要欄位的處理;如果是 timer 已被刪除,那麼需要重新新增到 timer 列表中;如果 timer 修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier,修改時間提前,還需要觸發 netpoll 中斷。

timer 的執行

聊完了如何新增 timer,下面我們來看看 timer 是如何執行的。timer 的執行是交給 runtime.runtimer函數執行的,這個函數會檢查 P 上最小堆的最頂上的 timer 的狀態,根據狀態做不同的處理。

func runtimer(pp *p, now int64) int64 {
	for {
		// 獲取最小堆的第一個元素
		t := pp.timers[0]
		if t.pp.ptr() != pp {
			throw("runtimer: bad p")
		}
		// 獲取 timer 狀態
		switch s := atomic.Load(&t.status); s {
		// timerWaiting
		case timerWaiting:
			// 還沒到時間,返回下次執行時間
			if t.when > now {
				// Not ready to run.
				return t.when
			}
			// 修改狀態為 timerRunning
			if !atomic.Cas(&t.status, s, timerRunning) {
				continue
			}
			// 執行該 timer
			runOneTimer(pp, t, now)
			return 0
		// timerDeleted
		case timerDeleted:
			if !atomic.Cas(&t.status, s, timerRemoving) {
				continue
			}
			// 刪除最小堆的第一個 timer
			dodeltimer0(pp)
			if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
				badTimer()
			}
			atomic.Xadd(&pp.deletedTimers, -1)
			if len(pp.timers) == 0 {
				return -1
			}
		// 需要重新移動位置的 timer
		case timerModifiedEarlier, timerModifiedLater:
			if !atomic.Cas(&t.status, s, timerMoving) {
				continue
			}
			t.when = t.nextwhen
			// 刪除最小堆的第一個 timer
			dodeltimer0(pp)
			// 將該 timer 重新新增到最小堆
			doaddtimer(pp, t)
			if s == timerModifiedEarlier {
				atomic.Xadd(&pp.adjustTimers, -1)
			}
			if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
				badTimer()
			}

		case timerModifying: 
			osyield()

		case timerNoStatus, timerRemoved: 
			badTimer()
		case timerRunning, timerRemoving, timerMoving: 
			badTimer()
		default:
			badTimer()
		}
	}
}

runtimer 裡面會啟動一個 for 迴圈,不停的檢查 P 的 timer 列表的第一個元素的狀態。

  • 如果該 timer 處於 timerWaiting,那麼判斷當前的時間大於 timer 執行的時間,則呼叫 runOneTimer 執行;
  • 如果該 timer 處於 timerDeleted,表示該 timer 是需要被刪除的,那麼呼叫 dodeltimer0 刪除最小堆的第一個 timer ,並修改其狀態;
  • 如果該 timer 狀態是 timerModifiedEarlier 、timerModifiedLater,那麼表示該 timer 的執行時間被修改過,需要重新調整它在最小堆中的位置,所以先呼叫 dodeltimer0 刪除該 timer,再呼叫 doaddtimer 將該 timer 重新新增到最小堆。

runtime.runOneTimer

func runOneTimer(pp *p, t *timer, now int64) {
	... 
	// 需要被執行的函數
	f := t.f
	// 被執行函數的引數
	arg := t.arg
	seq := t.seq
	// 表示該 timer 為 ticker,需要再次觸發
	if t.period > 0 { 
		// 放入堆中並調整觸發時間
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		siftdownTimer(pp.timers, 0)
		if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
			badTimer()
		}
		updateTimer0When(pp)
	// 一次性 timer
	} else {
		// 刪除該 timer.
		dodeltimer0(pp)
		if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
			badTimer()
		}
	}  
	unlock(&pp.timersLock)
	// 執行該函數
	f(arg, seq)
	lock(&pp.timersLock)
	...
}

runOneTimer 會根據 period 是否大於0判斷該 timer 是否需要反覆執行,如果是的話需要重新調整 when 下次執行時間後重新調整該 timer 在堆中的位置。一次性 timer 的話會執行 dodeltimer0 刪除該 timer ,最後執行 timer 中的函數;

timer 的觸發

下面這裡是我覺得比較有意思的地方,timer 的觸發有兩種:

  • 從排程迴圈中直接觸發;

  • 另一種是Go語言的後臺系統監控中會定時觸發;

排程迴圈觸發

排程迴圈,我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,不明白的同學可以自己再去看看。

整個排程迴圈會有三個地方去檢查是否有可執行的 timer:

  1. 呼叫 runtime.schedule 執行排程時;
  2. 呼叫runtime.findrunnable獲取可執行函數時;
  3. 呼叫runtime.findrunnable執行搶佔時;

runtime.schedule

func schedule() {
	_g_ := getg()
	...
top:
	pp := _g_.m.p.ptr()
	...
	// 檢查是否有可執行 timer 並執行
	checkTimers(pp, 0) 
	var gp *g
	...
	if gp == nil {
		gp, inheritTime = findrunnable() // blocks until work is available
	}
	... 
	execute(gp, inheritTime)
}

下面我們看看 checkTimers 做了什麼:

runtime.checkTimers

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { 
	// 如果沒有需要調整的 timer
	if atomic.Load(&pp.adjustTimers) == 0 {
		// 獲取 timer0 的執行時間 
		next := int64(atomic.Load64(&pp.timer0When))
		if next == 0 {
			return now, 0, false
		}
		if now == 0 {
			now = nanotime()
		}
		// 下次執行大於當前時間,
		if now < next { 
			// 需要刪除的 timer 個數小於 timer列表個數的4分之1,直接返回
			if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
				return now, next, false
			}
		}
	} 
	lock(&pp.timersLock)
	// 進行調整 timer
	adjusttimers(pp) 
	rnow = now
	if len(pp.timers) > 0 {
		if rnow == 0 {
			rnow = nanotime()
		}
		for len(pp.timers) > 0 { 
			// 查詢堆中是否存在需要執行的 timer
			if tw := runtimer(pp, rnow); tw != 0 {
				if tw > 0 {
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}

	// 如果需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼清理需要刪除的 timer
	if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
		clearDeletedTimers(pp)
	}

	unlock(&pp.timersLock)

	return rnow, pollUntil, ran
}

checkTimers 中主要做了這麼幾件事:

  1. 檢查是否有需要進行調整的 timer, 如果沒有需要執行的計時器時,直接返回;如果下一個要執行的 timer 沒有到期並且需要刪除的計時器較少(四分之一)時也會直接返回;
  2. 呼叫 adjusttimers 進行 timer 列表的調整,主要是維護 timer 列表的最小堆的順序;
  3. 呼叫 runtime.runtimer查詢堆中是否存在需要執行的timer, runtime.runtimer上面已經講過了,這裡不再贅述;
  4. 如果當前 Goroutine 的 P 和傳入的 P 相同,並且需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼呼叫 clearDeletedTimers 清理需要刪除的 timer;

runtime.findrunnable

func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()
top:
	_p_ := _g_.m.p.ptr()
	...
	// 檢查 P 中可執行的 timer
	now, pollUntil, _ := checkTimers(_p_, 0)
	... 
    // 如果 netpoll 已被初始化,並且 Waiters 大於零,並且 lastpoll 不為0
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		// 嘗試從netpoller獲取Glist
		if list := netpoll(0); !list.empty() { // 無阻塞
			gp := list.pop()
			//將其餘佇列放入 P 的可執行G佇列
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false
		}
	}
	...
	// 開始竊取
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			// 如果 i>2 表示如果其他 P 執行佇列中沒有 G ,將要從其他佇列的 runnext 中獲取
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			// 隨機獲取一個 P
			p2 := allp[enum.position()]
			if _p_ == p2 {
				continue
			}
			// 從其他 P 的執行佇列中獲取一般的 G 到當前佇列中
			if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
				return gp, false
			}

			// 如果執行佇列中沒有 G,那麼從 timers 中獲取可執行的 timer
			if i > 2 || (i > 1 && shouldStealTimers(p2)) {
				// ran 為 true 表示有執行過 timer
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
				if ran {
					// 因為已經執行過 timer 了,說不定已經有準備就緒的 G 了
					// 再次檢查本地佇列嘗試獲取 G
					if gp, inheritTime := runqget(_p_); gp != nil {
						return gp, inheritTime
					}
					ranTimer = true
				}
			}
		}
	} 

	if ranTimer {
		// 執行完一個 timer 後可能存在已經就緒的 G
		goto top
	}

stop:  
	...
	delta := int64(-1)
	if pollUntil != 0 {
		// checkTimers ensures that polluntil > now.
		delta = pollUntil - now
	}
	...
	// poll network
	// 休眠前再次檢查 poll 網路
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
		...
		list := netpoll(delta) // 阻塞呼叫
		lock(&sched.lock)
		_p_ = pidleget()
		unlock(&sched.lock)
		if _p_ == nil {
			injectglist(&list)
		} else {
			acquirep(_p_)
			if !list.empty() {
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp, _Gwaiting, _Grunnable)
				if trace.enabled {
					traceGoUnpark(gp, 0)
				}
				return gp, false
			}
			 
			goto top
		}
	} else if pollUntil != 0 && netpollinited() {
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
			netpollBreak()
		}
	}
	// 休眠當前 M
	stopm()
	goto top
}

findrunnable 我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,這裡提取 timer 相關的程式碼分析一下:

  1. findrunnable 在竊取前先會呼叫 checkTimers 檢查 P 中可執行的 timer;
  2. 如果 netpoll 中有等待的 waiter,那麼會呼叫 netpoll 嘗試無阻塞的從netpoller獲取Glist;
  3. 如果獲取不到可執行的 G,那麼就會開始執行竊取。竊取的時候會呼叫 checkTimers 隨機從其他的 P 中獲取 timer;
  4. 竊取完畢後也沒有可執行的 timer,那麼會繼續往下,休眠前再次檢查 netpoll 網路,呼叫 netpoll(delta) 函數進行阻塞呼叫。

系統監控觸發

系統監控其實就是 Go 語言的守護行程,它們能夠在後臺監控系統的執行狀態,在出現意外情況時及時響應。它會每隔一段時間檢查 Go 語言執行時狀態,確保沒有異常發生。我們這裡不主要去講系統監控,只抽離出其中的和 timer 相關的程式碼進行講解。

runtime.sysmon

func sysmon() {
	...
	for {
		...
		now := nanotime()
		// 返回下次需要排程 timer 到期時間
		next, _ := timeSleepUntil()
		...  
		// 如果超過 10ms 沒有 poll,則 poll 一下網路
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0) // 非阻塞,返回 G 列表
			// G 列表不為空
			if !list.empty() { 
				incidlelocked(-1)
				// 將獲取到的 G 列表插入到空閒的 P 中或全域性列表中
				injectglist(&list)
				incidlelocked(1)
			}
		}
		// 如果有 timer 到期
		if next < now {
			// 啟動新的 M 處理 timer
			startm(nil, false)
		}
		...
	}
}
  1. sysmon 會通過 timeSleepUntil 遍歷所有的 P 的 timer 列表,找到下一個需要執行的 timer;
  2. 如果超過 10ms 沒有 poll,則 poll 一下網路;
  3. 如果有 timer 到期,這個時候直接啟動新的 M 處理 timer;

netpoll 的作用

我們從一開始呼叫 runtime.addtimer 新增 timer 的時候,就會 runtime.wakeNetPoller來中斷 netpoll ,那麼它是如何做到的?我們下面先來看一個官方的例子:

func TestNetpollBreak(t *testing.T) {
	if runtime.GOMAXPROCS(0) == 1 {
		t.Skip("skipping: GOMAXPROCS=1")
	}
	// 初始化 netpoll
	runtime.NetpollGenericInit()

	start := time.Now()
	c := make(chan bool, 2)
	go func() {
		c <- true
		// netpoll 等待時間
		runtime.Netpoll(10 * time.Second.Nanoseconds())
		c <- true
	}()
	<-c 
loop:
	for {
		runtime.Usleep(100)
		// 中斷netpoll 等待
		runtime.NetpollBreak()
		runtime.NetpollBreak()
		select {
		case <-c:
			break loop
		default:
		}
	}
	if dur := time.Since(start); dur > 5*time.Second {
		t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
	}
}

在上面這個例子中,首先會呼叫 runtime.Netpoll進行阻塞等待,然後迴圈排程 runtime.NetpollBreak進行中斷阻塞。

runtime.netpoll

func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
	var waitms int32
	// 因為傳入delay單位是納秒,下面將納秒轉換成毫秒
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else { 
		waitms = 1e9
	}
	var events [128]epollevent
retry:
	// 等待檔案描述符轉換成可讀或者可寫
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	// 返回負值,那麼重新呼叫epollwait進行等待
	if n < 0 {
		...
		goto retry
	}
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}
		// 如果是 NetpollBreak 中斷的,那麼執行 continue 跳過
		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
			if ev.events != _EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.events)
				throw("runtime: netpoll: break fd ready for something unexpected")
			}
			if delay != 0 { 
				var tmp [16]byte
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
				atomic.Store(&netpollWakeSig, 0)
			}
			continue
		} 
		...
	}
	return toRun
}

在 呼叫runtime.findrunnable執行搶佔時,最後會傳入一個時間,超時阻塞呼叫 netpoll,如果沒有事件中斷,那麼迴圈排程會一直等待直到 netpoll 超時後才往下進行:

func findrunnable() (gp *g, inheritTime bool) {
	...
	delta := int64(-1)
	if pollUntil != 0 {
		// checkTimers ensures that polluntil > now.
		delta = pollUntil - now
	}
	...
	// poll network
	// 休眠前再次檢查 poll 網路
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
		...
		// 阻塞呼叫
		list := netpoll(delta) 
		
	}  
	...
	// 休眠當前 M
	stopm()
	goto top
}

所以在呼叫 runtime.addtimer 新增 timer 的時候進行 netpoll 的中斷操作可以更加靈敏的響應 timer 這類時間敏感的任務。

總結

我們通過 timer 的 1.13版本以及1.14版本後的對比可以發現,即使是一個定時器 go 語言都做了相當多的優化工作。從原來的需要維護 64 個桶,然後每個桶裡面跑非同步任務,到現在的將 timer列表直接掛到了 P 上面,這不僅減少了上下文切換帶來的效能損耗,也減少了在鎖之間的爭搶問題,通過這些優化後有了可以媲美時間輪的效能表現。

Reference

go1.14基於netpoll優化timer定時器實現原理 http://xiaorui.cc/archives/6483

https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2

https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81

計時器 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/timer/

《Golang》Netpoll解析 https://www.pefish.club/2020/05/04/Golang/1011Netpoll解析/

time.Timer 原始碼分析 https://docs.google.com/presentation/d/1c2mRWA-FiihgpbGsE4uducou7X5d4WoiiLVab-ewsT8/edit


IT145.com E-mail:sddin#qq.com