]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: prevent cleanup goroutines from missing work
authorMichael Anthony Knyszek <mknyszek@google.com>
Sat, 10 May 2025 18:53:14 +0000 (18:53 +0000)
committerGopher Robot <gobot@golang.org>
Fri, 16 May 2025 14:43:49 +0000 (07:43 -0700)
Currently, there's a window of time where each cleanup goroutine has
committed to going to sleep (immediately after full.pop() == nil) but
hasn't yet marked itself as asleep (state.sleep()). If new work arrives
in this window, it might get missed. This is what we see in #73642, and
I can reproduce it with stress2.

Side-note: even if the work gets missed by the existing sleeping
goroutines, needg is incremented. So in theory a new goroutine will
handle the work. Right now that doesn't happen in tests like the one
running in #73642, where there might never be another call to AddCleanup
to create the additional goroutine. Also, if we've hit the maximum on
cleanup goroutines and all of them are in this window simultaneously, we
can still end up missing work, it's just more rare. So this is still a
problem even if we choose to just be more aggressive about creating new
cleanup goroutines.

This change fixes the problem and also aims to make the cleanup
wake/sleep code clearer. The way this change fixes this problem is to
have cleanup goroutines re-check the work list before going to sleep,
but after having already marked themselves as sleeping. This way, if new
work comes in before the cleanup goroutine marks itself as going to
sleep, we can rely on the re-check to pick up that work. If new work
comes after the goroutine marks itself as going to sleep and after the
re-check, we can rely on the scheduler noticing that the goroutine is
asleep and waking it up. If work comes in between a goroutine marking
itself as sleeping and the re-check, then the re-check will catch that
piece of work. However, the scheduler might now get a false signal that
the goroutine is asleep and try to wake it up. This is OK. The sleeping
signal is now mutated and double-checked under the queue lock, so the
scheduler will grab the lock, may notice there are no sleeping
goroutines, and go on its way. This may cause spurious lock acquisitions
but it should be very rare. The window between a cleanup goroutine
marking itself as going to sleep and re-checking the work list is a
handful of instructions at most.

This seems subtle but overall it's a simplification of the code. We
rely more on the lock, which is easier to reason about, and we track two
separate atomic variables instead of the merged cleanupSleepState: the
length of the full list, and the number of cleanup goroutines that are
asleep. The former is now the primary way to acquire work. Cleanup
goroutines must decrement the length successfully to obtain an item off
the full list. The number of cleanup goroutines asleep, meanwhile, is
now only updated with the queue lock held. It can be checked without the
lock held, and the invariant to make that safe is simple: it must always
be an overestimate of the number of sleeping cleanup goroutines.

The changes here do change some other behaviors.

First, since we're tracking the length of the full list instead of the
abstract concept of a wake-up, the waker can't consume wake-ups anymore.
This means that cleanup goroutines may be created more aggressively. If
two threads in the scheduler see that there are goroutines that are
asleep, only one will win the race, but the other will observe zero
asleep goroutines but potentially many work units available. This will
cause it to signal many goroutines to be created. This is OK since we
have a cap on the number of cleanup goroutines, and the race should be
relatively rare.

