From 39a5ee52b9b41b1e4f4cf821c78ef5b7be68d181 Mon Sep 17 00:00:00 2001 From: Michael Anthony Knyszek Date: Mon, 2 Nov 2020 19:03:16 +0000 Subject: [PATCH] runtime: decouple consistent stats from mcache and allow P-less update This change modifies the consistent stats implementation to keep the per-P sequence counter on each P instead of each mcache. A valid mcache is not available everywhere that we want to call e.g. allocSpan, as per issue #42339. By decoupling these two, we can add a mechanism to allow contexts without a P to update stats consistently. In this CL, we achieve that with a mutex. In practice, it will be very rare for an M to update these stats without a P. Furthermore, the stats reader also only needs to hold the mutex across the update to "gen" since once that changes, writers are free to continue updating the new stats generation. Contention could thus only arise between writers without a P, and as mentioned earlier, those should be rare. A nice side-effect of this change is that the consistent stats acquire and release API becomes simpler. Fixes #42339. Change-Id: Ied74ab256f69abd54b550394c8ad7c4c40a5fe34 Reviewed-on: https://go-review.googlesource.com/c/go/+/267158 Run-TryBot: Michael Knyszek Trust: Michael Knyszek Reviewed-by: Michael Pratt --- src/runtime/mcache.go | 16 +++---- src/runtime/mgcscavenge.go | 8 +--- src/runtime/mgcsweep.go | 10 ++-- src/runtime/mheap.go | 27 +++-------- src/runtime/mstats.go | 95 ++++++++++++++++++++++++-------------- src/runtime/proc.go | 4 ++ src/runtime/runtime2.go | 8 +++- 7 files changed, 88 insertions(+), 80 deletions(-) diff --git a/src/runtime/mcache.go b/src/runtime/mcache.go index 847a5dedf3..bb7475b6f3 100644 --- a/src/runtime/mcache.go +++ b/src/runtime/mcache.go @@ -50,10 +50,6 @@ type mcache struct { // in this mcache are stale and need to the flushed so they // can be swept. This is done in acquirep. flushGen uint32 - - // statsSeq is a counter indicating whether this P is currently - // writing any stats. Its value is even when not, odd when it is. - statsSeq uint32 } // A gclink is a node in a linked list of blocks, like mlink, @@ -178,9 +174,9 @@ func (c *mcache) refill(spc spanClass) { // Assume all objects from this span will be allocated in the // mcache. If it gets uncached, we'll adjust this. - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.smallAllocCount[spc.sizeclass()], uintptr(s.nelems)-uintptr(s.allocCount)) - memstats.heapStats.release(c) + memstats.heapStats.release() // Update heap_live with the same assumption. usedBytes := uintptr(s.allocCount) * s.elemsize @@ -229,10 +225,10 @@ func (c *mcache) allocLarge(size uintptr, needzero bool, noscan bool) *mspan { if s == nil { throw("out of memory") } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.largeAlloc, npages*pageSize) atomic.Xadduintptr(&stats.largeAllocCount, 1) - memstats.heapStats.release(c) + memstats.heapStats.release() // Update heap_live and revise pacing if needed. atomic.Xadd64(&memstats.heap_live, int64(npages*pageSize)) @@ -263,9 +259,9 @@ func (c *mcache) releaseAll() { if s != &emptymspan { // Adjust nsmallalloc in case the span wasn't fully allocated. n := uintptr(s.nelems) - uintptr(s.allocCount) - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.smallAllocCount[spanClass(i).sizeclass()], -n) - memstats.heapStats.release(c) + memstats.heapStats.release() if s.sweepgen != sg+1 { // refill conservatively counted unallocated slots in heap_live. // Undo this. diff --git a/src/runtime/mgcscavenge.go b/src/runtime/mgcscavenge.go index ab4e28a60b..38f09309dc 100644 --- a/src/runtime/mgcscavenge.go +++ b/src/runtime/mgcscavenge.go @@ -733,14 +733,10 @@ func (p *pageAlloc) scavengeRangeLocked(ci chunkIdx, base, npages uint) uintptr atomic.Xadd64(&memstats.heap_released, nbytes) // Update consistent accounting too. - c := getMCache() - if c == nil { - throw("scavengeRangeLocked called without a P or outside bootstrapping") - } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xaddint64(&stats.committed, -nbytes) atomic.Xaddint64(&stats.released, nbytes) - memstats.heapStats.release(c) + memstats.heapStats.release() return addr } diff --git a/src/runtime/mgcsweep.go b/src/runtime/mgcsweep.go index 8391435630..76bc4246e5 100644 --- a/src/runtime/mgcsweep.go +++ b/src/runtime/mgcsweep.go @@ -339,8 +339,6 @@ func (s *mspan) sweep(preserve bool) bool { spc := s.spanclass size := s.elemsize - c := _g_.m.p.ptr().mcache - // The allocBits indicate which unmarked objects don't need to be // processed since they were free at the end of the last GC cycle // and were not allocated since then. @@ -505,9 +503,9 @@ func (s *mspan) sweep(preserve bool) bool { // wasn't totally filled, but then swept, still has all of its // free slots zeroed. s.needzero = 1 - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.smallFreeCount[spc.sizeclass()], uintptr(nfreed)) - memstats.heapStats.release(c) + memstats.heapStats.release() } if !preserve { // The caller may not have removed this span from whatever @@ -552,10 +550,10 @@ func (s *mspan) sweep(preserve bool) bool { } else { mheap_.freeSpan(s) } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.largeFreeCount, 1) atomic.Xadduintptr(&stats.largeFree, size) - memstats.heapStats.release(c) + memstats.heapStats.release() return true } diff --git a/src/runtime/mheap.go b/src/runtime/mheap.go index 6b29f34a82..b8429eee94 100644 --- a/src/runtime/mheap.go +++ b/src/runtime/mheap.go @@ -1246,12 +1246,7 @@ HaveSpan: memstats.heap_sys.add(-int64(nbytes)) } // Update consistent stats. - c := getMCache() - if c == nil { - // TODO(mknyszek): Remove this and handle this case to fix #42339. - throw("allocSpan called without P or outside bootstrapping") - } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xaddint64(&stats.committed, int64(scav)) atomic.Xaddint64(&stats.released, -int64(scav)) switch typ { @@ -1264,7 +1259,7 @@ HaveSpan: case spanAllocWorkBuf: atomic.Xaddint64(&stats.inWorkBufs, int64(nbytes)) } - memstats.heapStats.release(c) + memstats.heapStats.release() // Publish the span in various locations. @@ -1344,14 +1339,9 @@ func (h *mheap) grow(npage uintptr) bool { // size which is always > physPageSize, so its safe to // just add directly to heap_released. atomic.Xadd64(&memstats.heap_released, int64(asize)) - c := getMCache() - if c == nil { - // TODO(mknyszek): Remove this and handle this case to fix #42339. - throw("grow called without P or outside bootstrapping") - } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() atomic.Xaddint64(&stats.released, int64(asize)) - memstats.heapStats.release(c) + memstats.heapStats.release() // Recalculate nBase. // We know this won't overflow, because sysAlloc returned @@ -1447,12 +1437,7 @@ func (h *mheap) freeSpanLocked(s *mspan, typ spanAllocType) { memstats.heap_sys.add(int64(nbytes)) } // Update consistent stats. - c := getMCache() - if c == nil { - // TODO(mknyszek): Remove this and handle this case to fix #42339. - throw("freeSpanLocked called without P or outside bootstrapping") - } - stats := memstats.heapStats.acquire(c) + stats := memstats.heapStats.acquire() switch typ { case spanAllocHeap: atomic.Xaddint64(&stats.inHeap, -int64(nbytes)) @@ -1463,7 +1448,7 @@ func (h *mheap) freeSpanLocked(s *mspan, typ spanAllocType) { case spanAllocWorkBuf: atomic.Xaddint64(&stats.inWorkBufs, -int64(nbytes)) } - memstats.heapStats.release(c) + memstats.heapStats.release() // Mark the space as free. h.pages.free(s.base(), s.npages) diff --git a/src/runtime/mstats.go b/src/runtime/mstats.go index 3829355d7b..6defaedabe 100644 --- a/src/runtime/mstats.go +++ b/src/runtime/mstats.go @@ -158,7 +158,7 @@ type mstats struct { // heapStats is a set of statistics heapStats consistentHeapStats - _ uint32 // ensure gcPauseDist is aligned + // _ uint32 // ensure gcPauseDist is aligned // gcPauseDist represents the distribution of all GC-related // application pauses in the runtime. @@ -818,10 +818,11 @@ type consistentHeapStats struct { // Writers always atomically update the delta at index gen. // // Readers operate by rotating gen (0 -> 1 -> 2 -> 0 -> ...) - // and synchronizing with writers by observing each mcache's - // statsSeq field. If the reader observes a P (to which the - // mcache is bound) not writing, it can be sure that it will - // pick up the new gen value the next time it writes. + // and synchronizing with writers by observing each P's + // statsSeq field. If the reader observes a P not writing, + // it can be sure that it will pick up the new gen value the + // next time it writes. + // // The reader then takes responsibility by clearing space // in the ring buffer for the next reader to rotate gen to // that space (i.e. it merges in values from index (gen-2) mod 3 @@ -830,7 +831,7 @@ type consistentHeapStats struct { // Note that this means only one reader can be reading at a time. // There is no way for readers to synchronize. // - // This process is why we need ring buffer of size 3 instead + // This process is why we need a ring buffer of size 3 instead // of 2: one is for the writers, one contains the most recent // data, and the last one is clear so writers can begin writing // to it the moment gen is updated. @@ -840,24 +841,34 @@ type consistentHeapStats struct { // are writing, and can take on the value of 0, 1, or 2. // This value is updated atomically. gen uint32 + + // noPLock is intended to provide mutual exclusion for updating + // stats when no P is available. It does not block other writers + // with a P, only other writers without a P and the reader. Because + // stats are usually updated when a P is available, contention on + // this lock should be minimal. + noPLock mutex } // acquire returns a heapStatsDelta to be updated. In effect, // it acquires the shard for writing. release must be called -// as soon as the relevant deltas are updated. c must be -// a valid mcache not being used by any other thread. +// as soon as the relevant deltas are updated. // // The returned heapStatsDelta must be updated atomically. // -// Note however, that this is unsafe to call concurrently -// with other writers and there must be only one writer -// at a time. -func (m *consistentHeapStats) acquire(c *mcache) *heapStatsDelta { - seq := atomic.Xadd(&c.statsSeq, 1) - if seq%2 == 0 { - // Should have been incremented to odd. - print("runtime: seq=", seq, "\n") - throw("bad sequence number") +// The caller's P must not change between acquire and +// release. This also means that the caller should not +// acquire a P or release its P in between. +func (m *consistentHeapStats) acquire() *heapStatsDelta { + if pp := getg().m.p.ptr(); pp != nil { + seq := atomic.Xadd(&pp.statsSeq, 1) + if seq%2 == 0 { + // Should have been incremented to odd. + print("runtime: seq=", seq, "\n") + throw("bad sequence number") + } + } else { + lock(&m.noPLock) } gen := atomic.Load(&m.gen) % 3 return &m.stats[gen] @@ -868,14 +879,19 @@ func (m *consistentHeapStats) acquire(c *mcache) *heapStatsDelta { // acquire must no longer be accessed or modified after // release is called. // -// The mcache passed here must be the same as the one -// passed to acquire. -func (m *consistentHeapStats) release(c *mcache) { - seq := atomic.Xadd(&c.statsSeq, 1) - if seq%2 != 0 { - // Should have been incremented to even. - print("runtime: seq=", seq, "\n") - throw("bad sequence number") +// The caller's P must not change between acquire and +// release. This also means that the caller should not +// acquire a P or release its P in between. +func (m *consistentHeapStats) release() { + if pp := getg().m.p.ptr(); pp != nil { + seq := atomic.Xadd(&pp.statsSeq, 1) + if seq%2 != 0 { + // Should have been incremented to even. + print("runtime: seq=", seq, "\n") + throw("bad sequence number") + } + } else { + unlock(&m.noPLock) } } @@ -916,25 +932,33 @@ func (m *consistentHeapStats) read(out *heapStatsDelta) { // so it doesn't change out from under us. mp := acquirem() + // Get the current generation. We can be confident that this + // will not change since read is serialized and is the only + // one that modifies currGen. + currGen := atomic.Load(&m.gen) + prevGen := currGen - 1 + if currGen == 0 { + prevGen = 2 + } + + // Prevent writers without a P from writing while we update gen. + lock(&m.noPLock) + // Rotate gen, effectively taking a snapshot of the state of // these statistics at the point of the exchange by moving // writers to the next set of deltas. // // This exchange is safe to do because we won't race // with anyone else trying to update this value. - currGen := atomic.Load(&m.gen) atomic.Xchg(&m.gen, (currGen+1)%3) - prevGen := currGen - 1 - if currGen == 0 { - prevGen = 2 - } + + // Allow P-less writers to continue. They'll be writing to the + // next generation now. + unlock(&m.noPLock) + for _, p := range allp { - c := p.mcache - if c == nil { - continue - } // Spin until there are no more writers. - for atomic.Load(&c.statsSeq)%2 != 0 { + for atomic.Load(&p.statsSeq)%2 != 0 { } } @@ -951,5 +975,6 @@ func (m *consistentHeapStats) read(out *heapStatsDelta) { // Finally, copy out the complete delta. *out = m.stats[currGen] + releasem(mp) } diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 79529ac7ec..87949a2694 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -577,6 +577,10 @@ func schedinit() { lockInit(&trace.lock, lockRankTrace) lockInit(&cpuprof.lock, lockRankCpuprof) lockInit(&trace.stackTab.lock, lockRankTraceStackTab) + // Enforce that this lock is always a leaf lock. + // All of this lock's critical sections should be + // extremely short. + lockInit(&memstats.heapStats.noPLock, lockRankLeafRank) // raceinit must be the first call to race detector. // In particular, it must be done before mallocinit below calls racemapshadow. diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index 82fedd804b..c9376827da 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -654,8 +654,8 @@ type p struct { timerModifiedEarliest uint64 // Per-P GC state - gcAssistTime int64 // Nanoseconds in assistAlloc - gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic) + gcAssistTime int64 // Nanoseconds in assistAlloc + gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic) // gcMarkWorkerMode is the mode for the next mark worker to run in. // That is, this is used to communicate with the worker goroutine @@ -679,6 +679,10 @@ type p struct { runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point + // statsSeq is a counter indicating whether this P is currently + // writing any stats. Its value is even when not, odd when it is. + statsSeq uint32 + // Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. timersLock mutex -- 2.48.1