]> Cypherpunks repositories - gostls13.git/commitdiff
runtime, internal/synctest, sync: associate WaitGroups with bubbles
authorDamien Neil <dneil@google.com>
Tue, 20 May 2025 22:56:43 +0000 (15:56 -0700)
committerGopher Robot <gobot@golang.org>
Thu, 29 May 2025 17:26:00 +0000 (10:26 -0700)
Add support to internal/synctest for managing associations between
arbitrary pointers and synctest bubbles. (Implemented internally to
the runtime package by attaching a special to the pointer.)

Associate WaitGroups with bubbles.
Since WaitGroups don't have a constructor,
perform the association when Add is called.
All Add calls must be made from within the same bubble,
or outside any bubble.

When a bubbled goroutine calls WaitGroup.Wait,
the wait is durably blocking iff the WaitGroup is associated
with the current bubble.

Change-Id: I77e2701e734ac2fa2b32b28d5b0c853b7b2825c9
Reviewed-on: https://go-review.googlesource.com/c/go/+/676656
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Damien Neil <dneil@google.com>

src/go/build/deps_test.go
src/internal/synctest/synctest.go
src/internal/synctest/synctest_test.go
src/runtime/mheap.go
src/runtime/runtime2.go
src/runtime/sema.go
src/runtime/synctest.go
src/sync/runtime.go
src/sync/waitgroup.go
src/testing/synctest/synctest.go

index b2668a3d7d4fbe87ec8e8554d20f23d5fbebad58..6d92542e31b6526a5c9797f2d51bfb9f6a7c9ae2 100644 (file)
@@ -103,6 +103,7 @@ var depsRules = `
        < sync/atomic
        < internal/sync
        < weak
+       < internal/synctest
        < sync
        < internal/bisect
        < internal/godebug
@@ -136,9 +137,6 @@ var depsRules = `
 
        unicode !< path;
 
-       RUNTIME
-       < internal/synctest;
-
        # SYSCALL is RUNTIME plus the packages necessary for basic system calls.
        RUNTIME, unicode/utf8, unicode/utf16, internal/synctest
        < internal/syscall/windows/sysdll, syscall/js
index 19190d30f1602772a39ca5e677bcf28db959e847..4d7fa3730c455867f55ee58e2fe9caa8045d4d2c 100644 (file)
@@ -8,7 +8,7 @@
 package synctest
 
 import (
-       _ "unsafe" // for go:linkname
+       "unsafe"
 )
 
 //go:linkname Run
@@ -17,6 +17,36 @@ func Run(f func())
 //go:linkname Wait
 func Wait()
 
+// IsInBubble reports whether the current goroutine is in a bubble.
+//
+//go:linkname IsInBubble
+func IsInBubble() bool
+
+// Associate associates p with the current bubble.
+// It returns false if p has an existing association with a different bubble.
+func Associate[T any](p *T) (ok bool) {
+       return associate(unsafe.Pointer(p))
+}
+
+//go:linkname associate
+func associate(p unsafe.Pointer) bool
+
+// Disassociate disassociates p from any bubble.
+func Disassociate[T any](p *T) {
+       disassociate(unsafe.Pointer(p))
+}
+
+//go:linkname disassociate
+func disassociate(b unsafe.Pointer)
+
+// IsAssociated reports whether p is associated with the current bubble.
+func IsAssociated[T any](p *T) bool {
+       return isAssociated(unsafe.Pointer(p))
+}
+
+//go:linkname isAssociated
+func isAssociated(p unsafe.Pointer) bool
+
 //go:linkname acquire
 func acquire() any
 
index 7f71df1710056a8e7887ae6392bca2a80fc31a0e..8b2ade5630d205bd4acb33de891d9a504af4a6dc 100644 (file)
@@ -7,11 +7,14 @@ package synctest_test
 import (
        "fmt"
        "internal/synctest"
+       "internal/testenv"
        "iter"
+       "os"
        "reflect"
        "runtime"
        "slices"
        "strconv"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -523,7 +526,7 @@ func TestReflectFuncOf(t *testing.T) {
        })
 }
 
