]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: make bubbled timers more consistent with unbubbled
authorDamien Neil <dneil@google.com>
Mon, 2 Jun 2025 16:26:27 +0000 (09:26 -0700)
committerGopher Robot <gobot@golang.org>
Wed, 4 Jun 2025 16:20:21 +0000 (09:20 -0700)
This CL makes two changes to reduce the predictability
with which bubbled timers fire.

When asynctimerchan=0 (the default), regular timers with an associated
channel are only added to a timer heap when some channel operation
is blocked on that channel. This allows us to garbage collect
unreferenced, unstopped timers. Timers in a synctest bubble, in
contrast, are always added to the bubble's timer heap.

This CL changes bubbled timers with a channel to be handled the
same as unbubbled ones, adding them to the bubble's timer heap only
when some channel operation is blocked on the timer's channel.
This permits unstopped bubbled timers to be garbage collected,
but more importantly it makes all timers past their deadline
behave identically, regardless of whether they are in a bubble.

This CL also changes timer scheduling to execute bubbled timers
immediately when possible rather than adding them to a heap.
Timers in a bubble's heap are executed when the bubble is idle.
Executing timers immediately avoids creating a predictable
order of execution.

For #73850
Fixes #73934

Change-Id: If82e441546408f780f6af6fb7f6e416d3160295d
Reviewed-on: https://go-review.googlesource.com/c/go/+/678075
Auto-Submit: Damien Neil <dneil@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>

src/internal/synctest/synctest_test.go
src/runtime/chan.go
src/runtime/proc.go
src/runtime/select.go
src/runtime/synctest.go
src/runtime/time.go

