b := newProfileBuilder(w)
var err error
for {
- time.Sleep(100 * time.Millisecond)
+ if runtime.GOOS == "darwin" || runtime.GOOS == "ios" {
+ // see runtime_pprof_readProfile
+ time.Sleep(100 * time.Millisecond)
+ }
data, tags, eof := readProfile()
if e := b.addCPUData(data, tags); e != nil && err == nil {
err = e
// be returned by read. By definition, r ≤ rNext ≤ w (before wraparound),
// and rNext is only used by the reader, so it can be accessed without atomics.
//
+// If the reader is blocked waiting for more data, the writer will wake it up if
+// either the buffer is more than half full, or when the writer sets the eof
+// marker or writes overflow entries (described below.)
+//
// If the writer gets ahead of the reader, so that the buffer fills,
// future writes are discarded and replaced in the output stream by an
// overflow entry, which has size 2+hdrsize+1, time set to the time of
// Racing with reader setting flag bits in b.w, to avoid lost wakeups.
old := b.w.load()
new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1)
+ // We re-load b.r here to reduce the likelihood of early wakeups
+ // if the reader already consumed some data between the last
+ // time we read b.r and now. This isn't strictly necessary.
+ unread := countSub(new.dataCount(), b.r.load().dataCount())
+ if unread < 0 {
+ // The new count overflowed and wrapped around.
+ unread += len(b.data)
+ }
+ wakeupThreshold := len(b.data) / 2
+ if unread < wakeupThreshold {
+ // Carry over the sleeping flag since we're not planning
+ // to wake the reader yet
+ new |= old & profReaderSleeping
+ }
if !b.w.cas(old, new) {
continue
}
- // If there was a reader, wake it up.
- if old&profReaderSleeping != 0 {
+ // If we've hit our high watermark for data in the buffer,
+ // and there is a reader, wake it up.
+ if unread >= wakeupThreshold && old&profReaderSleeping != 0 {
+ // NB: if we reach this point, then the sleeping bit is
+ // cleared in the new b.w value
notewakeup(&b.wait)
}
break
package runtime_test
import (
+ "fmt"
+ "regexp"
+ "runtime"
. "runtime"
"slices"
+ "sync"
"testing"
"time"
"unsafe"
}
}
}
+
+func TestProfBufWakeup(t *testing.T) {
+ b := NewProfBuf(2, 16, 2)
+ var wg sync.WaitGroup
+ wg.Go(func() {
+ read := 0
+ for {
+ rdata, _, eof := b.Read(ProfBufBlocking)
+ if read == 0 && len(rdata) < 8 {
+ t.Errorf("first wake up at less than half full, got %x", rdata)
+ }
+ read += len(rdata)
+ if eof {
+ return
+ }
+ }
+ })
+
+ // Under the hood profBuf uses notetsleepg when the reader blocks.
+ // Different platforms have different implementations, leading to
+ // different statuses we need to look for to determine whether the
+ // reader is blocked.
+ var waitStatus string
+ switch runtime.GOOS {
+ case "js":
+ waitStatus = "waiting"
+ case "wasip1":
+ waitStatus = "runnable"
+ default:
+ waitStatus = "syscall"
+ }
+
+ // Ensure that the reader is blocked
+ awaitBlockedGoroutine(waitStatus, "TestProfBufWakeup.func1")
+ // NB: this writes 6 words not 4. Fine for the test.
+ // The reader shouldn't wake up for this
+ b.Write(nil, 1, []uint64{1, 2}, []uintptr{3, 4})
+
+ // The reader should still be blocked
+ //
+ // TODO(nick): this is racy. We could Gosched and still have the reader
+ // blocked in a buggy implementation because it just didn't get a chance
+ // to run
+ awaitBlockedGoroutine(waitStatus, "TestProfBufWakeup.func1")
+ b.Write(nil, 1, []uint64{5, 6}, []uintptr{7, 8})
+ b.Close()
+
+ // Wait here so we can be sure the reader got the data
+ wg.Wait()
+}
+
+// see also runtime/pprof tests
+func awaitBlockedGoroutine(state, fName string) {
+ re := fmt.Sprintf(`(?m)^goroutine \d+ \[%s\]:\n(?:.+\n\t.+\n)*runtime_test\.%s`, regexp.QuoteMeta(state), fName)
+ r := regexp.MustCompile(re)
+
+ buf := make([]byte, 64<<10)
+ for {
+ Gosched()
+ n := Stack(buf, true)
+ if n == len(buf) {
+ // Buffer wasn't large enough for a full goroutine dump.
+ // Resize it and try again.
+ buf = make([]byte, 2*len(buf))
+ continue
+ }
+ const count = 1
+ if len(r.FindAll(buf[:n], -1)) >= count {
+ return
+ }
+ }
+}