]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: synchronize P wakeup and dropping Ps
authorMichael Pratt <mpratt@google.com>
Tue, 1 Mar 2022 20:06:37 +0000 (15:06 -0500)
committerGopher Robot <gobot@golang.org>
Fri, 12 Aug 2022 03:22:59 +0000 (03:22 +0000)
CL 310850 dropped work re-checks on non-spinning Ms to fix #43997.

This introduced a new race condition: a non-spinning M may drop its P
and then park at the same time a spinning M attempts to wake a P to
handle some new work. The spinning M fails to find an idle P (because
the non-spinning M hasn't quite made its P idle yet), and does nothing
assuming that the system is fully loaded. This results in loss of work
conservation. In the worst case we could have a complete deadlock if
injectglist fails to wake anything just as all Ps are going idle.

sched.needspinning adds new synchronization to cover this case. If work
submission fails to find a P, it sets needspinning to indicate that a
spinning M is required. When non-spinning Ms prepare to drop their P,
they check needspinning and abort going idle to become a spinning M
instead. This addresses the race without extra spurious wakeups. In the
normal (non-racing case), an M will become spinning via the normal path
and clear the flag.

injectglist must change in addition to wakep because it is a similar
form of work submission, notably used following netpoll at a point when
we might not have a P that would guarantee the work runs.

Fixes #45867

Change-Id: Ieb623a6d4162fb8c2be7b4ff8acdebcc3a0d69a8
Reviewed-on: https://go-review.googlesource.com/c/go/+/389014
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Run-TryBot: Michael Pratt <mpratt@google.com>
Auto-Submit: Michael Pratt <mpratt@google.com>

src/runtime/proc.go
src/runtime/runtime2.go

index 04484da53f10b2f6798414a4b202a03bea06920f..a112c1b80c7b648540685db37eed05706e310dc7 100644 (file)
@@ -73,7 +73,7 @@ var modinfo string
 // If there is at least one spinning thread (sched.nmspinning>1), we don't
 // unpark new threads when submitting work. To compensate for that, if the last
 // spinning thread finds work and stops spinning, it must unpark a new spinning
-// thread.  This approach smooths out unjustified spikes of thread unparking,
+// thread. This approach smooths out unjustified spikes of thread unparking,
 // but at the same time guarantees eventual maximal CPU parallelism
 // utilization.
 //
@@ -827,6 +827,12 @@ func mcommoninit(mp *m, id int64) {
        }
 }
 