-func TestWaitGroup(t *testing.T) {
+func TestWaitGroupInBubble(t *testing.T) {
        synctest.Run(func() {
                var wg sync.WaitGroup
                wg.Add(1)
@@ -540,6 +543,83 @@ func TestWaitGroup(t *testing.T) {
        })
 }
 
+func TestWaitGroupOutOfBubble(t *testing.T) {
+       var wg sync.WaitGroup
+       wg.Add(1)
+       donec := make(chan struct{})
+       go synctest.Run(func() {
+               // Since wg.Add was called outside the bubble, Wait is not durably blocking
+               // and this waits until wg.Done is called below.
+               wg.Wait()
+               close(donec)
+       })
+       select {
+       case <-donec:
+               t.Fatalf("synctest.Run finished before WaitGroup.Done called")
+       case <-time.After(1 * time.Millisecond):
+       }
+       wg.Done()
+       <-donec
+}
+
+func TestWaitGroupMovedIntoBubble(t *testing.T) {
+       wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
+               var wg sync.WaitGroup
+               wg.Add(1)
+               synctest.Run(func() {
+                       wg.Add(1)
+               })
+       })
+}
+
+func TestWaitGroupMovedOutOfBubble(t *testing.T) {
+       wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
+               var wg sync.WaitGroup
+               synctest.Run(func() {
+                       wg.Add(1)
+               })
+               wg.Add(1)
+       })
+}
+
+func TestWaitGroupMovedBetweenBubblesWithNonZeroCount(t *testing.T) {
+       wantFatal(t, "fatal error: sync: WaitGroup.Add called from multiple synctest bubbles", func() {
+               var wg sync.WaitGroup
+               synctest.Run(func() {
+                       wg.Add(1)
+               })
+               synctest.Run(func() {
+                       wg.Add(1)
+               })
+       })
+}
+
+func TestWaitGroupMovedBetweenBubblesWithZeroCount(t *testing.T) {
+       var wg sync.WaitGroup
+       synctest.Run(func() {
+               wg.Add(1)
+               wg.Done()
+       })
+       synctest.Run(func() {
+               // Reusing the WaitGroup is safe, because its count is zero.
+               wg.Add(1)
+               wg.Done()
+       })
+}
+
+func TestWaitGroupMovedBetweenBubblesAfterWait(t *testing.T) {
+       var wg sync.WaitGroup
+       synctest.Run(func() {
+               wg.Go(func() {})
+               wg.Wait()
+       })
+       synctest.Run(func() {
+               // Reusing the WaitGroup is safe, because its count is zero.
+               wg.Go(func() {})
+               wg.Wait()
+       })
+}
+
 func TestHappensBefore(t *testing.T) {
        // Use two parallel goroutines accessing different vars to ensure that
        // we correctly account for multiple goroutines in the bubble.
@@ -647,3 +727,23 @@ func wantPanic(t *testing.T, want string) {
                t.Errorf("got no panic, want one")
        }
 }
