From: Austin Clements Date: Mon, 15 Feb 2016 22:38:06 +0000 (-0500) Subject: runtime: make shrinkstack concurrent-safe X-Git-Tag: go1.7beta1~1282 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=276b1777717ec5a0a02364a5aee806f8076bb60b;p=gostls13.git runtime: make shrinkstack concurrent-safe Currently shinkstack is only safe during STW because it adjusts channel-related stack pointers and moves send/receive stack slots without synchronizing with the channel code. Make it safe to use when the world isn't stopped by: 1) Locking all channels the G is blocked on while adjusting the sudogs and copying the area of the stack that may contain send/receive slots. 2) For any stack frames that may contain send/receive slot, using an atomic CAS to adjust pointers to prevent races between adjusting a pointer in a receive slot and a concurrent send writing to that receive slot. In principle, the synchronization could be finer-grained. For example, we considered synchronizing around the sudogs, which would allow channel operations involving other Gs to continue if the G being shrunk was far enough down the send/receive queue. However, using the channel lock means no additional locks are necessary in the channel code. Furthermore, the stack shrinking code holds the channel lock for a very short time (much less than the time required to shrink the stack). This does not yet make stack shrinking concurrent; it merely makes doing so safe. This has negligible effect on the go1 and garbage benchmarks. For #12967. Change-Id: Ia49df3a8a7be4b36e365aac4155a2416b94b988c Reviewed-on: https://go-review.googlesource.com/20042 Reviewed-by: Keith Randall Run-TryBot: Austin Clements --- diff --git a/src/runtime/chan_test.go b/src/runtime/chan_test.go index 2cdfae866c..911821bea5 100644 --- a/src/runtime/chan_test.go +++ b/src/runtime/chan_test.go @@ -586,6 +586,67 @@ func TestSelectDuplicateChannel(t *testing.T) { c <- 8 // wake up B. This operation used to fail because c.recvq was corrupted (it tries to wake up an already running G instead of B) } +var selectSink interface{} + +func TestSelectStackAdjust(t *testing.T) { + // Test that channel receive slots that contain local stack + // pointers are adjusted correctly by stack shrinking. + c := make(chan *int) + d := make(chan *int) + ready := make(chan bool) + go func() { + // Temporarily grow the stack to 10K. + stackGrowthRecursive((10 << 10) / (128 * 8)) + + // We're ready to trigger GC and stack shrink. + ready <- true + + val := 42 + var cx *int + cx = &val + // Receive from d. cx won't be affected. + select { + case cx = <-c: + case <-d: + } + + // Check that pointer in cx was adjusted correctly. + if cx != &val { + t.Error("cx no longer points to val") + } else if val != 42 { + t.Error("val changed") + } else { + *cx = 43 + if val != 43 { + t.Error("changing *cx failed to change val") + } + } + ready <- true + }() + + // Let the goroutine get into the select. + <-ready + time.Sleep(10 * time.Millisecond) + + // Force concurrent GC a few times. + var before, after runtime.MemStats + runtime.ReadMemStats(&before) + for i := 0; i < 100; i++ { + selectSink = new([1 << 20]byte) + runtime.ReadMemStats(&after) + if after.NumGC-before.NumGC >= 2 { + goto done + } + } + t.Fatal("failed to trigger concurrent GC") +done: + selectSink = nil + + // Wake select. + d <- nil + <-ready +} + func BenchmarkChanNonblocking(b *testing.B) { myc := make(chan int) b.RunParallel(func(pb *testing.PB) { diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index eda258a992..1935270936 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -215,7 +215,8 @@ type gobuf struct { // selecttype. type sudog struct { // The following fields are protected by the hchan.lock of the - // channel this sudog is blocking on. + // channel this sudog is blocking on. shrinkstack depends on + // this. g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) diff --git a/src/runtime/stack.go b/src/runtime/stack.go index 6450094ef7..06e6416617 100644 --- a/src/runtime/stack.go +++ b/src/runtime/stack.go @@ -516,6 +516,9 @@ type adjustinfo struct { old stack delta uintptr // ptr distance from old to new stack (newbase - oldbase) cache pcvalueCache + + // sghi is the highest sudog.elem on the stack. + sghi uintptr } // Adjustpointer checks whether *vpp is in the old stack described by adjinfo. @@ -564,12 +567,19 @@ func adjustpointers(scanp unsafe.Pointer, cbv *bitvector, adjinfo *adjustinfo, f maxp := adjinfo.old.hi delta := adjinfo.delta num := bv.n + // If this frame might contain channel receive slots, use CAS + // to adjust pointers. If the slot hasn't been received into + // yet, it may contain stack pointers and a concurrent send + // could race with adjusting those pointers. (The sent value + // itself can never contain stack pointers.) + useCAS := uintptr(scanp) < adjinfo.sghi for i := uintptr(0); i < num; i++ { if stackDebug >= 4 { print(" ", add(scanp, i*sys.PtrSize), ":", ptrnames[ptrbit(&bv, i)], ":", hex(*(*uintptr)(add(scanp, i*sys.PtrSize))), " # ", i, " ", bv.bytedata[i/8], "\n") } if ptrbit(&bv, i) == 1 { pp := (*uintptr)(add(scanp, i*sys.PtrSize)) + retry: p := *pp if f != nil && 0 < p && p < _PageSize && debug.invalidptr != 0 || p == poisonStack { // Looks like a junk value in a pointer slot. @@ -582,7 +592,14 @@ func adjustpointers(scanp unsafe.Pointer, cbv *bitvector, adjinfo *adjustinfo, f if stackDebug >= 3 { print("adjust ptr ", p, " ", funcname(f), "\n") } - *pp = p + delta + if useCAS { + ppu := (*unsafe.Pointer)(unsafe.Pointer(pp)) + if !atomic.Casp1(ppu, unsafe.Pointer(p), unsafe.Pointer(p+delta)) { + goto retry + } + } else { + *pp = p + delta + } } } } @@ -727,9 +744,68 @@ func fillstack(stk stack, b byte) { } } +func findsghi(gp *g, stk stack) uintptr { + var sghi uintptr + for sg := gp.waiting; sg != nil; sg = sg.waitlink { + p := uintptr(sg.elem) + uintptr(sg.c.elemsize) + if stk.lo <= p && p < stk.hi && p > sghi { + sghi = p + } + p = uintptr(unsafe.Pointer(sg.selectdone)) + unsafe.Sizeof(sg.selectdone) + if stk.lo <= p && p < stk.hi && p > sghi { + sghi = p + } + } + return sghi +} + +// syncadjustsudogs adjusts gp's sudogs and copies the part of gp's +// stack they refer to while synchronizing with concurrent channel +// operations. It returns the number of bytes of stack copied. +func syncadjustsudogs(gp *g, used uintptr, adjinfo *adjustinfo) uintptr { + if gp.waiting == nil { + return 0 + } + + // Lock channels to prevent concurrent send/receive. + // It's important that we *only* do this for async + // copystack; otherwise, gp may be in the middle of + // putting itself on wait queues and this would + // self-deadlock. + for sg := gp.waiting; sg != nil; sg = sg.waitlink { + lock(&sg.c.lock) + } + + // Adjust sudogs. + adjustsudogs(gp, adjinfo) + + // Copy the part of the stack the sudogs point in to + // while holding the lock to prevent races on + // send/receive slots. + var sgsize uintptr + if adjinfo.sghi != 0 { + oldBot := adjinfo.old.hi - used + newBot := oldBot + adjinfo.delta + sgsize = adjinfo.sghi - oldBot + memmove(unsafe.Pointer(newBot), unsafe.Pointer(oldBot), sgsize) + } + + // Unlock channels. + for sg := gp.waiting; sg != nil; sg = sg.waitlink { + unlock(&sg.c.lock) + } + + return sgsize +} + // Copies gp's stack to a new stack of a different size. // Caller must have changed gp status to Gcopystack. -func copystack(gp *g, newsize uintptr) { +// +// If sync is true, this is a self-triggered stack growth and, in +// particular, no other G may be writing to gp's stack (e.g., via a +// channel operation). If sync is false, copystack protects against +// concurrent channel operations. +func copystack(gp *g, newsize uintptr, sync bool) { if gp.syscallsp != 0 { throw("stack growth not allowed in system call") } @@ -753,21 +829,41 @@ func copystack(gp *g, newsize uintptr) { adjinfo.old = old adjinfo.delta = new.hi - old.hi - // copy the stack to the new location - memmove(unsafe.Pointer(new.hi-used), unsafe.Pointer(old.hi-used), used) + // Adjust sudogs, synchronizing with channel ops if necessary. + ncopy := used + if sync { + adjustsudogs(gp, &adjinfo) + } else { + // sudogs can point in to the stack. During concurrent + // shrinking, these areas may be written to. Find the + // highest such pointer so we can handle everything + // there and below carefully. (This shouldn't be far + // from the bottom of the stack, so there's little + // cost in handling everything below it carefully.) + adjinfo.sghi = findsghi(gp, old) + + // Synchronize with channel ops and copy the part of + // the stack they may interact with. + ncopy -= syncadjustsudogs(gp, used, &adjinfo) + } + + // Copy the stack (or the rest of it) to the new location + memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy) // Disallow sigprof scans of this stack and block if there's // one in progress. gcLockStackBarriers(gp) - // Adjust structures that have pointers into stacks. We have - // to do most of these before we traceback the new stack - // because gentraceback uses them. + // Adjust remaining structures that have pointers into stacks. + // We have to do most of these before we traceback the new + // stack because gentraceback uses them. adjustctxt(gp, &adjinfo) adjustdefers(gp, &adjinfo) adjustpanics(gp, &adjinfo) - adjustsudogs(gp, &adjinfo) adjuststkbar(gp, &adjinfo) + if adjinfo.sghi != 0 { + adjinfo.sghi += adjinfo.delta + } // copy old stack barriers to new stack barrier array newstkbar = newstkbar[:len(gp.stkbar)] @@ -944,7 +1040,7 @@ func newstack() { // The concurrent GC will not scan the stack while we are doing the copy since // the gp is in a Gcopystack status. - copystack(gp, uintptr(newsize)) + copystack(gp, uintptr(newsize), true) if stackDebug >= 1 { print("stack grow done\n") } @@ -971,6 +1067,7 @@ func gostartcallfn(gobuf *gobuf, fv *funcval) { // Maybe shrink the stack being used by gp. // Called at garbage collection time. +// gp must be stopped, but the world need not be. func shrinkstack(gp *g) { if readgstatus(gp) == _Gdead { if gp.stack.lo != 0 { @@ -1023,7 +1120,7 @@ func shrinkstack(gp *g) { } oldstatus := casgcopystack(gp) - copystack(gp, newsize) + copystack(gp, newsize, false) casgstatus(gp, _Gcopystack, oldstatus) }