(tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
}
+func (b *batch) isEndOfGeneration() bool {
+ return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvEndOfGeneration
+}
+
// readBatch reads the next full batch from r.
func readBatch(r interface {
io.Reader
if err != nil {
return batch{}, 0, err
}
+ if typ := tracev2.EventType(b); typ == tracev2.EvEndOfGeneration {
+ return batch{m: NoThread, exp: tracev2.NoExperiment, data: []byte{b}}, 0, nil
+ }
if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
}
"bytes"
"cmp"
"encoding/binary"
+ "errors"
"fmt"
"io"
"slices"
*evTable
}
+// readGeneration buffers and decodes the structural elements of a trace generation
+// out of r.
+func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
+ if ver < version.Go126 {
+ return nil, errors.New("internal error: readGeneration called for <1.26 trace")
+ }
+ g := &generation{
+ evTable: &evTable{
+ pcs: make(map[uint64]frame),
+ },
+ batches: make(map[ThreadID][]batch),
+ }
+
+ // Read batches one at a time until we either hit the next generation.
+ for {
+ b, gen, err := readBatch(r)
+ if err == io.EOF {
+ if len(g.batches) != 0 {
+ return nil, errors.New("incomplete generation found; trace likely truncated")
+ }
+ return nil, nil // All done.
+ }
+ if err != nil {
+ return nil, err
+ }
+ if g.gen == 0 {
+ // Initialize gen.
+ g.gen = gen
+ }
+ if b.isEndOfGeneration() {
+ break
+ }
+ if gen == 0 {
+ // 0 is a sentinel used by the runtime, so we'll never see it.
+ return nil, fmt.Errorf("invalid generation number %d", gen)
+ }
+ if gen != g.gen {
+ return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
+ }
+ if g.minTs == 0 || b.time < g.minTs {
+ g.minTs = b.time
+ }
+ if err := processBatch(g, b, ver); err != nil {
+ return nil, err
+ }
+ }
+
+ // Check some invariants.
+ if g.freq == 0 {
+ return nil, fmt.Errorf("no frequency event found")
+ }
+ if !g.hasClockSnapshot {
+ return nil, fmt.Errorf("no clock snapshot event found")
+ }
+
+ // N.B. Trust that the batch order is correct. We can't validate the batch order
+ // by timestamp because the timestamps could just be plain wrong. The source of
+ // truth is the order things appear in the trace and the partial order sequence
+ // numbers on certain events. If it turns out the batch order is actually incorrect
+ // we'll very likely fail to advance a partial order from the frontier.
+
+ // Compactify stacks and strings for better lookup performance later.
+ g.stacks.compactify()
+ g.strings.compactify()
+
+ // Validate stacks.
+ if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
+ return nil, err
+ }
+
+ // Now that we have the frequency, fix up CPU samples.
+ fixUpCPUSamples(g.cpuSamples, g.freq)
+ return g, nil
+}
+
// spilledBatch represents a batch that was read out for the next generation,
// while reading the previous one. It's passed on when parsing the next
// generation.
+//
+// Used only for trace versions < Go126.
type spilledBatch struct {
gen uint64
*batch
}
-// readGeneration buffers and decodes the structural elements of a trace generation
+// readGenerationWithSpill buffers and decodes the structural elements of a trace generation
// out of r. spill is the first batch of the new generation (already buffered and
// parsed from reading the last generation). Returns the generation and the first
// batch read of the next generation, if any.
//
// If gen is non-nil, it is valid and must be processed before handling the returned
// error.
-func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
+func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
+ if ver >= version.Go126 {
+ return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
+ }
g := &generation{
evTable: &evTable{
pcs: make(map[uint64]frame),
}
// Process the spilled batch.
if spill != nil {
+ // Process the spilled batch, which contains real data.
g.gen = spill.gen
g.minTs = spill.batch.time
if err := processBatch(g, *spill.batch, ver); err != nil {
}
spill = nil
}
- // Read batches one at a time until we either hit EOF or
- // the next generation.
+ // Read batches one at a time until we either hit the next generation.
var spillErr error
for {
b, gen, err := readBatch(r)
}
if err != nil {
if g.gen != 0 {
- // This is an error reading the first batch of the next generation.
+ // This may be an error reading the first batch of the next generation.
// This is fine. Let's forge ahead assuming that what we've got so
// far is fine.
spillErr = err
// Initialize gen.
g.gen = gen
}
- if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
+ if gen == g.gen+1 {
+ // TODO: Increment the generation with wraparound the same way the runtime does.
spill = &spilledBatch{gen: gen, batch: &b}
break
}
return nil, nil, err
}
- // Fix up the CPU sample timestamps, now that we have freq.
- for i := range g.cpuSamples {
- s := &g.cpuSamples[i]
- s.time = g.freq.mul(timestamp(s.time))
- }
- // Sort the CPU samples.
- slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
- return cmp.Compare(a.time, b.time)
- })
+ // Now that we have the frequency, fix up CPU samples.
+ fixUpCPUSamples(g.cpuSamples, g.freq)
return g, spill, spillErr
}
if err := addExperimentalBatch(g.expBatches, b); err != nil {
return err
}
+ case b.isEndOfGeneration():
+ return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
default:
if _, ok := g.batches[b.m]; !ok {
g.batchMs = append(g.batchMs, b.m)
})
return nil
}
+
+func fixUpCPUSamples(samples []cpuSample, freq frequency) {
+ // Fix up the CPU sample timestamps.
+ for i := range samples {
+ s := &samples[i]
+ s.time = freq.mul(timestamp(s.time))
+ }
+ // Sort the CPU samples.
+ slices.SortFunc(samples, func(a, b cpuSample) int {
+ return cmp.Compare(a.time, b.time)
+ })
+}
}
}
b.writeEventsTo(tw)
+
+ // Write end-of-generation event if necessary.
+ if g.trace.ver >= version.Go126 {
+ tw.WriteEvent(raw.Event{
+ Version: g.trace.ver,
+ Ev: tracev2.EvEndOfGeneration,
+ })
+ }
}
func (g *Generation) newStructuralBatch() *Batch {
import (
"bufio"
+ "errors"
"fmt"
"io"
"slices"
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
- version version.Version
- r *bufio.Reader
- lastTs Time
- gen *generation
+ version version.Version
+ r *bufio.Reader
+ lastTs Time
+ gen *generation
+ frontier []*batchCursor
+ cpuSamples []cpuSample
+ order ordering
+ syncs int
+ done bool
+
+ // Spill state.
+ //
+ // Traces before Go 1.26 had no explicit end-of-generation signal, and
+ // so the first batch of the next generation needed to be parsed to identify
+ // a new generation. This batch is the "spilled" so we don't lose track
+ // of it when parsing the next generation.
+ //
+ // This is unnecessary after Go 1.26 because of an explicit end-of-generation
+ // signal.
spill *spilledBatch
spillErr error // error from reading spill
spillErrSync bool // whether we emitted a Sync before reporting spillErr
- frontier []*batchCursor
- cpuSamples []cpuSample
- order ordering
- syncs int
- done bool
v1Events *traceV1Converter
}
return &Reader{
v1Events: convertV1Trace(tr),
}, nil
- case version.Go122, version.Go123, version.Go125:
+ case version.Go122, version.Go123, version.Go125, version.Go126:
return &Reader{
version: v,
r: br,
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
- if r.spillErr != nil {
- if r.spillErrSync {
- return Event{}, r.spillErr
- }
- r.spillErrSync = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
+ if r.version < version.Go126 {
+ return r.nextGenWithSpill()
}
- if r.gen != nil && r.spill == nil {
- // If we have a generation from the last read,
- // and there's nothing left in the frontier, and
- // there's no spilled batch, indicating that there's
- // no further generation, it means we're done.
- // Emit the final sync event.
- r.done = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
- }
- // Read the next generation.
- r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
- if r.gen == nil {
- r.spillErrSync = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
- }
-
- // Reset CPU samples cursor.
- r.cpuSamples = r.gen.cpuSamples
-
- // Reset frontier.
- for _, m := range r.gen.batchMs {
- batches := r.gen.batches[m]
- bc := &batchCursor{m: m}
- ok, err := bc.nextEvent(batches, r.gen.freq)
- if err != nil {
- return Event{}, err
- }
- if !ok {
- // Turns out there aren't actually any events in these batches.
- continue
- }
- r.frontier = heapInsert(r.frontier, bc)
+ gen, err := readGeneration(r.r, r.version)
+ if err != nil {
+ return Event{}, err
}
- r.syncs++
- // Always emit a sync event at the beginning of the generation.
- return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+ return r.installGen(gen)
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
return ev, nil
}
+// nextGenWithSpill reads the generation and calls nextGen while
+// also handling any spilled batches.
+func (r *Reader) nextGenWithSpill() (Event, error) {
+ if r.version >= version.Go126 {
+ return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
+ }
+ if r.spillErr != nil {
+ if r.spillErrSync {
+ return Event{}, r.spillErr
+ }
+ r.spillErrSync = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ if r.gen != nil && r.spill == nil {
+ // If we have a generation from the last read,
+ // and there's nothing left in the frontier, and
+ // there's no spilled batch, indicating that there's
+ // no further generation, it means we're done.
+ // Emit the final sync event.
+ r.done = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+
+ // Read the next generation.
+ var gen *generation
+ gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
+ if gen == nil {
+ r.gen = nil
+ r.spillErrSync = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ return r.installGen(gen)
+}
+
+// installGen installs the new generation into the Reader and returns
+// a Sync event for the new generation.
+func (r *Reader) installGen(gen *generation) (Event, error) {
+ if gen == nil {
+ // Emit the final sync event.
+ r.gen = nil
+ r.done = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ r.gen = gen
+
+ // Reset CPU samples cursor.
+ r.cpuSamples = r.gen.cpuSamples
+
+ // Reset frontier.
+ for _, m := range r.gen.batchMs {
+ batches := r.gen.batches[m]
+ bc := &batchCursor{m: m}
+ ok, err := bc.nextEvent(batches, r.gen.freq)
+ if err != nil {
+ return Event{}, err
+ }
+ if !ok {
+ // Turns out there aren't actually any events in these batches.
+ continue
+ }
+ r.frontier = heapInsert(r.frontier, bc)
+ }
+ r.syncs++
+
+ // Always emit a sync event at the beginning of the generation.
+ return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+}
+
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {
EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot]
EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec]
- // Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25.
- // This could be used as an explicit in-band end-of-generation signal in the future.
+ // In-band end-of-generation signal. Added in Go 1.26.
+ // Used in Go 1.25 only internally.
EvEndOfGeneration
NumEvents
Go122 Version = 22 // v2
Go123 Version = 23 // v2
Go125 Version = 25 // v2
- Current = Go125
+ Go126 Version = 26 // v2
+ Current = Go126
)
var versions = map[Version][]tracev2.EventSpec{
Go122: tracev2.Specs()[:tracev2.EvUserLog+1], // All events after are Go 1.23+.
Go123: tracev2.Specs()[:tracev2.EvExperimentalBatch+1], // All events after are Go 1.25+.
- Go125: tracev2.Specs(),
+ Go125: tracev2.Specs()[:tracev2.EvClockSnapshot+1], // All events after are Go 1.26+.
+ Go126: tracev2.Specs(),
}
// Specs returns the set of event.Specs for this version.
// was on has been returned, ReadTrace returns nil. The caller must copy the
// returned data before calling ReadTrace again.
// ReadTrace must be called from one goroutine at a time.
-func ReadTrace() []byte {
- for {
- buf := readTrace()
-
- // Skip over the end-of-generation signal which must not appear
- // in the final trace.
- if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration {
- continue
- }
- return buf
- }
-}
-
-// readTrace is the implementation of ReadTrace, except with an additional
-// in-band signal as to when the buffer is for a new generation.
-//
-//go:linkname readTrace runtime/trace.runtime_readTrace
-func readTrace() (buf []byte) {
+func ReadTrace() (buf []byte) {
top:
var park bool
systemstack(func() {
if !trace.headerWritten {
trace.headerWritten = true
unlock(&trace.lock)
- return []byte("go 1.25 trace\x00\x00\x00"), false
+ return []byte("go 1.26 trace\x00\x00\x00"), false
}
// Read the next buffer.
// timestamp is an unprocessed timestamp.
type timestamp uint64
-// batch represents a batch of trace events.
-// It is unparsed except for its header.
type batch struct {
- m threadID
time timestamp
+ gen uint64
data []byte
}
-// threadID is the runtime-internal M structure's ID. This is unique
-// for each OS thread.
-type threadID int64
-
// readBatch copies b and parses the trace batch header inside.
-// Returns the batch, the generation, bytes read, and an error.
-func readBatch(b []byte) (batch, uint64, uint64, error) {
+// Returns the batch, bytes read, and an error.
+func readBatch(b []byte) (batch, uint64, error) {
if len(b) == 0 {
- return batch{}, 0, 0, fmt.Errorf("batch is empty")
+ return batch{}, 0, fmt.Errorf("batch is empty")
}
data := make([]byte, len(b))
- if nw := copy(data, b); nw != len(b) {
- return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
- }
+ copy(data, b)
+
// Read batch header byte.
+ if typ := tracev2.EventType(b[0]); typ == tracev2.EvEndOfGeneration {
+ if len(b) != 1 {
+ return batch{}, 1, fmt.Errorf("unexpected end of generation in batch of size >1")
+ }
+ return batch{data: data}, 1, nil
+ }
if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
- return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
+ return batch{}, 1, fmt.Errorf("expected batch event, got event %d", typ)
}
-
- // Read the batch header: gen (generation), thread (M) ID, base timestamp
- // for the batch.
total := 1
b = b[1:]
+
+ // Read the generation
gen, n, err := readUvarint(b)
if err != nil {
- return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
+ return batch{}, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
}
total += n
b = b[n:]
- m, n, err := readUvarint(b)
+
+ // Read the M (discard it).
+ _, n, err = readUvarint(b)
if err != nil {
- return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
+ return batch{}, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
}
total += n
b = b[n:]
+
+ // Read the timestamp.
ts, n, err := readUvarint(b)
if err != nil {
- return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
+ return batch{}, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
}
total += n
b = b[n:]
- // Read in the size of the batch to follow.
+ // Read the size of the batch to follow.
size, n, err := readUvarint(b)
if err != nil {
- return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
+ return batch{}, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
}
if size > tracev2.MaxBatchSize {
- return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
+ return batch{}, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
}
total += n
total += int(size)
+ if total != len(data) {
+ return batch{}, uint64(total), fmt.Errorf("expected complete batch")
+ }
data = data[:total]
// Return the batch.
return batch{
- m: threadID(m),
+ gen: gen,
time: timestamp(ts),
data: data,
- }, gen, uint64(total), nil
+ }, uint64(total), nil
}
// Write all the data.
for _, gen := range gens {
- for _, batch := range gen.batches {
+ for _, data := range gen.batches {
// Write batch data.
- nw, err = w.Write(batch.data)
+ nw, err = w.Write(data)
n += int64(nw)
if err != nil {
return n, err
if len(b) == n {
return 0, nil
}
- ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
+ ba, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
if err != nil {
return len(b) - int(nb) - n, err
}
n += int(nb)
// Append the batch to the current generation.
- if r.active.gen == 0 {
- r.active.gen = gen
+ if ba.gen != 0 && r.active.gen == 0 {
+ r.active.gen = ba.gen
}
- if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) {
+ if ba.time != 0 && (r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time)) {
r.active.minTime = r.freq.mul(ba.time)
}
r.active.size += len(ba.data)
- r.active.batches = append(r.active.batches, ba)
+ r.active.batches = append(r.active.batches, ba.data)
return len(b), nil
}
gen uint64
size int
minTime eventTime
- batches []batch
+ batches [][]byte
}
func traceTimeNow(freq frequency) eventTime {
t.subscribersMu.Unlock()
go func() {
- header := runtime_readTrace()
+ header := runtime.ReadTrace()
if traceStartWriter != nil {
traceStartWriter.Write(header)
}
}
for {
- data := runtime_readTrace()
+ data := runtime.ReadTrace()
if data == nil {
break
}
+ if traceStartWriter != nil {
+ traceStartWriter.Write(data)
+ }
+ if flightRecorder != nil {
+ flightRecorder.Write(data)
+ }
if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
if flightRecorder != nil {
flightRecorder.endGeneration()
if frIsNew {
flightRecorder.Write(header)
}
- } else {
- if traceStartWriter != nil {
- traceStartWriter.Write(data)
- }
- if flightRecorder != nil {
- flightRecorder.Write(data)
- }
}
}
}()