+func (mp *m) becomeSpinning() {
+       mp.spinning = true
+       sched.nmspinning.Add(1)
+       sched.needspinning.Store(0)
+}
+
 var fastrandseed uintptr
 
 func fastrandinit() {
@@ -2242,8 +2248,8 @@ func mspinning() {
 // Schedules some M to run the p (creates an M if necessary).
 // If p==nil, tries to get an idle P, if no idle P's does nothing.
 // May run with m.p==nil, so write barriers are not allowed.
-// If spinning is set, the caller has incremented nmspinning and startm will
-// either decrement nmspinning or set m.spinning in the newly started M.
+// If spinning is set, the caller has incremented nmspinning and must provide a
+// P. startm will set m.spinning in the newly started M.
 //
 // Callers passing a non-nil P must call from a non-preemptible context. See
 // comment on acquirem below.
@@ -2271,16 +2277,15 @@ func startm(pp *p, spinning bool) {
        mp := acquirem()
        lock(&sched.lock)
        if pp == nil {
+               if spinning {
+                       // TODO(prattmic): All remaining calls to this function
+                       // with _p_ == nil could be cleaned up to find a P
+                       // before calling startm.
+                       throw("startm: P required for spinning=true")
+               }
                pp, _ = pidleget(0)
                if pp == nil {
                        unlock(&sched.lock)
-                       if spinning {
-                               // The caller incremented nmspinning, but there are no idle Ps,
-                               // so it's okay to just undo the increment and give up.
-                               if sched.nmspinning.Add(-1) < 0 {
-                                       throw("startm: negative nmspinning")
-                               }
-                       }
                        releasem(mp)
                        return
                }
@@ -2358,6 +2363,7 @@ func handoffp(pp *p) {
        // no local work, check that there are no spinning/idle M's,
        // otherwise our help is not required
        if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) { // TODO: fast atomic
+               sched.needspinning.Store(0)
                startm(pp, true)
                return
        }
@@ -2404,15 +2410,41 @@ func handoffp(pp *p) {
 
 // Tries to add one more P to execute G's.
 // Called when a G is made runnable (newproc, ready).
+// Must be called with a P.
 func wakep() {
-       if sched.npidle.Load() == 0 {
+       // Be conservative about spinning threads, only start one if none exist
+       // already.
+       if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
                return
        }
-       // be conservative about spinning threads
-       if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
+
+       // Disable preemption until ownership of pp transfers to the next M in
+       // startm. Otherwise preemption here would leave pp stuck waiting to
+       // enter _Pgcstop.
+       //
+       // See preemption comment on acquirem in startm for more details.
+       mp := acquirem()
+
+       var pp *p
+       lock(&sched.lock)
+       pp, _ = pidlegetSpinning(0)
+       if pp == nil {
+               if sched.nmspinning.Add(-1) < 0 {
+                       throw("wakep: negative nmspinning")
+               }
+               unlock(&sched.lock)
+               releasem(mp)
                return
        }
-       startm(nil, true)
+       // Since we always have a P, the race in the "No M is available"
+       // comment in startm doesn't apply during the small window between the
+       // unlock here and lock in startm. A checkdead in between will always
+       // see at least one running M (ours).
+       unlock(&sched.lock)
+
+       startm(pp, true)
+
+       releasem(mp)
 }
 
 // Stops execution of the current m that is locked to a g until the g is runnable again.
@@ -2646,8 +2678,7 @@ top:
        // GOMAXPROCS>>1 but the program parallelism is low.
        if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
                if !mp.spinning {
-                       mp.spinning = true
-                       sched.nmspinning.Add(1)
+                       mp.becomeSpinning()
                }
 
                gp, inheritTime, tnow, w, newWork := stealWork(now)
@@ -2723,6 +2754,12 @@ top:
                unlock(&sched.lock)
                return gp, false, false
        }
+       if !mp.spinning && sched.needspinning.Load() == 1 {
+               // See "Delicate dance" comment below.
+               mp.becomeSpinning()
+               unlock(&sched.lock)
+               goto top
+       }
        if releasep() != pp {
                throw("findrunnable: wrong p")
        }
@@ -2743,12 +2780,28 @@ top:
        // * New/modified-earlier timers on a per-P timer heap.
        // * Idle-priority GC work (barring golang.org/issue/19112).
        //
-       // If we discover new work below, we need to restore m.spinning as a signal
-       // for resetspinning to unpark a new worker thread (because there can be more
-       // than one starving goroutine). However, if after discovering new work
-       // we also observe no idle Ps it is OK to skip unparking a new worker
-       // thread: the system is fully loaded so no spinning threads are required.
-       // Also see "Worker thread parking/unparking" comment at the top of the file.
+       // If we discover new work below, we need to restore m.spinning as a
+       // signal for resetspinning to unpark a new worker thread (because
+       // there can be more than one starving goroutine).
+       //
+       // However, if after discovering new work we also observe no idle Ps
+       // (either here or in resetspinning), we have a problem. We may be
+       // racing with a non-spinning M in the block above, having found no
+       // work and preparing to release its P and park. Allowing that P to go
+       // idle will result in loss of work conservation (idle P while there is
+       // runnable work). This could result in complete deadlock in the
+       // unlikely event that we discover new work (from netpoll) right as we
+       // are racing with _all_ other Ps going idle.
+       //
+       // We use sched.needspinning to synchronize with non-spinning Ms going
+       // idle. If needspinning is set when they are about to drop their P,
+       // they abort the drop and instead become a new spinning M on our
+       // behalf. If we are not racing and the system is truly fully loaded
+       // then no spinning threads are required, and the next thread to
+       // naturally become spinning will clear the flag.
+       //
+       // Also see "Worker thread parking/unparking" comment at the top of the
+       // file.
        wasSpinning := mp.spinning
        if mp.spinning {
                mp.spinning = false
@@ -2758,16 +2811,18 @@ top:
 
                // Note the for correctness, only the last M transitioning from
                // spinning to non-spinning must perform these rechecks to
-               // ensure no missed work. We are performing it on every M that
-               // transitions as a conservative change to monitor effects on
-               // latency. See golang.org/issue/43997.
+               // ensure no missed work. However, the runtime has some cases
+               // of transient increments of nmspinning that are decremented
+               // without going through this path, so we must be conservative
+               // and perform the check on all spinning Ms.
+               //
+               // See https://go.dev/issue/43997.
 
                // Check all runqueues once again.
                pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
                if pp != nil {
                        acquirep(pp)
-                       mp.spinning = true
-                       sched.nmspinning.Add(1)
+                       mp.becomeSpinning()
                        goto top
                }
 
@@ -2775,8 +2830,7 @@ top:
                pp, gp := checkIdleGCNoP()
                if pp != nil {
                        acquirep(pp)
-                       mp.spinning = true
-                       sched.nmspinning.Add(1)
+                       mp.becomeSpinning()
 
                        // Run the idle worker.
                        pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
@@ -2844,8 +2898,7 @@ top:
                                return gp, false, false
                        }
                        if wasSpinning {
-                               mp.spinning = true
-                               sched.nmspinning.Add(1)
+                               mp.becomeSpinning()
                        }
                        goto top
                }
@@ -2964,17 +3017,18 @@ func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
        for id, p2 := range allpSnapshot {
                if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
                        lock(&sched.lock)
-                       pp, _ := pidleget(0)
-                       unlock(&sched.lock)
-                       if pp != nil {
-                               return pp
+                       pp, _ := pidlegetSpinning(0)
+                       if pp == nil {
+                               // Can't get a P, don't bother checking remaining Ps.
+                               unlock(&sched.lock)
+                               return nil
                        }
-
-                       // Can't get a P, don't bother checking remaining Ps.
-                       break
+                       unlock(&sched.lock)
+                       return pp
                }
        }
 
+       // No work available.
        return nil
 }
 
@@ -3030,7 +3084,7 @@ func checkIdleGCNoP() (*p, *g) {
        // the assumption in gcControllerState.findRunnableGCWorker that an
        // empty gcBgMarkWorkerPool is only possible if gcMarkDone is running.
        lock(&sched.lock)
-       pp, now := pidleget(0)
+       pp, now := pidlegetSpinning(0)
        if pp == nil {
                unlock(&sched.lock)
                return nil, nil
@@ -3130,8 +3184,20 @@ func injectglist(glist *gList) {
        *glist = gList{}
 
        startIdle := func(n int) {
-               for ; n != 0 && sched.npidle.Load() != 0; n-- {
-                       startm(nil, false)
+               for i := 0; i < n; i++ {
+                       mp := acquirem() // See comment in startm.
+                       lock(&sched.lock)
+
+                       pp, _ := pidlegetSpinning(0)
+                       if pp == nil {
+                               unlock(&sched.lock)
+                               releasem(mp)
+                               break
+                       }
+
+                       unlock(&sched.lock)
+                       startm(pp, false)
+                       releasem(mp)
                }
        }
 
@@ -5406,7 +5472,7 @@ func schedtrace(detailed bool) {
        }
 
        lock(&sched.lock)
-       print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle.Load(), " threads=", mcount(), " spinningthreads=", sched.nmspinning.Load(), " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
+       print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle.Load(), " threads=", mcount(), " spinningthreads=", sched.nmspinning.Load(), " needspinning=", sched.needspinning.Load(), " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
        if detailed {
                print(" gcwaiting=", sched.gcwaiting.Load(), " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait.Load(), "\n")
        }
@@ -5742,6 +5808,31 @@ func pidleget(now int64) (*p, int64) {
        return pp, now
 }
 
+// pidlegetSpinning tries to get a p from the _Pidle list, acquiring ownership.
+// This is called by spinning Ms (or callers than need a spinning M) that have
+// found work. If no P is available, this must synchronized with non-spinning
+// Ms that may be preparing to drop their P without discovering this work.
+//
+// sched.lock must be held.
+//
+// May run during STW, so write barriers are not allowed.
+//
+//go:nowritebarrierrec
+func pidlegetSpinning(now int64) (*p, int64) {
+       assertLockHeld(&sched.lock)
+
+       pp, now := pidleget(now)
+       if pp == nil {
+               // See "Delicate dance" comment in findrunnable. We found work
+               // that we cannot take, we must synchronize with non-spinning
+               // Ms that may be preparing to drop their P.
+               sched.needspinning.Store(1)
+               return nil, now
+       }
+
+       return pp, now
+}
+
 // runqempty reports whether pp has no Gs on its local run queue.
 // It never returns true spuriously.
 func runqempty(pp *p) bool {
index e706cf735453f461b77f1673424d0bba4545f757..884d6cc096082cd641e144c996caa760a9c2607e 100644 (file)
@@ -777,9 +777,10 @@ type schedt struct {
 
        ngsys atomic.Int32 // number of system goroutines
 
-       pidle      puintptr // idle p's
-       npidle     atomic.Int32
-       nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
+       pidle        puintptr // idle p's
+       npidle       atomic.Int32
+       nmspinning   atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
+       needspinning atomic.Uint32 // See "Delicate dance" comment in proc.go. Boolean. Must hold sched.lock to set to 1.
 
        // Global runnable queue.
        runq     gQueue
@@ -840,6 +841,8 @@ type schedt struct {
        // with the rest of the runtime.
        sysmonlock mutex
 
+       _ uint32 // ensure timeToRun has 8-byte alignment
+
        // timeToRun is a distribution of scheduling latencies, defined
        // as the sum of time a G spends in the _Grunnable state before
        // it transitions to _Grunning.