go-goroutine

goroutine调度

抢占式调度源码解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//proc.go
func main(){
//略、、、
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {// 基于系统栈启动守护协程sysmon,这个是go抢占式调度的上帝之手。
newm(sysmon, nil)
})
}
}

/*
*sysmon干活总结
*1:检查netpoll就绪的goroutine,将其状态由阻塞态->就绪态
*2:检查所有的goroutine的运行时间,超时10ns则直接开始抢占式调度
*/
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead() // 死锁检查
unlock(&sched.lock)

scavengelimit := int64(5 * 60 * 1e9) // gc之后五分钟内未使用的堆内存,会归还给操作系统

if debug.scavenge > 0 {
// Scavenge-a-lot for testing.
forcegcperiod = 10 * 1e6
scavengelimit = 20 * 1e6
}

lastscavenge := nanotime()
nscavenge := 0

lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {// 死循环,延迟检查
if idle == 0 { // 空闲wokeup-goroutine数量为零,就休眠20us, start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay) // sysmon守护协程休眠
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
//检查netpoll就绪的goroutine(阻塞->就绪),加入调度器列表开始调度运行
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
// 抢占式核心点,检查所有运行的goroutine,传入的参数是当前的检查时间,若超时(运行时间超过10ns),则直接强些进行调度
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// 检查是否需要强制GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
// scavenge heap once in a while
if lastscavenge+scavengelimit/2 < now {
mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
lastscavenge = now
nscavenge++
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}

// 检查所有的goroutine, 强制抢占(类时间片)
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
//range遍历可能会出现死锁
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
} else if s == _Prunning {
// 抢占运行时间过长的Goroutine
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
// 运行未超过10ns
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 如果调度起始时间+10ns小于sysmon检查的时间,说明该goroutine运行超过10ns,则直接抢占该goroutine,goroutine时间片的实现方式
preemptone(_p_)
}
}
unlock(&allpLock)
return uint32(n)
}
// 抢占核心函数。将其栈标志位stackPreempt。
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}

gp.preempt = true

gp.stackguard0 = stackPreempt // 所有goroutine中的每个函数调用,都需要进行对于的栈检查(go:nosplit注释可以跳过栈检查),抢占的式调度就是直接将当前的stack指针修改(堆栈检查会溢出),变相进行调度,这就是golang花式的"时间片"调度
return true
}
总结golang抢占式调度:

1:类时间片调度
2:守护进程会检查netpoll继续的goroutine,并修改其状态为阻塞态->就绪态
3:通过systemstack守护程序(sysmon)来对运行时间超过10ns的goroutine修改器栈标志,变相进行抢占式调度
// go编译器,会在编译时期,将所有的非内联函数调用嵌入一段检查栈标志的代码,
// 所以当用户态修改了该栈标志的时候,在goroutine运行的过程中,只要发生非内联函数的用,该goroutine就会被抢占调度