Second, because cleanup goroutines can now fail to go to sleep if any
units of work come in, they might spend more time contended on the lock.
For example, if we have N cleanup goroutines and work comes in at *just*
the wrong rate, in the worst case we'll have each of G goroutines loop
N times for N blocks, resulting in O(G*N) thread time to handle each
block in the worst case. To paint a picture, imagine each goroutine
trying to go to sleep, fail because a new block of work came in, and
only one goroutine will get that block. Then once that goroutine is
done, we all try again, fail because a new block of work came in, and so
on and so forth. This case is unlikely, though, and probably not worth
worrying about until it actually becomes a problem. (A similar problem
exists with parking (and exists before this change, too) but at least in
that case each goroutine parks, so it doesn't block the thread.)

Fixes #73642.

Change-Id: I6bbe1b789e7eb7e8168e56da425a6450fbad9625
Reviewed-on: https://go-review.googlesource.com/c/go/+/671676
Auto-Submit: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
src/internal/runtime/math/math.go
src/runtime/mcleanup.go

index b2e55086511d3f934daa77d994988012d5f6c64e..e0fdc3438d3a9d144fcc9a2e2c71c878f73c06ce 100644 (file)
@@ -6,7 +6,10 @@ package math
 
 import "internal/goarch"
 
-const MaxUintptr = ^uintptr(0)
+const (
+       MaxUint32  = ^uint32(0)
+       MaxUintptr = ^uintptr(0)
+)
 
 // MulUintptr returns a * b and whether the multiplication overflowed.
 // On supported platforms this is an intrinsic lowered by the compiler.
index a488d50f475ee6fdacd36c809d8d539e8d195d31..ca110284321f5e9e3e821c59fda44e399dc82dfe 100644 (file)
@@ -9,6 +9,7 @@ import (
        "internal/cpu"
        "internal/goarch"
        "internal/runtime/atomic"
+       "internal/runtime/math"
        "internal/runtime/sys"
        "unsafe"
 )
@@ -272,8 +273,9 @@ func (a *cleanupBlock) take(b *cleanupBlock) {
 // cleanupQueue is a queue of ready-to-run cleanup functions.
 type cleanupQueue struct {
        // Stack of full cleanup blocks.
-       full lfstack
-       _    [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0))]byte
+       full      lfstack
+       workUnits atomic.Uint64 // length of full; decrement before pop from full, increment after push to full
+       _         [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0)) - unsafe.Sizeof(atomic.Uint64{})]byte
 
        // Stack of free cleanup blocks.
        free lfstack
@@ -290,86 +292,64 @@ type cleanupQueue struct {
        all atomic.UnsafePointer // *cleanupBlock
        _   [cpu.CacheLinePadSize - unsafe.Sizeof(atomic.UnsafePointer{})]byte
 
-       state cleanupSleep
-       _     [cpu.CacheLinePadSize - unsafe.Sizeof(cleanupSleep{})]byte
-
        // Goroutine block state.
+       lock mutex
+
+       // sleeping is the list of sleeping cleanup goroutines.
        //
-       // lock protects sleeping and writes to ng. It is also the lock
-       // used by cleanup goroutines to park atomically with updates to
-       // sleeping and ng.
-       lock     mutex
+       // Protected by lock.
        sleeping gList
-       running  atomic.Uint32
-       ng       atomic.Uint32
-       needg    atomic.Uint32
-}
 
-// cleanupSleep is an atomically-updatable cleanupSleepState.
-type cleanupSleep struct {
-       u atomic.Uint64 // cleanupSleepState
-}
+       // asleep is the number of cleanup goroutines sleeping.
+       //
+       // Read without lock, written only with the lock held.
+       // When the lock is held, the lock holder may only observe
+       // asleep.Load() == sleeping.n.
+       //
+       // To make reading without the lock safe as a signal to wake up
+       // a goroutine and handle new work, it must always be greater
+       // than or equal to sleeping.n. In the periods of time that it
+       // is strictly greater, it may cause spurious calls to wake.
+       asleep atomic.Uint32
+
+       // running indicates the number of cleanup goroutines actively
+       // executing user cleanup functions at any point in time.
+       //
+       // Read and written to without lock.
+       running atomic.Uint32
 
-func (s *cleanupSleep) load() cleanupSleepState {
-       return cleanupSleepState(s.u.Load())
-}
+       // ng is the number of cleanup goroutines.
+       //
+       // Read without lock, written only with lock held.
+       ng atomic.Uint32
 
