// everything out of the run
// queue so it can run
// somewhere else.
- lock(&sched.lock)
- for {
- gp, _ := runqget(pp)
- if gp == nil {
- break
- }
- globrunqput(gp)
+ if drainQ, n := runqdrain(pp); n > 0 {
+ lock(&sched.lock)
+ globrunqputbatch(&drainQ, int32(n))
+ unlock(&sched.lock)
}
- unlock(&sched.lock)
}
// Go back to draining, this time
// without preemption.
}
}
+// runqdrain drains the local runnable queue of _p_ and returns all g's in it.
+// Executed only by the owner P.
+func runqdrain(_p_ *p) (drainQ gQueue, n uint32) {
+ var getNext bool
+ oldNext := _p_.runnext
+ if oldNext != 0 && _p_.runnext.cas(oldNext, 0) {
+ drainQ.pushBack(oldNext.ptr())
+ n++
+ getNext = true
+ }
+
+ for {
+ h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
+ t := _p_.runqtail
+ qn := t - h
+ if qn == 0 {
+ return
+ }
+ for i := uint32(0); i < qn; i++ {
+ gp := _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
+ drainQ.pushBack(gp)
+ }
+ if atomic.CasRel(&_p_.runqhead, h, h+qn) { // cas-release, commits consume
+ n += qn
+ return
+ }
+
+ // Clean up if it failed to drain _p_ in this round and start over until it succeed.
+ drainQ = gQueue{}
+ n = 0
+ // Push the prior old _p_.runnext back into drainQ.
+ if getNext {
+ drainQ.pushBack(oldNext.ptr())
+ n++
+ }
+ }
+}
+
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
+ //
+ // Note that while other P's may atomically CAS this to zero,
+ // only the owner P can CAS it to a valid G.
runnext guintptr
// Available G's (status == Gdead)