< sync/atomic
< internal/sync
< weak
+ < internal/synctest
< sync
< internal/bisect
< internal/godebug
unicode !< path;
- RUNTIME
- < internal/synctest;
-
# SYSCALL is RUNTIME plus the packages necessary for basic system calls.
RUNTIME, unicode/utf8, unicode/utf16, internal/synctest
< internal/syscall/windows/sysdll, syscall/js
package synctest
import (
- _ "unsafe" // for go:linkname
+ "unsafe"
)
//go:linkname Run
//go:linkname Wait
func Wait()
+// IsInBubble reports whether the current goroutine is in a bubble.
+//
+//go:linkname IsInBubble
+func IsInBubble() bool
+
+// Associate associates p with the current bubble.
+// It returns false if p has an existing association with a different bubble.
+func Associate[T any](p *T) (ok bool) {
+ return associate(unsafe.Pointer(p))
+}
+
+//go:linkname associate
+func associate(p unsafe.Pointer) bool
+
+// Disassociate disassociates p from any bubble.
+func Disassociate[T any](p *T) {
+ disassociate(unsafe.Pointer(p))
+}
+
+//go:linkname disassociate
+func disassociate(b unsafe.Pointer)
+
+// IsAssociated reports whether p is associated with the current bubble.
+func IsAssociated[T any](p *T) bool {
+ return isAssociated(unsafe.Pointer(p))
+}
+
+//go:linkname isAssociated
+func isAssociated(p unsafe.Pointer) bool
+
//go:linkname acquire
func acquire() any
import (
"fmt"
"internal/synctest"
+ "internal/testenv"
"iter"
+ "os"
"reflect"
"runtime"
"slices"
"strconv"
+ "strings"
"sync"
"testing"
"time"
})
}
-func TestWaitGroup(t *testing.T) {
+func TestWaitGroupInBubble(t *testing.T) {
synctest.Run(func() {
var wg sync.WaitGroup
wg.Add(1)
})
}
+func TestWaitGroupOutOfBubble(t *testing.T) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ donec := make(chan struct{})
+ go synctest.Run(func() {
+ // Since wg.Add was called outside the bubble, Wait is not durably blocking
+ // and this waits until wg.Done is called below.
+ wg.Wait()
+ close(donec)
+ })
+ select {
+ case <-donec:
+ t.Fatalf("synctest.Run finished before WaitGroup.Done called")
+ case <-time.After(1 * time.Millisecond):
+ }
+ wg.Done()
+ <-donec
+}
+
+func TestWaitGroupMovedIntoBubble(t *testing.T) {
+ wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ synctest.Run(func() {
+ wg.Add(1)
+ })
+ })
+}
+
+func TestWaitGroupMovedOutOfBubble(t *testing.T) {
+ wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
+ var wg sync.WaitGroup
+ synctest.Run(func() {
+ wg.Add(1)
+ })
+ wg.Add(1)
+ })
+}
+
+func TestWaitGroupMovedBetweenBubblesWithNonZeroCount(t *testing.T) {
+ wantFatal(t, "fatal error: sync: WaitGroup.Add called from multiple synctest bubbles", func() {
+ var wg sync.WaitGroup
+ synctest.Run(func() {
+ wg.Add(1)
+ })
+ synctest.Run(func() {
+ wg.Add(1)
+ })
+ })
+}
+
+func TestWaitGroupMovedBetweenBubblesWithZeroCount(t *testing.T) {
+ var wg sync.WaitGroup
+ synctest.Run(func() {
+ wg.Add(1)
+ wg.Done()
+ })
+ synctest.Run(func() {
+ // Reusing the WaitGroup is safe, because its count is zero.
+ wg.Add(1)
+ wg.Done()
+ })
+}
+
+func TestWaitGroupMovedBetweenBubblesAfterWait(t *testing.T) {
+ var wg sync.WaitGroup
+ synctest.Run(func() {
+ wg.Go(func() {})
+ wg.Wait()
+ })
+ synctest.Run(func() {
+ // Reusing the WaitGroup is safe, because its count is zero.
+ wg.Go(func() {})
+ wg.Wait()
+ })
+}
+
func TestHappensBefore(t *testing.T) {
// Use two parallel goroutines accessing different vars to ensure that
// we correctly account for multiple goroutines in the bubble.
t.Errorf("got no panic, want one")
}
}
+
+func wantFatal(t *testing.T, want string, f func()) {
+ t.Helper()
+
+ if os.Getenv("GO_WANT_HELPER_PROCESS") == "1" {
+ f()
+ return
+ }
+
+ cmd := testenv.Command(t, testenv.Executable(t), "-test.run=^"+t.Name()+"$")
+ cmd = testenv.CleanCmdEnv(cmd)
+ cmd.Env = append(cmd.Env, "GO_WANT_HELPER_PROCESS=1")
+ out, err := cmd.CombinedOutput()
+ if err == nil {
+ t.Errorf("expected test function to panic, but test returned successfully")
+ }
+ if !strings.Contains(string(out), want) {
+ t.Errorf("wanted test output contaiing %q; got %q", want, string(out))
+ }
+}
specialReachableAlloc fixalloc // allocator for specialReachable
specialPinCounterAlloc fixalloc // allocator for specialPinCounter
specialWeakHandleAlloc fixalloc // allocator for specialWeakHandle
+ specialBubbleAlloc fixalloc // allocator for specialBubble
speciallock mutex // lock for special record allocators.
arenaHintAlloc fixalloc // allocator for arenaHints
h.specialReachableAlloc.init(unsafe.Sizeof(specialReachable{}), nil, nil, &memstats.other_sys)
h.specialPinCounterAlloc.init(unsafe.Sizeof(specialPinCounter{}), nil, nil, &memstats.other_sys)
h.specialWeakHandleAlloc.init(unsafe.Sizeof(specialWeakHandle{}), nil, nil, &memstats.gcMiscSys)
+ h.specialBubbleAlloc.init(unsafe.Sizeof(specialBubble{}), nil, nil, &memstats.other_sys)
h.arenaHintAlloc.init(unsafe.Sizeof(arenaHint{}), nil, nil, &memstats.other_sys)
// Don't zero mspan allocations. Background sweeping can
// _KindSpecialCheckFinalizer adds additional context to a finalizer or cleanup.
// Used only if debug.checkfinalizers != 0.
_KindSpecialCheckFinalizer = 8
+ // _KindSpecialBubble is used to associate objects with synctest bubbles.
+ _KindSpecialBubble = 9
)
type special struct {
lock(&mheap_.speciallock)
mheap_.specialTinyBlockAlloc.free(unsafe.Pointer(st))
unlock(&mheap_.speciallock)
+ case _KindSpecialBubble:
+ st := (*specialBubble)(unsafe.Pointer(s))
+ lock(&mheap_.speciallock)
+ mheap_.specialBubbleAlloc.free(unsafe.Pointer(st))
+ unlock(&mheap_.speciallock)
default:
throw("bad special kind")
panic("not reached")
waitReasonSynctestChanReceive // "chan receive (synctest)"
waitReasonSynctestChanSend // "chan send (synctest)"
waitReasonSynctestSelect // "select (synctest)"
+ waitReasonSynctestWaitGroupWait // "sync.WaitGroup.Wait (synctest)"
waitReasonCleanupWait // "cleanup wait"
)
waitReasonSynctestChanReceive: "chan receive (synctest)",
waitReasonSynctestChanSend: "chan send (synctest)",
waitReasonSynctestSelect: "select (synctest)",
+ waitReasonSynctestWaitGroupWait: "sync.WaitGroup.Wait (synctest)",
waitReasonCleanupWait: "cleanup wait",
}
// isIdleInSynctest indicates that a goroutine is considered idle by synctest.Wait.
var isIdleInSynctest = [len(waitReasonStrings)]bool{
- waitReasonChanReceiveNilChan: true,
- waitReasonChanSendNilChan: true,
- waitReasonSelectNoCases: true,
- waitReasonSleep: true,
- waitReasonSyncCondWait: true,
- waitReasonSyncWaitGroupWait: true,
- waitReasonCoroutine: true,
- waitReasonSynctestRun: true,
- waitReasonSynctestWait: true,
- waitReasonSynctestChanReceive: true,
- waitReasonSynctestChanSend: true,
- waitReasonSynctestSelect: true,
+ waitReasonChanReceiveNilChan: true,
+ waitReasonChanSendNilChan: true,
+ waitReasonSelectNoCases: true,
+ waitReasonSleep: true,
+ waitReasonSyncCondWait: true,
+ waitReasonSynctestWaitGroupWait: true,
+ waitReasonCoroutine: true,
+ waitReasonSynctestRun: true,
+ waitReasonSynctestWait: true,
+ waitReasonSynctestChanReceive: true,
+ waitReasonSynctestChanSend: true,
+ waitReasonSynctestSelect: true,
}
var (
}
//go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup
-func sync_runtime_SemacquireWaitGroup(addr *uint32) {
- semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait)
+func sync_runtime_SemacquireWaitGroup(addr *uint32, synctestDurable bool) {
+ reason := waitReasonSyncWaitGroupWait
+ if synctestDurable {
+ reason = waitReasonSynctestWaitGroupWait
+ }
+ semacquire1(addr, false, semaBlockProfile, 0, reason)
}
//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
package runtime
import (
+ "internal/runtime/atomic"
"internal/runtime/sys"
"unsafe"
)
type synctestBubble struct {
mu mutex
timers timers
- now int64 // current fake time
- root *g // caller of synctest.Run
- waiter *g // caller of synctest.Wait
- main *g // goroutine started by synctest.Run
- waiting bool // true if a goroutine is calling synctest.Wait
- done bool // true if main has exited
+ id uint64 // unique id
+ now int64 // current fake time
+ root *g // caller of synctest.Run
+ waiter *g // caller of synctest.Wait
+ main *g // goroutine started by synctest.Run
+ waiting bool // true if a goroutine is calling synctest.Wait
+ done bool // true if main has exited
// The bubble is active (not blocked) so long as running > 0 || active > 0.
//
return unsafe.Pointer(bubble)
}
+var bubbleGen atomic.Uint64 // bubble ID counter
+
//go:linkname synctestRun internal/synctest.Run
func synctestRun(f func()) {
if debug.asynctimerchan.Load() != 0 {
panic("synctest.Run called from within a synctest bubble")
}
bubble := &synctestBubble{
+ id: bubbleGen.Add(1),
total: 1,
running: 1,
root: gp,
return true
}
+//go:linkname synctest_isInBubble internal/synctest.IsInBubble
+func synctest_isInBubble() bool {
+ return getg().bubble != nil
+}
+
//go:linkname synctest_acquire internal/synctest.acquire
func synctest_acquire() any {
if bubble := getg().bubble; bubble != nil {
}()
f()
}
+
+// specialBubble is a special used to associate objects with bubbles.
+type specialBubble struct {
+ _ sys.NotInHeap
+ special special
+ bubbleid uint64
+}
+
+// getOrSetBubbleSpecial checks the special record for p's bubble membership.
+//
+// If add is true and p is not associated with any bubble,
+// it adds a special record for p associating it with bubbleid.
+//
+// It returns ok==true if p is associated with bubbleid
+// (including if a new association was added),
+// and ok==false if not.
+func getOrSetBubbleSpecial(p unsafe.Pointer, bubbleid uint64, add bool) (ok bool) {
+ span := spanOfHeap(uintptr(p))
+ if span == nil {
+ throw("getOrSetBubbleSpecial on invalid pointer")
+ }
+
+ // Ensure that the span is swept.
+ // Sweeping accesses the specials list w/o locks, so we have
+ // to synchronize with it. And it's just much safer.
+ mp := acquirem()
+ span.ensureSwept()
+
+ offset := uintptr(p) - span.base()
+
+ lock(&span.speciallock)
+
+ // Find splice point, check for existing record.
+ iter, exists := span.specialFindSplicePoint(offset, _KindSpecialBubble)
+ if exists {
+ // p is already associated with a bubble.
+ // Return true iff it's the same bubble.
+ s := (*specialBubble)((unsafe.Pointer)(*iter))
+ ok = s.bubbleid == bubbleid
+ } else if add {
+ // p is not associated with a bubble,
+ // and we've been asked to add an association.
+ s := (*specialBubble)(mheap_.specialBubbleAlloc.alloc())
+ s.bubbleid = bubbleid
+ s.special.kind = _KindSpecialBubble
+ s.special.offset = offset
+ s.special.next = *iter
+ *iter = (*special)(unsafe.Pointer(s))
+ spanHasSpecials(span)
+ ok = true
+ } else {
+ // p is not associated with a bubble.
+ ok = false
+ }
+
+ unlock(&span.speciallock)
+ releasem(mp)
+
+ return ok
+}
+
+// synctest_associate associates p with the current bubble.
+// It returns false if p is already associated with a different bubble.
+//
+//go:linkname synctest_associate internal/synctest.associate
+func synctest_associate(p unsafe.Pointer) (ok bool) {
+ return getOrSetBubbleSpecial(p, getg().bubble.id, true)
+}
+
+// synctest_disassociate disassociates p from its bubble.
+//
+//go:linkname synctest_disassociate internal/synctest.disassociate
+func synctest_disassociate(p unsafe.Pointer) {
+ removespecial(p, _KindSpecialBubble)
+}
+
+// synctest_isAssociated reports whether p is associated with the current bubble.
+//
+//go:linkname synctest_isAssociated internal/synctest.isAssociated
+func synctest_isAssociated(p unsafe.Pointer) bool {
+ return getOrSetBubbleSpecial(p, getg().bubble.id, false)
+}
func runtime_Semacquire(s *uint32)
// SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait.
-func runtime_SemacquireWaitGroup(s *uint32)
+func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool)
// Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended
// Mutexes and RWMutexes.
import (
"internal/race"
+ "internal/synctest"
"sync/atomic"
"unsafe"
)
type WaitGroup struct {
noCopy noCopy
- state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
+ // Bits (high to low):
+ // bits[0:32] counter
+ // bits[32] flag: synctest bubble membership
+ // bits[33:64] wait count
+ state atomic.Uint64
sema uint32
}
+// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
+const waitGroupBubbleFlag = 0x8000_0000
+
// Add adds delta, which may be negative, to the [WaitGroup] task counter.
// If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.
// If the counter goes negative, Add panics.
race.Disable()
defer race.Enable()
}
+ if synctest.IsInBubble() {
+ // If Add is called from within a bubble, then all Add calls must be made
+ // from the same bubble.
+ if !synctest.Associate(wg) {
+ // wg is already associated with a different bubble.
+ fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
+ } else {
+ state := wg.state.Or(waitGroupBubbleFlag)
+ if state != 0 && state&waitGroupBubbleFlag == 0 {
+ // Add has been called from outside this bubble.
+ fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+ }
+ }
+ }
state := wg.state.Add(uint64(delta) << 32)
+ if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() {
+ // Add has been called from within a synctest bubble (and we aren't in one).
+ fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+ }
v := int32(state >> 32)
- w := uint32(state)
+ w := uint32(state & 0x7fffffff)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
+ if v == 0 && state&waitGroupBubbleFlag != 0 {
+ // Disassociate the WaitGroup from its bubble.
+ synctest.Disassociate(wg)
+ if w == 0 {
+ wg.state.Store(0)
+ }
+ }
if v > 0 || w == 0 {
return
}
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
- runtime_SemacquireWaitGroup(&wg.sema)
+ synctestDurable := false
+ if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
+ if race.Enabled {
+ race.Enable()
+ }
+ if synctest.IsAssociated(wg) {
+ // Add was called within the current bubble,
+ // so this Wait is durably blocking.
+ synctestDurable = true
+ }
+ if race.Enabled {
+ race.Disable()
+ }
+ }
+ runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
// - a blocking select statement where every case is a channel created
// within the bubble
// - [sync.Cond.Wait]
-// - [sync.WaitGroup.Wait]
+// - [sync.WaitGroup.Wait], when [sync.WaitGroup.Add] was called within the bubble
// - [time.Sleep]
//
-// Locking a [sync.Mutex] or [sync.RWMutex] is not durably blocking.
+// Operations not in the above list are not durably blocking.
+// In particular, the following operations may block a goroutine,
+// but are not durably blocking because the goroutine can be unblocked
+// by an event occurring outside its bubble:
+//
+// - locking a [sync.Mutex] or [sync.RWMutex]
+// - blocking on I/O, such as reading from a network socket
+// - system calls
//
// # Isolation
//
// is associated with it. Operating on a bubbled channel, timer, or
// ticker from outside the bubble panics.
//
+// A [sync.WaitGroup] becomes associated with a bubble on the first
+// call to Add or Go. Once a WaitGroup is associated with a bubble,
+// calling Add or Go from outside that bubble panics.
+//
// Cleanup functions and finalizers registered with
// [runtime.AddCleanup] and [runtime.SetFinalizer]
// run outside of any bubble.