+
+func wantFatal(t *testing.T, want string, f func()) {
+       t.Helper()
+
+       if os.Getenv("GO_WANT_HELPER_PROCESS") == "1" {
+               f()
+               return
+       }
+
+       cmd := testenv.Command(t, testenv.Executable(t), "-test.run=^"+t.Name()+"$")
+       cmd = testenv.CleanCmdEnv(cmd)
+       cmd.Env = append(cmd.Env, "GO_WANT_HELPER_PROCESS=1")
+       out, err := cmd.CombinedOutput()
+       if err == nil {
+               t.Errorf("expected test function to panic, but test returned successfully")
+       }
+       if !strings.Contains(string(out), want) {
+               t.Errorf("wanted test output contaiing %q; got %q", want, string(out))
+       }
+}
index 3612d71e666a0f9b713be654e11049079e29c5a6..9361089b801e18616eee72128703365c42b599ac 100644 (file)
@@ -223,6 +223,7 @@ type mheap struct {
        specialReachableAlloc      fixalloc // allocator for specialReachable
        specialPinCounterAlloc     fixalloc // allocator for specialPinCounter
        specialWeakHandleAlloc     fixalloc // allocator for specialWeakHandle
+       specialBubbleAlloc         fixalloc // allocator for specialBubble
        speciallock                mutex    // lock for special record allocators.
        arenaHintAlloc             fixalloc // allocator for arenaHints
 
@@ -799,6 +800,7 @@ func (h *mheap) init() {
        h.specialReachableAlloc.init(unsafe.Sizeof(specialReachable{}), nil, nil, &memstats.other_sys)
        h.specialPinCounterAlloc.init(unsafe.Sizeof(specialPinCounter{}), nil, nil, &memstats.other_sys)
        h.specialWeakHandleAlloc.init(unsafe.Sizeof(specialWeakHandle{}), nil, nil, &memstats.gcMiscSys)
+       h.specialBubbleAlloc.init(unsafe.Sizeof(specialBubble{}), nil, nil, &memstats.other_sys)
        h.arenaHintAlloc.init(unsafe.Sizeof(arenaHint{}), nil, nil, &memstats.other_sys)
 
        // Don't zero mspan allocations. Background sweeping can
@@ -2003,6 +2005,8 @@ const (
        // _KindSpecialCheckFinalizer adds additional context to a finalizer or cleanup.
        // Used only if debug.checkfinalizers != 0.
        _KindSpecialCheckFinalizer = 8
+       // _KindSpecialBubble is used to associate objects with synctest bubbles.
+       _KindSpecialBubble = 9
 )
 
 type special struct {
@@ -2839,6 +2843,11 @@ func freeSpecial(s *special, p unsafe.Pointer, size uintptr) {
                lock(&mheap_.speciallock)
                mheap_.specialTinyBlockAlloc.free(unsafe.Pointer(st))
                unlock(&mheap_.speciallock)
+       case _KindSpecialBubble:
+               st := (*specialBubble)(unsafe.Pointer(s))
+               lock(&mheap_.speciallock)
+               mheap_.specialBubbleAlloc.free(unsafe.Pointer(st))
+               unlock(&mheap_.speciallock)
        default:
                throw("bad special kind")
                panic("not reached")
index 94ab87f6db218dd6a611617984d80d1b5b3b94ba..cd40586bc27daebdf2dea6237f6e53c67611e8b7 100644 (file)
@@ -1096,6 +1096,7 @@ const (
        waitReasonSynctestChanReceive                     // "chan receive (synctest)"
        waitReasonSynctestChanSend                        // "chan send (synctest)"
        waitReasonSynctestSelect                          // "select (synctest)"
+       waitReasonSynctestWaitGroupWait                   // "sync.WaitGroup.Wait (synctest)"
        waitReasonCleanupWait                             // "cleanup wait"
 )
 
@@ -1145,6 +1146,7 @@ var waitReasonStrings = [...]string{
        waitReasonSynctestChanReceive:   "chan receive (synctest)",
        waitReasonSynctestChanSend:      "chan send (synctest)",
        waitReasonSynctestSelect:        "select (synctest)",
+       waitReasonSynctestWaitGroupWait: "sync.WaitGroup.Wait (synctest)",
        waitReasonCleanupWait:           "cleanup wait",
 }
 
@@ -1190,18 +1192,18 @@ func (w waitReason) isIdleInSynctest() bool {
 
 // isIdleInSynctest indicates that a goroutine is considered idle by synctest.Wait.
 var isIdleInSynctest = [len(waitReasonStrings)]bool{
-       waitReasonChanReceiveNilChan:  true,
-       waitReasonChanSendNilChan:     true,
-       waitReasonSelectNoCases:       true,
-       waitReasonSleep:               true,
-       waitReasonSyncCondWait:        true,
-       waitReasonSyncWaitGroupWait:   true,
-       waitReasonCoroutine:           true,
-       waitReasonSynctestRun:         true,
-       waitReasonSynctestWait:        true,
-       waitReasonSynctestChanReceive: true,
-       waitReasonSynctestChanSend:    true,
-       waitReasonSynctestSelect:      true,
+       waitReasonChanReceiveNilChan:    true,
+       waitReasonChanSendNilChan:       true,
+       waitReasonSelectNoCases:         true,
+       waitReasonSleep:                 true,
+       waitReasonSyncCondWait:          true,
+       waitReasonSynctestWaitGroupWait: true,
+       waitReasonCoroutine:             true,
+       waitReasonSynctestRun:           true,
+       waitReasonSynctestWait:          true,
+       waitReasonSynctestChanReceive:   true,
+       waitReasonSynctestChanSend:      true,
+       waitReasonSynctestSelect:        true,
 }
 
 var (
index 4890df346407f4049a4f585050744a48134b7fca..7d6fc6d57d9309d52c6f0d64c26cad789f6038f0 100644 (file)
@@ -106,8 +106,12 @@ func sync_runtime_SemacquireRWMutex(addr *uint32, lifo bool, skipframes int) {
 }
 
 //go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup
-func sync_runtime_SemacquireWaitGroup(addr *uint32) {
-       semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait)
+func sync_runtime_SemacquireWaitGroup(addr *uint32, synctestDurable bool) {
+       reason := waitReasonSyncWaitGroupWait
+       if synctestDurable {
+               reason = waitReasonSynctestWaitGroupWait
+       }
+       semacquire1(addr, false, semaBlockProfile, 0, reason)
 }
 
 //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
index ff1979a8d81db5f6895cd9a7d9f1871e38a4c02e..f676afa20d09e2fabd1dc8299ed7db46c9a787e3 100644 (file)
@@ -5,6 +5,7 @@
 package runtime
 
 import (
+       "internal/runtime/atomic"
        "internal/runtime/sys"
        "unsafe"
 )
@@ -13,12 +14,13 @@ import (
 type synctestBubble struct {
        mu      mutex
        timers  timers
-       now     int64 // current fake time
-       root    *g    // caller of synctest.Run
-       waiter  *g    // caller of synctest.Wait
-       main    *g    // goroutine started by synctest.Run
-       waiting bool  // true if a goroutine is calling synctest.Wait
-       done    bool  // true if main has exited
+       id      uint64 // unique id
+       now     int64  // current fake time
+       root    *g     // caller of synctest.Run
+       waiter  *g     // caller of synctest.Wait
+       main    *g     // goroutine started by synctest.Run
+       waiting bool   // true if a goroutine is calling synctest.Wait
+       done    bool   // true if main has exited
 
        // The bubble is active (not blocked) so long as running > 0 || active > 0.
        //
@@ -163,6 +165,8 @@ func (bubble *synctestBubble) raceaddr() unsafe.Pointer {
        return unsafe.Pointer(bubble)
 }
 
+var bubbleGen atomic.Uint64 // bubble ID counter
+
 //go:linkname synctestRun internal/synctest.Run
 func synctestRun(f func()) {
        if debug.asynctimerchan.Load() != 0 {
@@ -174,6 +178,7 @@ func synctestRun(f func()) {
                panic("synctest.Run called from within a synctest bubble")
        }
        bubble := &synctestBubble{
+               id:      bubbleGen.Add(1),
                total:   1,
                running: 1,
                root:    gp,
@@ -313,6 +318,11 @@ func synctestwait_c(gp *g, _ unsafe.Pointer) bool {
        return true
 }
 
+//go:linkname synctest_isInBubble internal/synctest.IsInBubble
+func synctest_isInBubble() bool {
+       return getg().bubble != nil
+}
+
 //go:linkname synctest_acquire internal/synctest.acquire
 func synctest_acquire() any {
        if bubble := getg().bubble; bubble != nil {
@@ -339,3 +349,85 @@ func synctest_inBubble(bubble any, f func()) {
        }()
        f()
 }
+
+// specialBubble is a special used to associate objects with bubbles.
+type specialBubble struct {
+       _        sys.NotInHeap
+       special  special
+       bubbleid uint64
+}
+
+// getOrSetBubbleSpecial checks the special record for p's bubble membership.
+//
+// If add is true and p is not associated with any bubble,
+// it adds a special record for p associating it with bubbleid.
+//
+// It returns ok==true if p is associated with bubbleid
+// (including if a new association was added),
+// and ok==false if not.
+func getOrSetBubbleSpecial(p unsafe.Pointer, bubbleid uint64, add bool) (ok bool) {
+       span := spanOfHeap(uintptr(p))
+       if span == nil {
+               throw("getOrSetBubbleSpecial on invalid pointer")
+       }
+
+       // Ensure that the span is swept.
+       // Sweeping accesses the specials list w/o locks, so we have
+       // to synchronize with it. And it's just much safer.
+       mp := acquirem()
+       span.ensureSwept()
+
+       offset := uintptr(p) - span.base()
+
+       lock(&span.speciallock)
+
+       // Find splice point, check for existing record.
+       iter, exists := span.specialFindSplicePoint(offset, _KindSpecialBubble)
+       if exists {
+               // p is already associated with a bubble.
+               // Return true iff it's the same bubble.
+               s := (*specialBubble)((unsafe.Pointer)(*iter))
+               ok = s.bubbleid == bubbleid
+       } else if add {
+               // p is not associated with a bubble,
+               // and we've been asked to add an association.
+               s := (*specialBubble)(mheap_.specialBubbleAlloc.alloc())
+               s.bubbleid = bubbleid
+               s.special.kind = _KindSpecialBubble
+               s.special.offset = offset
+               s.special.next = *iter
+               *iter = (*special)(unsafe.Pointer(s))
+               spanHasSpecials(span)
+               ok = true
+       } else {
+               // p is not associated with a bubble.
+               ok = false
+       }
+
+       unlock(&span.speciallock)
+       releasem(mp)
+
+       return ok
+}
+
+// synctest_associate associates p with the current bubble.
+// It returns false if p is already associated with a different bubble.
+//
+//go:linkname synctest_associate internal/synctest.associate
+func synctest_associate(p unsafe.Pointer) (ok bool) {
+       return getOrSetBubbleSpecial(p, getg().bubble.id, true)
+}
+
+// synctest_disassociate disassociates p from its bubble.
+//
+//go:linkname synctest_disassociate internal/synctest.disassociate
+func synctest_disassociate(p unsafe.Pointer) {
+       removespecial(p, _KindSpecialBubble)
+}
+
+// synctest_isAssociated reports whether p is associated with the current bubble.
+//
+//go:linkname synctest_isAssociated internal/synctest.isAssociated
+func synctest_isAssociated(p unsafe.Pointer) bool {
+       return getOrSetBubbleSpecial(p, getg().bubble.id, false)
+}
index 99e5bccbee40be768f6c7541b36da97384ece85f..ae3368e58d024802a14abf315225bf3312d43453 100644 (file)
@@ -14,7 +14,7 @@ import "unsafe"
 func runtime_Semacquire(s *uint32)
 
 // SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait.
-func runtime_SemacquireWaitGroup(s *uint32)
+func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool)
 
 // Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended
 // Mutexes and RWMutexes.
index c850f58ed14e41817f5911b2d35b23f6a9127602..efc63be0990af2726402d832916c8baa96d8802b 100644 (file)
@@ -6,6 +6,7 @@ package sync
 
 import (
        "internal/race"
+       "internal/synctest"
        "sync/atomic"
        "unsafe"
 )
@@ -47,10 +48,17 @@ import (
 type WaitGroup struct {
        noCopy noCopy
 
-       state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
+       // Bits (high to low):
+       //   bits[0:32]  counter
+       //   bits[32]    flag: synctest bubble membership
+       //   bits[33:64] wait count
+       state atomic.Uint64
        sema  uint32
 }
 
+// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
+const waitGroupBubbleFlag = 0x8000_0000
+
 // Add adds delta, which may be negative, to the [WaitGroup] task counter.
 // If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.
 // If the counter goes negative, Add panics.
@@ -75,9 +83,27 @@ func (wg *WaitGroup) Add(delta int) {
                race.Disable()
                defer race.Enable()
        }
+       if synctest.IsInBubble() {
+               // If Add is called from within a bubble, then all Add calls must be made
+               // from the same bubble.
+               if !synctest.Associate(wg) {
+                       // wg is already associated with a different bubble.
+                       fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
+               } else {
+                       state := wg.state.Or(waitGroupBubbleFlag)
+                       if state != 0 && state&waitGroupBubbleFlag == 0 {
+                               // Add has been called from outside this bubble.
+                               fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+                       }
+               }
+       }
        state := wg.state.Add(uint64(delta) << 32)
+       if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() {
+               // Add has been called from within a synctest bubble (and we aren't in one).
+               fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+       }
        v := int32(state >> 32)
-       w := uint32(state)
+       w := uint32(state & 0x7fffffff)
        if race.Enabled && delta > 0 && v == int32(delta) {
                // The first increment must be synchronized with Wait.
                // Need to model this as a read, because there can be
@@ -90,6 +116,13 @@ func (wg *WaitGroup) Add(delta int) {
        if w != 0 && delta > 0 && v == int32(delta) {
                panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
+       if v == 0 && state&waitGroupBubbleFlag != 0 {
+               // Disassociate the WaitGroup from its bubble.
+               synctest.Disassociate(wg)
+               if w == 0 {
+                       wg.state.Store(0)
+               }
+       }
        if v > 0 || w == 0 {
                return
        }
@@ -147,7 +180,21 @@ func (wg *WaitGroup) Wait() {
                                // otherwise concurrent Waits will race with each other.
                                race.Write(unsafe.Pointer(&wg.sema))
                        }
-                       runtime_SemacquireWaitGroup(&wg.sema)
+                       synctestDurable := false
+                       if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
+                               if race.Enabled {
+                                       race.Enable()
+                               }
+                               if synctest.IsAssociated(wg) {
+                                       // Add was called within the current bubble,
+                                       // so this Wait is durably blocking.
+                                       synctestDurable = true
+                               }
+                               if race.Enabled {
+                                       race.Disable()
+                               }
+                       }
+                       runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
                        if wg.state.Load() != 0 {
                                panic("sync: WaitGroup is reused before previous Wait has returned")
                        }
index c7e93b22013d7b7ba6b20a35c203eaa2200d9610..1664cb8484a5c19c5b3e141114d440dc04d73c13 100644 (file)
 //   - a blocking select statement where every case is a channel created
 //     within the bubble
 //   - [sync.Cond.Wait]
-//   - [sync.WaitGroup.Wait]
+//   - [sync.WaitGroup.Wait], when [sync.WaitGroup.Add] was called within the bubble
 //   - [time.Sleep]
 //
-// Locking a [sync.Mutex] or [sync.RWMutex] is not durably blocking.
+// Operations not in the above list are not durably blocking.
+// In particular, the following operations may block a goroutine,
+// but are not durably blocking because the goroutine can be unblocked
+// by an event occurring outside its bubble:
+//
+//   - locking a [sync.Mutex] or [sync.RWMutex]
+//   - blocking on I/O, such as reading from a network socket
+//   - system calls
 //
 // # Isolation
 //
 // is associated with it. Operating on a bubbled channel, timer, or
 // ticker from outside the bubble panics.
 //
+// A [sync.WaitGroup] becomes associated with a bubble on the first
+// call to Add or Go. Once a WaitGroup is associated with a bubble,
+// calling Add or Go from outside that bubble panics.
+//
 // Cleanup functions and finalizers registered with
 // [runtime.AddCleanup] and [runtime.SetFinalizer]
 // run outside of any bubble.