// everything out of the run
// queue so it can run
// somewhere else.
- if drainQ, n := runqdrain(pp); n > 0 {
- lock(&sched.lock)
- globrunqputbatch(&drainQ, int32(n))
- unlock(&sched.lock)
+ lock(&sched.lock)
+ for {
+ gp, _ := runqget(pp)
+ if gp == nil {
+ break
+ }
+ globrunqput(gp)
}
+ unlock(&sched.lock)
}
// Go back to draining, this time
// without preemption.
}
}
-// runqdrain drains the local runnable queue of _p_ and returns all g's in it.
-// Executed only by the owner P.
-func runqdrain(_p_ *p) (drainQ gQueue, n uint32) {
- var getNext bool
- oldNext := _p_.runnext
- if oldNext != 0 && _p_.runnext.cas(oldNext, 0) {
- drainQ.pushBack(oldNext.ptr())
- n++
- getNext = true
- }
-
- for {
- h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
- t := _p_.runqtail
- qn := t - h
- if qn == 0 {
- return
- }
- for i := uint32(0); i < qn; i++ {
- gp := _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
- drainQ.pushBack(gp)
- }
- if atomic.CasRel(&_p_.runqhead, h, h+qn) { // cas-release, commits consume
- n += qn
- return
- }
-
- // Clean up if it failed to drain _p_ in this round and start over until it succeed.
- drainQ = gQueue{}
- n = 0
- // Push the prior old _p_.runnext back into drainQ.
- if getNext {
- drainQ.pushBack(oldNext.ptr())
- n++
- }
- }
-}
-
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
- //
- // Note that while other P's may atomically CAS this to zero,
- // only the owner P can CAS it to a valid G.
runnext guintptr
// Available G's (status == Gdead)