-// awaken indicates that N cleanup goroutines should be awoken.
-func (s *cleanupSleep) awaken(n int) {
-       s.u.Add(int64(n))
+       // needg is the number of new cleanup goroutines that
+       // need to be created.
+       //
+       // Read without lock, written only with lock held.
+       needg atomic.Uint32
 }
 
-// sleep indicates that a cleanup goroutine is about to go to sleep.
-func (s *cleanupSleep) sleep() {
-       s.u.Add(1 << 32)
+// addWork indicates that n units of parallelizable work have been added to the queue.
+func (q *cleanupQueue) addWork(n int) {
+       q.workUnits.Add(int64(n))
 }
 
-// take returns the number of goroutines to wake to handle
-// the cleanup load, and also how many extra wake signals
-// there were. The caller takes responsibility for waking
-// up "wake" cleanup goroutines.
-//
-// The number of goroutines to wake is guaranteed to be
-// bounded by the current sleeping goroutines, provided
-// they call sleep before going to sleep, and all wakeups
-// are preceded by a call to take.
-func (s *cleanupSleep) take() (wake, extra uint32) {
+// tryTakeWork is an attempt to dequeue some work by a cleanup goroutine.
+// This might fail if there's no work to do.
+func (q *cleanupQueue) tryTakeWork() bool {
        for {
-               old := s.load()
-               if old == 0 {
-                       return 0, 0
-               }
-               if old.wakes() > old.asleep() {
-                       wake = old.asleep()
-                       extra = old.wakes() - old.asleep()
-               } else {
-                       wake = old.wakes()
-                       extra = 0
+               wu := q.workUnits.Load()
+               if wu == 0 {
+                       return false
                }
-               new := cleanupSleepState(old.asleep()-wake) << 32
-               if s.u.CompareAndSwap(uint64(old), uint64(new)) {
-                       return
+               // CAS to prevent us from going negative.
+               if q.workUnits.CompareAndSwap(wu, wu-1) {
+                       return true
                }
        }
 }
 
-// cleanupSleepState consists of two fields: the number of
-// goroutines currently asleep (equivalent to len(q.sleeping)), and
-// the number of times a wakeup signal has been sent.
-// These two fields are packed together in a uint64, such
-// that they may be updated atomically as part of cleanupSleep.
-// The top 32 bits is the number of sleeping goroutines,
-// and the bottom 32 bits is the number of wakeup signals.
-type cleanupSleepState uint64
-
-func (s cleanupSleepState) asleep() uint32 {
-       return uint32(s >> 32)
-}
-
-func (s cleanupSleepState) wakes() uint32 {
-       return uint32(s)
-}
-
 // enqueue queues a single cleanup for execution.
 //
 // Called by the sweeper, and only the sweeper.
@@ -397,7 +377,7 @@ func (q *cleanupQueue) enqueue(fn *funcval) {
        if full := b.enqueue(fn); full {
                q.full.push(&b.lfnode)
                pp.cleanups = nil
-               q.state.awaken(1)
+               q.addWork(1)
        }
        releasem(mp)
 }
@@ -406,23 +386,37 @@ func (q *cleanupQueue) enqueue(fn *funcval) {
 // and never returns nil.
 func (q *cleanupQueue) dequeue() *cleanupBlock {
        for {
-               b := (*cleanupBlock)(q.full.pop())
-               if b != nil {
-                       return b
+               if q.tryTakeWork() {
+                       // Guaranteed to be non-nil.
+                       return (*cleanupBlock)(q.full.pop())
                }
                lock(&q.lock)
+               // Increment asleep first. We may have to undo this if we abort the sleep.
+               // We must update asleep first because the scheduler might not try to wake
+               // us up when work comes in between the last check of workUnits and when we
+               // go to sleep. (It may see asleep as 0.) By incrementing it here, we guarantee
+               // after this point that if new work comes in, someone will try to grab the
+               // lock and wake us. However, this also means that if we back out, we may cause
+               // someone to spuriously grab the lock and try to wake us up, only to fail.
+               // This should be very rare because the window here is incredibly small: the
+               // window between now and when we decrement q.asleep below.
+               q.asleep.Add(1)
+
+               // Re-check workUnits under the lock and with asleep updated. If it's still zero,
+               // then no new work came in, and it's safe for us to go to sleep. If new work
+               // comes in after this point, then the scheduler will notice that we're sleeping
+               // and wake us up.
+               if q.workUnits.Load() > 0 {
+                       // Undo the q.asleep update and try to take work again.
+                       q.asleep.Add(-1)
+                       unlock(&q.lock)
+                       continue
+               }
                q.sleeping.push(getg())
-               q.state.sleep()
                goparkunlock(&q.lock, waitReasonCleanupWait, traceBlockSystemGoroutine, 1)
        }
 }
 
-// tryDequeue is a non-blocking attempt to dequeue a block of cleanups.
-// May return nil if there are no blocks to run.
-func (q *cleanupQueue) tryDequeue() *cleanupBlock {
-       return (*cleanupBlock)(q.full.pop())
-}
-
 // flush pushes all active cleanup blocks to the full list and wakes up cleanup
 // goroutines to handle them.
 //
@@ -468,7 +462,7 @@ func (q *cleanupQueue) flush() {
                flushed++
        }
        if flushed != 0 {
-               q.state.awaken(flushed)
+               q.addWork(flushed)
        }
        if flushed+emptied+missing != len(allp) {
                throw("failed to correctly flush all P-owned cleanup blocks")
@@ -477,34 +471,56 @@ func (q *cleanupQueue) flush() {
        releasem(mp)
 }
 
-// needsWake returns true if cleanup goroutines need to be awoken or created to handle cleanup load.
+// needsWake returns true if cleanup goroutines may need to be awoken or created to handle cleanup load.
 func (q *cleanupQueue) needsWake() bool {
-       s := q.state.load()
-       return s.wakes() > 0 && (s.asleep() > 0 || q.ng.Load() < maxCleanupGs())
+       return q.workUnits.Load() > 0 && (q.asleep.Load() > 0 || q.ng.Load() < maxCleanupGs())
 }
 
 // wake wakes up one or more goroutines to process the cleanup queue. If there aren't
 // enough sleeping goroutines to handle the demand, wake will arrange for new goroutines
 // to be created.
 func (q *cleanupQueue) wake() {
-       wake, extra := q.state.take()
+       lock(&q.lock)
+
+       // Figure out how many goroutines to wake, and how many extra goroutines to create.
+       // Wake one goroutine for each work unit.
+       var wake, extra uint32
+       work := q.workUnits.Load()
+       asleep := uint64(q.asleep.Load())
+       if work > asleep {
+               wake = uint32(asleep)
+               if work > uint64(math.MaxUint32) {
+                       // Protect against overflow.
+                       extra = math.MaxUint32
+               } else {
+                       extra = uint32(work - asleep)
+               }
+       } else {
+               wake = uint32(work)
+               extra = 0
+       }
        if extra != 0 {
+               // Signal that we should create new goroutines, one for each extra work unit,
+               // up to maxCleanupGs.
                newg := min(extra, maxCleanupGs()-q.ng.Load())
                if newg > 0 {
                        q.needg.Add(int32(newg))
                }
        }
        if wake == 0 {
+               // Nothing to do.
+               unlock(&q.lock)
                return
        }
 
-       // By calling 'take', we've taken ownership of waking 'wake' goroutines.
+       // Take ownership of waking 'wake' goroutines.
+       //
        // Nobody else will wake up these goroutines, so they're guaranteed
        // to be sitting on q.sleeping, waiting for us to wake them.
-       //
+       q.asleep.Add(-int32(wake))
+
        // Collect them and schedule them.
        var list gList
-       lock(&q.lock)
        for range wake {
                list.push(q.sleeping.pop())
        }