From 3432c68467d50ffc622fed230a37cd401d82d4bf Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Mon, 2 Jun 2025 09:26:27 -0700 Subject: [PATCH] runtime: make bubbled timers more consistent with unbubbled This CL makes two changes to reduce the predictability with which bubbled timers fire. When asynctimerchan=0 (the default), regular timers with an associated channel are only added to a timer heap when some channel operation is blocked on that channel. This allows us to garbage collect unreferenced, unstopped timers. Timers in a synctest bubble, in contrast, are always added to the bubble's timer heap. This CL changes bubbled timers with a channel to be handled the same as unbubbled ones, adding them to the bubble's timer heap only when some channel operation is blocked on the timer's channel. This permits unstopped bubbled timers to be garbage collected, but more importantly it makes all timers past their deadline behave identically, regardless of whether they are in a bubble. This CL also changes timer scheduling to execute bubbled timers immediately when possible rather than adding them to a heap. Timers in a bubble's heap are executed when the bubble is idle. Executing timers immediately avoids creating a predictable order of execution. For #73850 Fixes #73934 Change-Id: If82e441546408f780f6af6fb7f6e416d3160295d Reviewed-on: https://go-review.googlesource.com/c/go/+/678075 Auto-Submit: Damien Neil Reviewed-by: Michael Pratt LUCI-TryBot-Result: Go LUCI --- src/internal/synctest/synctest_test.go | 55 ++++++++++++++- src/runtime/chan.go | 6 +- src/runtime/proc.go | 4 +- src/runtime/select.go | 2 +- src/runtime/synctest.go | 3 +- src/runtime/time.go | 97 ++++++++++++++------------ 6 files changed, 113 insertions(+), 54 deletions(-) diff --git a/src/internal/synctest/synctest_test.go b/src/internal/synctest/synctest_test.go index 53c7c89716..c2f84be736 100644 --- a/src/internal/synctest/synctest_test.go +++ b/src/internal/synctest/synctest_test.go @@ -226,8 +226,8 @@ func TestTimerNondeterminism(t *testing.T) { const iterations = 1000 var seen1, seen2 bool for range iterations { - tm1 := time.NewTimer(0) - tm2 := time.NewTimer(0) + tm1 := time.NewTimer(1) + tm2 := time.NewTimer(1) select { case <-tm1.C: seen1 = true @@ -278,6 +278,57 @@ func TestSleepNondeterminism(t *testing.T) { }) } +// TestTimerRunsImmediately verifies that a 0-duration timer sends on its channel +// without waiting for the bubble to block. +func TestTimerRunsImmediately(t *testing.T) { + synctest.Run(func() { + start := time.Now() + tm := time.NewTimer(0) + select { + case got := <-tm.C: + if !got.Equal(start) { + t.Errorf("<-tm.C = %v, want %v", got, start) + } + default: + t.Errorf("0-duration timer channel is not readable; want it to be") + } + }) +} + +// TestTimerRunsLater verifies that reading from a timer's channel receives the +// timer fired, even when that time is in reading from a timer's channel receives the +// time the timer fired, even when that time is in the past. +func TestTimerRanInPast(t *testing.T) { + synctest.Run(func() { + delay := 1 * time.Second + want := time.Now().Add(delay) + tm := time.NewTimer(delay) + time.Sleep(2 * delay) + select { + case got := <-tm.C: + if !got.Equal(want) { + t.Errorf("<-tm.C = %v, want %v", got, want) + } + default: + t.Errorf("0-duration timer channel is not readable; want it to be") + } + }) +} + +// TestAfterFuncRunsImmediately verifies that a 0-duration AfterFunc is scheduled +// without waiting for the bubble to block. +func TestAfterFuncRunsImmediately(t *testing.T) { + synctest.Run(func() { + var b atomic.Bool + time.AfterFunc(0, func() { + b.Store(true) + }) + for !b.Load() { + runtime.Gosched() + } + }) +} + func TestChannelFromOutsideBubble(t *testing.T) { choutside := make(chan struct{}) for _, test := range []struct { diff --git a/src/runtime/chan.go b/src/runtime/chan.go index df48267e97..bb554ebfdb 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -497,7 +497,7 @@ func empty(c *hchan) bool { // c.timer is also immutable (it is set after make(chan) but before any channel operations). // All timer channels have dataqsiz > 0. if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } return atomic.Loaduint(&c.qcount) == 0 } @@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) } if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } // Fast path: check for failed non-blocking operation without acquiring the lock. @@ -821,7 +821,7 @@ func chanlen(c *hchan) int { } async := debug.asynctimerchan.Load() != 0 if c.timer != nil && async { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } if c.timer != nil && !async { // timer channels have a buffered implementation diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 5b8db2bee4..37a7b7f684 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -3341,7 +3341,7 @@ top: // which may steal timers. It's important that between now // and then, nothing blocks, so these numbers remain mostly // relevant. - now, pollUntil, _ := pp.timers.check(0) + now, pollUntil, _ := pp.timers.check(0, nil) // Try to schedule the trace reader. if traceEnabled() || traceShuttingDown() { @@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo // timerpMask tells us whether the P may have timers at all. If it // can't, no need to check at all. if stealTimersOrRunNextG && timerpMask.read(enum.position()) { - tnow, w, ran := p2.timers.check(now) + tnow, w, ran := p2.timers.check(now, nil) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w diff --git a/src/runtime/select.go b/src/runtime/select.go index 19256df6a6..ae7754b173 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo } if cas.c.timer != nil { - cas.c.timer.maybeRunChan() + cas.c.timer.maybeRunChan(cas.c) } j := cheaprandn(uint32(norder + 1)) diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go index f676afa20d..c837c792a5 100644 --- a/src/runtime/synctest.go +++ b/src/runtime/synctest.go @@ -185,7 +185,6 @@ func synctestRun(f func()) { } const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01 bubble.now = synctestBaseTime - bubble.timers.bubble = bubble lockInit(&bubble.mu, lockRankSynctest) lockInit(&bubble.timers.mu, lockRankTimers) @@ -213,7 +212,7 @@ func synctestRun(f func()) { // so timer goroutines inherit their child race context from g0. curg := gp.m.curg gp.m.curg = nil - gp.bubble.timers.check(gp.bubble.now) + gp.bubble.timers.check(bubble.now, bubble) gp.m.curg = curg }) gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0) diff --git a/src/runtime/time.go b/src/runtime/time.go index a1f8351a1e..4880dce8cd 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -157,8 +157,6 @@ type timers struct { // heap[i].when over timers with the timerModified bit set. // If minWhenModified = 0, it means there are no timerModified timers in the heap. minWhenModified atomic.Int64 - - bubble *synctestBubble } type timerWhen struct { @@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg throw("invalid timer channel: no capacity") } } - if gr := getg().bubble; gr != nil { + if bubble := getg().bubble; bubble != nil { t.isFake = true } t.modify(when, period, f, arg, 0) @@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() { // timer ourselves now is fine.) if now := nanotime(); t.when <= now { systemstack(func() { - t.unlockAndRun(now) // resets t.when + t.unlockAndRun(now, nil) // resets t.when }) t.lock() } @@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in add := t.needsAdd() + if add && t.isFake { + // If this is a bubbled timer scheduled to fire immediately, + // run it now rather than waiting for the bubble's timer scheduler. + // This avoids deferring timer execution until after the bubble + // becomes durably blocked. + // + // Don't do this for non-bubbled timers: It isn't necessary, + // and there may be cases where the runtime executes timers with + // the expectation the timer func will not run in the current goroutine. + // Bubbled timers are always created by the time package, and are + // safe to run in the current goroutine. + bubble := getg().bubble + if bubble == nil { + throw("fake timer executing with no bubble") + } + if t.state&timerHeaped == 0 && when <= bubble.now { + systemstack(func() { + t.unlockAndRun(bubble.now, bubble) + }) + return pending + } + } + if !async && t.isChan { // Stop any future sends with stale values. // See timer.unlockAndRun. @@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in // t must be locked. func (t *timer) needsAdd() bool { assertLockHeld(&t.mu) - need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0) + need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) if need { t.trace("needsAdd+") } else { @@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 { // We pass now in and out to avoid extra calls of nanotime. // //go:yeswritebarrierrec -func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { +func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) { ts.trace("check") // If it's not yet time for the first timer, or the first adjusted // timer, then there is nothing to do. @@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { ts.adjust(now, false) for len(ts.heap) > 0 { // Note that runtimer may temporarily unlock ts. - if tw := ts.run(now); tw != 0 { + if tw := ts.run(now, bubble); tw != 0 { if tw > 0 { pollUntil = tw } @@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { // If a timer is run, this will temporarily unlock ts. // //go:systemstack -func (ts *timers) run(now int64) int64 { +func (ts *timers) run(now int64, bubble *synctestBubble) int64 { ts.trace("run") assertLockHeld(&ts.mu) Redo: @@ -1081,7 +1102,7 @@ Redo: return t.when } - t.unlockAndRun(now) + t.unlockAndRun(now, bubble) assertLockHeld(&ts.mu) // t is unlocked now, but not ts return 0 } @@ -1092,7 +1113,7 @@ Redo: // unlockAndRun returns with t unlocked and t.ts (re-)locked. // //go:systemstack -func (t *timer) unlockAndRun(now int64) { +func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) { t.trace("unlockAndRun") assertLockHeld(&t.mu) if t.ts != nil { @@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) { // out from under us while this function executes. gp := getg() var tsLocal *timers - if t.ts == nil || t.ts.bubble == nil { + if bubble == nil { tsLocal = &gp.m.p.ptr().timers } else { - tsLocal = &t.ts.bubble.timers + tsLocal = &bubble.timers } if tsLocal.raceCtx == 0 { tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum) @@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) { if gp.racectx != 0 { throw("unexpected racectx") } - if ts == nil || ts.bubble == nil { + if bubble == nil { gp.racectx = gp.m.p.ptr().timers.raceCtx } else { - gp.racectx = ts.bubble.timers.raceCtx + gp.racectx = bubble.timers.raceCtx } } @@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) { ts.unlock() } - if ts != nil && ts.bubble != nil { + if bubble != nil { // Temporarily use the timer's synctest group for the G running this timer. gp := getg() if gp.bubble != nil { throw("unexpected syncgroup set") } - gp.bubble = ts.bubble - ts.bubble.changegstatus(gp, _Gdead, _Grunning) + gp.bubble = bubble + bubble.changegstatus(gp, _Gdead, _Grunning) } if !async && t.isChan { @@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) { unlock(&t.sendLock) } - if ts != nil && ts.bubble != nil { + if bubble != nil { gp := getg() - ts.bubble.changegstatus(gp, _Grunning, _Gdead) + bubble.changegstatus(gp, _Grunning, _Gdead) if raceenabled { // Establish a happens-before between this timer event and // the next synctest.Wait call. - racereleasemergeg(gp, ts.bubble.raceaddr()) + racereleasemergeg(gp, bubble.raceaddr()) } gp.bubble = nil } @@ -1415,24 +1436,10 @@ func badTimer() { // maybeRunChan checks whether the timer needs to run // to send a value to its associated channel. If so, it does. // The timer must not be locked. -func (t *timer) maybeRunChan() { - if t.isFake { - t.lock() - var timerBubble *synctestBubble - if t.ts != nil { - timerBubble = t.ts.bubble - } - t.unlock() - bubble := getg().bubble - if bubble == nil { - panic(plainError("synctest timer accessed from outside bubble")) - } - if timerBubble != nil && bubble != timerBubble { - panic(plainError("timer moved between synctest bubbles")) - } - // No need to do anything here. - // synctest.Run will run the timer when it advances its fake clock. - return +func (t *timer) maybeRunChan(c *hchan) { + if t.isFake && getg().bubble != c.bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } if t.astate.Load()&timerHeaped != 0 { // If the timer is in the heap, the ordinary timer code @@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() { t.lock() now := nanotime() + if t.isFake { + now = getg().bubble.now + } if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { t.trace("maybeRunChan-") // Timer in the heap, or not running at all, or not triggered. @@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() { } t.trace("maybeRunChan+") systemstack(func() { - t.unlockAndRun(now) + t.unlockAndRun(now, c.bubble) }) } @@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() { // adding it if needed. func blockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return + if t.isFake && c.bubble != getg().bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } + t.lock() t.trace("blockTimerChan") if !t.isChan { @@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) { // blocked on it anymore. func unblockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return - } t.lock() t.trace("unblockTimerChan") if !t.isChan || t.blocked == 0 { -- 2.50.0