//
// Design doc at https://golang.org/s/go11sched.
+// Worker thread parking/unparking.
+// We need to balance between keeping enough running worker threads to utilize
+// available hardware parallelism and parking excessive running worker threads
+// to conserve CPU resources and power. This is not simple for two reasons:
+// (1) scheduler state is intentionally distributed (in particular, per-P work
+// queues), so it is not possible to compute global predicates on fast paths;
+// (2) for optimal thread management we would need to know the future (don't park
+// a worker thread when a new goroutine will be readied in near future).
+//
+// Three rejected approaches that would work badly:
+// 1. Centralize all scheduler state (would inhibit scalability).
+// 2. Direct goroutine handoff. That is, when we ready a new goroutine and there
+// is a spare P, unpark a thread and handoff it the thread and the goroutine.
+// This would lead to thread state thrashing, as the thread that readied the
+// goroutine can be out of work the very next moment, we will need to park it.
+// Also, it would destroy locality of computation as we want to preserve
+// dependent goroutines on the same thread; and introduce additional latency.
+// 3. Unpark an additional thread whenever we ready a goroutine and there is an
+// idle P, but don't do handoff. This would lead to excessive thread parking/
+// unparking as the additional threads will instantly park without discovering
+// any work to do.
+//
+// The current approach:
+// We unpark an additional thread when we ready a goroutine if (1) there is an
+// idle P and there are no "spinning" worker threads. A worker thread is considered
+// spinning if it is out of local work and did not find work in global run queue/
+// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning.
+// Threads unparked this way are also considered spinning; we don't do goroutine
+// handoff so such threads are out of work initially. Spinning threads do some
+// spinning looking for work in per-P run queues before parking. If a spinning
+// thread finds work it takes itself out of the spinning state and proceeds to
+// execution. If it does not find work it takes itself out of the spinning state
+// and then parks.
+// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark
+// new threads when readying goroutines. To compensate for that, if the last spinning
+// thread finds work and stops spinning, it must unpark a new spinning thread.
+// This approach smooths out unjustified spikes of thread unparking,
+// but at the same time guarantees eventual maximal CPU parallelism utilization.
+//
+// The main implementation complication is that we need to be very careful during
+// spinning->non-spinning thread transition. This transition can race with submission
+// of a new goroutine, and either one part or another needs to unpark another worker
+// thread. If they both fail to do that, we can end up with semi-persistent CPU
+// underutilization. The general pattern for goroutine readying is: submit a goroutine
+// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning.
+// The general pattern for spinning->non-spinning transition is: decrement nmspinning,
+// #StoreLoad-style memory barrier, check all per-P work queues for new work.
+// Note that all this complexity does not apply to global run queue as we are not
+// sloppy about thread unparking when submitting to global queue. Also see comments
+// for nmspinning manipulation.
+
var (
m0 m
g0 g
throw("stopm holding p")
}
if _g_.m.spinning {
- _g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ throw("stopm spinning")
}
retry:
}
func mspinning() {
- gp := getg()
- if !runqempty(gp.m.nextp.ptr()) {
- // Something (presumably the GC) was readied while the
- // runtime was starting up this M, so the M is no
- // longer spinning.
- if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
- throw("mspinning: nmspinning underflowed")
- }
- } else {
- gp.m.spinning = true
- }
+ // startm's caller incremented nmspinning. Set the new M's spinning.
+ getg().m.spinning = true
}
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
+// If spinning is set, the caller has incremented nmspinning and startm will
+// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrier
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
unlock(&sched.lock)
if spinning {
- atomic.Xadd(&sched.nmspinning, -1)
+ // The caller incremented nmspinning, but there are no idle Ps,
+ // so it's okay to just undo the increment and give up.
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("startm: negative nmspinning")
+ }
}
return
}
if mp == nil {
var fn func()
if spinning {
+ // The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
+ // The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
}
if _g_.m.spinning {
_g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ // OK to just drop nmspinning here,
+ // startTheWorld will unpark threads as necessary.
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("gcstopm: negative nmspinning")
+ }
}
_p_ := releasep()
lock(&sched.lock)
_p_ := releasep()
pidleput(_p_)
unlock(&sched.lock)
+
+ // Delicate dance: thread transitions from spinning to non-spinning state,
+ // potentially concurrently with submission of new goroutines. We must
+ // drop nmspinning first and then check all per-P queues again (with
+ // #StoreLoad memory barrier in between). If we do it the other way around,
+ // another thread can submit a goroutine after we've checked all run queues
+ // but before we drop nmspinning; as the result nobody will unpark a thread
+ // to run the goroutine.
+ // If we discover new work below, we need to restore m.spinning as a signal
+ // for resetspinning to unpark a new worker thread (because there can be more
+ // than one starving goroutine). However, if after discovering new work
+ // we also observe no idle Ps, it is OK to just park the current thread:
+ // the system is fully loaded so no spinning threads are required.
+ // Also see "Worker thread parking/unparking" comment at the top of the file.
+ wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("findrunnable: negative nmspinning")
+ }
}
// check all runqueues once again
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
+ if wasSpinning {
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
+ }
goto top
}
break
func resetspinning() {
_g_ := getg()
-
- var nmspinning uint32
- if _g_.m.spinning {
- _g_.m.spinning = false
- nmspinning = atomic.Xadd(&sched.nmspinning, -1)
- if int32(nmspinning) < 0 {
- throw("findrunnable: negative nmspinning")
- }
- } else {
- nmspinning = atomic.Load(&sched.nmspinning)
+ if !_g_.m.spinning {
+ throw("resetspinning: not a spinning m")
}
-
- // M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
- // so see if we need to wakeup another P here.
+ _g_.m.spinning = false
+ nmspinning := atomic.Xadd(&sched.nmspinning, -1)
+ if int32(nmspinning) < 0 {
+ throw("findrunnable: negative nmspinning")
+ }
+ // M wakeup policy is deliberately somewhat conservative, so check if we
+ // need to wakeup another P here. See "Worker thread parking/unparking"
+ // comment at the top of the file for details.
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
- resetspinning()
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
- if gp != nil {
- resetspinning()
- }
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
- if gp != nil {
- resetspinning()
- }
}
}
if gp == nil {
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
+ }
+
+ // This thread is going to run a goroutine and is not spinning anymore,
+ // so if it was marked as spinning we need to reset it now and potentially
+ // start a new spinning M.
+ if _g_.m.spinning {
resetspinning()
}
import (
"math"
+ "net"
"runtime"
"runtime/debug"
"sync"
}
}
+// Test that all runnable goroutines are scheduled at the same time.
+func TestGoroutineParallelism2(t *testing.T) {
+ //testGoroutineParallelism2(t, false, false)
+ testGoroutineParallelism2(t, true, false)
+ testGoroutineParallelism2(t, false, true)
+ testGoroutineParallelism2(t, true, true)
+}
+
+func testGoroutineParallelism2(t *testing.T, load, netpoll bool) {
+ if runtime.NumCPU() == 1 {
+ // Takes too long, too easy to deadlock, etc.
+ t.Skip("skipping on uniprocessor")
+ }
+ P := 4
+ N := 10
+ if testing.Short() {
+ N = 3
+ }
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
+ // If runtime triggers a forced GC during this test then it will deadlock,
+ // since the goroutines can't be stopped/preempted.
+ // Disable GC for this test (see issue #10958).
+ defer debug.SetGCPercent(debug.SetGCPercent(-1))
+ for try := 0; try < N; try++ {
+ if load {
+ // Create P goroutines and wait until they all run.
+ // When we run the actual test below, worker threads
+ // running the goroutines will start parking.
+ done := make(chan bool)
+ x := uint32(0)
+ for p := 0; p < P; p++ {
+ go func() {
+ if atomic.AddUint32(&x, 1) == uint32(P) {
+ done <- true
+ return
+ }
+ for atomic.LoadUint32(&x) != uint32(P) {
+ }
+ }()
+ }
+ <-done
+ }
+ if netpoll {
+ // Enable netpoller, affects schedler behavior.
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ defer ln.Close() // yup, defer in a loop
+ }
+ }
+ done := make(chan bool)
+ x := uint32(0)
+ // Spawn P goroutines in a nested fashion just to differ from TestGoroutineParallelism.
+ for p := 0; p < P/2; p++ {
+ go func(p int) {
+ for p2 := 0; p2 < 2; p2++ {
+ go func(p2 int) {
+ for i := 0; i < 3; i++ {
+ expected := uint32(P*i + p*2 + p2)
+ for atomic.LoadUint32(&x) != expected {
+ }
+ atomic.StoreUint32(&x, expected+1)
+ }
+ done <- true
+ }(p2)
+ }
+ }(p)
+ }
+ for p := 0; p < P; p++ {
+ <-done
+ }
+ }
+}
+
func TestBlockLocked(t *testing.T) {
const N = 10
c := make(chan bool)