index 53c7c89716dded7d1b29529dfeeea3e625abffa5..c2f84be736d5757d5afd34ace4c747823abdf764 100644 (file)
@@ -226,8 +226,8 @@ func TestTimerNondeterminism(t *testing.T) {
                const iterations = 1000
                var seen1, seen2 bool
                for range iterations {
-                       tm1 := time.NewTimer(0)
-                       tm2 := time.NewTimer(0)
+                       tm1 := time.NewTimer(1)
+                       tm2 := time.NewTimer(1)
                        select {
                        case <-tm1.C:
                                seen1 = true
@@ -278,6 +278,57 @@ func TestSleepNondeterminism(t *testing.T) {
        })
 }
 
+// TestTimerRunsImmediately verifies that a 0-duration timer sends on its channel
+// without waiting for the bubble to block.
+func TestTimerRunsImmediately(t *testing.T) {
+       synctest.Run(func() {
+               start := time.Now()
+               tm := time.NewTimer(0)
+               select {
+               case got := <-tm.C:
+                       if !got.Equal(start) {
+                               t.Errorf("<-tm.C = %v, want %v", got, start)
+                       }
+               default:
+                       t.Errorf("0-duration timer channel is not readable; want it to be")
+               }
+       })
+}
+
+// TestTimerRunsLater verifies that reading from a timer's channel receives the
+// timer fired, even when that time is in reading from a timer's channel receives the
+// time the timer fired, even when that time is in the past.
+func TestTimerRanInPast(t *testing.T) {
+       synctest.Run(func() {
+               delay := 1 * time.Second
+               want := time.Now().Add(delay)
+               tm := time.NewTimer(delay)
+               time.Sleep(2 * delay)
+               select {
+               case got := <-tm.C:
+                       if !got.Equal(want) {
+                               t.Errorf("<-tm.C = %v, want %v", got, want)
+                       }
+               default:
+                       t.Errorf("0-duration timer channel is not readable; want it to be")
+               }
+       })
+}
+
+// TestAfterFuncRunsImmediately verifies that a 0-duration AfterFunc is scheduled
+// without waiting for the bubble to block.
+func TestAfterFuncRunsImmediately(t *testing.T) {
+       synctest.Run(func() {
+               var b atomic.Bool
+               time.AfterFunc(0, func() {
+                       b.Store(true)
+               })
+               for !b.Load() {
+                       runtime.Gosched()
+               }
+       })
+}
+
 func TestChannelFromOutsideBubble(t *testing.T) {
        choutside := make(chan struct{})
        for _, test := range []struct {
index df48267e97909da83a4c16783c414da72fee862b..bb554ebfdb1f3a0f9d96df19adce1248f23c9d70 100644 (file)
@@ -497,7 +497,7 @@ func empty(c *hchan) bool {
        // c.timer is also immutable (it is set after make(chan) but before any channel operations).
        // All timer channels have dataqsiz > 0.
        if c.timer != nil {
-               c.timer.maybeRunChan()
+               c.timer.maybeRunChan(c)
        }
        return atomic.Loaduint(&c.qcount) == 0
 }
@@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
        }
 
        if c.timer != nil {
-               c.timer.maybeRunChan()
+               c.timer.maybeRunChan(c)
        }
 
        // Fast path: check for failed non-blocking operation without acquiring the lock.
@@ -821,7 +821,7 @@ func chanlen(c *hchan) int {
        }
        async := debug.asynctimerchan.Load() != 0
        if c.timer != nil && async {
-               c.timer.maybeRunChan()
+               c.timer.maybeRunChan(c)
        }
        if c.timer != nil && !async {
                // timer channels have a buffered implementation
index 5b8db2bee4ecca08dfd3bde00cfb84e6fe180030..37a7b7f6849e7d8d1ec6faf76836707e617e8911 100644 (file)
@@ -3341,7 +3341,7 @@ top:
        // which may steal timers. It's important that between now
        // and then, nothing blocks, so these numbers remain mostly
        // relevant.
-       now, pollUntil, _ := pp.timers.check(0)
+       now, pollUntil, _ := pp.timers.check(0, nil)
 
        // Try to schedule the trace reader.
        if traceEnabled() || traceShuttingDown() {
@@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo
                        // timerpMask tells us whether the P may have timers at all. If it
                        // can't, no need to check at all.
                        if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
-                               tnow, w, ran := p2.timers.check(now)
+                               tnow, w, ran := p2.timers.check(now, nil)
                                now = tnow
                                if w != 0 && (pollUntil == 0 || w < pollUntil) {
                                        pollUntil = w
index 19256df6a60172b958650c455121b3a732b40f38..ae7754b17377ddc254ce55b5779ea0a62c46fe3b 100644 (file)
@@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo
                }
 
                if cas.c.timer != nil {
-                       cas.c.timer.maybeRunChan()
+                       cas.c.timer.maybeRunChan(cas.c)
                }
 
                j := cheaprandn(uint32(norder + 1))
index f676afa20d09e2fabd1dc8299ed7db46c9a787e3..c837c792a53e8301ff02044085dd25555a9914f7 100644 (file)
@@ -185,7 +185,6 @@ func synctestRun(f func()) {
        }
        const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01
        bubble.now = synctestBaseTime
-       bubble.timers.bubble = bubble
        lockInit(&bubble.mu, lockRankSynctest)
        lockInit(&bubble.timers.mu, lockRankTimers)
 
@@ -213,7 +212,7 @@ func synctestRun(f func()) {
                        // so timer goroutines inherit their child race context from g0.
                        curg := gp.m.curg
                        gp.m.curg = nil
-                       gp.bubble.timers.check(gp.bubble.now)
+                       gp.bubble.timers.check(bubble.now, bubble)
                        gp.m.curg = curg
                })
                gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0)
index a1f8351a1e316db4d72fff8440673e799286220d..4880dce8cddc799f91c1d7b0b88f624aa28c0eb1 100644 (file)
@@ -157,8 +157,6 @@ type timers struct {
        // heap[i].when over timers with the timerModified bit set.
        // If minWhenModified = 0, it means there are no timerModified timers in the heap.
        minWhenModified atomic.Int64
-
-       bubble *synctestBubble
 }
 
 type timerWhen struct {
@@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg
                        throw("invalid timer channel: no capacity")
                }
        }
-       if gr := getg().bubble; gr != nil {
+       if bubble := getg().bubble; bubble != nil {
                t.isFake = true
        }
        t.modify(when, period, f, arg, 0)
@@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() {
                // timer ourselves now is fine.)
                if now := nanotime(); t.when <= now {
                        systemstack(func() {
-                               t.unlockAndRun(now) // resets t.when
+                               t.unlockAndRun(now, nil) // resets t.when
                        })
                        t.lock()
                }
@@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
 
        add := t.needsAdd()
 
+       if add && t.isFake {
+               // If this is a bubbled timer scheduled to fire immediately,
+               // run it now rather than waiting for the bubble's timer scheduler.
+               // This avoids deferring timer execution until after the bubble
+               // becomes durably blocked.
+               //
+               // Don't do this for non-bubbled timers: It isn't necessary,
+               // and there may be cases where the runtime executes timers with
+               // the expectation the timer func will not run in the current goroutine.
+               // Bubbled timers are always created by the time package, and are
+               // safe to run in the current goroutine.
+               bubble := getg().bubble
+               if bubble == nil {
+                       throw("fake timer executing with no bubble")
+               }
+               if t.state&timerHeaped == 0 && when <= bubble.now {
+                       systemstack(func() {
+                               t.unlockAndRun(bubble.now, bubble)
+                       })
+                       return pending
+               }
+       }
+
        if !async && t.isChan {
                // Stop any future sends with stale values.
                // See timer.unlockAndRun.
@@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
 // t must be locked.
 func (t *timer) needsAdd() bool {
        assertLockHeld(&t.mu)
-       need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0)
+       need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0)
        if need {
                t.trace("needsAdd+")
        } else {
@@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 {
 // We pass now in and out to avoid extra calls of nanotime.
 //
 //go:yeswritebarrierrec
-func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
+func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) {
        ts.trace("check")
        // If it's not yet time for the first timer, or the first adjusted
        // timer, then there is nothing to do.
@@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
                ts.adjust(now, false)
                for len(ts.heap) > 0 {
                        // Note that runtimer may temporarily unlock ts.
-                       if tw := ts.run(now); tw != 0 {
+                       if tw := ts.run(now, bubble); tw != 0 {
                                if tw > 0 {
                                        pollUntil = tw
                                }
@@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
 // If a timer is run, this will temporarily unlock ts.
 //
 //go:systemstack
-func (ts *timers) run(now int64) int64 {
+func (ts *timers) run(now int64, bubble *synctestBubble) int64 {
        ts.trace("run")
        assertLockHeld(&ts.mu)
 Redo:
@@ -1081,7 +1102,7 @@ Redo:
                return t.when
        }
 
-       t.unlockAndRun(now)
+       t.unlockAndRun(now, bubble)
        assertLockHeld(&ts.mu) // t is unlocked now, but not ts
        return 0
 }
@@ -1092,7 +1113,7 @@ Redo:
 // unlockAndRun returns with t unlocked and t.ts (re-)locked.
 //
 //go:systemstack
-func (t *timer) unlockAndRun(now int64) {
+func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) {
        t.trace("unlockAndRun")
        assertLockHeld(&t.mu)
        if t.ts != nil {
@@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) {
                // out from under us while this function executes.
                gp := getg()
                var tsLocal *timers
-               if t.ts == nil || t.ts.bubble == nil {
+               if bubble == nil {
                        tsLocal = &gp.m.p.ptr().timers
                } else {
-                       tsLocal = &t.ts.bubble.timers
+                       tsLocal = &bubble.timers
                }
                if tsLocal.raceCtx == 0 {
                        tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum)
@@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) {
                if gp.racectx != 0 {
                        throw("unexpected racectx")
                }
-               if ts == nil || ts.bubble == nil {
+               if bubble == nil {
                        gp.racectx = gp.m.p.ptr().timers.raceCtx
                } else {
-                       gp.racectx = ts.bubble.timers.raceCtx
+                       gp.racectx = bubble.timers.raceCtx
                }
        }
 
@@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) {
                ts.unlock()
        }
 
-       if ts != nil && ts.bubble != nil {
+       if bubble != nil {
                // Temporarily use the timer's synctest group for the G running this timer.
                gp := getg()
                if gp.bubble != nil {
                        throw("unexpected syncgroup set")
                }
-               gp.bubble = ts.bubble
-               ts.bubble.changegstatus(gp, _Gdead, _Grunning)
+               gp.bubble = bubble
+               bubble.changegstatus(gp, _Gdead, _Grunning)
        }
 
        if !async && t.isChan {
@@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) {
                unlock(&t.sendLock)
        }
 
-       if ts != nil && ts.bubble != nil {
+       if bubble != nil {
                gp := getg()
-               ts.bubble.changegstatus(gp, _Grunning, _Gdead)
+               bubble.changegstatus(gp, _Grunning, _Gdead)
                if raceenabled {
                        // Establish a happens-before between this timer event and
                        // the next synctest.Wait call.
-                       racereleasemergeg(gp, ts.bubble.raceaddr())
+                       racereleasemergeg(gp, bubble.raceaddr())
                }
                gp.bubble = nil
        }
@@ -1415,24 +1436,10 @@ func badTimer() {
 // maybeRunChan checks whether the timer needs to run
 // to send a value to its associated channel. If so, it does.
 // The timer must not be locked.
-func (t *timer) maybeRunChan() {
-       if t.isFake {
-               t.lock()
-               var timerBubble *synctestBubble
-               if t.ts != nil {
-                       timerBubble = t.ts.bubble
-               }
-               t.unlock()
-               bubble := getg().bubble
-               if bubble == nil {
-                       panic(plainError("synctest timer accessed from outside bubble"))
-               }
-               if timerBubble != nil && bubble != timerBubble {
-                       panic(plainError("timer moved between synctest bubbles"))
-               }
-               // No need to do anything here.
-               // synctest.Run will run the timer when it advances its fake clock.
-               return
+func (t *timer) maybeRunChan(c *hchan) {
+       if t.isFake && getg().bubble != c.bubble {
+               // This should have been checked by the caller, but check just in case.
+               fatal("synctest timer accessed from outside bubble")
        }
        if t.astate.Load()&timerHeaped != 0 {
                // If the timer is in the heap, the ordinary timer code
@@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() {
 
        t.lock()
        now := nanotime()
+       if t.isFake {
+               now = getg().bubble.now
+       }
        if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
                t.trace("maybeRunChan-")
                // Timer in the heap, or not running at all, or not triggered.
@@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() {
        }
        t.trace("maybeRunChan+")
        systemstack(func() {
-               t.unlockAndRun(now)
+               t.unlockAndRun(now, c.bubble)
        })
 }
 
@@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() {
 // adding it if needed.
 func blockTimerChan(c *hchan) {
        t := c.timer
-       if t.isFake {
-               return
+       if t.isFake && c.bubble != getg().bubble {
+               // This should have been checked by the caller, but check just in case.
+               fatal("synctest timer accessed from outside bubble")
        }
+
        t.lock()
        t.trace("blockTimerChan")
        if !t.isChan {
@@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) {
 // blocked on it anymore.
 func unblockTimerChan(c *hchan) {
        t := c.timer
-       if t.isFake {
-               return
-       }
        t.lock()
        t.trace("unblockTimerChan")
        if !t.isChan || t.blocked == 0 {