From d90ce588eac7b9105c0ca556a7c6e975fd5c1eca Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Tue, 11 Jun 2024 11:02:18 -0700 Subject: [PATCH] internal/synctest: new package for testing concurrent code Add an internal (for now) implementation of testing/synctest. The synctest.Run function executes a tree of goroutines in an isolated environment using a fake clock. The synctest.Wait function allows a test to wait for all other goroutines within the test to reach a blocking point. For #67434 For #69687 Change-Id: Icb39e54c54cece96517e58ef9cfb18bf68506cfc Reviewed-on: https://go-review.googlesource.com/c/go/+/591997 Reviewed-by: Michael Pratt LUCI-TryBot-Result: Go LUCI --- src/go/build/deps_test.go | 3 + src/internal/synctest/synctest.go | 62 ++++ src/internal/synctest/synctest_test.go | 407 ++++++++++++++++++++++ src/runtime/chan.go | 39 ++- src/runtime/coro.go | 18 +- src/runtime/lockrank.go | 23 +- src/runtime/mgc.go | 17 +- src/runtime/mgcmark.go | 11 + src/runtime/mklockrank.go | 4 + src/runtime/proc.go | 49 ++- src/runtime/runtime2.go | 35 +- src/runtime/select.go | 21 +- src/runtime/sema.go | 13 + src/runtime/sizeof_test.go | 2 +- src/runtime/synctest.go | 256 ++++++++++++++ src/runtime/synctest_test.go | 17 + src/runtime/testdata/testsynctest/main.go | 67 ++++ src/runtime/time.go | 101 +++++- src/runtime/time_linux_amd64.s | 2 +- src/runtime/traceback.go | 3 + src/runtime/traceruntime.go | 2 + src/sync/runtime.go | 3 + src/sync/waitgroup.go | 2 +- src/time/time.go | 28 +- src/time/zoneinfo_plan9.go | 2 +- src/time/zoneinfo_read.go | 2 +- 26 files changed, 1154 insertions(+), 35 deletions(-) create mode 100644 src/internal/synctest/synctest.go create mode 100644 src/internal/synctest/synctest_test.go create mode 100644 src/runtime/synctest.go create mode 100644 src/runtime/synctest_test.go create mode 100644 src/runtime/testdata/testsynctest/main.go diff --git a/src/go/build/deps_test.go b/src/go/build/deps_test.go index b999699a8d..5ab7ea8d0d 100644 --- a/src/go/build/deps_test.go +++ b/src/go/build/deps_test.go @@ -643,6 +643,9 @@ var depsRules = ` FMT, DEBUG, flag, runtime/trace, internal/sysinfo, math/rand < testing; + RUNTIME + < internal/synctest; + log/slog, testing < testing/slogtest; diff --git a/src/internal/synctest/synctest.go b/src/internal/synctest/synctest.go new file mode 100644 index 0000000000..eb317fbf82 --- /dev/null +++ b/src/internal/synctest/synctest.go @@ -0,0 +1,62 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package synctest provides support for testing concurrent code. +package synctest + +import ( + _ "unsafe" // for go:linkname +) + +// Run executes f in a new goroutine. +// +// The new goroutine and any goroutines transitively started by it form +// an isolated "bubble". +// Run waits for all goroutines in the bubble to exit before returning. +// +// Goroutines in the bubble use a synthetic time implementation. +// The initial time is midnight UTC 2000-01-01. +// +// Time advances when every goroutine in the bubble is blocked. +// For example, a call to time.Sleep will block until all other +// goroutines are blocked and return after the bubble's clock has +// advanced. See [Wait] for the specific definition of blocked. +// +// If every goroutine is blocked and there are no timers scheduled, +// Run panics. +// +// Channels, time.Timers, and time.Tickers created within the bubble +// are associated with it. Operating on a bubbled channel, timer, or ticker +// from outside the bubble panics. +// +//go:linkname Run +func Run(f func()) + +// Wait blocks until every goroutine within the current bubble, +// other than the current goroutine, is durably blocked. +// It panics if called from a non-bubbled goroutine, +// or if two goroutines in the same bubble call Wait at the same time. +// +// A goroutine is durably blocked if can only be unblocked by another +// goroutine in its bubble. The following operations durably block +// a goroutine: +// - a send or receive on a channel from within the bubble +// - a select statement where every case is a channel within the bubble +// - sync.Cond.Wait +// - time.Sleep +// +// A goroutine executing a system call or waiting for an external event +// such as a network operation is not durably blocked. +// For example, a goroutine blocked reading from an network connection +// is not durably blocked even if no data is currently available on the +// connection, because it may be unblocked by data written from outside +// the bubble or may be in the process of receiving data from a kernel +// network buffer. +// +// A goroutine is not durably blocked when blocked on a send or receive +// on a channel that was not created within its bubble, because it may +// be unblocked by a channel receive or send from outside its bubble. +// +//go:linkname Wait +func Wait() diff --git a/src/internal/synctest/synctest_test.go b/src/internal/synctest/synctest_test.go new file mode 100644 index 0000000000..2c4ac0ff64 --- /dev/null +++ b/src/internal/synctest/synctest_test.go @@ -0,0 +1,407 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package synctest_test + +import ( + "fmt" + "internal/synctest" + "iter" + "reflect" + "slices" + "strconv" + "sync" + "testing" + "time" +) + +func TestNow(t *testing.T) { + start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).In(time.Local) + synctest.Run(func() { + // Time starts at 2000-1-1 00:00:00. + if got, want := time.Now(), start; !got.Equal(want) { + t.Errorf("at start: time.Now = %v, want %v", got, want) + } + go func() { + // New goroutines see the same fake clock. + if got, want := time.Now(), start; !got.Equal(want) { + t.Errorf("time.Now = %v, want %v", got, want) + } + }() + // Time advances after a sleep. + time.Sleep(1 * time.Second) + if got, want := time.Now(), start.Add(1*time.Second); !got.Equal(want) { + t.Errorf("after sleep: time.Now = %v, want %v", got, want) + } + }) +} + +func TestRunEmpty(t *testing.T) { + synctest.Run(func() { + }) +} + +func TestSimpleWait(t *testing.T) { + synctest.Run(func() { + synctest.Wait() + }) +} + +func TestGoroutineWait(t *testing.T) { + synctest.Run(func() { + go func() {}() + synctest.Wait() + }) +} + +// TestWait starts a collection of goroutines. +// It checks that synctest.Wait waits for all goroutines to exit before returning. +func TestWait(t *testing.T) { + synctest.Run(func() { + done := false + ch := make(chan int) + var f func() + f = func() { + count := <-ch + if count == 0 { + done = true + } else { + go f() + ch <- count - 1 + } + } + go f() + ch <- 100 + synctest.Wait() + if !done { + t.Fatalf("done = false, want true") + } + }) +} + +func TestMallocs(t *testing.T) { + for i := 0; i < 100; i++ { + synctest.Run(func() { + done := false + ch := make(chan []byte) + var f func() + f = func() { + b := <-ch + if len(b) == 0 { + done = true + } else { + go f() + ch <- make([]byte, len(b)-1) + } + } + go f() + ch <- make([]byte, 100) + synctest.Wait() + if !done { + t.Fatalf("done = false, want true") + } + }) + } +} + +func TestTimer(t *testing.T) { + synctest.Run(func() { + start := time.Now() + tm := time.NewTimer(5 * time.Second) + <-tm.C + if got, want := time.Since(start), 5*time.Second; got != want { + t.Errorf("after sleep: time.Since(start) = %v, want %v", got, want) + } + }) +} + +func TestTimeAfter(t *testing.T) { + synctest.Run(func() { + i := 0 + time.AfterFunc(1*time.Second, func() { + // Ensure synctest group membership propagates through the AfterFunc. + i++ // 1 + go func() { + time.Sleep(1 * time.Second) + i++ // 2 + }() + }) + time.Sleep(3 * time.Second) + synctest.Wait() + if got, want := i, 2; got != want { + t.Errorf("after sleep and wait: i = %v, want %v", got, want) + } + }) +} + +func TestTimerFromOutsideBubble(t *testing.T) { + tm := time.NewTimer(10 * time.Millisecond) + synctest.Run(func() { + defer wantPanic(t, "timer moved between synctest groups") + <-tm.C + }) +} + +func TestChannelFromOutsideBubble(t *testing.T) { + choutside := make(chan struct{}) + for _, test := range []struct { + desc string + outside func(ch chan int) + inside func(ch chan int) + }{{ + desc: "read closed", + outside: func(ch chan int) { close(ch) }, + inside: func(ch chan int) { <-ch }, + }, { + desc: "read value", + outside: func(ch chan int) { ch <- 0 }, + inside: func(ch chan int) { <-ch }, + }, { + desc: "write value", + outside: func(ch chan int) { <-ch }, + inside: func(ch chan int) { ch <- 0 }, + }, { + desc: "select outside only", + outside: func(ch chan int) { close(ch) }, + inside: func(ch chan int) { + select { + case <-ch: + case <-choutside: + } + }, + }, { + desc: "select mixed", + outside: func(ch chan int) { close(ch) }, + inside: func(ch chan int) { + ch2 := make(chan struct{}) + select { + case <-ch: + case <-ch2: + } + }, + }} { + t.Run(test.desc, func(t *testing.T) { + ch := make(chan int) + time.AfterFunc(1*time.Millisecond, func() { + test.outside(ch) + }) + synctest.Run(func() { + test.inside(ch) + }) + }) + } +} + +func TestTimerFromInsideBubble(t *testing.T) { + for _, test := range []struct { + desc string + f func(tm *time.Timer) + wantPanic string + }{{ + desc: "read channel", + f: func(tm *time.Timer) { + <-tm.C + }, + wantPanic: "receive on synctest channel from outside bubble", + }, { + desc: "Reset", + f: func(tm *time.Timer) { + tm.Reset(1 * time.Second) + }, + wantPanic: "reset of synctest timer from outside bubble", + }, { + desc: "Stop", + f: func(tm *time.Timer) { + tm.Stop() + }, + wantPanic: "stop of synctest timer from outside bubble", + }} { + t.Run(test.desc, func(t *testing.T) { + donec := make(chan struct{}) + ch := make(chan *time.Timer) + go func() { + defer close(donec) + defer wantPanic(t, test.wantPanic) + test.f(<-ch) + }() + synctest.Run(func() { + tm := time.NewTimer(1 * time.Second) + ch <- tm + }) + <-donec + }) + } +} + +func TestDeadlockRoot(t *testing.T) { + defer wantPanic(t, "deadlock: all goroutines in bubble are blocked") + synctest.Run(func() { + select {} + }) +} + +func TestDeadlockChild(t *testing.T) { + defer wantPanic(t, "deadlock: all goroutines in bubble are blocked") + synctest.Run(func() { + go func() { + select {} + }() + }) +} + +func TestCond(t *testing.T) { + synctest.Run(func() { + var mu sync.Mutex + cond := sync.NewCond(&mu) + start := time.Now() + const waitTime = 1 * time.Millisecond + + go func() { + // Signal the cond. + time.Sleep(waitTime) + mu.Lock() + cond.Signal() + mu.Unlock() + + // Broadcast to the cond. + time.Sleep(waitTime) + mu.Lock() + cond.Broadcast() + mu.Unlock() + }() + + // Wait for cond.Signal. + mu.Lock() + cond.Wait() + mu.Unlock() + if got, want := time.Since(start), waitTime; got != want { + t.Errorf("after cond.Signal: time elapsed = %v, want %v", got, want) + } + + // Wait for cond.Broadcast in two goroutines. + waiterDone := false + go func() { + mu.Lock() + cond.Wait() + mu.Unlock() + waiterDone = true + }() + mu.Lock() + cond.Wait() + mu.Unlock() + synctest.Wait() + if !waiterDone { + t.Errorf("after cond.Broadcast: waiter not done") + } + if got, want := time.Since(start), 2*waitTime; got != want { + t.Errorf("after cond.Broadcast: time elapsed = %v, want %v", got, want) + } + }) +} + +func TestIteratorPush(t *testing.T) { + synctest.Run(func() { + seq := func(yield func(time.Time) bool) { + for yield(time.Now()) { + time.Sleep(1 * time.Second) + } + } + var got []time.Time + go func() { + for now := range seq { + got = append(got, now) + if len(got) >= 3 { + break + } + } + }() + want := []time.Time{ + time.Now(), + time.Now().Add(1 * time.Second), + time.Now().Add(2 * time.Second), + } + time.Sleep(5 * time.Second) + synctest.Wait() + if !slices.Equal(got, want) { + t.Errorf("got: %v; want: %v", got, want) + } + }) +} + +func TestIteratorPull(t *testing.T) { + synctest.Run(func() { + seq := func(yield func(time.Time) bool) { + for yield(time.Now()) { + time.Sleep(1 * time.Second) + } + } + var got []time.Time + go func() { + next, stop := iter.Pull(seq) + defer stop() + for len(got) < 3 { + now, _ := next() + got = append(got, now) + } + }() + want := []time.Time{ + time.Now(), + time.Now().Add(1 * time.Second), + time.Now().Add(2 * time.Second), + } + time.Sleep(5 * time.Second) + synctest.Wait() + if !slices.Equal(got, want) { + t.Errorf("got: %v; want: %v", got, want) + } + }) +} + +func TestReflectFuncOf(t *testing.T) { + mkfunc := func(name string, i int) { + reflect.FuncOf([]reflect.Type{ + reflect.StructOf([]reflect.StructField{{ + Name: name + strconv.Itoa(i), + Type: reflect.TypeOf(0), + }}), + }, nil, false) + } + go func() { + for i := 0; i < 100000; i++ { + mkfunc("A", i) + } + }() + synctest.Run(func() { + for i := 0; i < 100000; i++ { + mkfunc("A", i) + } + }) +} + +func TestWaitGroup(t *testing.T) { + synctest.Run(func() { + var wg sync.WaitGroup + wg.Add(1) + const delay = 1 * time.Second + go func() { + time.Sleep(delay) + wg.Done() + }() + start := time.Now() + wg.Wait() + if got := time.Since(start); got != delay { + t.Fatalf("WaitGroup.Wait() took %v, want %v", got, delay) + } + }) +} + +func wantPanic(t *testing.T, want string) { + if e := recover(); e != nil { + if got := fmt.Sprint(e); got != want { + t.Errorf("got panic message %q, want %q", got, want) + } + } else { + t.Errorf("got no panic, want one") + } +} diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 1702e555ac..8e09653707 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -36,6 +36,7 @@ type hchan struct { dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 + synctest bool // true if created in a synctest bubble closed uint32 timer *timer // timer feeding this chan elemtype *_type // element type @@ -112,6 +113,9 @@ func makechan(t *chantype, size int) *hchan { c.elemsize = uint16(elem.Size_) c.elemtype = elem c.dataqsiz = uint(size) + if getg().syncGroup != nil { + c.synctest = true + } lockInit(&c.lock, lockRankHchan) if debugChan { @@ -186,6 +190,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) } + if c.synctest && getg().syncGroup == nil { + panic(plainError("send on synctest channel from outside bubble")) + } + // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is @@ -268,7 +276,11 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) - gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) + reason := waitReasonChanSend + if c.synctest { + reason = waitReasonSynctestChanSend + } + gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the @@ -304,6 +316,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { + if c.synctest && sg.g.syncGroup != getg().syncGroup { + unlockf() + panic(plainError("send on synctest channel from outside bubble")) + } if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) @@ -518,6 +534,10 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) throw("unreachable") } + if c.synctest && getg().syncGroup == nil { + panic(plainError("receive on synctest channel from outside bubble")) + } + if c.timer != nil { c.timer.maybeRunChan() } @@ -637,7 +657,11 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) - gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) + reason := waitReasonChanReceive + if c.synctest { + reason = waitReasonSynctestChanReceive + } + gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2) // someone woke us up if mysg != gp.waiting { @@ -673,6 +697,10 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { + if c.synctest && sg.g.syncGroup != getg().syncGroup { + unlockf() + panic(plainError("receive on synctest channel from outside bubble")) + } if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) @@ -876,8 +904,11 @@ func (q *waitq) dequeue() *sudog { // We use a flag in the G struct to tell us when someone // else has won the race to signal this goroutine but the goroutine // hasn't removed itself from the queue yet. - if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { - continue + if sgp.isSelect { + if !sgp.g.selectDone.CompareAndSwap(0, 1) { + // We lost the race to wake this goroutine. + continue + } } return sgp diff --git a/src/runtime/coro.go b/src/runtime/coro.go index d378e92de8..f2eb8c9802 100644 --- a/src/runtime/coro.go +++ b/src/runtime/coro.go @@ -137,6 +137,16 @@ func coroswitch_m(gp *g) { // emitting an event for every single transition. trace := traceAcquire() + canCAS := true + sg := gp.syncGroup + if sg != nil { + // If we're in a synctest group, always use casgstatus (which tracks + // group idleness) rather than directly CASing. Mark the group as active + // while we're in the process of transferring control. + canCAS = false + sg.incActive() + } + if locked { // Detach the goroutine from the thread; we'll attach to the goroutine we're // switching to before returning. @@ -155,7 +165,7 @@ func coroswitch_m(gp *g) { // If we can CAS ourselves directly from running to waiting, so do, // keeping the control transfer as lightweight as possible. gp.waitreason = waitReasonCoroutine - if !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) { + if !canCAS || !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) { // The CAS failed: use casgstatus, which will take care of // coordinating with the garbage collector about the state change. casgstatus(gp, _Grunning, _Gwaiting) @@ -223,7 +233,7 @@ func coroswitch_m(gp *g) { tryRecordGoroutineProfile(gnext, nil, osyield) } - if !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) { + if !canCAS || !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) { // The CAS failed: use casgstatus, which will take care of // coordinating with the garbage collector about the state change. casgstatus(gnext, _Gwaiting, _Grunnable) @@ -241,6 +251,10 @@ func coroswitch_m(gp *g) { traceRelease(trace) } + if sg != nil { + sg.decActive() + } + // Switch to gnext. Does not return. gogo(&gnext.sched) } diff --git a/src/runtime/lockrank.go b/src/runtime/lockrank.go index 373838332f..7a5a618517 100644 --- a/src/runtime/lockrank.go +++ b/src/runtime/lockrank.go @@ -43,6 +43,7 @@ const ( lockRankRoot lockRankItab lockRankReflectOffs + lockRankSynctest lockRankUserArenaState // TRACEGLOBAL lockRankTraceBuf @@ -116,6 +117,7 @@ var lockNames = []string{ lockRankRoot: "root", lockRankItab: "itab", lockRankReflectOffs: "reflectOffs", + lockRankSynctest: "synctest", lockRankUserArenaState: "userArenaState", lockRankTraceBuf: "traceBuf", lockRankTraceStrings: "traceStrings", @@ -196,6 +198,7 @@ var lockPartialOrder [][]lockRank = [][]lockRank{ lockRankRoot: {}, lockRankItab: {}, lockRankReflectOffs: {lockRankItab}, + lockRankSynctest: {lockRankSysmon, lockRankScavenge, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankRoot, lockRankItab, lockRankReflectOffs}, lockRankUserArenaState: {}, lockRankTraceBuf: {lockRankSysmon, lockRankScavenge}, lockRankTraceStrings: {lockRankSysmon, lockRankScavenge, lockRankTraceBuf}, @@ -208,16 +211,16 @@ var lockPartialOrder [][]lockRank = [][]lockRank{ lockRankProfBlock: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings}, lockRankProfMemActive: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings}, lockRankProfMemFuture: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankProfMemActive}, - lockRankGscan: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture}, - lockRankStackpool: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, - lockRankStackLarge: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, - lockRankHchanLeaf: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankHchanLeaf}, - lockRankWbufSpans: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, - lockRankMheap: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans}, - lockRankMheapSpecial: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap}, - lockRankGlobalAlloc: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap, lockRankMheapSpecial}, - lockRankTrace: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap}, - lockRankTraceStackTab: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap, lockRankTrace}, + lockRankGscan: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture}, + lockRankStackpool: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, + lockRankStackLarge: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, + lockRankHchanLeaf: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankHchanLeaf}, + lockRankWbufSpans: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan}, + lockRankMheap: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans}, + lockRankMheapSpecial: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap}, + lockRankGlobalAlloc: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap, lockRankMheapSpecial}, + lockRankTrace: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap}, + lockRankTraceStackTab: {lockRankSysmon, lockRankScavenge, lockRankForcegc, lockRankDefer, lockRankSweepWaiters, lockRankAssistQueue, lockRankStrongFromWeakQueue, lockRankSweep, lockRankTestR, lockRankTimerSend, lockRankExecW, lockRankCpuprof, lockRankPollCache, lockRankPollDesc, lockRankWakeableSleep, lockRankHchan, lockRankAllocmR, lockRankExecR, lockRankSched, lockRankAllg, lockRankAllp, lockRankNotifyList, lockRankSudog, lockRankTimers, lockRankTimer, lockRankNetpollInit, lockRankRoot, lockRankItab, lockRankReflectOffs, lockRankSynctest, lockRankUserArenaState, lockRankTraceBuf, lockRankTraceStrings, lockRankFin, lockRankSpanSetSpine, lockRankMspanSpecial, lockRankGcBitsArenas, lockRankProfInsert, lockRankProfBlock, lockRankProfMemActive, lockRankProfMemFuture, lockRankGscan, lockRankStackpool, lockRankStackLarge, lockRankWbufSpans, lockRankMheap, lockRankTrace}, lockRankPanic: {}, lockRankDeadlock: {lockRankPanic, lockRankDeadlock}, lockRankRaceFini: {lockRankPanic}, diff --git a/src/runtime/mgc.go b/src/runtime/mgc.go index b3741a2e59..48001cfdb9 100644 --- a/src/runtime/mgc.go +++ b/src/runtime/mgc.go @@ -639,6 +639,17 @@ func gcStart(trigger gcTrigger) { releasem(mp) mp = nil + if gp := getg(); gp.syncGroup != nil { + // Disassociate the G from its synctest bubble while allocating. + // This is less elegant than incrementing the group's active count, + // but avoids any contamination between GC and synctest. + sg := gp.syncGroup + gp.syncGroup = nil + defer func() { + gp.syncGroup = sg + }() + } + // Pick up the remaining unswept/not being swept spans concurrently // // This shouldn't happen if we're being invoked in background @@ -1774,8 +1785,12 @@ func boring_registerCache(p unsafe.Pointer) { //go:linkname unique_runtime_registerUniqueMapCleanup unique.runtime_registerUniqueMapCleanup func unique_runtime_registerUniqueMapCleanup(f func()) { + // Create the channel on the system stack so it doesn't inherit the current G's + // synctest bubble (if any). + systemstack(func() { + uniqueMapCleanup = make(chan struct{}, 1) + }) // Start the goroutine in the runtime so it's counted as a system goroutine. - uniqueMapCleanup = make(chan struct{}, 1) go func(cleanup func()) { for { <-uniqueMapCleanup diff --git a/src/runtime/mgcmark.go b/src/runtime/mgcmark.go index 6e2bd8b948..823b2bd7df 100644 --- a/src/runtime/mgcmark.go +++ b/src/runtime/mgcmark.go @@ -428,6 +428,17 @@ func gcAssistAlloc(gp *g) { return } + if gp := getg(); gp.syncGroup != nil { + // Disassociate the G from its synctest bubble while allocating. + // This is less elegant than incrementing the group's active count, + // but avoids any contamination between GC assist and synctest. + sg := gp.syncGroup + gp.syncGroup = nil + defer func() { + gp.syncGroup = sg + }() + } + // This extremely verbose boolean indicates whether we've // entered mark assist from the perspective of the tracer. // diff --git a/src/runtime/mklockrank.go b/src/runtime/mklockrank.go index 3391afc657..e4a749dd31 100644 --- a/src/runtime/mklockrank.go +++ b/src/runtime/mklockrank.go @@ -95,6 +95,9 @@ NONE < itab < reflectOffs; +# Synctest +hchan, root, timers, timer, notifyList, reflectOffs < synctest; + # User arena state NONE < userArenaState; @@ -145,6 +148,7 @@ gcBitsArenas, profInsert, profMemFuture, spanSetSpine, + synctest, fin, root # Anything that can grow the stack can acquire STACKGROW. diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 17c375de1a..e7f44c5b6c 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -1229,6 +1229,12 @@ func casgstatus(gp *g, oldval, newval uint32) { } } + if gp.syncGroup != nil { + systemstack(func() { + gp.syncGroup.changegstatus(gp, oldval, newval) + }) + } + if oldval == _Grunning { // Track every gTrackingPeriod time a goroutine transitions out of running. if casgstatusAlwaysTrack || gp.trackingSeq%gTrackingPeriod == 0 { @@ -1325,6 +1331,9 @@ func casgcopystack(gp *g) uint32 { throw("copystack: bad status, not Gwaiting or Grunnable") } if gp.atomicstatus.CompareAndSwap(oldstatus, _Gcopystack) { + if sg := gp.syncGroup; sg != nil { + sg.changegstatus(gp, oldstatus, _Gcopystack) + } return oldstatus } } @@ -1341,6 +1350,12 @@ func casGToPreemptScan(gp *g, old, new uint32) { acquireLockRankAndM(lockRankGscan) for !gp.atomicstatus.CompareAndSwap(_Grunning, _Gscan|_Gpreempted) { } + // We never notify gp.syncGroup that the goroutine state has moved + // from _Grunning to _Gpreempted. We call syncGroup.changegstatus + // after status changes happen, but doing so here would violate the + // ordering between the gscan and synctest locks. syncGroup doesn't + // distinguish between _Grunning and _Gpreempted anyway, so not + // notifying it is fine. } // casGFromPreempted attempts to transition gp from _Gpreempted to @@ -1351,7 +1366,13 @@ func casGFromPreempted(gp *g, old, new uint32) bool { throw("bad g transition") } gp.waitreason = waitReasonPreempted - return gp.atomicstatus.CompareAndSwap(_Gpreempted, _Gwaiting) + if !gp.atomicstatus.CompareAndSwap(_Gpreempted, _Gwaiting) { + return false + } + if sg := gp.syncGroup; sg != nil { + sg.changegstatus(gp, _Gpreempted, _Gwaiting) + } + return true } // stwReason is an enumeration of reasons the world is stopping. @@ -4093,6 +4114,15 @@ func park_m(gp *g) { trace := traceAcquire() + // If g is in a synctest group, we don't want to let the group + // become idle until after the waitunlockf (if any) has confirmed + // that the park is happening. + // We need to record gp.syncGroup here, since waitunlockf can change it. + sg := gp.syncGroup + if sg != nil { + sg.incActive() + } + if trace.ok() { // Trace the event before the transition. It may take a // stack trace, but we won't own the stack after the @@ -4115,6 +4145,9 @@ func park_m(gp *g) { if !ok { trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) + if sg != nil { + sg.decActive() + } if trace.ok() { trace.GoUnpark(gp, 2) traceRelease(trace) @@ -4122,6 +4155,11 @@ func park_m(gp *g) { execute(gp, true) // Schedule it back, never returns. } } + + if sg != nil { + sg.decActive() + } + schedule() } @@ -4275,6 +4313,9 @@ func goyield_m(gp *g) { // Finishes execution of the current goroutine. func goexit1() { if raceenabled { + if gp := getg(); gp.syncGroup != nil { + racereleasemergeg(gp, gp.syncGroup.raceaddr()) + } racegoend() } trace := traceAcquire() @@ -4313,6 +4354,7 @@ func gdestroy(gp *g) { gp.param = nil gp.labels = nil gp.timer = nil + gp.syncGroup = nil if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { // Flush assist credit to the global pool. This gives @@ -5059,7 +5101,8 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreaso if isSystemGoroutine(newg, false) { sched.ngsys.Add(1) } else { - // Only user goroutines inherit pprof labels. + // Only user goroutines inherit synctest groups and pprof labels. + newg.syncGroup = callergp.syncGroup if mp.curg != nil { newg.labels = mp.curg.labels } @@ -5086,7 +5129,6 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreaso status = _Gwaiting newg.waitreason = waitreason } - casgstatus(newg, _Gdead, status) if pp.goidcache == pp.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. @@ -5096,6 +5138,7 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreaso pp.goidcacheend = pp.goidcache + _GoidCacheBatch } newg.goid = pp.goidcache + casgstatus(newg, _Gdead, status) pp.goidcache++ newg.trace.reset() if trace.ok() { diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index 03798d5699..e837c28af8 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -489,7 +489,8 @@ type g struct { // current in-progress goroutine profile goroutineProfiled goroutineProfileStateHolder - coroarg *coro // argument during coroutine transfers + coroarg *coro // argument during coroutine transfers + syncGroup *synctestGroup // Per-G tracer state. trace gTraceState @@ -1064,6 +1065,7 @@ const ( waitReasonSyncMutexLock // "sync.Mutex.Lock" waitReasonSyncRWMutexRLock // "sync.RWMutex.RLock" waitReasonSyncRWMutexLock // "sync.RWMutex.Lock" + waitReasonSyncWaitGroupWait // "sync.WaitGroup.Wait" waitReasonTraceReaderBlocked // "trace reader (blocked)" waitReasonWaitForGCCycle // "wait for GC cycle" waitReasonGCWorkerIdle // "GC worker (idle)" @@ -1078,6 +1080,11 @@ const ( waitReasonPageTraceFlush // "page trace flush" waitReasonCoroutine // "coroutine" waitReasonGCWeakToStrongWait // "GC weak to strong wait" + waitReasonSynctestRun // "synctest.Run" + waitReasonSynctestWait // "synctest.Wait" + waitReasonSynctestChanReceive // "chan receive (synctest)" + waitReasonSynctestChanSend // "chan send (synctest)" + waitReasonSynctestSelect // "select (synctest)" ) var waitReasonStrings = [...]string{ @@ -1105,6 +1112,7 @@ var waitReasonStrings = [...]string{ waitReasonSyncMutexLock: "sync.Mutex.Lock", waitReasonSyncRWMutexRLock: "sync.RWMutex.RLock", waitReasonSyncRWMutexLock: "sync.RWMutex.Lock", + waitReasonSyncWaitGroupWait: "sync.WaitGroup.Wait", waitReasonTraceReaderBlocked: "trace reader (blocked)", waitReasonWaitForGCCycle: "wait for GC cycle", waitReasonGCWorkerIdle: "GC worker (idle)", @@ -1119,6 +1127,11 @@ var waitReasonStrings = [...]string{ waitReasonPageTraceFlush: "page trace flush", waitReasonCoroutine: "coroutine", waitReasonGCWeakToStrongWait: "GC weak to strong wait", + waitReasonSynctestRun: "synctest.Run", + waitReasonSynctestWait: "synctest.Wait", + waitReasonSynctestChanReceive: "chan receive (synctest)", + waitReasonSynctestChanSend: "chan send (synctest)", + waitReasonSynctestSelect: "select (synctest)", } func (w waitReason) String() string { @@ -1157,6 +1170,26 @@ var isWaitingForGC = [len(waitReasonStrings)]bool{ waitReasonFlushProcCaches: true, } +func (w waitReason) isIdleInSynctest() bool { + return isIdleInSynctest[w] +} + +// isIdleInSynctest indicates that a goroutine is considered idle by synctest.Wait. +var isIdleInSynctest = [len(waitReasonStrings)]bool{ + waitReasonChanReceiveNilChan: true, + waitReasonChanSendNilChan: true, + waitReasonSelectNoCases: true, + waitReasonSleep: true, + waitReasonSyncCondWait: true, + waitReasonSyncWaitGroupWait: true, + waitReasonCoroutine: true, + waitReasonSynctestRun: true, + waitReasonSynctestWait: true, + waitReasonSynctestChanReceive: true, + waitReasonSynctestChanSend: true, + waitReasonSynctestSelect: true, +} + var ( allm *m gomaxprocs int32 diff --git a/src/runtime/select.go b/src/runtime/select.go index 2e86c85493..0b1d144951 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -120,6 +120,7 @@ func block() { // Also, if the chosen scase was a receive operation, it reports whether // a value was received. func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { + gp := getg() if debugSelect { print("select: cas0=", cas0, "\n") } @@ -165,6 +166,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo // generate permuted order norder := 0 + allSynctest := true for i := range scases { cas := &scases[i] @@ -174,6 +176,14 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo continue } + if cas.c.synctest { + if getg().syncGroup == nil { + panic(plainError("select on synctest channel from outside bubble")) + } + } else { + allSynctest = false + } + if cas.c.timer != nil { cas.c.timer.maybeRunChan() } @@ -186,6 +196,13 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo pollorder = pollorder[:norder] lockorder = lockorder[:norder] + waitReason := waitReasonSelect + if gp.syncGroup != nil && allSynctest { + // Every channel selected on is in a synctest bubble, + // so this goroutine will count as idle while selecting. + waitReason = waitReasonSynctestSelect + } + // sort the cases by Hchan address to get the locking order. // simple heap sort, to guarantee n log n time and constant stack footprint. for i := range lockorder { @@ -235,7 +252,6 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo sellock(scases, lockorder) var ( - gp *g sg *sudog c *hchan k *scase @@ -291,7 +307,6 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo } // pass 2 - enqueue on all chans - gp = getg() if gp.waiting != nil { throw("gp.waiting != nil") } @@ -333,7 +348,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) - gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1) + gopark(selparkcommit, nil, waitReason, traceBlockSelect, 1) gp.activeStackChans = false sellock(scases, lockorder) diff --git a/src/runtime/sema.go b/src/runtime/sema.go index 5057bb0b7d..18ada5a68b 100644 --- a/src/runtime/sema.go +++ b/src/runtime/sema.go @@ -105,6 +105,11 @@ func sync_runtime_SemacquireRWMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncRWMutexLock) } +//go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup +func sync_runtime_SemacquireWaitGroup(addr *uint32) { + semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait) +} + //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease func poll_runtime_Semrelease(addr *uint32) { semrelease(addr) @@ -624,6 +629,10 @@ func notifyListNotifyAll(l *notifyList) { for s != nil { next := s.next s.next = nil + if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup { + println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble") + panic("semaphore wake of synctest goroutine from outside bubble") + } readyWithTime(s, 4) s = next } @@ -677,6 +686,10 @@ func notifyListNotifyOne(l *notifyList) { } unlock(&l.lock) s.next = nil + if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup { + println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble") + panic("semaphore wake of synctest goroutine from outside bubble") + } readyWithTime(s, 4) return } diff --git a/src/runtime/sizeof_test.go b/src/runtime/sizeof_test.go index c1b201caf1..a5dc8aed34 100644 --- a/src/runtime/sizeof_test.go +++ b/src/runtime/sizeof_test.go @@ -20,7 +20,7 @@ func TestSizeof(t *testing.T) { _32bit uintptr // size on 32bit platforms _64bit uintptr // size on 64bit platforms }{ - {runtime.G{}, 276, 432}, // g, but exported for testing + {runtime.G{}, 280, 440}, // g, but exported for testing {runtime.Sudog{}, 56, 88}, // sudog, but exported for testing } diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go new file mode 100644 index 0000000000..b4934de853 --- /dev/null +++ b/src/runtime/synctest.go @@ -0,0 +1,256 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime + +import ( + "unsafe" +) + +// A synctestGroup is a group of goroutines started by synctest.Run. +type synctestGroup struct { + mu mutex + timers timers + now int64 // current fake time + root *g // caller of synctest.Run + waiter *g // caller of synctest.Wait + waiting bool // true if a goroutine is calling synctest.Wait + + // The group is active (not blocked) so long as running > 0 || active > 0. + // + // running is the number of goroutines which are not "durably blocked": + // Goroutines which are either running, runnable, or non-durably blocked + // (for example, blocked in a syscall). + // + // active is used to keep the group from becoming blocked, + // even if all goroutines in the group are blocked. + // For example, park_m can choose to immediately unpark a goroutine after parking it. + // It increments the active count to keep the group active until it has determined + // that the park operation has completed. + total int // total goroutines + running int // non-blocked goroutines + active int // other sources of activity +} + +// changegstatus is called when the non-lock status of a g changes. +// It is never called with a Gscanstatus. +func (sg *synctestGroup) changegstatus(gp *g, oldval, newval uint32) { + lock(&sg.mu) + wasRunning := true + switch oldval { + case _Gdead: + wasRunning = false + sg.total++ + case _Gwaiting: + if gp.waitreason.isIdleInSynctest() { + wasRunning = false + } + } + isRunning := true + switch newval { + case _Gdead: + isRunning = false + sg.total-- + case _Gwaiting: + if gp.waitreason.isIdleInSynctest() { + isRunning = false + } + } + if wasRunning != isRunning { + if isRunning { + sg.running++ + } else { + sg.running-- + if raceenabled && newval != _Gdead { + racereleasemergeg(gp, sg.raceaddr()) + } + } + } + if sg.total < 0 { + fatal("total < 0") + } + if sg.running < 0 { + fatal("running < 0") + } + wake := sg.maybeWakeLocked() + unlock(&sg.mu) + if wake != nil { + goready(wake, 0) + } +} + +// incActive increments the active-count for the group. +// A group does not become durably blocked while the active-count is non-zero. +func (sg *synctestGroup) incActive() { + lock(&sg.mu) + sg.active++ + unlock(&sg.mu) +} + +// decActive decrements the active-count for the group. +func (sg *synctestGroup) decActive() { + lock(&sg.mu) + sg.active-- + if sg.active < 0 { + throw("active < 0") + } + wake := sg.maybeWakeLocked() + unlock(&sg.mu) + if wake != nil { + goready(wake, 0) + } +} + +// maybeWakeLocked returns a g to wake if the group is durably blocked. +func (sg *synctestGroup) maybeWakeLocked() *g { + if sg.running > 0 || sg.active > 0 { + return nil + } + // Increment the group active count, since we've determined to wake something. + // The woken goroutine will decrement the count. + // We can't just call goready and let it increment sg.running, + // since we can't call goready with sg.mu held. + // + // Incrementing the active count here is only necessary if something has gone wrong, + // and a goroutine that we considered durably blocked wakes up unexpectedly. + // Two wakes happening at the same time leads to very confusing failure modes, + // so we take steps to avoid it happening. + sg.active++ + if gp := sg.waiter; gp != nil { + // A goroutine is blocked in Wait. Wake it. + return gp + } + // All goroutines in the group are durably blocked, and nothing has called Wait. + // Wake the root goroutine. + return sg.root +} + +func (sg *synctestGroup) raceaddr() unsafe.Pointer { + // Address used to record happens-before relationships created by the group. + // + // Wait creates a happens-before relationship between itself and + // the blocking operations which caused other goroutines in the group to park. + return unsafe.Pointer(sg) +} + +//go:linkname synctestRun internal/synctest.Run +func synctestRun(f func()) { + if debug.asynctimerchan.Load() != 0 { + panic("synctest.Run not supported with asynctimerchan!=0") + } + + gp := getg() + if gp.syncGroup != nil { + panic("synctest.Run called from within a synctest bubble") + } + gp.syncGroup = &synctestGroup{ + total: 1, + running: 1, + root: gp, + } + const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01 + gp.syncGroup.now = synctestBaseTime + gp.syncGroup.timers.syncGroup = gp.syncGroup + lockInit(&gp.syncGroup.mu, lockRankSynctest) + lockInit(&gp.syncGroup.timers.mu, lockRankTimers) + defer func() { + gp.syncGroup = nil + }() + + fv := *(**funcval)(unsafe.Pointer(&f)) + newproc(fv) + + sg := gp.syncGroup + lock(&sg.mu) + sg.active++ + for { + if raceenabled { + raceacquireg(gp, gp.syncGroup.raceaddr()) + } + unlock(&sg.mu) + systemstack(func() { + gp.syncGroup.timers.check(gp.syncGroup.now) + }) + gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0) + lock(&sg.mu) + if sg.active < 0 { + throw("active < 0") + } + next := sg.timers.wakeTime() + if next == 0 { + break + } + if next < sg.now { + throw("time went backwards") + } + sg.now = next + } + + total := sg.total + unlock(&sg.mu) + if total != 1 { + panic("deadlock: all goroutines in bubble are blocked") + } + if gp.timer != nil && gp.timer.isFake { + // Verify that we haven't marked this goroutine's sleep timer as fake. + // This could happen if something in Run were to call timeSleep. + throw("synctest root goroutine has a fake timer") + } +} + +func synctestidle_c(gp *g, _ unsafe.Pointer) bool { + lock(&gp.syncGroup.mu) + defer unlock(&gp.syncGroup.mu) + if gp.syncGroup.running == 0 && gp.syncGroup.active == 1 { + // All goroutines in the group have blocked or exited. + return false + } + gp.syncGroup.active-- + return true +} + +//go:linkname synctestWait internal/synctest.Wait +func synctestWait() { + gp := getg() + if gp.syncGroup == nil { + panic("goroutine is not in a bubble") + } + lock(&gp.syncGroup.mu) + // We use a syncGroup.waiting bool to detect simultaneous calls to Wait rather than + // checking to see if syncGroup.waiter is non-nil. This avoids a race between unlocking + // syncGroup.mu and setting syncGroup.waiter while parking. + if gp.syncGroup.waiting { + unlock(&gp.syncGroup.mu) + panic("wait already in progress") + } + gp.syncGroup.waiting = true + unlock(&gp.syncGroup.mu) + gopark(synctestwait_c, nil, waitReasonSynctestWait, traceBlockSynctest, 0) + + lock(&gp.syncGroup.mu) + gp.syncGroup.active-- + if gp.syncGroup.active < 0 { + throw("active < 0") + } + gp.syncGroup.waiter = nil + gp.syncGroup.waiting = false + unlock(&gp.syncGroup.mu) + + // Establish a happens-before relationship on the activity of the now-blocked + // goroutines in the group. + if raceenabled { + raceacquireg(gp, gp.syncGroup.raceaddr()) + } +} + +func synctestwait_c(gp *g, _ unsafe.Pointer) bool { + lock(&gp.syncGroup.mu) + if gp.syncGroup.running == 0 && gp.syncGroup.active == 0 { + // This shouldn't be possible, since gopark increments active during unlockf. + throw("running == 0 && active == 0") + } + gp.syncGroup.waiter = gp + unlock(&gp.syncGroup.mu) + return true +} diff --git a/src/runtime/synctest_test.go b/src/runtime/synctest_test.go new file mode 100644 index 0000000000..0fdd032fc9 --- /dev/null +++ b/src/runtime/synctest_test.go @@ -0,0 +1,17 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime_test + +import ( + "testing" +) + +func TestSynctest(t *testing.T) { + output := runTestProg(t, "testsynctest", "") + want := "success\n" + if output != want { + t.Fatalf("output:\n%s\n\nwanted:\n%s", output, want) + } +} diff --git a/src/runtime/testdata/testsynctest/main.go b/src/runtime/testdata/testsynctest/main.go new file mode 100644 index 0000000000..d2cbc99258 --- /dev/null +++ b/src/runtime/testdata/testsynctest/main.go @@ -0,0 +1,67 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "internal/synctest" + "runtime" + "runtime/metrics" +) + +// This program ensures system goroutines (GC workers, finalizer goroutine) +// started from within a synctest bubble do not participate in that bubble. +// +// To ensure none of these goroutines start before synctest.Run, +// it must have no dependencies on packages which may start system goroutines. +// This includes the os package, which creates finalizers at init time. + +func numGCCycles() uint64 { + samples := []metrics.Sample{{Name: "/gc/cycles/total:gc-cycles"}} + metrics.Read(samples) + if samples[0].Value.Kind() == metrics.KindBad { + panic("metric not supported") + } + return samples[0].Value.Uint64() +} + +func main() { + synctest.Run(func() { + // Start the finalizer goroutine. + p := new(int) + runtime.SetFinalizer(p, func(*int) {}) + + startingCycles := numGCCycles() + ch1 := make(chan *int) + ch2 := make(chan *int) + defer close(ch1) + go func() { + for i := range ch1 { + v := *i + 1 + ch2 <- &v + } + }() + for { + // Make a lot of short-lived allocations to get the GC working. + for i := 0; i < 1000; i++ { + v := new(int) + *v = i + // Set finalizers on these values, just for added stress. + runtime.SetFinalizer(v, func(*int) {}) + ch1 <- v + <-ch2 + } + + // If we've improperly put a GC goroutine into the synctest group, + // this Wait is going to hang. + synctest.Wait() + + // End the test after a couple of GC cycles have passed. + if numGCCycles()-startingCycles > 1 { + break + } + } + }) + println("success") +} diff --git a/src/runtime/time.go b/src/runtime/time.go index fb4136a018..7c6d798872 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -13,6 +13,25 @@ import ( "unsafe" ) +//go:linkname time_runtimeNow time.runtimeNow +func time_runtimeNow() (sec int64, nsec int32, mono int64) { + if sg := getg().syncGroup; sg != nil { + sec = sg.now / (1000 * 1000 * 1000) + nsec = int32(sg.now % (1000 * 1000 * 1000)) + return sec, nsec, sg.now + } + return time_now() +} + +//go:linkname time_runtimeNano time.runtimeNano +func time_runtimeNano() int64 { + gp := getg() + if gp.syncGroup != nil { + return gp.syncGroup.now + } + return nanotime() +} + // A timer is a potentially repeating trigger for calling t.f(t.arg, t.seq). // Timers are allocated by client code, often as part of other data structures. // Each P has a heap of pointers to timers that it manages. @@ -29,6 +48,7 @@ type timer struct { astate atomic.Uint8 // atomic copy of state bits at last unlock state uint8 // state bits isChan bool // timer has a channel; immutable; can be read without lock + isFake bool // timer is using fake time; immutable; can be read without lock blocked uint32 // number of goroutines blocked on timer's channel @@ -125,6 +145,8 @@ type timers struct { // heap[i].when over timers with the timerModified bit set. // If minWhenModified = 0, it means there are no timerModified timers in the heap. minWhenModified atomic.Int64 + + syncGroup *synctestGroup } type timerWhen struct { @@ -290,14 +312,31 @@ func timeSleep(ns int64) { if t == nil { t = new(timer) t.init(goroutineReady, gp) + if gp.syncGroup != nil { + t.isFake = true + } gp.timer = t } - when := nanotime() + ns + var now int64 + if sg := gp.syncGroup; sg != nil { + now = sg.now + } else { + now = nanotime() + } + when := now + ns if when < 0 { // check for overflow. when = maxWhen } gp.sleepWhen = when - gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1) + if t.isFake { + // Call timer.reset in this goroutine, since it's the one in a syncGroup. + // We don't need to worry about the timer function running before the goroutine + // is parked, because time won't advance until we park. + resetForSleep(gp, nil) + gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1) + } else { + gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1) + } } // resetForSleep is called after the goroutine is parked for timeSleep. @@ -337,6 +376,9 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg throw("invalid timer channel: no capacity") } } + if gr := getg().syncGroup; gr != nil { + t.isFake = true + } t.modify(when, period, f, arg, 0) t.init = true return t @@ -347,6 +389,9 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg // //go:linkname stopTimer time.stopTimer func stopTimer(t *timeTimer) bool { + if t.isFake && getg().syncGroup == nil { + panic("stop of synctest timer from outside bubble") + } return t.stop() } @@ -359,6 +404,9 @@ func resetTimer(t *timeTimer, when, period int64) bool { if raceenabled { racerelease(unsafe.Pointer(&t.timer)) } + if t.isFake && getg().syncGroup == nil { + panic("reset of synctest timer from outside bubble") + } return t.reset(when, period) } @@ -582,7 +630,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in // t must be locked. func (t *timer) needsAdd() bool { assertLockHeld(&t.mu) - need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) + need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0) if need { t.trace("needsAdd+") } else { @@ -620,7 +668,16 @@ func (t *timer) maybeAdd() { // Calling acquirem instead of using getg().m makes sure that // we end up locking and inserting into the current P's timers. mp := acquirem() - ts := &mp.p.ptr().timers + var ts *timers + if t.isFake { + sg := getg().syncGroup + if sg == nil { + throw("invalid timer: fake time but no syncgroup") + } + ts = &sg.timers + } else { + ts = &mp.p.ptr().timers + } ts.lock() ts.cleanHead() t.lock() @@ -1071,6 +1128,16 @@ func (t *timer) unlockAndRun(now int64) { ts.unlock() } + if ts != nil && ts.syncGroup != nil { + // Temporarily use the timer's synctest group for the G running this timer. + gp := getg() + if gp.syncGroup != nil { + throw("unexpected syncgroup set") + } + gp.syncGroup = ts.syncGroup + ts.syncGroup.changegstatus(gp, _Gdead, _Grunning) + } + if !async && t.isChan { // For a timer channel, we want to make sure that no stale sends // happen after a t.stop or t.modify, but we cannot hold t.mu @@ -1112,6 +1179,12 @@ func (t *timer) unlockAndRun(now int64) { unlock(&t.sendLock) } + if ts != nil && ts.syncGroup != nil { + gp := getg() + ts.syncGroup.changegstatus(gp, _Grunning, _Gdead) + gp.syncGroup = nil + } + if ts != nil { ts.lock() } @@ -1297,6 +1370,20 @@ func badTimer() { // to send a value to its associated channel. If so, it does. // The timer must not be locked. func (t *timer) maybeRunChan() { + if sg := getg().syncGroup; sg != nil || t.isFake { + t.lock() + var timerGroup *synctestGroup + if t.ts != nil { + timerGroup = t.ts.syncGroup + } + t.unlock() + if sg == nil || !t.isFake || sg != timerGroup { + panic(plainError("timer moved between synctest groups")) + } + // No need to do anything here. + // synctest.Run will run the timer when it advances its fake clock. + return + } if t.astate.Load()&timerHeaped != 0 { // If the timer is in the heap, the ordinary timer code // is in charge of sending when appropriate. @@ -1323,6 +1410,9 @@ func (t *timer) maybeRunChan() { // adding it if needed. func blockTimerChan(c *hchan) { t := c.timer + if t.isFake { + return + } t.lock() t.trace("blockTimerChan") if !t.isChan { @@ -1360,6 +1450,9 @@ func blockTimerChan(c *hchan) { // blocked on it anymore. func unblockTimerChan(c *hchan) { t := c.timer + if t.isFake { + return + } t.lock() t.trace("unblockTimerChan") if !t.isChan || t.blocked == 0 { diff --git a/src/runtime/time_linux_amd64.s b/src/runtime/time_linux_amd64.s index 1416d23230..fa9561b25b 100644 --- a/src/runtime/time_linux_amd64.s +++ b/src/runtime/time_linux_amd64.s @@ -10,7 +10,7 @@ #define SYS_clock_gettime 228 -// func time.now() (sec int64, nsec int32, mono int64) +// func now() (sec int64, nsec int32, mono int64) TEXT time·now(SB),NOSPLIT,$16-24 MOVQ SP, R12 // Save old SP; R12 unchanged by C code. diff --git a/src/runtime/traceback.go b/src/runtime/traceback.go index 3c2092ed99..91c0720dcc 100644 --- a/src/runtime/traceback.go +++ b/src/runtime/traceback.go @@ -1238,6 +1238,9 @@ func goroutineheader(gp *g) { if gp.lockedm != 0 { print(", locked to thread") } + if sg := gp.syncGroup; sg != nil { + print(", synctest group ", sg.root.goid) + } print("]:\n") } diff --git a/src/runtime/traceruntime.go b/src/runtime/traceruntime.go index 40c7eb224a..284e61301b 100644 --- a/src/runtime/traceruntime.go +++ b/src/runtime/traceruntime.go @@ -102,6 +102,7 @@ const ( traceBlockUntilGCEnds traceBlockSleep traceBlockGCWeakToStrongWait + traceBlockSynctest ) var traceBlockReasonStrings = [...]string{ @@ -121,6 +122,7 @@ var traceBlockReasonStrings = [...]string{ traceBlockUntilGCEnds: "wait until GC ends", traceBlockSleep: "sleep", traceBlockGCWeakToStrongWait: "GC weak to strong wait", + traceBlockSynctest: "synctest", } // traceGoStopReason is an enumeration of reasons a goroutine might yield. diff --git a/src/sync/runtime.go b/src/sync/runtime.go index b4289dd467..99e5bccbee 100644 --- a/src/sync/runtime.go +++ b/src/sync/runtime.go @@ -13,6 +13,9 @@ import "unsafe" // library and should not be used directly. func runtime_Semacquire(s *uint32) +// SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait. +func runtime_SemacquireWaitGroup(s *uint32) + // Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended // Mutexes and RWMutexes. // If lifo is true, queue waiter at the head of wait queue. diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index 872d6d87c0..b50ecd94d3 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -115,7 +115,7 @@ func (wg *WaitGroup) Wait() { // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } - runtime_Semacquire(&wg.sema) + runtime_SemacquireWaitGroup(&wg.sema) if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } diff --git a/src/time/time.go b/src/time/time.go index 6259eaac4c..14e79672ca 100644 --- a/src/time/time.go +++ b/src/time/time.go @@ -1297,11 +1297,32 @@ func daysIn(m Month, year int) int { } // Provided by package runtime. +// +// now returns the current real time, and is superseded by runtimeNow which returns +// the fake synctest clock when appropriate. +// +// now should be an internal detail, +// but widely used packages access it using linkname. +// Notable members of the hall of shame include: +// - gitee.com/quant1x/gox +// - github.com/phuslu/log +// - github.com/sethvargo/go-limiter +// - github.com/ulule/limiter/v3 +// +// Do not remove or change the type signature. +// See go.dev/issue/67401. func now() (sec int64, nsec int32, mono int64) +// runtimeNow returns the current time. +// When called within a synctest.Run bubble, it returns the group's fake clock. +// +//go:linkname runtimeNow +func runtimeNow() (sec int64, nsec int32, mono int64) + // runtimeNano returns the current value of the runtime clock in nanoseconds. +// When called within a synctest.Run bubble, it returns the group's fake clock. // -//go:linkname runtimeNano runtime.nanotime +//go:linkname runtimeNano func runtimeNano() int64 // Monotonic times are reported as offsets from startNano. @@ -1317,7 +1338,10 @@ var startNano int64 = runtimeNano() - 1 // Now returns the current local time. func Now() Time { - sec, nsec, mono := now() + sec, nsec, mono := runtimeNow() + if mono == 0 { + return Time{uint64(nsec), sec + unixToInternal, Local} + } mono -= startNano sec += unixToInternal - minWall if uint64(sec)>>33 != 0 { diff --git a/src/time/zoneinfo_plan9.go b/src/time/zoneinfo_plan9.go index d13b623a37..036f669d8d 100644 --- a/src/time/zoneinfo_plan9.go +++ b/src/time/zoneinfo_plan9.go @@ -96,7 +96,7 @@ func loadZoneDataPlan9(s string) (l *Location, err error) { // Fill in the cache with information about right now, // since that will be the most common lookup. - sec, _, _ := now() + sec, _, _ := runtimeNow() for i := range tx { if tx[i].when <= sec && (i+1 == len(tx) || sec < tx[i+1].when) { l.cacheStart = tx[i].when diff --git a/src/time/zoneinfo_read.go b/src/time/zoneinfo_read.go index 5314b6ff9a..047e360d11 100644 --- a/src/time/zoneinfo_read.go +++ b/src/time/zoneinfo_read.go @@ -320,7 +320,7 @@ func LoadLocationFromTZData(name string, data []byte) (*Location, error) { // Fill in the cache with information about right now, // since that will be the most common lookup. - sec, _, _ := now() + sec, _, _ := runtimeNow() for i := range tx { if tx[i].when <= sec && (i+1 == len(tx) || sec < tx[i+1].when) { l.cacheStart = tx[i].when -- 2.48.1