// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
- runqput(_g_.m.p.ptr(), gp)
+ runqput(_g_.m.p.ptr(), gp, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic
wakep()
}
// Preempt the current g
casgstatus(_g_, _Grunning, _Grunnable)
- runqput(_g_.m.p.ptr(), _g_)
+ runqput(_g_.m.p.ptr(), _g_, false)
dropg()
// Ready gp and switch to it
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp)
+ execute(gp, false)
})
}
}
// Schedules gp to run on the current M.
+// If inheritTime is true, gp inherits the remaining time in the
+// current time slice. Otherwise, it starts a new time slice.
// Never returns.
-func execute(gp *g) {
+func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
- _g_.m.p.ptr().schedtick++
+ if !inheritTime {
+ _g_.m.p.ptr().schedtick++
+ }
_g_.m.curg = gp
gp.m = _g_.m
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
-func findrunnable() *g {
+func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
}
// local runq
- if gp := runqget(_g_.m.p.ptr()); gp != nil {
- return gp
+ if gp, inheritTime := runqget(_g_.m.p.ptr()); gp != nil {
+ return gp, inheritTime
}
// global runq
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
if gp != nil {
- return gp
+ return gp, false
}
}
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
}
_p_ := allp[fastrand1()%uint32(gomaxprocs)]
var gp *g
if _p_ == _g_.m.p.ptr() {
- gp = runqget(_p_)
+ gp, _ = runqget(_p_)
} else {
gp = runqsteal(_g_.m.p.ptr(), _p_)
}
if gp != nil {
- return gp
+ return gp, false
}
}
stop:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
// return P and block
if sched.runqsize != 0 {
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
- return gp
+ return gp, false
}
_p_ := releasep()
pidleput(_p_)
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
injectglist(gp)
}
if _g_.m.lockedg != nil {
stoplockedm()
- execute(_g_.m.lockedg) // Never returns.
+ execute(_g_.m.lockedg, false) // Never returns.
}
top:
}
var gp *g
+ var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
}
}
if gp == nil {
- gp = runqget(_g_.m.p.ptr())
+ gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
- gp = findrunnable() // blocks until work is available
+ gp, inheritTime = findrunnable() // blocks until work is available
resetspinning()
}
goto top
}
- execute(gp)
+ execute(gp, inheritTime)
}
// dropg removes the association between m and the current goroutine m->curg (gp for short).
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp) // Schedule it back, never returns.
+ execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again.
stoplockedm()
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
- runqput(_p_, newg)
+ runqput(_p_, newg, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic
wakep()
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))]
// push onto head of global queue
- gp.schedlink = sched.runqhead
- sched.runqhead.set(gp)
- if sched.runqtail == 0 {
- sched.runqtail.set(gp)
- }
- sched.runqsize++
+ globrunqputhead(gp)
+ }
+ if p.runnext != 0 {
+ globrunqputhead(p.runnext.ptr())
+ p.runnext = 0
}
// if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up
sched.runqsize++
}
+// Put gp at the head of the global runnable queue.
+// Sched must be locked.
+// May run during STW, so write barriers are not allowed.
+//go:nowritebarrier
+func globrunqputhead(gp *g) {
+ gp.schedlink = sched.runqhead
+ sched.runqhead.set(gp)
+ if sched.runqtail == 0 {
+ sched.runqtail.set(gp)
+ }
+ sched.runqsize++
+}
+
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) {
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
- runqput(_p_, gp1)
+ runqput(_p_, gp1, false)
}
return gp
}
// runqempty returns true if _p_ has no Gs on its local run queue.
// Note that this test is generally racy.
func runqempty(_p_ *p) bool {
- return _p_.runqhead == _p_.runqtail
+ return _p_.runqhead == _p_.runqtail && _p_.runnext == 0
}
-// Try to put g on local runnable queue.
-// If it's full, put onto global queue.
+// runqput tries to put g on the local runnable queue.
+// If next if false, runqput adds g to the tail of the runnable queue.
+// If next is true, runqput puts g in the _p_.runnext slot.
+// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
-func runqput(_p_ *p, gp *g) {
+func runqput(_p_ *p, gp *g, next bool) {
+ if next {
+ retryNext:
+ oldnext := _p_.runnext
+ if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
+ goto retryNext
+ }
+ if oldnext == 0 {
+ return
+ }
+ // Kick the old runnext out to the regular run queue.
+ gp = oldnext.ptr()
+ }
+
retry:
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
}
// Get g from local runnable queue.
+// If inheritTime is true, gp should inherit the remaining time in the
+// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
-func runqget(_p_ *p) *g {
+func runqget(_p_ *p) (gp *g, inheritTime bool) {
+ // If there's a runnext, it's the next G to run.
+ for {
+ next := _p_.runnext
+ if next == 0 {
+ break
+ }
+ if _p_.runnext.cas(next, 0) {
+ return next.ptr(), true
+ }
+ }
+
for {
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
- return nil
+ return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))]
if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
- return gp
+ return gp, false
}
}
}
n := t - h
n = n - n/2
if n == 0 {
+ // Try to steal from _p_.runnext.
+ if next := _p_.runnext; next != 0 {
+ if !_p_.runnext.cas(next, 0) {
+ continue
+ }
+ batch[0] = next.ptr()
+ return 1
+ }
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
_p_ := new(p)
gs := make([]g, len(_p_.runq))
for i := 0; i < len(_p_.runq); i++ {
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty initially")
}
for j := 0; j < i; j++ {
- runqput(_p_, &gs[i])
+ runqput(_p_, &gs[i], false)
}
for j := 0; j < i; j++ {
- if runqget(_p_) != &gs[i] {
+ if g, _ := runqget(_p_); g != &gs[i] {
print("bad element at iter ", i, "/", j, "\n")
throw("bad element")
}
}
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty afterwards")
}
}
for i := 0; i < len(p1.runq); i++ {
for j := 0; j < i; j++ {
gs[j].sig = 0
- runqput(p1, &gs[j])
+ runqput(p1, &gs[j], false)
}
gp := runqsteal(p2, p1)
s := 0
gp.sig++
}
for {
- gp = runqget(p2)
+ gp, _ = runqget(p2)
if gp == nil {
break
}
gp.sig++
}
for {
- gp = runqget(p1)
+ gp, _ = runqget(p1)
if gp == nil {
break
}
}
`
+func TestPingPongHog(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping in -short mode")
+ }
+
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
+ done := make(chan bool)
+ hogChan, lightChan := make(chan bool), make(chan bool)
+ hogCount, lightCount := 0, 0
+
+ run := func(limit int, counter *int, wake chan bool) {
+ for {
+ select {
+ case <-done:
+ return
+
+ case <-wake:
+ for i := 0; i < limit; i++ {
+ *counter++
+ }
+ wake <- true
+ }
+ }
+ }
+
+ // Start two co-scheduled hog goroutines.
+ for i := 0; i < 2; i++ {
+ go run(1e6, &hogCount, hogChan)
+ }
+
+ // Start two co-scheduled light goroutines.
+ for i := 0; i < 2; i++ {
+ go run(1e3, &lightCount, lightChan)
+ }
+
+ // Start goroutine pairs and wait for a few preemption rounds.
+ hogChan <- true
+ lightChan <- true
+ time.Sleep(100 * time.Millisecond)
+ close(done)
+ <-hogChan
+ <-lightChan
+
+ // Check that hogCount and lightCount are within a factor of
+ // 2, which indicates that both pairs of goroutines handed off
+ // the P within a time-slice to their buddy.
+ if hogCount > lightCount*2 || lightCount > hogCount*2 {
+ t.Fatalf("want hogCount/lightCount in [0.5, 2]; got %d/%d = %g", hogCount, lightCount, float64(hogCount)/float64(lightCount))
+ }
+}
+
func BenchmarkPingPongHog(b *testing.B) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))