This change adds the flight recorder to the trace package.
Flight recording is a technique in which trace data is kept
in a circular buffer and can be flushed upon request. The
implementation will be added in follow-up CLs.
The flight recorder has already been implemented inside of the
golang.org/x/exp/trace package. This copies the current implementation
and modifies it to work within the runtime/trace package.
The changes include:
This adds the ability for multiple consumers (both the execution
tracer and the flight recorder) to subscribe to tracing events. This
change allows us to add multiple consumers without making major
modifications to the runtime. Future optimizations are planned
for this functionality.
This removes the use of byte readers from the process that
parses and processes the trace batches.
This modifies the flight recorder to not parse out the trace
clock frequency, since that requires knowledge of the format that's
unfortunate to encode in yet another place. Right now, the trace clock
frequency is considered stable for the lifetime of the program, so just
grab it directly from the runtime.
This change adds an in-band end-of-generation signal to the internal
implementation of runtime.ReadTrace. The internal implementation is
exported via linkname to runtime/trace, so the flight recorder can
identify exactly when a generation has ended. This signal is also useful
for ensuring that subscribers to runtime trace data always see complete
generations, by starting or stopping data streaming only at generation
boundaries.
For #63185
Change-Id: I5c15345981a6bbe9764a3d623448237e983c64ec
Reviewed-on: https://go-review.googlesource.com/c/go/+/673116
Auto-Submit: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
--- /dev/null
+pkg runtime/trace, func NewFlightRecorder(FlightRecorderConfig) *FlightRecorder #63185
+pkg runtime/trace, method (*FlightRecorder) Enabled() bool #63185
+pkg runtime/trace, method (*FlightRecorder) Start() error #63185
+pkg runtime/trace, method (*FlightRecorder) Stop() #63185
+pkg runtime/trace, method (*FlightRecorder) WriteTo(io.Writer) (int64, error) #63185
+pkg runtime/trace, type FlightRecorder struct #63185
+pkg runtime/trace, type FlightRecorderConfig struct #63185
+pkg runtime/trace, type FlightRecorderConfig struct, MaxBytes uint64 #63185
+pkg runtime/trace, type FlightRecorderConfig struct, MinAge time.Duration #63185
--- /dev/null
+<!-- go.dev/issue/63185 -->
+TODO The flight recorder has been added to the runtime/trace package.
if v.GoVersion >= version.Go125 && !(s.N > 1 && s.ClockSnapshot == nil) {
if s.ClockSnapshot == nil {
e.Errorf("sync %d has no clock snapshot", s.N)
- }
- if s.ClockSnapshot.Wall.IsZero() {
- e.Errorf("sync %d has zero wall time", s.N)
- }
- if s.ClockSnapshot.Mono == 0 {
- e.Errorf("sync %d has zero mono time", s.N)
- }
- if s.ClockSnapshot.Trace == 0 {
- e.Errorf("sync %d has zero trace time", s.N)
- }
- if !v.skipClockSnapshotChecks {
- if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) {
- e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall)
+ } else {
+ if s.ClockSnapshot.Wall.IsZero() {
+ e.Errorf("sync %d has zero wall time", s.N)
}
- if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) {
- e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono)
+ if s.ClockSnapshot.Mono == 0 {
+ e.Errorf("sync %d has zero mono time", s.N)
}
- if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) {
- e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace)
+ if s.ClockSnapshot.Trace == 0 {
+ e.Errorf("sync %d has zero trace time", s.N)
+ }
+ if !v.skipClockSnapshotChecks {
+ if s.N >= 2 && !s.ClockSnapshot.Wall.After(v.lastSync.ClockSnapshot.Wall) {
+ e.Errorf("sync %d has non-increasing wall time: %v vs %v", s.N, s.ClockSnapshot.Wall, v.lastSync.ClockSnapshot.Wall)
+ }
+ if s.N >= 2 && !(s.ClockSnapshot.Mono > v.lastSync.ClockSnapshot.Mono) {
+ e.Errorf("sync %d has non-increasing mono time: %v vs %v", s.N, s.ClockSnapshot.Mono, v.lastSync.ClockSnapshot.Mono)
+ }
+ if s.N >= 2 && !(s.ClockSnapshot.Trace > v.lastSync.ClockSnapshot.Trace) {
+ e.Errorf("sync %d has non-increasing trace time: %v vs %v", s.N, s.ClockSnapshot.Trace, v.lastSync.ClockSnapshot.Trace)
+ }
}
}
}
EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot]
EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec]
+ // Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25.
+ // This could be used as an explicit in-band end-of-generation signal in the future.
+ EvEndOfGeneration
+
NumEvents
)
EvSync: {
Name: "Sync",
},
+ EvEndOfGeneration: {
+ Name: "EndOfGeneration",
+ },
// "Timed" Events.
EvProcsChange: {
import (
"internal/runtime/atomic"
+ "internal/trace/tracev2"
"unsafe"
)
// State for the trace reader goroutine.
//
// Protected by trace.lock.
- readerGen atomic.Uintptr // the generation the reader is currently reading for
- flushedGen atomic.Uintptr // the last completed generation
- headerWritten bool // whether ReadTrace has emitted trace header
+ readerGen atomic.Uintptr // the generation the reader is currently reading for
+ flushedGen atomic.Uintptr // the last completed generation
+ headerWritten bool // whether ReadTrace has emitted trace header
+ endOfGenerationWritten bool // whether readTrace has emitted the end of the generation signal
// doneSema is used to synchronize the reader and traceAdvance. Specifically,
// it notifies traceAdvance that the reader is done with a generation.
// returned data before calling ReadTrace again.
// ReadTrace must be called from one goroutine at a time.
func ReadTrace() []byte {
+ for {
+ buf := readTrace()
+
+ // Skip over the end-of-generation signal which must not appear
+ // in the final trace.
+ if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration {
+ continue
+ }
+ return buf
+ }
+}
+
+// readTrace is the implementation of ReadTrace, except with an additional
+// in-band signal as to when the buffer is for a new generation.
+//
+//go:linkname readTrace runtime/trace.runtime_readTrace
+func readTrace() (buf []byte) {
top:
- var buf []byte
var park bool
systemstack(func() {
buf, park = readTrace0()
}, nil, waitReasonTraceReaderBlocked, traceBlockSystemGoroutine, 2)
goto top
}
-
return buf
}
// is waiting on the reader to finish flushing the last generation so that it
// can continue to advance.
if trace.flushedGen.Load() == gen {
+ // Write out the internal in-band end-of-generation signal.
+ if !trace.endOfGenerationWritten {
+ trace.endOfGenerationWritten = true
+ unlock(&trace.lock)
+ return []byte{byte(tracev2.EvEndOfGeneration)}, false
+ }
+
+ // Reset the flag.
+ trace.endOfGenerationWritten = false
+
+ // Handle shutdown.
if trace.shutdown.Load() {
unlock(&trace.lock)
// read. We're done.
return nil, false
}
+ // Handle advancing to the next generation.
+
// The previous gen has had all of its buffers flushed, and
// there's nothing else for us to read. Advance the generation
// we're reading from and try again.
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace
+
+import (
+ "fmt"
+ "internal/trace/tracev2"
+)
+
+// timestamp is an unprocessed timestamp.
+type timestamp uint64
+
+// batch represents a batch of trace events.
+// It is unparsed except for its header.
+type batch struct {
+ m threadID
+ time timestamp
+ data []byte
+}
+
+// threadID is the runtime-internal M structure's ID. This is unique
+// for each OS thread.
+type threadID int64
+
+// readBatch copies b and parses the trace batch header inside.
+// Returns the batch, the generation, bytes read, and an error.
+func readBatch(b []byte) (batch, uint64, uint64, error) {
+ if len(b) == 0 {
+ return batch{}, 0, 0, fmt.Errorf("batch is empty")
+ }
+ data := make([]byte, len(b))
+ if nw := copy(data, b); nw != len(b) {
+ return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
+ }
+ // Read batch header byte.
+ if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
+ return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
+ }
+
+ // Read the batch header: gen (generation), thread (M) ID, base timestamp
+ // for the batch.
+ total := 1
+ b = b[1:]
+ gen, n, err := readUvarint(b)
+ if err != nil {
+ return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
+ }
+ total += n
+ b = b[n:]
+ m, n, err := readUvarint(b)
+ if err != nil {
+ return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
+ }
+ total += n
+ b = b[n:]
+ ts, n, err := readUvarint(b)
+ if err != nil {
+ return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
+ }
+ total += n
+ b = b[n:]
+
+ // Read in the size of the batch to follow.
+ size, n, err := readUvarint(b)
+ if err != nil {
+ return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
+ }
+ if size > tracev2.MaxBatchSize {
+ return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
+ }
+ total += n
+ total += int(size)
+ data = data[:total]
+
+ // Return the batch.
+ return batch{
+ m: threadID(m),
+ time: timestamp(ts),
+ data: data,
+ }, gen, uint64(total), nil
+}
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace
+
+import (
+ "errors"
+)
+
+// maxVarintLenN is the maximum length of a varint-encoded N-bit integer.
+const maxVarintLen64 = 10
+
+var (
+ errOverflow = errors.New("binary: varint overflows a 64-bit integer")
+ errEOB = errors.New("binary: end of buffer")
+)
+
+// TODO deduplicate this function.
+func readUvarint(b []byte) (uint64, int, error) {
+ var x uint64
+ var s uint
+ var byt byte
+ for i := 0; i < maxVarintLen64 && i < len(b); i++ {
+ byt = b[i]
+ if byt < 0x80 {
+ if i == maxVarintLen64-1 && byt > 1 {
+ return x, i, errOverflow
+ }
+ return x | uint64(byt)<<s, i + 1, nil
+ }
+ x |= uint64(byt&0x7f) << s
+ s += 7
+ }
+ return x, len(b), errOverflow
+}
+
+// putUvarint encodes a uint64 into buf and returns the number of bytes written.
+// If the buffer is too small, PutUvarint will panic.
+// TODO deduplicate this function.
+func putUvarint(buf []byte, x uint64) int {
+ i := 0
+ for x >= 0x80 {
+ buf[i] = byte(x) | 0x80
+ x >>= 7
+ i++
+ }
+ buf[i] = byte(x)
+ return i + 1
+}
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace
+
+import (
+ "fmt"
+ "io"
+ "sync"
+ "time"
+ _ "unsafe" // added for go linkname usage
+)
+
+// FlightRecorder represents a single consumer of a Go execution
+// trace.
+// It tracks a moving window over the execution trace produced by
+// the runtime, always containing the most recent trace data.
+//
+// At most one flight recorder may be active at any given time,
+// though flight recording is allowed to be concurrently active
+// with a trace consumer using trace.Start.
+// This restriction of only a single flight recorder may be removed
+// in the future.
+type FlightRecorder struct {
+ err error
+
+ // State specific to the recorder.
+ header [16]byte
+ active rawGeneration
+ ringMu sync.Mutex
+ ring []rawGeneration
+ freq frequency // timestamp conversion factor, from the runtime
+
+ // Externally-set options.
+ targetSize uint64
+ targetPeriod time.Duration
+
+ enabled bool // whether the flight recorder is enabled.
+ writing sync.Mutex // protects concurrent calls to WriteTo
+
+ // The values of targetSize and targetPeriod we've committed to since the last Start.
+ wantSize uint64
+ wantDur time.Duration
+}
+
+// NewFlightRecorder creates a new flight recorder from the provided configuration.
+func NewFlightRecorder(cfg FlightRecorderConfig) *FlightRecorder {
+ fr := new(FlightRecorder)
+ if cfg.MaxBytes != 0 {
+ fr.targetSize = cfg.MaxBytes
+ } else {
+ fr.targetSize = 10 << 20 // 10 MiB.
+ }
+
+ if cfg.MinAge != 0 {
+ fr.targetPeriod = cfg.MinAge
+ } else {
+ fr.targetPeriod = 10 * time.Second
+ }
+ return fr
+}
+
+// Start activates the flight recorder and begins recording trace data.
+// Only one call to trace.Start may be active at any given time.
+// In addition, currently only one flight recorder may be active in the program.
+// Returns an error if the flight recorder cannot be started or is already started.
+func (fr *FlightRecorder) Start() error {
+ if fr.enabled {
+ return fmt.Errorf("cannot enable a enabled flight recorder")
+ }
+ fr.wantSize = fr.targetSize
+ fr.wantDur = fr.targetPeriod
+ fr.err = nil
+ fr.freq = frequency(1.0 / (float64(runtime_traceClockUnitsPerSecond()) / 1e9))
+
+ // Start tracing, data is sent to a recorder which forwards it to our own
+ // storage.
+ if err := tracing.subscribeFlightRecorder(&recorder{r: fr}); err != nil {
+ return err
+ }
+
+ fr.enabled = true
+ return nil
+}
+
+// Stop ends recording of trace data. It blocks until any concurrent WriteTo calls complete.
+func (fr *FlightRecorder) Stop() {
+ if !fr.enabled {
+ return
+ }
+ fr.enabled = false
+ tracing.unsubscribeFlightRecorder()
+
+ // Reset all state. No need to lock because the reader has already exited.
+ fr.active = rawGeneration{}
+ fr.ring = nil
+}
+
+// Enabled returns true if the flight recorder is active.
+// Specifically, it will return true if Start did not return an error, and Stop has not yet been called.
+// It is safe to call from multiple goroutines simultaneously.
+func (fr *FlightRecorder) Enabled() bool { return fr.enabled }
+
+// WriteTo snapshots the moving window tracked by the flight recorder.
+// The snapshot is expected to contain data that is up-to-date as of when WriteTo is called,
+// though this is not a hard guarantee.
+// Only one goroutine may execute WriteTo at a time.
+// An error is returned upon failure to write to w, if another WriteTo call is already in-progress,
+// or if the flight recorder is inactive.
+func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) {
+ if !fr.enabled {
+ return 0, fmt.Errorf("cannot snapshot a disabled flight recorder")
+ }
+ if !fr.writing.TryLock() {
+ // Indicates that a call to WriteTo was made while one was already in progress.
+ // If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo.
+ return 0, fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress")
+ }
+ defer fr.writing.Unlock()
+
+ // Force a global buffer flush.
+ runtime_traceAdvance(false)
+
+ // Now that everything has been flushed and written, grab whatever we have.
+ //
+ // N.B. traceAdvance blocks until the tracer goroutine has actually written everything
+ // out, which means the generation we just flushed must have been already been observed
+ // by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to
+ // have been both completed *and* processed by the recorder goroutine.
+ fr.ringMu.Lock()
+ gens := fr.ring
+ fr.ringMu.Unlock()
+
+ // Write the header.
+ nw, err := w.Write(fr.header[:])
+ if err != nil {
+ return int64(nw), err
+ }
+ n += int64(nw)
+
+ // Write all the data.
+ for _, gen := range gens {
+ for _, batch := range gen.batches {
+ // Write batch data.
+ nw, err = w.Write(batch.data)
+ n += int64(nw)
+ if err != nil {
+ return n, err
+ }
+ }
+ }
+ return n, nil
+}
+
+type FlightRecorderConfig struct {
+ // MinAge is a lower bound on the age of an event in the flight recorder's window.
+ //
+ // The flight recorder will strive to promptly discard events older than the minimum age,
+ // but older events may appear in the window snapshot. The age setting will always be
+ // overridden by MaxSize.
+ //
+ // If this is 0, the minimum age is implementation defined, but can be assumed to be on the order
+ // of seconds.
+ MinAge time.Duration
+
+ // MaxBytes is an upper bound on the size of the window in bytes.
+ //
+ // This setting takes precedence over MinAge.
+ // However, it does not make any guarantees on the size of the data WriteTo will write,
+ // nor does it guarantee memory overheads will always stay below MaxBytes. Treat it
+ // as a hint.
+ //
+ // If this is 0, the maximum size is implementation defined.
+ MaxBytes uint64
+}
+
+//go:linkname runtime_traceClockUnitsPerSecond
+func runtime_traceClockUnitsPerSecond() uint64
+
+//go:linkname runtime_traceAdvance runtime.traceAdvance
+func runtime_traceAdvance(stopTrace bool)
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace_test
+
+import (
+ "bytes"
+ "context"
+ inttrace "internal/trace"
+ "internal/trace/testtrace"
+ "io"
+ "runtime/trace"
+ "slices"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestFlightRecorderDoubleStart(t *testing.T) {
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
+ if err := fr.Start(); err != nil {
+ t.Fatalf("unexpected error on Start: %v", err)
+ }
+ if err := fr.Start(); err == nil {
+ t.Fatalf("expected error from double Start: %v", err)
+ }
+ fr.Stop()
+}
+
+func TestFlightRecorderEnabled(t *testing.T) {
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
+
+ if fr.Enabled() {
+ t.Fatal("flight recorder is enabled, but never started")
+ }
+ if err := fr.Start(); err != nil {
+ t.Fatalf("unexpected error on Start: %v", err)
+ }
+ if !fr.Enabled() {
+ t.Fatal("flight recorder is not enabled, but started")
+ }
+ fr.Stop()
+ if fr.Enabled() {
+ t.Fatal("flight recorder is enabled, but stopped")
+ }
+}
+
+func TestFlightRecorderWriteToDisabled(t *testing.T) {
+ var buf bytes.Buffer
+
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
+ if n, err := fr.WriteTo(&buf); err == nil {
+ t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
+ }
+ if err := fr.Start(); err != nil {
+ t.Fatalf("unexpected error on Start: %v", err)
+ }
+ fr.Stop()
+ if n, err := fr.WriteTo(&buf); err == nil {
+ t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
+ }
+}
+
+func TestFlightRecorderConcurrentWriteTo(t *testing.T) {
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
+ if err := fr.Start(); err != nil {
+ t.Fatalf("unexpected error on Start: %v", err)
+ }
+
+ // Start two goroutines to write snapshots.
+ //
+ // Most of the time one will fail and one will succeed, but we don't require this.
+ // Due to a variety of factors, it's definitely possible for them both to succeed.
+ // However, at least one should succeed.
+ var bufs [2]bytes.Buffer
+ var wg sync.WaitGroup
+ var successes atomic.Uint32
+ for i := range bufs {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ n, err := fr.WriteTo(&bufs[i])
+ // TODO(go.dev/issue/63185) was an exported error. Consider refactoring.
+ if err != nil && err.Error() == "call to WriteTo for trace.FlightRecorder already in progress" {
+ if n != 0 {
+ t.Errorf("(goroutine %d) WriteTo bytes written is non-zero for early bail out: %d", i, n)
+ }
+ return
+ }
+ if err != nil {
+ t.Errorf("(goroutine %d) failed to write snapshot for unexpected reason: %v", i, err)
+ }
+ successes.Add(1)
+
+ if n == 0 {
+ t.Errorf("(goroutine %d) wrote invalid trace of zero bytes in size", i)
+ }
+ if n != int64(bufs[i].Len()) {
+ t.Errorf("(goroutine %d) trace length doesn't match WriteTo result: got %d, want %d", i, n, int64(bufs[i].Len()))
+ }
+ }()
+ }
+ wg.Wait()
+
+ // Stop tracing.
+ fr.Stop()
+
+ // Make sure at least one succeeded to write.
+ if successes.Load() == 0 {
+ t.Fatal("expected at least one success to write a snapshot, got zero")
+ }
+
+ // Validate the traces that came out.
+ for i := range bufs {
+ buf := &bufs[i]
+ if buf.Len() == 0 {
+ continue
+ }
+ testReader(t, buf, testtrace.ExpectSuccess())
+ }
+}
+
+func TestFlightRecorder(t *testing.T) {
+ testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) {
+ snapshot()
+ })
+}
+
+func TestFlightRecorderStartStop(t *testing.T) {
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{})
+ for i := 0; i < 5; i++ {
+ testFlightRecorder(t, fr, func(snapshot func()) {
+ snapshot()
+ })
+ }
+}
+
+func TestFlightRecorderLog(t *testing.T) {
+ tr := testFlightRecorder(t, trace.NewFlightRecorder(trace.FlightRecorderConfig{}), func(snapshot func()) {
+ trace.Log(context.Background(), "message", "hello")
+ snapshot()
+ })
+
+ // Prepare to read the trace snapshot.
+ r, err := inttrace.NewReader(bytes.NewReader(tr))
+ if err != nil {
+ t.Fatalf("unexpected error creating trace reader: %v", err)
+ }
+
+ // Find the log message in the trace.
+ found := false
+ for {
+ ev, err := r.ReadEvent()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.Fatalf("unexpected error reading trace: %v", err)
+ }
+ if !found && ev.Kind() == inttrace.EventLog {
+ log := ev.Log()
+ found = log.Category == "message" && log.Message == "hello"
+ }
+ }
+ if !found {
+ t.Errorf("failed to find expected log message (%q, %q) in snapshot", "message", "hello")
+ }
+}
+
+func TestFlightRecorderOneGeneration(t *testing.T) {
+ test := func(t *testing.T, fr *trace.FlightRecorder) {
+ tr := testFlightRecorder(t, fr, func(snapshot func()) {
+ // Sleep to let a few generations pass.
+ time.Sleep(3 * time.Second)
+ snapshot()
+ })
+
+ // Prepare to read the trace snapshot.
+ r, err := inttrace.NewReader(bytes.NewReader(tr))
+ if err != nil {
+ t.Fatalf("unexpected error creating trace reader: %v", err)
+ }
+
+ // Make sure there are exactly two Sync events: at the start and end.
+ var syncs []int
+ evs := 0
+ for {
+ ev, err := r.ReadEvent()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.Fatalf("unexpected error reading trace: %v", err)
+ }
+ if ev.Kind() == inttrace.EventSync {
+ syncs = append(syncs, evs)
+ }
+ evs++
+ }
+ if ends := []int{0, evs - 1}; !slices.Equal(syncs, ends) {
+ t.Errorf("expected two sync events (one at each end of the trace), found %d at %d instead of %d",
+ len(syncs), syncs[:min(len(syncs), 5)], ends)
+ }
+ }
+ t.Run("SetMinAge", func(t *testing.T) {
+ t.Skip("issue 63185: flaky test")
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MinAge: time.Millisecond})
+ test(t, fr)
+ })
+ t.Run("MaxBytes", func(t *testing.T) {
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16})
+ test(t, fr)
+ })
+}
+
+type flightRecorderTestFunc func(snapshot func())
+
+func testFlightRecorder(t *testing.T, fr *trace.FlightRecorder, f flightRecorderTestFunc) []byte {
+ if trace.IsEnabled() {
+ t.Skip("cannot run flight recorder tests when tracing is enabled")
+ }
+
+ // Start the flight recorder.
+ if err := fr.Start(); err != nil {
+ t.Fatalf("unexpected error on Start: %v", err)
+ }
+
+ // Set up snapshot callback.
+ var buf bytes.Buffer
+ callback := func() {
+ n, err := fr.WriteTo(&buf)
+ if err != nil {
+ t.Errorf("unexpected failure during flight recording: %v", err)
+ return
+ }
+ if n < 16 {
+ t.Errorf("expected a trace size of at least 16 bytes, got %d", n)
+ }
+ if n != int64(buf.Len()) {
+ t.Errorf("WriteTo result doesn't match trace size: got %d, want %d", n, int64(buf.Len()))
+ }
+ }
+
+ // Call the test function.
+ f(callback)
+
+ // Stop the flight recorder.
+ fr.Stop()
+
+ // Get the trace bytes; we don't want to use the Buffer as a Reader directly
+ // since we may want to consume this data more than once.
+ traceBytes := buf.Bytes()
+
+ // Parse the trace to make sure it's not broken.
+ testReader(t, bytes.NewReader(traceBytes), testtrace.ExpectSuccess())
+ return traceBytes
+}
+
+func testReader(t *testing.T, tr io.Reader, exp *testtrace.Expectation) {
+ r, err := inttrace.NewReader(tr)
+ if err != nil {
+ if err := exp.Check(err); err != nil {
+ t.Error(err)
+ }
+ return
+ }
+ v := testtrace.NewValidator()
+ v.SkipClockSnapshotChecks()
+ for {
+ ev, err := r.ReadEvent()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ if err := exp.Check(err); err != nil {
+ t.Error(err)
+ }
+ return
+ }
+ if err := v.Event(ev); err != nil {
+ t.Error(err)
+ }
+ }
+ if err := exp.Check(nil); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestTraceAndFlightRecorder(t *testing.T) {
+ var tBuf, frBuf bytes.Buffer
+ if err := trace.Start(&tBuf); err != nil {
+ t.Errorf("unable to start execution tracer: %s", err)
+ }
+ fr := trace.NewFlightRecorder(trace.FlightRecorderConfig{MaxBytes: 16})
+ fr.Start()
+ fr.WriteTo(&frBuf)
+ fr.Stop()
+ trace.Stop()
+ if tBuf.Len() == 0 || frBuf.Len() == 0 {
+ t.Errorf("None of these should be equal to zero: %d %d", tBuf.Len(), frBuf.Len())
+ }
+ if tBuf.Len() <= frBuf.Len() {
+ t.Errorf("trace should be longer than the flight recorder: trace=%d flight record=%d", tBuf.Len(), frBuf.Len())
+ }
+}
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace
+
+import (
+ "fmt"
+ "slices"
+ "time"
+ _ "unsafe" // added for go linkname usage
+)
+
+// A recorder receives bytes from the runtime tracer, processes it.
+type recorder struct {
+ r *FlightRecorder
+
+ headerReceived bool
+}
+
+func (w *recorder) Write(b []byte) (n int, err error) {
+ r := w.r
+
+ defer func() {
+ if err != nil {
+ // Propagate errors to the flightrecorder.
+ if r.err == nil {
+ r.err = err
+ }
+ }
+ }()
+
+ if !w.headerReceived {
+ if len(b) < len(r.header) {
+ return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header))
+ }
+ r.header = ([16]byte)(b[:16])
+ n += 16
+ w.headerReceived = true
+ }
+ if len(b) == n {
+ return 0, nil
+ }
+ ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
+ if err != nil {
+ return len(b) - int(nb) - n, err
+ }
+ n += int(nb)
+
+ // Append the batch to the current generation.
+ if r.active.gen == 0 {
+ r.active.gen = gen
+ }
+ if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) {
+ r.active.minTime = r.freq.mul(ba.time)
+ }
+ r.active.size += len(ba.data)
+ r.active.batches = append(r.active.batches, ba)
+
+ return len(b), nil
+}
+
+func (w *recorder) endGeneration() {
+ r := w.r
+
+ // Check if we're entering a new generation.
+ r.ringMu.Lock()
+
+ // Get the current trace clock time.
+ now := traceTimeNow(r.freq)
+
+ // Add the current generation to the ring. Make sure we always have at least one
+ // complete generation by putting the active generation onto the new list, regardless
+ // of whatever our settings are.
+ //
+ // N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
+ // and not worry about aliasing. This creates allocations, but at a very low rate.
+ newRing := []rawGeneration{r.active}
+ size := r.active.size
+ for i := len(r.ring) - 1; i >= 0; i-- {
+ // Stop adding older generations if the new ring already exceeds the thresholds.
+ // This ensures we keep generations that cross a threshold, but not any that lie
+ // entirely outside it.
+ if uint64(size) > r.wantSize || now.Sub(newRing[len(newRing)-1].minTime) > r.wantDur {
+ break
+ }
+ size += r.ring[i].size
+ newRing = append(newRing, r.ring[i])
+ }
+ slices.Reverse(newRing)
+ r.ring = newRing
+ r.ringMu.Unlock()
+
+ // Start a new active generation.
+ r.active = rawGeneration{}
+}
+
+type rawGeneration struct {
+ gen uint64
+ size int
+ minTime eventTime
+ batches []batch
+}
+
+func traceTimeNow(freq frequency) eventTime {
+ return freq.mul(timestamp(runtime_traceClockNow()))
+}
+
+//go:linkname runtime_traceClockNow runtime.traceClockNow
+func runtime_traceClockNow() int64
+
+// frequency is nanoseconds per timestamp unit.
+type frequency float64
+
+// mul multiplies an unprocessed to produce a time in nanoseconds.
+func (f frequency) mul(t timestamp) eventTime {
+ return eventTime(float64(t) * float64(f))
+}
+
+// eventTime is a timestamp in nanoseconds.
+//
+// It corresponds to the monotonic clock on the platform that the
+// trace was taken, and so is possible to correlate with timestamps
+// for other traces taken on the same machine using the same clock
+// (i.e. no reboots in between).
+//
+// The actual absolute value of the timestamp is only meaningful in
+// relation to other timestamps from the same clock.
+//
+// BUG: Timestamps coming from traces on Windows platforms are
+// only comparable with timestamps from the same trace. Timestamps
+// across traces cannot be compared, because the system clock is
+// not used as of Go 1.22.
+//
+// BUG: Traces produced by Go versions 1.21 and earlier cannot be
+// compared with timestamps from other traces taken on the same
+// machine. This is because the system clock was not used at all
+// to collect those timestamps.
+type eventTime int64
+
+// Sub subtracts t0 from t, returning the duration in nanoseconds.
+func (t eventTime) Sub(t0 eventTime) time.Duration {
+ return time.Duration(int64(t) - int64(t0))
+}
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package trace
+
+import (
+ "fmt"
+ "internal/trace/tracev2"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ _ "unsafe"
+)
+
+var tracing traceMultiplexer
+
+type traceMultiplexer struct {
+ sync.Mutex
+ enabled atomic.Bool
+ subscribers int
+
+ subscribersMu sync.Mutex
+ traceStartWriter io.Writer
+ flightRecorder *recorder
+}
+
+func (t *traceMultiplexer) subscribeFlightRecorder(r *recorder) error {
+ t.Lock()
+ defer t.Unlock()
+
+ t.subscribersMu.Lock()
+ if t.flightRecorder != nil {
+ t.subscribersMu.Unlock()
+ return fmt.Errorf("flight recorder already enabled")
+ }
+ t.flightRecorder = r
+ t.subscribersMu.Unlock()
+
+ if err := t.addedSubscriber(); err != nil {
+ t.subscribersMu.Lock()
+ t.flightRecorder = nil
+ t.subscribersMu.Unlock()
+ return err
+ }
+ return nil
+}
+
+func (t *traceMultiplexer) unsubscribeFlightRecorder() error {
+ t.Lock()
+ defer t.Unlock()
+
+ t.removingSubscriber()
+
+ t.subscribersMu.Lock()
+ if t.flightRecorder == nil {
+ t.subscribersMu.Unlock()
+ return fmt.Errorf("attempt to unsubscribe missing flight recorder")
+ }
+ t.flightRecorder = nil
+ t.subscribersMu.Unlock()
+
+ t.removedSubscriber()
+ return nil
+}
+
+func (t *traceMultiplexer) subscribeTraceStartWriter(w io.Writer) error {
+ t.Lock()
+ defer t.Unlock()
+
+ t.subscribersMu.Lock()
+ if t.traceStartWriter != nil {
+ t.subscribersMu.Unlock()
+ return fmt.Errorf("execution tracer already enabled")
+ }
+ t.traceStartWriter = w
+ t.subscribersMu.Unlock()
+
+ if err := t.addedSubscriber(); err != nil {
+ t.subscribersMu.Lock()
+ t.traceStartWriter = nil
+ t.subscribersMu.Unlock()
+ return err
+ }
+ return nil
+}
+
+func (t *traceMultiplexer) unsubscribeTraceStartWriter() {
+ t.Lock()
+ defer t.Unlock()
+
+ t.removingSubscriber()
+
+ t.subscribersMu.Lock()
+ if t.traceStartWriter == nil {
+ t.subscribersMu.Unlock()
+ return
+ }
+ t.traceStartWriter = nil
+ t.subscribersMu.Unlock()
+
+ t.removedSubscriber()
+ return
+}
+
+func (t *traceMultiplexer) addedSubscriber() error {
+ if t.enabled.Load() {
+ // This is necessary for the trace reader goroutine to pick up on the new subscriber.
+ runtime_traceAdvance(false)
+ } else {
+ if err := t.startLocked(); err != nil {
+ return err
+ }
+ }
+ t.subscribers++
+ return nil
+}
+
+func (t *traceMultiplexer) removingSubscriber() {
+ if t.subscribers == 0 {
+ return
+ }
+ t.subscribers--
+ if t.subscribers == 0 {
+ runtime.StopTrace()
+ t.enabled.Store(false)
+ } else {
+ // This is necessary to avoid missing trace data when the system is under high load.
+ runtime_traceAdvance(false)
+ }
+}
+
+func (t *traceMultiplexer) removedSubscriber() {
+ if t.subscribers > 0 {
+ // This is necessary for the trace reader goroutine to pick up on the new subscriber.
+ runtime_traceAdvance(false)
+ }
+}
+
+func (t *traceMultiplexer) startLocked() error {
+ if err := runtime.StartTrace(); err != nil {
+ return err
+ }
+
+ // Grab the trace reader goroutine's subscribers.
+ //
+ // We only update our subscribers if we see an end-of-generation
+ // signal from the runtime after this, so any new subscriptions
+ // or unsubscriptions must call traceAdvance to ensure the reader
+ // goroutine sees an end-of-generation signal.
+ t.subscribersMu.Lock()
+ flightRecorder := t.flightRecorder
+ traceStartWriter := t.traceStartWriter
+ t.subscribersMu.Unlock()
+
+ go func() {
+ for {
+ data := runtime_readTrace()
+ if data == nil {
+ break
+ }
+ if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
+ if flightRecorder != nil {
+ flightRecorder.endGeneration()
+ }
+
+ // Pick up any changes.
+ t.subscribersMu.Lock()
+ flightRecorder = t.flightRecorder
+ traceStartWriter = t.traceStartWriter
+ t.subscribersMu.Unlock()
+ } else {
+ if traceStartWriter != nil {
+ traceStartWriter.Write(data)
+ }
+ if flightRecorder != nil {
+ flightRecorder.Write(data)
+ }
+ }
+ }
+ }()
+ t.enabled.Store(true)
+ return nil
+}
+
+//go:linkname runtime_readTrace
+func runtime_readTrace() (buf []byte)
import (
"io"
- "runtime"
- "sync"
- "sync/atomic"
)
// Start enables tracing for the current program.
// While tracing, the trace will be buffered and written to w.
// Start returns an error if tracing is already enabled.
func Start(w io.Writer) error {
- tracing.Lock()
- defer tracing.Unlock()
-
- if err := runtime.StartTrace(); err != nil {
- return err
- }
- go func() {
- for {
- data := runtime.ReadTrace()
- if data == nil {
- break
- }
- w.Write(data)
- }
- }()
- tracing.enabled.Store(true)
- return nil
+ return tracing.subscribeTraceStartWriter(w)
}
// Stop stops the current tracing, if any.
// Stop only returns after all the writes for the trace have completed.
func Stop() {
- tracing.Lock()
- defer tracing.Unlock()
- tracing.enabled.Store(false)
-
- runtime.StopTrace()
-}
-
-var tracing struct {
- sync.Mutex // gate mutators (Start, Stop)
- enabled atomic.Bool
+ tracing.unsubscribeTraceStartWriter()
}
}
// Try to install a new block.
- lock(&a.lock)
-
- // Check block again under the lock. Someone may
- // have gotten here first.
- block = (*traceRegionAllocBlock)(a.current.Load())
- if block != nil {
- r := block.off.Add(n)
- if r <= uintptr(len(block.data)) {
- unlock(&a.lock)
- return (*notInHeap)(unsafe.Pointer(&block.data[r-n]))
+ var x *notInHeap
+ systemstack(func() {
+ // Acquire a.lock on the systemstack to avoid stack growth
+ // and accidentally entering the tracer again.
+ lock(&a.lock)
+
+ // Check block again under the lock. Someone may
+ // have gotten here first.
+ block = (*traceRegionAllocBlock)(a.current.Load())
+ if block != nil {
+ r := block.off.Add(n)
+ if r <= uintptr(len(block.data)) {
+ unlock(&a.lock)
+ x = (*notInHeap)(unsafe.Pointer(&block.data[r-n]))
+ return
+ }
+
+ // Add the existing block to the full list.
+ block.next = a.full
+ a.full = block
}
- // Add the existing block to the full list.
- block.next = a.full
- a.full = block
- }
-
- // Allocate a new block.
- block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc"))
- if block == nil {
- throw("traceRegion: out of memory")
- }
+ // Allocate a new block.
+ block = (*traceRegionAllocBlock)(sysAlloc(unsafe.Sizeof(traceRegionAllocBlock{}), &memstats.other_sys, "trace arena alloc"))
+ if block == nil {
+ throw("traceRegion: out of memory")
+ }
- // Allocate space for our current request, so we always make
- // progress.
- block.off.Store(n)
- x := (*notInHeap)(unsafe.Pointer(&block.data[0]))
+ // Allocate space for our current request, so we always make
+ // progress.
+ block.off.Store(n)
+ x = (*notInHeap)(unsafe.Pointer(&block.data[0]))
- // Publish the new block.
- a.current.Store(unsafe.Pointer(block))
- unlock(&a.lock)
+ // Publish the new block.
+ a.current.Store(unsafe.Pointer(block))
+ unlock(&a.lock)
+ })
return x
}
// traceClockUnitsPerSecond estimates the number of trace clock units per
// second that elapse.
+//
+//go:linkname traceClockUnitsPerSecond runtime/trace.runtime_traceClockUnitsPerSecond
func traceClockUnitsPerSecond() uint64 {
if osHasLowResClock {
// We're using cputicks as our clock, so we need a real estimate.