2021-05-12 14:32:11
Go中定時器實現原理及原始碼解析
轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
本文使用的go的原始碼15.7,需要注意的是由於timer是1.14版本進行改版,但是1.14和1.15版本的timer並無很大區別
我在春節期間寫了一篇文章有關時間輪的:https://www.luozhiyun.com/archives/444。後來有同學建議我去看看 1.14版本之後的 timer 優化。然後我就自己就時間輪和 timer 也做了個 benchmark:
goos: darwin
goarch: amd64
pkg: gin-test/api/main
BenchmarkTimingWheel_StartStop/N-1m-12 4582120 254 ns/op 85 B/op 2 allocs/op
BenchmarkTimingWheel_StartStop/N-5m-12 3356630 427 ns/op 46 B/op 1 allocs/op
BenchmarkTimingWheel_StartStop/N-10m-12 2474842 483 ns/op 60 B/op 1 allocs/op
BenchmarkStandardTimer_StartStop/N-1m-12 6777975 179 ns/op 84 B/op 1 allocs/op
BenchmarkStandardTimer_StartStop/N-5m-12 6431217 231 ns/op 85 B/op 1 allocs/op
BenchmarkStandardTimer_StartStop/N-10m-12 5374492 266 ns/op 83 B/op 1 allocs/op
PASS
ok gin-test/api/main 60.414s
從上面可以直接看出,在新增了一千萬個定時器後,時間輪的單次呼叫時間有明顯的上漲,但是 timer 卻依然很穩。
從官方的一個資料顯示:
name old time/op new time/op delta
AfterFunc-12 1.57ms ± 1% 0.07ms ± 1% -95.42% (p=0.000 n=10+8)
After-12 1.63ms ± 3% 0.11ms ± 1% -93.54% (p=0.000 n=9+10)
Stop-12 78.3µs ± 3% 73.6µs ± 3% -6.01% (p=0.000 n=9+10)
SimultaneousAfterFunc-12 138µs ± 1% 111µs ± 1% -19.57% (p=0.000 n=10+9)
StartStop-12 28.7µs ± 1% 31.5µs ± 5% +9.64% (p=0.000 n=10+7)
Reset-12 6.78µs ± 1% 4.24µs ± 7% -37.45% (p=0.000 n=9+10)
Sleep-12 183µs ± 1% 125µs ± 1% -31.67% (p=0.000 n=10+9)
Ticker-12 5.40ms ± 2% 0.03ms ± 1% -99.43% (p=0.000 n=10+10)
...
在很多項測試中,效能確實得到了很大的增強。下面也就一起看看效能暴漲的原因。
介紹
1.13 版本的 timer
Go 在1.14版本之前是使用 64 個最小堆,執行時建立的所有計時器都會加入到最小堆中,每個處理器(P)建立的計時器會由對應的最小堆維護。
下面是1.13版本 runtime.time
原始碼:
const timersLen = 64
var timers [timersLen]struct {
timersBucket
// padding, 防止false sharing
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}
// 獲取 P 對應的 Bucket
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}
type timersBucket struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
sleepUntil int64
waitnote note
// timer 列表
t []*timer
}
通過上面的 assignBucket 方法可以知道,如果當前機器上的處理器 P 的個數超過了 64,多個處理器上的計時器就可能儲存在同一個桶 timersBucket 中。
每個桶負責管理一堆這樣有序的 timer,同時每個桶都會有一個對應的 timerproc 非同步任務來負責不斷排程這些 timer。t
imerproc 會從 timersBucket 不斷取堆頂元素,如果堆頂的 timer 已到期則執行,沒有任務到期則 sleep,所有任務都消耗完了,那麼呼叫 gopark 掛起,直到有新的 timer 被新增到桶中時,才會被重新喚醒。
timerproc 在 sleep 的時候會呼叫 notetsleepg ,繼而引發entersyscallblock呼叫,該方法會主動呼叫 handoffp ,解綁 M 和 P。當下一個定時時間到來時,又會進行 M 和 P 的繫結,處理器 P 和執行緒 M 之間頻繁的上下文切換也是 timer 的首要效能影響因素之一。
1.14 版本後 timer 的變化
在Go 在1.14版本之後,移除了timersBucket,所有的計時器都以最小四叉堆的形式儲存 P 中。
type p struct {
...
// 互斥鎖
timersLock mutex
// 儲存計時器的最小四叉堆
timers []*timer
// 計時器數量
numTimers uint32
// 處於 timerModifiedEarlier 狀態的計時器數量
adjustTimers uint32
// 處於 timerDeleted 狀態的計時器數量
deletedTimers uint32
...
}
timer 不再使用 timerproc 非同步任務來排程,而是改用排程迴圈或系統監控排程的時候進行觸發執行,減少了執行緒之間上下文切換帶來的效能損失,並且通過使用 netpoll 阻塞喚醒機制可以讓 timer 更加及時的得到執行。
timer的使用
time.Timer
計時器必須通過time.NewTimer
、time.AfterFunc
或者 time.After
函數建立。
如下time.NewTimer
:
通過定時器的欄位C,我們可以及時得知定時器到期的這個事件來臨,C是一個chan time.Time型別的緩衝通道,一旦觸及到期時間,定時器就會向自己的C欄位傳送一個time.Time型別的元素值
func main() {
//初始化定時器
t := time.NewTimer(2 * time.Second)
//當前時間
now := time.Now()
fmt.Printf("Now time : %v.n", now)
expire := <- t.C
fmt.Printf("Expiration time: %v.n", expire)
}
time.After
一般是配合select來使用:
func main() {
ch1 := make(chan int, 1)
select {
case e1 := <-ch1:
//如果ch1通道成功讀取資料,則執行該case處理語句
fmt.Printf("1th case is selected. e1=%v",e1)
case <- time.After(2 * time.Second):
fmt.Println("Timed out")
}
}
time.Afterfunc
可以在設定時間過後執行一個函數:
func main() {
f := func(){
fmt.Printf("Expiration time : %v.n", time.Now())
}
time.AfterFunc(1*time.Second, f)
time.Sleep(2 * time.Second)
}
分析
初始化&Timer結構體
我們先看看NewTimer方法是如何建立一個Timer的:
type Timer struct {
C <-chan Time
r runtimeTimer
}
func NewTimer(d Duration) *Timer {
// 初始化一個channel,用於返回
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
// 呼叫runtime.time的startTimer方法
startTimer(&t.r)
return t
}
func startTimer(*runtimeTimer)
NewTimer方法主要是初始化一個Timer,然後呼叫startTimer方法,並返回Timer。startTimer方法的真正邏輯並不在time包裡面,我們可以使用到上一節提到的使用dlv偵錯組合程式碼:
sleep.go:94 0xd8ea09 e872c7faff call $time.startTimer
得知startTimer實際上呼叫的是runtime.time.startTimer
方法。也就是說time.Timer
只是對runtime包中timer的一層wrap。這層自身實現的最核心功能是將底層的超時回撥轉換為傳送channel訊息。
下面我們看看runtime.startTimer
:
func startTimer(t *timer) {
...
addtimer(t)
}
startTimer方法會將傳入的runtimeTimer轉為timer,然後呼叫addtimer方法。
在NewTimer方法中會初始化一個runtimeTimer結構體,這個結構體實際上會被當做runtime.time
中的timer結構體傳入到startTimer方法中,所以下面我們來看看timer:
type timer struct {
// 對應處理器P的指標
pp puintptr
// 定時器被喚醒的時間
when int64
// 喚醒的間隔時間
period int64
// 喚醒時被呼叫的函數
f func(interface{}, uintptr)
// 被呼叫的函數的引數
arg interface{}
seq uintptr
// 處於timerModifiedXX狀態時用於設定when欄位
nextwhen int64
// 定時器的狀態
status uint32
}
除此之外,timer還有一些標誌位來表示 status 狀態:
const (
// 初始化狀態
timerNoStatus = iota
// 等待被呼叫
// timer 已在 P 的列表中
timerWaiting
// 表示 timer 在執行中
timerRunning
// timer 已被刪除
timerDeleted
// timer 正在被移除
timerRemoving
// timer 已被移除,並停止執行
timerRemoved
// timer 被修改了
timerModifying
// 被修改到了更早的時間
timerModifiedEarlier
// 被修改到了更晚的時間
timerModifiedLater
// 已經被修改,並且正在被移動
timerMoving
)
addtimer 新增 timer
runtime.addtimer
func addtimer(t *timer) {
// 定時器被喚醒的時間的時間不能為負數
if t.when < 0 {
t.when = maxWhen
}
// 狀態必須為初始化
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
// 設定為等待排程
t.status = timerWaiting
when := t.when
// 獲取當前 P
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 清理 P 的 timer 列表頭中的 timer
cleantimers(pp)
// 將 timer 加入到 P 的最小堆中
doaddtimer(pp, t)
unlock(&pp.timersLock)
// 喚醒 netpoller 中休眠的執行緒
wakeNetPoller(when)
}
- addtimer 會對 timer 被喚醒的時間 when 進行校驗,以及校驗 status 必須是新出初始化的 timer;
- 接著會在加鎖後呼叫 cleantimers 對 P 中對應的 timer 列表的頭節點進行清理工作,清理完後呼叫 doaddtimer 將 timer 加入到 P 的最小堆中,並釋放鎖;
- 呼叫 wakeNetPoller 喚醒 netpoller 中休眠的執行緒。
下面分別來看看 addtimer 中幾個重要函數的具體實現:
runtime.cleantimers
func cleantimers(pp *p) {
gp := getg()
for {
// 排程器列表為空,直接返回
if len(pp.timers) == 0 {
return
}
// 如果當前 G 被搶佔了,直接返回
if gp.preemptStop {
return
}
// 獲取第一個 timer
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("cleantimers: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// 設定 timer 的狀態
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
// 刪除第一個 timer
dodeltimer0(pp)
// 刪除完畢後重置狀態為 timerRemoved
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
// timer 被修改到了更早或更晚的時間
case timerModifiedEarlier, timerModifiedLater:
// 將 timer 狀態設定為 timerMoving
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
// 重新設定 when 欄位
t.when = t.nextwhen
// 在列表中刪除後重新加入
dodeltimer0(pp)
doaddtimer(pp, t)
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
// 設定狀態為 timerWaiting
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
return
}
}
}
cleantimers 函數中使用了一個無限迴圈來獲取頭節點。如果頭節點的狀態是 timerDeleted ,那麼需要從 timer 列表中刪除;如果頭節點的狀態是 timerModifiedEarlier 或 timerModifiedLater ,表示頭節點的觸發的時間被修改到了更早或更晚的時間,那麼就先從 timer佇列中刪除再重新新增。
runtime.doaddtimer
func doaddtimer(pp *p, t *timer) {
// Timers 依賴於 netpoller
// 所以如果 netpoller 沒有啟動,需要啟動一下
if netpollInited == 0 {
netpollGenericInit()
}
// 校驗是否早已在 timer 列表中
if t.pp != 0 {
throw("doaddtimer: P already set in timer")
}
// 設定 timer 與 P 的關聯
t.pp.set(pp)
i := len(pp.timers)
// 將 timer 加入到 P 的 timer 列表中
pp.timers = append(pp.timers, t)
// 維護 timer 在 最小堆中的位置
siftupTimer(pp.timers, i)
// 如果 timer 是列表中頭節點,需要設定一下 timer0When
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1)
}
doaddtimer 函數實際上很簡單,主要是將 timer 與 P 設定關聯關係,並將 timer 加入到 P 的 timer 列表中,並維護 timer 列表最小堆的順序。
runtime.wakeNetPoller
func wakeNetPoller(when int64) {
if atomic.Load64(&sched.lastpoll) == 0 {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
// 如果計時器的觸發時間小於netpoller的下一次輪詢時間
if pollerPollUntil == 0 || pollerPollUntil > when {
// 向netpollBreakWr裡面寫入資料,立即中斷netpoll
netpollBreak()
}
}
}
func netpollBreak() {
if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
// 向 netpollBreakWr 裡面寫入資料
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}
wakeNetPoller 主要是將 timer 下次排程的時間和 netpoller 的下一次輪詢時間相比,如果小於的話,呼叫 netpollBreak 向 netpollBreakWr 裡面寫入資料,立即中斷netpoll。具體如何中斷的,我們下面再聊。
stopTimer 終止 timer
終止 timer 的邏輯主要是 timer 的狀態的變更:
如果該timer處於 timerWaiting 或 timerModifiedLater 或 timerModifiedEarlier:
- timerModifying -> timerDeleted
如果該timer處於 其他狀態:
- 待狀態改變或者直接返回
所以在終止 timer 的過程中不會去刪除 timer,而是標記一個狀態,等待被刪除。
modTimer 修改 timer
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when < 0 {
when = maxWhen
}
status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
// 修改 timer 狀態
switch status = atomic.Load(&t.status); status {
...
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
// 如果 timer 已被刪除,那麼需要重新新增到 timer 列表中
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
// 如果修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier
if when < t.when {
newStatus = timerModifiedEarlier
}
...
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)
// 如果修改時間提前,那麼觸發 netpoll 中斷
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
return pending
}
modtimer 進入到 for 迴圈後會根據不同的狀態做狀態設定以及必要欄位的處理;如果是 timer 已被刪除,那麼需要重新新增到 timer 列表中;如果 timer 修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier,修改時間提前,還需要觸發 netpoll 中斷。
timer 的執行
聊完了如何新增 timer,下面我們來看看 timer 是如何執行的。timer 的執行是交給 runtime.runtimer
函數執行的,這個函數會檢查 P 上最小堆的最頂上的 timer 的狀態,根據狀態做不同的處理。
func runtimer(pp *p, now int64) int64 {
for {
// 獲取最小堆的第一個元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
// 獲取 timer 狀態
switch s := atomic.Load(&t.status); s {
// timerWaiting
case timerWaiting:
// 還沒到時間,返回下次執行時間
if t.when > now {
// Not ready to run.
return t.when
}
// 修改狀態為 timerRunning
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// 執行該 timer
runOneTimer(pp, t, now)
return 0
// timerDeleted
case timerDeleted:
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
// 刪除最小堆的第一個 timer
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
if len(pp.timers) == 0 {
return -1
}
// 需要重新移動位置的 timer
case timerModifiedEarlier, timerModifiedLater:
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
t.when = t.nextwhen
// 刪除最小堆的第一個 timer
dodeltimer0(pp)
// 將該 timer 重新新增到最小堆
doaddtimer(pp, t)
if s == timerModifiedEarlier {
atomic.Xadd(&pp.adjustTimers, -1)
}
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}
runtimer 裡面會啟動一個 for 迴圈,不停的檢查 P 的 timer 列表的第一個元素的狀態。
- 如果該 timer 處於 timerWaiting,那麼判斷當前的時間大於 timer 執行的時間,則呼叫 runOneTimer 執行;
- 如果該 timer 處於 timerDeleted,表示該 timer 是需要被刪除的,那麼呼叫 dodeltimer0 刪除最小堆的第一個 timer ,並修改其狀態;
- 如果該 timer 狀態是 timerModifiedEarlier 、timerModifiedLater,那麼表示該 timer 的執行時間被修改過,需要重新調整它在最小堆中的位置,所以先呼叫 dodeltimer0 刪除該 timer,再呼叫 doaddtimer 將該 timer 重新新增到最小堆。
runtime.runOneTimer
func runOneTimer(pp *p, t *timer, now int64) {
...
// 需要被執行的函數
f := t.f
// 被執行函數的引數
arg := t.arg
seq := t.seq
// 表示該 timer 為 ticker,需要再次觸發
if t.period > 0 {
// 放入堆中並調整觸發時間
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
// 一次性 timer
} else {
// 刪除該 timer.
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
unlock(&pp.timersLock)
// 執行該函數
f(arg, seq)
lock(&pp.timersLock)
...
}
runOneTimer 會根據 period 是否大於0判斷該 timer 是否需要反覆執行,如果是的話需要重新調整 when 下次執行時間後重新調整該 timer 在堆中的位置。一次性 timer 的話會執行 dodeltimer0 刪除該 timer ,最後執行 timer 中的函數;
timer 的觸發
下面這裡是我覺得比較有意思的地方,timer 的觸發有兩種:
-
從排程迴圈中直接觸發;
-
另一種是Go語言的後臺系統監控中會定時觸發;
排程迴圈觸發
排程迴圈,我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,不明白的同學可以自己再去看看。
整個排程迴圈會有三個地方去檢查是否有可執行的 timer:
- 呼叫
runtime.schedule
執行排程時; - 呼叫
runtime.findrunnable
獲取可執行函數時; - 呼叫
runtime.findrunnable
執行搶佔時;
runtime.schedule
func schedule() {
_g_ := getg()
...
top:
pp := _g_.m.p.ptr()
...
// 檢查是否有可執行 timer 並執行
checkTimers(pp, 0)
var gp *g
...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
...
execute(gp, inheritTime)
}
下面我們看看 checkTimers 做了什麼:
runtime.checkTimers
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// 如果沒有需要調整的 timer
if atomic.Load(&pp.adjustTimers) == 0 {
// 獲取 timer0 的執行時間
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
// 下次執行大於當前時間,
if now < next {
// 需要刪除的 timer 個數小於 timer列表個數的4分之1,直接返回
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
}
lock(&pp.timersLock)
// 進行調整 timer
adjusttimers(pp)
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
// 查詢堆中是否存在需要執行的 timer
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// 如果需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼清理需要刪除的 timer
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
checkTimers 中主要做了這麼幾件事:
- 檢查是否有需要進行調整的 timer, 如果沒有需要執行的計時器時,直接返回;如果下一個要執行的 timer 沒有到期並且需要刪除的計時器較少(四分之一)時也會直接返回;
- 呼叫 adjusttimers 進行 timer 列表的調整,主要是維護 timer 列表的最小堆的順序;
- 呼叫
runtime.runtimer
查詢堆中是否存在需要執行的timer,runtime.runtimer
上面已經講過了,這裡不再贅述; - 如果當前 Goroutine 的 P 和傳入的 P 相同,並且需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼呼叫 clearDeletedTimers 清理需要刪除的 timer;
runtime.findrunnable
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
// 檢查 P 中可執行的 timer
now, pollUntil, _ := checkTimers(_p_, 0)
...
// 如果 netpoll 已被初始化,並且 Waiters 大於零,並且 lastpoll 不為0
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 嘗試從netpoller獲取Glist
if list := netpoll(0); !list.empty() { // 無阻塞
gp := list.pop()
//將其餘佇列放入 P 的可執行G佇列
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
...
// 開始竊取
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
// 如果 i>2 表示如果其他 P 執行佇列中沒有 G ,將要從其他佇列的 runnext 中獲取
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 隨機獲取一個 P
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 從其他 P 的執行佇列中獲取一般的 G 到當前佇列中
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
// 如果執行佇列中沒有 G,那麼從 timers 中獲取可執行的 timer
if i > 2 || (i > 1 && shouldStealTimers(p2)) {
// ran 為 true 表示有執行過 timer
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// 因為已經執行過 timer 了,說不定已經有準備就緒的 G 了
// 再次檢查本地佇列嘗試獲取 G
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
}
}
if ranTimer {
// 執行完一個 timer 後可能存在已經就緒的 G
goto top
}
stop:
...
delta := int64(-1)
if pollUntil != 0 {
// checkTimers ensures that polluntil > now.
delta = pollUntil - now
}
...
// poll network
// 休眠前再次檢查 poll 網路
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
...
list := netpoll(delta) // 阻塞呼叫
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ == nil {
injectglist(&list)
} else {
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
}
// 休眠當前 M
stopm()
goto top
}
findrunnable 我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,這裡提取 timer 相關的程式碼分析一下:
- findrunnable 在竊取前先會呼叫 checkTimers 檢查 P 中可執行的 timer;
- 如果 netpoll 中有等待的 waiter,那麼會呼叫 netpoll 嘗試無阻塞的從netpoller獲取Glist;
- 如果獲取不到可執行的 G,那麼就會開始執行竊取。竊取的時候會呼叫 checkTimers 隨機從其他的 P 中獲取 timer;
- 竊取完畢後也沒有可執行的 timer,那麼會繼續往下,休眠前再次檢查 netpoll 網路,呼叫 netpoll(delta) 函數進行阻塞呼叫。
系統監控觸發
系統監控其實就是 Go 語言的守護行程,它們能夠在後臺監控系統的執行狀態,在出現意外情況時及時響應。它會每隔一段時間檢查 Go 語言執行時狀態,確保沒有異常發生。我們這裡不主要去講系統監控,只抽離出其中的和 timer 相關的程式碼進行講解。
runtime.sysmon
func sysmon() {
...
for {
...
now := nanotime()
// 返回下次需要排程 timer 到期時間
next, _ := timeSleepUntil()
...
// 如果超過 10ms 沒有 poll,則 poll 一下網路
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // 非阻塞,返回 G 列表
// G 列表不為空
if !list.empty() {
incidlelocked(-1)
// 將獲取到的 G 列表插入到空閒的 P 中或全域性列表中
injectglist(&list)
incidlelocked(1)
}
}
// 如果有 timer 到期
if next < now {
// 啟動新的 M 處理 timer
startm(nil, false)
}
...
}
}
- sysmon 會通過 timeSleepUntil 遍歷所有的 P 的 timer 列表,找到下一個需要執行的 timer;
- 如果超過 10ms 沒有 poll,則 poll 一下網路;
- 如果有 timer 到期,這個時候直接啟動新的 M 處理 timer;
netpoll 的作用
我們從一開始呼叫 runtime.addtimer
新增 timer 的時候,就會 runtime.wakeNetPoller
來中斷 netpoll ,那麼它是如何做到的?我們下面先來看一個官方的例子:
func TestNetpollBreak(t *testing.T) {
if runtime.GOMAXPROCS(0) == 1 {
t.Skip("skipping: GOMAXPROCS=1")
}
// 初始化 netpoll
runtime.NetpollGenericInit()
start := time.Now()
c := make(chan bool, 2)
go func() {
c <- true
// netpoll 等待時間
runtime.Netpoll(10 * time.Second.Nanoseconds())
c <- true
}()
<-c
loop:
for {
runtime.Usleep(100)
// 中斷netpoll 等待
runtime.NetpollBreak()
runtime.NetpollBreak()
select {
case <-c:
break loop
default:
}
}
if dur := time.Since(start); dur > 5*time.Second {
t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
}
}
在上面這個例子中,首先會呼叫 runtime.Netpoll
進行阻塞等待,然後迴圈排程 runtime.NetpollBreak
進行中斷阻塞。
runtime.netpoll
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
// 因為傳入delay單位是納秒,下面將納秒轉換成毫秒
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
// 等待檔案描述符轉換成可讀或者可寫
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// 返回負值,那麼重新呼叫epollwait進行等待
if n < 0 {
...
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// 如果是 NetpollBreak 中斷的,那麼執行 continue 跳過
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
...
}
return toRun
}
在 呼叫runtime.findrunnable
執行搶佔時,最後會傳入一個時間,超時阻塞呼叫 netpoll,如果沒有事件中斷,那麼迴圈排程會一直等待直到 netpoll 超時後才往下進行:
func findrunnable() (gp *g, inheritTime bool) {
...
delta := int64(-1)
if pollUntil != 0 {
// checkTimers ensures that polluntil > now.
delta = pollUntil - now
}
...
// poll network
// 休眠前再次檢查 poll 網路
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
...
// 阻塞呼叫
list := netpoll(delta)
}
...
// 休眠當前 M
stopm()
goto top
}
所以在呼叫 runtime.addtimer
新增 timer 的時候進行 netpoll 的中斷操作可以更加靈敏的響應 timer 這類時間敏感的任務。
總結
我們通過 timer 的 1.13版本以及1.14版本後的對比可以發現,即使是一個定時器 go 語言都做了相當多的優化工作。從原來的需要維護 64 個桶,然後每個桶裡面跑非同步任務,到現在的將 timer列表直接掛到了 P 上面,這不僅減少了上下文切換帶來的效能損耗,也減少了在鎖之間的爭搶問題,通過這些優化後有了可以媲美時間輪的效能表現。
Reference
go1.14基於netpoll優化timer定時器實現原理 http://xiaorui.cc/archives/6483
https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2
https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81
計時器 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/timer/
《Golang》Netpoll解析 https://www.pefish.club/2020/05/04/Golang/1011Netpoll解析/
time.Timer 原始碼分析 https://docs.google.com/presentation/d/1c2mRWA-FiihgpbGsE4uducou7X5d4WoiiLVab-ewsT8/edit
相關文章