"internal/cpu"
"internal/goarch"
"internal/runtime/atomic"
+ "internal/runtime/math"
"internal/runtime/sys"
"unsafe"
)
// cleanupQueue is a queue of ready-to-run cleanup functions.
type cleanupQueue struct {
// Stack of full cleanup blocks.
- full lfstack
- _ [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0))]byte
+ full lfstack
+ workUnits atomic.Uint64 // length of full; decrement before pop from full, increment after push to full
+ _ [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0)) - unsafe.Sizeof(atomic.Uint64{})]byte
// Stack of free cleanup blocks.
free lfstack
all atomic.UnsafePointer // *cleanupBlock
_ [cpu.CacheLinePadSize - unsafe.Sizeof(atomic.UnsafePointer{})]byte
- state cleanupSleep
- _ [cpu.CacheLinePadSize - unsafe.Sizeof(cleanupSleep{})]byte
-
// Goroutine block state.
+ lock mutex
+
+ // sleeping is the list of sleeping cleanup goroutines.
//
- // lock protects sleeping and writes to ng. It is also the lock
- // used by cleanup goroutines to park atomically with updates to
- // sleeping and ng.
- lock mutex
+ // Protected by lock.
sleeping gList
- running atomic.Uint32
- ng atomic.Uint32
- needg atomic.Uint32
-}
-// cleanupSleep is an atomically-updatable cleanupSleepState.
-type cleanupSleep struct {
- u atomic.Uint64 // cleanupSleepState
-}
+ // asleep is the number of cleanup goroutines sleeping.
+ //
+ // Read without lock, written only with the lock held.
+ // When the lock is held, the lock holder may only observe
+ // asleep.Load() == sleeping.n.
+ //
+ // To make reading without the lock safe as a signal to wake up
+ // a goroutine and handle new work, it must always be greater
+ // than or equal to sleeping.n. In the periods of time that it
+ // is strictly greater, it may cause spurious calls to wake.
+ asleep atomic.Uint32
+
+ // running indicates the number of cleanup goroutines actively
+ // executing user cleanup functions at any point in time.
+ //
+ // Read and written to without lock.
+ running atomic.Uint32
-func (s *cleanupSleep) load() cleanupSleepState {
- return cleanupSleepState(s.u.Load())
-}
+ // ng is the number of cleanup goroutines.
+ //
+ // Read without lock, written only with lock held.
+ ng atomic.Uint32
-// awaken indicates that N cleanup goroutines should be awoken.
-func (s *cleanupSleep) awaken(n int) {
- s.u.Add(int64(n))
+ // needg is the number of new cleanup goroutines that
+ // need to be created.
+ //
+ // Read without lock, written only with lock held.
+ needg atomic.Uint32
}
-// sleep indicates that a cleanup goroutine is about to go to sleep.
-func (s *cleanupSleep) sleep() {
- s.u.Add(1 << 32)
+// addWork indicates that n units of parallelizable work have been added to the queue.
+func (q *cleanupQueue) addWork(n int) {
+ q.workUnits.Add(int64(n))
}
-// take returns the number of goroutines to wake to handle
-// the cleanup load, and also how many extra wake signals
-// there were. The caller takes responsibility for waking
-// up "wake" cleanup goroutines.
-//
-// The number of goroutines to wake is guaranteed to be
-// bounded by the current sleeping goroutines, provided
-// they call sleep before going to sleep, and all wakeups
-// are preceded by a call to take.
-func (s *cleanupSleep) take() (wake, extra uint32) {
+// tryTakeWork is an attempt to dequeue some work by a cleanup goroutine.
+// This might fail if there's no work to do.
+func (q *cleanupQueue) tryTakeWork() bool {
for {
- old := s.load()
- if old == 0 {
- return 0, 0
- }
- if old.wakes() > old.asleep() {
- wake = old.asleep()
- extra = old.wakes() - old.asleep()
- } else {
- wake = old.wakes()
- extra = 0
+ wu := q.workUnits.Load()
+ if wu == 0 {
+ return false
}
- new := cleanupSleepState(old.asleep()-wake) << 32
- if s.u.CompareAndSwap(uint64(old), uint64(new)) {
- return
+ // CAS to prevent us from going negative.
+ if q.workUnits.CompareAndSwap(wu, wu-1) {
+ return true
}
}
}
-// cleanupSleepState consists of two fields: the number of
-// goroutines currently asleep (equivalent to len(q.sleeping)), and
-// the number of times a wakeup signal has been sent.
-// These two fields are packed together in a uint64, such
-// that they may be updated atomically as part of cleanupSleep.
-// The top 32 bits is the number of sleeping goroutines,
-// and the bottom 32 bits is the number of wakeup signals.
-type cleanupSleepState uint64
-
-func (s cleanupSleepState) asleep() uint32 {
- return uint32(s >> 32)
-}
-
-func (s cleanupSleepState) wakes() uint32 {
- return uint32(s)
-}
-
// enqueue queues a single cleanup for execution.
//
// Called by the sweeper, and only the sweeper.
if full := b.enqueue(fn); full {
q.full.push(&b.lfnode)
pp.cleanups = nil
- q.state.awaken(1)
+ q.addWork(1)
}
releasem(mp)
}
// and never returns nil.
func (q *cleanupQueue) dequeue() *cleanupBlock {
for {
- b := (*cleanupBlock)(q.full.pop())
- if b != nil {
- return b
+ if q.tryTakeWork() {
+ // Guaranteed to be non-nil.
+ return (*cleanupBlock)(q.full.pop())
}
lock(&q.lock)
+ // Increment asleep first. We may have to undo this if we abort the sleep.
+ // We must update asleep first because the scheduler might not try to wake
+ // us up when work comes in between the last check of workUnits and when we
+ // go to sleep. (It may see asleep as 0.) By incrementing it here, we guarantee
+ // after this point that if new work comes in, someone will try to grab the
+ // lock and wake us. However, this also means that if we back out, we may cause
+ // someone to spuriously grab the lock and try to wake us up, only to fail.
+ // This should be very rare because the window here is incredibly small: the
+ // window between now and when we decrement q.asleep below.
+ q.asleep.Add(1)
+
+ // Re-check workUnits under the lock and with asleep updated. If it's still zero,
+ // then no new work came in, and it's safe for us to go to sleep. If new work
+ // comes in after this point, then the scheduler will notice that we're sleeping
+ // and wake us up.
+ if q.workUnits.Load() > 0 {
+ // Undo the q.asleep update and try to take work again.
+ q.asleep.Add(-1)
+ unlock(&q.lock)
+ continue
+ }
q.sleeping.push(getg())
- q.state.sleep()
goparkunlock(&q.lock, waitReasonCleanupWait, traceBlockSystemGoroutine, 1)
}
}
-// tryDequeue is a non-blocking attempt to dequeue a block of cleanups.
-// May return nil if there are no blocks to run.
-func (q *cleanupQueue) tryDequeue() *cleanupBlock {
- return (*cleanupBlock)(q.full.pop())
-}
-
// flush pushes all active cleanup blocks to the full list and wakes up cleanup
// goroutines to handle them.
//
flushed++
}
if flushed != 0 {
- q.state.awaken(flushed)
+ q.addWork(flushed)
}
if flushed+emptied+missing != len(allp) {
throw("failed to correctly flush all P-owned cleanup blocks")
releasem(mp)
}
-// needsWake returns true if cleanup goroutines need to be awoken or created to handle cleanup load.
+// needsWake returns true if cleanup goroutines may need to be awoken or created to handle cleanup load.
func (q *cleanupQueue) needsWake() bool {
- s := q.state.load()
- return s.wakes() > 0 && (s.asleep() > 0 || q.ng.Load() < maxCleanupGs())
+ return q.workUnits.Load() > 0 && (q.asleep.Load() > 0 || q.ng.Load() < maxCleanupGs())
}
// wake wakes up one or more goroutines to process the cleanup queue. If there aren't
// enough sleeping goroutines to handle the demand, wake will arrange for new goroutines
// to be created.
func (q *cleanupQueue) wake() {
- wake, extra := q.state.take()
+ lock(&q.lock)
+
+ // Figure out how many goroutines to wake, and how many extra goroutines to create.
+ // Wake one goroutine for each work unit.
+ var wake, extra uint32
+ work := q.workUnits.Load()
+ asleep := uint64(q.asleep.Load())
+ if work > asleep {
+ wake = uint32(asleep)
+ if work > uint64(math.MaxUint32) {
+ // Protect against overflow.
+ extra = math.MaxUint32
+ } else {
+ extra = uint32(work - asleep)
+ }
+ } else {
+ wake = uint32(work)
+ extra = 0
+ }
if extra != 0 {
+ // Signal that we should create new goroutines, one for each extra work unit,
+ // up to maxCleanupGs.
newg := min(extra, maxCleanupGs()-q.ng.Load())
if newg > 0 {
q.needg.Add(int32(newg))
}
}
if wake == 0 {
+ // Nothing to do.
+ unlock(&q.lock)
return
}
- // By calling 'take', we've taken ownership of waking 'wake' goroutines.
+ // Take ownership of waking 'wake' goroutines.
+ //
// Nobody else will wake up these goroutines, so they're guaranteed
// to be sitting on q.sleeping, waiting for us to wake them.
- //
+ q.asleep.Add(-int32(wake))
+
// Collect them and schedule them.
var list gList
- lock(&q.lock)
for range wake {
list.push(q.sleeping.pop())
}