]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: remove unnecessary wakeups of worker threads
authorDmitry Vyukov <dvyukov@google.com>
Tue, 8 Dec 2015 14:11:27 +0000 (15:11 +0100)
committerDmitry Vyukov <dvyukov@google.com>
Fri, 11 Dec 2015 11:31:12 +0000 (11:31 +0000)
Currently we wake up new worker threads whenever we pass
through the scheduler with nmspinning==0. This leads to
lots of unnecessary thread wake ups.
Instead let only spinning threads wake up new spinning threads.

For the following program:

package main
import "runtime"
func main() {
for i := 0; i < 1e7; i++ {
runtime.Gosched()
}
}

Before:
$ time ./test
real 0m4.278s
user 0m7.634s
sys 0m1.423s

$ strace -c ./test
% time     seconds  usecs/call     calls    errors syscall
 99.93    9.314936           3   2685009     17536 futex

After:
$ time ./test
real 0m1.200s
user 0m1.181s
sys 0m0.024s

$ strace -c ./test
% time     seconds  usecs/call     calls    errors syscall
  3.11    0.000049          25         2           futex

Fixes #13527

Change-Id: Ia1f5bf8a896dcc25d8b04beb1f4317aa9ff16f74
Reviewed-on: https://go-review.googlesource.com/17540
Reviewed-by: Austin Clements <austin@google.com>
Run-TryBot: Dmitry Vyukov <dvyukov@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/runtime/proc.go
src/runtime/proc_test.go
src/runtime/runtime2.go

index 9ef7bfb9541c8d4c400411eeaae3156696c5268f..c0df6f1d05f6c456b8493d24e1549f84880a1321 100644 (file)
@@ -22,6 +22,57 @@ import (
 //
 // Design doc at https://golang.org/s/go11sched.
 
+// Worker thread parking/unparking.
+// We need to balance between keeping enough running worker threads to utilize
+// available hardware parallelism and parking excessive running worker threads
+// to conserve CPU resources and power. This is not simple for two reasons:
+// (1) scheduler state is intentionally distributed (in particular, per-P work
+// queues), so it is not possible to compute global predicates on fast paths;
+// (2) for optimal thread management we would need to know the future (don't park
+// a worker thread when a new goroutine will be readied in near future).
+//
+// Three rejected approaches that would work badly:
+// 1. Centralize all scheduler state (would inhibit scalability).
+// 2. Direct goroutine handoff. That is, when we ready a new goroutine and there
+//    is a spare P, unpark a thread and handoff it the thread and the goroutine.
+//    This would lead to thread state thrashing, as the thread that readied the
+//    goroutine can be out of work the very next moment, we will need to park it.
+//    Also, it would destroy locality of computation as we want to preserve
+//    dependent goroutines on the same thread; and introduce additional latency.
+// 3. Unpark an additional thread whenever we ready a goroutine and there is an
+//    idle P, but don't do handoff. This would lead to excessive thread parking/
+//    unparking as the additional threads will instantly park without discovering
+//    any work to do.
+//
+// The current approach:
+// We unpark an additional thread when we ready a goroutine if (1) there is an
+// idle P and there are no "spinning" worker threads. A worker thread is considered
+// spinning if it is out of local work and did not find work in global run queue/
+// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning.
+// Threads unparked this way are also considered spinning; we don't do goroutine
+// handoff so such threads are out of work initially. Spinning threads do some
+// spinning looking for work in per-P run queues before parking. If a spinning
+// thread finds work it takes itself out of the spinning state and proceeds to
+// execution. If it does not find work it takes itself out of the spinning state
+// and then parks.
+// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark
+// new threads when readying goroutines. To compensate for that, if the last spinning
+// thread finds work and stops spinning, it must unpark a new spinning thread.
+// This approach smooths out unjustified spikes of thread unparking,
+// but at the same time guarantees eventual maximal CPU parallelism utilization.
+//
+// The main implementation complication is that we need to be very careful during
+// spinning->non-spinning thread transition. This transition can race with submission
+// of a new goroutine, and either one part or another needs to unpark another worker
+// thread. If they both fail to do that, we can end up with semi-persistent CPU
+// underutilization. The general pattern for goroutine readying is: submit a goroutine
+// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning.
+// The general pattern for spinning->non-spinning transition is: decrement nmspinning,
+// #StoreLoad-style memory barrier, check all per-P work queues for new work.
+// Note that all this complexity does not apply to global run queue as we are not
+// sloppy about thread unparking when submitting to global queue. Also see comments
+// for nmspinning manipulation.
+
 var (
        m0 m
        g0 g
@@ -1454,8 +1505,7 @@ func stopm() {
                throw("stopm holding p")
        }
        if _g_.m.spinning {
-               _g_.m.spinning = false
-               atomic.Xadd(&sched.nmspinning, -1)
+               throw("stopm spinning")
        }
 
 retry:
@@ -1476,22 +1526,15 @@ retry:
 }
 
 func mspinning() {
-       gp := getg()
-       if !runqempty(gp.m.nextp.ptr()) {
-               // Something (presumably the GC) was readied while the
-               // runtime was starting up this M, so the M is no
-               // longer spinning.
-               if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
-                       throw("mspinning: nmspinning underflowed")
-               }
-       } else {
-               gp.m.spinning = true
-       }
+       // startm's caller incremented nmspinning. Set the new M's spinning.
+       getg().m.spinning = true
 }
 
 // Schedules some M to run the p (creates an M if necessary).
 // If p==nil, tries to get an idle P, if no idle P's does nothing.
 // May run with m.p==nil, so write barriers are not allowed.
