From 13df972f6885ebdeba1ea38f0acd99ea0f2bfb49 Mon Sep 17 00:00:00 2001 From: Michael Anthony Knyszek Date: Wed, 23 Jul 2025 17:35:54 +0000 Subject: [PATCH] runtime/metrics: add metrics for goroutine sched states This is largely a port of CL 38180. For #15490. Change-Id: I2726111e472e81e9f9f0f294df97872c2689f061 Reviewed-on: https://go-review.googlesource.com/c/go/+/690397 Reviewed-by: Michael Pratt Auto-Submit: Michael Knyszek LUCI-TryBot-Result: Go LUCI --- src/runtime/debug.go | 2 +- src/runtime/metrics.go | 112 ++++++++++++++++- src/runtime/metrics/description.go | 20 +++ src/runtime/metrics/doc.go | 20 +++ src/runtime/metrics_test.go | 193 +++++++++++++++++++++++++++++ src/runtime/mprof.go | 4 +- src/runtime/pipe_unix_test.go | 15 +++ src/runtime/pipe_windows_test.go | 13 ++ src/runtime/proc.go | 17 ++- src/runtime/runtime2.go | 3 +- 10 files changed, 391 insertions(+), 8 deletions(-) create mode 100644 src/runtime/pipe_unix_test.go create mode 100644 src/runtime/pipe_windows_test.go diff --git a/src/runtime/debug.go b/src/runtime/debug.go index dacadd2721..56b766f11f 100644 --- a/src/runtime/debug.go +++ b/src/runtime/debug.go @@ -177,7 +177,7 @@ func totalMutexWaitTimeNanos() int64 { // NumGoroutine returns the number of goroutines that currently exist. func NumGoroutine() int { - return int(gcount()) + return int(gcount(false)) } //go:linkname debug_modinfo runtime/debug.modinfo diff --git a/src/runtime/metrics.go b/src/runtime/metrics.go index ef3782b783..47b1b891e1 100644 --- a/src/runtime/metrics.go +++ b/src/runtime/metrics.go @@ -8,6 +8,7 @@ package runtime import ( "internal/godebugs" + "internal/runtime/atomic" "internal/runtime/gc" "unsafe" ) @@ -465,9 +466,38 @@ func initMetrics() { }, }, "/sched/goroutines:goroutines": { - compute: func(_ *statAggregate, out *metricValue) { + deps: makeStatDepSet(schedStatsDep), + compute: func(in *statAggregate, out *metricValue) { + out.kind = metricKindUint64 + out.scalar = uint64(in.schedStats.gTotal) + }, + }, + "/sched/goroutines/not-in-go:goroutines": { + deps: makeStatDepSet(schedStatsDep), + compute: func(in *statAggregate, out *metricValue) { + out.kind = metricKindUint64 + out.scalar = uint64(in.schedStats.gNonGo) + }, + }, + "/sched/goroutines/running:goroutines": { + deps: makeStatDepSet(schedStatsDep), + compute: func(in *statAggregate, out *metricValue) { + out.kind = metricKindUint64 + out.scalar = uint64(in.schedStats.gRunning) + }, + }, + "/sched/goroutines/runnable:goroutines": { + deps: makeStatDepSet(schedStatsDep), + compute: func(in *statAggregate, out *metricValue) { + out.kind = metricKindUint64 + out.scalar = uint64(in.schedStats.gRunnable) + }, + }, + "/sched/goroutines/waiting:goroutines": { + deps: makeStatDepSet(schedStatsDep), + compute: func(in *statAggregate, out *metricValue) { out.kind = metricKindUint64 - out.scalar = uint64(gcount()) + out.scalar = uint64(in.schedStats.gWaiting) }, }, "/sched/latencies:seconds": { @@ -547,6 +577,7 @@ const ( cpuStatsDep // corresponds to cpuStatsAggregate gcStatsDep // corresponds to gcStatsAggregate finalStatsDep // corresponds to finalStatsAggregate + schedStatsDep // corresponds to schedStatsAggregate numStatsDeps ) @@ -740,6 +771,80 @@ func (a *finalStatsAggregate) compute() { a.cleanupsQueued, a.cleanupsExecuted = gcCleanups.readQueueStats() } +// schedStatsAggregate contains stats about the scheduler, including +// an approximate count of goroutines in each state. +type schedStatsAggregate struct { + gTotal uint64 + gRunning uint64 + gRunnable uint64 + gNonGo uint64 + gWaiting uint64 +} + +// compute populates the schedStatsAggregate with values from the runtime. +func (a *schedStatsAggregate) compute() { + // Lock the scheduler so the global run queue can't change and + // the number of Ps can't change. This doesn't prevent the + // local run queues from changing, so the results are still + // approximate. + lock(&sched.lock) + + // Collect running/runnable from per-P run queues. + for _, p := range allp { + if p == nil || p.status == _Pdead { + break + } + switch p.status { + case _Prunning: + a.gRunning++ + case _Psyscall: + a.gNonGo++ + case _Pgcstop: + // The world is stopping or stopped. + // This is fine. The results will be + // slightly odd since nothing else + // is running, but it will be accurate. + } + + for { + h := atomic.Load(&p.runqhead) + t := atomic.Load(&p.runqtail) + next := atomic.Loaduintptr((*uintptr)(&p.runnext)) + runnable := int32(t - h) + if atomic.Load(&p.runqhead) != h || runnable < 0 { + continue + } + if next != 0 { + runnable++ + } + a.gRunnable += uint64(runnable) + break + } + } + + // Global run queue. + a.gRunnable += uint64(sched.runq.size) + + // Account for Gs that are in _Gsyscall without a P in _Psyscall. + nGsyscallNoP := sched.nGsyscallNoP.Load() + + // nGsyscallNoP can go negative during temporary races. + if nGsyscallNoP >= 0 { + a.gNonGo += uint64(nGsyscallNoP) + } + + // Compute the number of blocked goroutines. We have to + // include system goroutines in this count because we included + // them above. + a.gTotal = uint64(gcount(true)) + a.gWaiting = a.gTotal - (a.gRunning + a.gRunnable + a.gNonGo) + if a.gWaiting < 0 { + a.gWaiting = 0 + } + + unlock(&sched.lock) +} + // nsToSec takes a duration in nanoseconds and converts it to seconds as // a float64. func nsToSec(ns int64) float64 { @@ -758,6 +863,7 @@ type statAggregate struct { cpuStats cpuStatsAggregate gcStats gcStatsAggregate finalStats finalStatsAggregate + schedStats schedStatsAggregate } // ensure populates statistics aggregates determined by deps if they @@ -782,6 +888,8 @@ func (a *statAggregate) ensure(deps *statDepSet) { a.gcStats.compute() case finalStatsDep: a.finalStats.compute() + case schedStatsDep: + a.schedStats.compute() } } a.ensured = a.ensured.union(missing) diff --git a/src/runtime/metrics/description.go b/src/runtime/metrics/description.go index 4587f791e1..cf22bb73ad 100644 --- a/src/runtime/metrics/description.go +++ b/src/runtime/metrics/description.go @@ -437,6 +437,26 @@ var allDesc = []Description{ Description: "The current runtime.GOMAXPROCS setting, or the number of operating system threads that can execute user-level Go code simultaneously.", Kind: KindUint64, }, + { + Name: "/sched/goroutines/not-in-go:goroutines", + Description: "Approximate count of goroutines running or blocked in a system call or cgo call. Not guaranteed to add up to /sched/goroutines:goroutines with other goroutine metrics.", + Kind: KindUint64, + }, + { + Name: "/sched/goroutines/runnable:goroutines", + Description: "Approximate count of goroutines ready to execute, but not executing. Not guaranteed to add up to /sched/goroutines:goroutines with other goroutine metrics.", + Kind: KindUint64, + }, + { + Name: "/sched/goroutines/running:goroutines", + Description: "Approximate count of goroutines executing. Always less than or equal to /sched/gomaxprocs:threads. Not guaranteed to add up to /sched/goroutines:goroutines with other goroutine metrics.", + Kind: KindUint64, + }, + { + Name: "/sched/goroutines/waiting:goroutines", + Description: "Approximate count of goroutines waiting on a resource (I/O or sync primitives). Not guaranteed to add up to /sched/goroutines:goroutines with other goroutine metrics.", + Kind: KindUint64, + }, { Name: "/sched/goroutines:goroutines", Description: "Count of live goroutines.", diff --git a/src/runtime/metrics/doc.go b/src/runtime/metrics/doc.go index 058769ac3a..c379b201b4 100644 --- a/src/runtime/metrics/doc.go +++ b/src/runtime/metrics/doc.go @@ -509,6 +509,26 @@ Below is the full list of supported metrics, ordered lexicographically. operating system threads that can execute user-level Go code simultaneously. + /sched/goroutines/not-in-go:goroutines + Approximate count of goroutines running or blocked in + a system call or cgo call. Not guaranteed to add up to + /sched/goroutines:goroutines with other goroutine metrics. + + /sched/goroutines/runnable:goroutines + Approximate count of goroutines ready to execute, + but not executing. Not guaranteed to add up to + /sched/goroutines:goroutines with other goroutine metrics. + + /sched/goroutines/running:goroutines + Approximate count of goroutines executing. Always less than or + equal to /sched/gomaxprocs:threads. Not guaranteed to add up to + /sched/goroutines:goroutines with other goroutine metrics. + + /sched/goroutines/waiting:goroutines + Approximate count of goroutines waiting on a resource + (I/O or sync primitives). Not guaranteed to add up to + /sched/goroutines:goroutines with other goroutine metrics. + /sched/goroutines:goroutines Count of live goroutines. diff --git a/src/runtime/metrics_test.go b/src/runtime/metrics_test.go index 5787c96084..5b16cbcb22 100644 --- a/src/runtime/metrics_test.go +++ b/src/runtime/metrics_test.go @@ -22,6 +22,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "testing" "time" "unsafe" @@ -1575,3 +1576,195 @@ func TestReadMetricsFinalizers(t *testing.T) { t.Errorf("expected %s difference to be exactly %d, got %d -> %d", before[1].Name, N, v0, v1) } } + +func TestReadMetricsSched(t *testing.T) { + const ( + notInGo = iota + runnable + running + waiting + ) + var s [4]metrics.Sample + s[0].Name = "/sched/goroutines/not-in-go:goroutines" + s[1].Name = "/sched/goroutines/runnable:goroutines" + s[2].Name = "/sched/goroutines/running:goroutines" + s[3].Name = "/sched/goroutines/waiting:goroutines" + + logMetrics := func(t *testing.T, s []metrics.Sample) { + for i := range s { + t.Logf("%s: %d", s[i].Name, s[i].Value.Uint64()) + } + } + + // generalSlack is the amount of goroutines we allow ourselves to be + // off by in any given category, either due to background system + // goroutines or testing package goroutines. + const generalSlack = 4 + + // waitingSlack is the max number of blocked goroutines left + // from other tests, the testing package, or system + // goroutines. + const waitingSlack = 100 + + // Make sure GC isn't running, since GC workers interfere with + // expected counts. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + runtime.GC() + + check := func(t *testing.T, s *metrics.Sample, min, max uint64) { + val := s.Value.Uint64() + if val < min { + t.Errorf("%s too low; %d < %d", s.Name, val, min) + } + if val > max { + t.Errorf("%s too high; %d > %d", s.Name, val, max) + } + } + checkEq := func(t *testing.T, s *metrics.Sample, value uint64) { + check(t, s, value, value) + } + spinUntil := func(f func() bool, timeout time.Duration) bool { + start := time.Now() + for time.Since(start) < timeout { + if f() { + return true + } + time.Sleep(time.Millisecond) + } + return false + } + + // Check base values. + t.Run("base", func(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) + metrics.Read(s[:]) + logMetrics(t, s[:]) + check(t, &s[notInGo], 0, generalSlack) + check(t, &s[runnable], 0, generalSlack) + checkEq(t, &s[running], 1) + check(t, &s[waiting], 0, waitingSlack) + }) + + // Force Running count to be high. We'll use these goroutines + // for Runnable, too. + const count = 10 + var ready, exit atomic.Uint32 + for i := 0; i < count-1; i++ { + go func() { + ready.Add(1) + for exit.Load() == 0 { + // Spin to get us and keep us running, but check + // the exit condition so we exit out early if we're + // done. + start := time.Now() + for time.Since(start) < 10*time.Millisecond && exit.Load() == 0 { + } + runtime.Gosched() + } + }() + } + for ready.Load() < count-1 { + runtime.Gosched() + } + + // Be careful. We've entered a dangerous state for platforms + // that do not return back to the underlying system unless all + // goroutines are blocked, like js/wasm, since we have a bunch + // of runnable goroutines all spinning. We cannot write anything + // out. + if testenv.HasParallelism() { + t.Run("running", func(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(count + 4)) + // It can take a little bit for the scheduler to + // distribute the goroutines to Ps, so retry for a + // while. + spinUntil(func() bool { + metrics.Read(s[:]) + return s[running].Value.Uint64() >= count + }, time.Second) + logMetrics(t, s[:]) + check(t, &s[running], count, count+4) + }) + + // Force runnable count to be high. + t.Run("runnable", func(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) + metrics.Read(s[:]) + logMetrics(t, s[:]) + checkEq(t, &s[running], 1) + check(t, &s[runnable], count-1, count+generalSlack) + }) + + // Done with the running/runnable goroutines. + exit.Store(1) + } else { + // Read metrics and then exit all the other goroutines, + // so that system calls may proceed. + metrics.Read(s[:]) + + // Done with the running/runnable goroutines. + exit.Store(1) + + // Now we can check our invariants. + t.Run("running", func(t *testing.T) { + logMetrics(t, s[:]) + checkEq(t, &s[running], 1) + }) + t.Run("runnable", func(t *testing.T) { + logMetrics(t, s[:]) + check(t, &s[runnable], count-1, count+generalSlack) + }) + } + + // Force not-in-go count to be high. This is a little tricky since + // we try really hard not to let things block in system calls. + // We have to drop to the syscall package to do this reliably. + t.Run("not-in-go", func(t *testing.T) { + // Block a bunch of goroutines on an OS pipe. + pr, pw, err := pipe() + if err != nil { + switch runtime.GOOS { + case "js", "wasip1": + t.Skip("creating pipe:", err) + } + t.Fatal("creating pipe:", err) + } + for i := 0; i < count; i++ { + go syscall.Read(pr, make([]byte, 1)) + } + + // Let the goroutines block. + spinUntil(func() bool { + metrics.Read(s[:]) + return s[notInGo].Value.Uint64() >= count + }, time.Second) + + metrics.Read(s[:]) + logMetrics(t, s[:]) + check(t, &s[notInGo], count, count+generalSlack) + + syscall.Close(pw) + syscall.Close(pr) + }) + + t.Run("waiting", func(t *testing.T) { + // Force waiting count to be high. + const waitingCount = 1000 + stop = make(chan bool) + for i := 0; i < waitingCount; i++ { + go func() { <-stop }() + } + + // Let the goroutines block. + spinUntil(func() bool { + metrics.Read(s[:]) + return s[waiting].Value.Uint64() >= waitingCount + }, time.Second) + + metrics.Read(s[:]) + logMetrics(t, s[:]) + check(t, &s[waiting], waitingCount, waitingCount+waitingSlack) + + close(stop) + }) +} diff --git a/src/runtime/mprof.go b/src/runtime/mprof.go index b2ff257f65..97b2907652 100644 --- a/src/runtime/mprof.go +++ b/src/runtime/mprof.go @@ -1308,7 +1308,7 @@ func goroutineProfileWithLabelsConcurrent(p []profilerecord.StackRecord, labels // allocation estimate without bothering to STW. As long as // this is close, then we'll only need to STW once (on the next // call). - return int(gcount()), false + return int(gcount(false)), false } semacquire(&goroutineProfile.sema) @@ -1324,7 +1324,7 @@ func goroutineProfileWithLabelsConcurrent(p []profilerecord.StackRecord, labels // goroutines that can vary between user and system to ensure that the count // doesn't change during the collection. So, check the finalizer goroutine // and cleanup goroutines in particular. - n = int(gcount()) + n = int(gcount(false)) if fingStatus.Load()&fingRunningFinalizer != 0 { n++ } diff --git a/src/runtime/pipe_unix_test.go b/src/runtime/pipe_unix_test.go new file mode 100644 index 0000000000..82a49df339 --- /dev/null +++ b/src/runtime/pipe_unix_test.go @@ -0,0 +1,15 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !windows + +package runtime_test + +import "syscall" + +func pipe() (r, w int, err error) { + var p [2]int + err = syscall.Pipe(p[:]) + return p[0], p[1], err +} diff --git a/src/runtime/pipe_windows_test.go b/src/runtime/pipe_windows_test.go new file mode 100644 index 0000000000..ad84ec918a --- /dev/null +++ b/src/runtime/pipe_windows_test.go @@ -0,0 +1,13 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime_test + +import "syscall" + +func pipe() (r, w syscall.Handle, err error) { + var p [2]syscall.Handle + err = syscall.Pipe(p[:]) + return p[0], p[1], err +} diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 8d5f2fc793..15f10f087e 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -1648,6 +1648,7 @@ func stopTheWorldWithSema(reason stwReason) worldStop { if trace.ok() { trace.ProcSteal(pp, false) } + sched.nGsyscallNoP.Add(1) pp.syscalltick++ pp.gcStopTime = nanotime() sched.stopwait-- @@ -2174,6 +2175,7 @@ func forEachPInternal(fn func(*p)) { trace.ProcSteal(p2, false) traceRelease(trace) } + sched.nGsyscallNoP.Add(1) p2.syscalltick++ handoffp(p2) } else if trace.ok() { @@ -2447,6 +2449,7 @@ func needm(signal bool) { // mp.curg is now a real goroutine. casgstatus(mp.curg, _Gdead, _Gsyscall) sched.ngsys.Add(-1) + sched.nGsyscallNoP.Add(1) if !signal { if trace.ok() { @@ -2582,6 +2585,7 @@ func dropm() { casgstatus(mp.curg, _Gsyscall, _Gdead) mp.curg.preemptStop = false sched.ngsys.Add(1) + sched.nGsyscallNoP.Add(-1) if !mp.isExtraInSig { if trace.ok() { @@ -4675,6 +4679,7 @@ func entersyscall_gcwait() { trace.ProcSteal(pp, true) traceRelease(trace) } + sched.nGsyscallNoP.Add(1) pp.gcStopTime = nanotime() pp.syscalltick++ if sched.stopwait--; sched.stopwait == 0 { @@ -4707,6 +4712,8 @@ func entersyscallblock() { gp.m.syscalltick = gp.m.p.ptr().syscalltick gp.m.p.ptr().syscalltick++ + sched.nGsyscallNoP.Add(1) + // Leave SP around for GC and traceback. pc := sys.GetCallerPC() sp := sys.GetCallerSP() @@ -4927,6 +4934,7 @@ func exitsyscallfast_pidle() bool { } unlock(&sched.lock) if pp != nil { + sched.nGsyscallNoP.Add(-1) acquirep(pp) return true } @@ -4953,6 +4961,7 @@ func exitsyscall0(gp *g) { trace.GoSysExit(true) traceRelease(trace) } + sched.nGsyscallNoP.Add(-1) dropg() lock(&sched.lock) var pp *p @@ -5528,8 +5537,11 @@ func badunlockosthread() { throw("runtime: internal error: misuse of lockOSThread/unlockOSThread") } -func gcount() int32 { - n := int32(atomic.Loaduintptr(&allglen)) - sched.gFree.stack.size - sched.gFree.noStack.size - sched.ngsys.Load() +func gcount(includeSys bool) int32 { + n := int32(atomic.Loaduintptr(&allglen)) - sched.gFree.stack.size - sched.gFree.noStack.size + if !includeSys { + n -= sched.ngsys.Load() + } for _, pp := range allp { n -= pp.gFree.size } @@ -6404,6 +6416,7 @@ func retake(now int64) uint32 { trace.ProcSteal(pp, false) traceRelease(trace) } + sched.nGsyscallNoP.Add(1) n++ pp.syscalltick++ handoffp(pp) diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index b5d2dcefad..c5d15754ec 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -792,7 +792,8 @@ type schedt struct { nmsys int32 // number of system m's not counted for deadlock nmfreed int64 // cumulative number of freed m's - ngsys atomic.Int32 // number of system goroutines + ngsys atomic.Int32 // number of system goroutines + nGsyscallNoP atomic.Int32 // number of goroutines in syscalls without a P pidle puintptr // idle p's npidle atomic.Int32 -- 2.51.0