+// If spinning is set, the caller has incremented nmspinning and startm will
+// either decrement nmspinning or set m.spinning in the newly started M.
 //go:nowritebarrier
 func startm(_p_ *p, spinning bool) {
        lock(&sched.lock)
@@ -1500,7 +1543,11 @@ func startm(_p_ *p, spinning bool) {
                if _p_ == nil {
                        unlock(&sched.lock)
                        if spinning {
-                               atomic.Xadd(&sched.nmspinning, -1)
+                               // The caller incremented nmspinning, but there are no idle Ps,
+                               // so it's okay to just undo the increment and give up.
+                               if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+                                       throw("startm: negative nmspinning")
+                               }
                        }
                        return
                }
@@ -1510,6 +1557,7 @@ func startm(_p_ *p, spinning bool) {
        if mp == nil {
                var fn func()
                if spinning {
+                       // The caller incremented nmspinning, so set m.spinning in the new M.
                        fn = mspinning
                }
                newm(fn, _p_)
@@ -1524,6 +1572,7 @@ func startm(_p_ *p, spinning bool) {
        if spinning && !runqempty(_p_) {
                throw("startm: p has runnable gs")
        }
+       // The caller incremented nmspinning, so set m.spinning in the new M.
        mp.spinning = spinning
        mp.nextp.set(_p_)
        notewakeup(&mp.park)
@@ -1645,7 +1694,11 @@ func gcstopm() {
        }
        if _g_.m.spinning {
                _g_.m.spinning = false
-               atomic.Xadd(&sched.nmspinning, -1)
+               // OK to just drop nmspinning here,
+               // startTheWorld will unpark threads as necessary.
+               if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+                       throw("gcstopm: negative nmspinning")
+               }
        }
        _p_ := releasep()
        lock(&sched.lock)
@@ -1818,9 +1871,26 @@ stop:
        _p_ := releasep()
        pidleput(_p_)
        unlock(&sched.lock)
+
+       // Delicate dance: thread transitions from spinning to non-spinning state,
+       // potentially concurrently with submission of new goroutines. We must
+       // drop nmspinning first and then check all per-P queues again (with
+       // #StoreLoad memory barrier in between). If we do it the other way around,
+       // another thread can submit a goroutine after we've checked all run queues
+       // but before we drop nmspinning; as the result nobody will unpark a thread
+       // to run the goroutine.
+       // If we discover new work below, we need to restore m.spinning as a signal
+       // for resetspinning to unpark a new worker thread (because there can be more
+       // than one starving goroutine). However, if after discovering new work
+       // we also observe no idle Ps, it is OK to just park the current thread:
+       // the system is fully loaded so no spinning threads are required.
+       // Also see "Worker thread parking/unparking" comment at the top of the file.
+       wasSpinning := _g_.m.spinning
        if _g_.m.spinning {
                _g_.m.spinning = false
-               atomic.Xadd(&sched.nmspinning, -1)
+               if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+                       throw("findrunnable: negative nmspinning")
+               }
        }
 
        // check all runqueues once again
@@ -1832,6 +1902,10 @@ stop:
                        unlock(&sched.lock)
                        if _p_ != nil {
                                acquirep(_p_)
+                               if wasSpinning {
+                                       _g_.m.spinning = true
+                                       atomic.Xadd(&sched.nmspinning, 1)
+                               }
                                goto top
                        }
                        break
@@ -1870,20 +1944,17 @@ stop:
 
 func resetspinning() {
        _g_ := getg()
-
-       var nmspinning uint32
-       if _g_.m.spinning {
-               _g_.m.spinning = false
-               nmspinning = atomic.Xadd(&sched.nmspinning, -1)
-               if int32(nmspinning) < 0 {
-                       throw("findrunnable: negative nmspinning")
-               }
-       } else {
-               nmspinning = atomic.Load(&sched.nmspinning)
+       if !_g_.m.spinning {
+               throw("resetspinning: not a spinning m")
        }
-
-       // M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
-       // so see if we need to wakeup another P here.
+       _g_.m.spinning = false
+       nmspinning := atomic.Xadd(&sched.nmspinning, -1)
+       if int32(nmspinning) < 0 {
+               throw("findrunnable: negative nmspinning")
+       }
+       // M wakeup policy is deliberately somewhat conservative, so check if we
+       // need to wakeup another P here. See "Worker thread parking/unparking"
+       // comment at the top of the file for details.
        if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
                wakep()
        }
@@ -1944,14 +2015,10 @@ top:
                if gp != nil {
                        casgstatus(gp, _Gwaiting, _Grunnable)
                        traceGoUnpark(gp, 0)
-                       resetspinning()
                }
        }
        if gp == nil && gcBlackenEnabled != 0 {
                gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
-               if gp != nil {
-                       resetspinning()
-               }
        }
        if gp == nil {
                // Check the global runnable queue once in a while to ensure fairness.
@@ -1961,9 +2028,6 @@ top:
                        lock(&sched.lock)
                        gp = globrunqget(_g_.m.p.ptr(), 1)
                        unlock(&sched.lock)
-                       if gp != nil {
-                               resetspinning()
-                       }
                }
        }
        if gp == nil {
@@ -1974,6 +2038,12 @@ top:
        }
        if gp == nil {
                gp, inheritTime = findrunnable() // blocks until work is available
+       }
+
+       // This thread is going to run a goroutine and is not spinning anymore,
+       // so if it was marked as spinning we need to reset it now and potentially
+       // start a new spinning M.
+       if _g_.m.spinning {
                resetspinning()
        }
 
index 2be103e3a6b680d3984c7a18746d4031bfa0190d..c0213086b3874849257547decdb132aabb9d1737 100644 (file)
@@ -6,6 +6,7 @@ package runtime_test
 
 import (
        "math"
+       "net"
        "runtime"
        "runtime/debug"
        "sync"
@@ -132,6 +133,79 @@ func TestGoroutineParallelism(t *testing.T) {
        }
 }
 
+// Test that all runnable goroutines are scheduled at the same time.
+func TestGoroutineParallelism2(t *testing.T) {
+       //testGoroutineParallelism2(t, false, false)
+       testGoroutineParallelism2(t, true, false)
+       testGoroutineParallelism2(t, false, true)
+       testGoroutineParallelism2(t, true, true)
+}
+
+func testGoroutineParallelism2(t *testing.T, load, netpoll bool) {
+       if runtime.NumCPU() == 1 {
+               // Takes too long, too easy to deadlock, etc.
+               t.Skip("skipping on uniprocessor")
+       }
+       P := 4
+       N := 10
+       if testing.Short() {
+               N = 3
+       }
+       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
+       // If runtime triggers a forced GC during this test then it will deadlock,
+       // since the goroutines can't be stopped/preempted.
+       // Disable GC for this test (see issue #10958).
+       defer debug.SetGCPercent(debug.SetGCPercent(-1))
+       for try := 0; try < N; try++ {
+               if load {
+                       // Create P goroutines and wait until they all run.
+                       // When we run the actual test below, worker threads
+                       // running the goroutines will start parking.
+                       done := make(chan bool)
+                       x := uint32(0)
+                       for p := 0; p < P; p++ {
+                               go func() {
+                                       if atomic.AddUint32(&x, 1) == uint32(P) {
+                                               done <- true
+                                               return
+                                       }
+                                       for atomic.LoadUint32(&x) != uint32(P) {
+                                       }
+                               }()
+                       }
+                       <-done
+               }
+               if netpoll {
+                       // Enable netpoller, affects schedler behavior.
+                       ln, err := net.Listen("tcp", "localhost:0")
+                       if err != nil {
+                               defer ln.Close() // yup, defer in a loop
+                       }
+               }
+               done := make(chan bool)
+               x := uint32(0)
+               // Spawn P goroutines in a nested fashion just to differ from TestGoroutineParallelism.
+               for p := 0; p < P/2; p++ {
+                       go func(p int) {
+                               for p2 := 0; p2 < 2; p2++ {
+                                       go func(p2 int) {
+                                               for i := 0; i < 3; i++ {
+                                                       expected := uint32(P*i + p*2 + p2)
+                                                       for atomic.LoadUint32(&x) != expected {
+                                                       }
+                                                       atomic.StoreUint32(&x, expected+1)
+                                               }
+                                               done <- true
+                                       }(p2)
+                               }
+                       }(p)
+               }
+               for p := 0; p < P; p++ {
+                       <-done
+               }
+       }
+}
+
 func TestBlockLocked(t *testing.T) {
        const N = 10
        c := make(chan bool)
index cfe45894480e33b7cbaaa475ba9a005d6d166288..86ed8460640cdf7787d68463330f62c55b5fe5f7 100644 (file)
@@ -419,7 +419,7 @@ type schedt struct {
 
        pidle      puintptr // idle p's
        npidle     uint32
-       nmspinning uint32 // limited to [0, 2^31-1]
+       nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
 
        // Global runnable queue.
        runqhead guintptr