]> Cypherpunks repositories - gostls13.git/commitdiff
internal/trace: add end-of-generation signal to trace
authorMichael Anthony Knyszek <mknyszek@google.com>
Tue, 5 Aug 2025 21:37:07 +0000 (21:37 +0000)
committerMichael Knyszek <mknyszek@google.com>
Fri, 15 Aug 2025 21:01:30 +0000 (14:01 -0700)
This change takes the EvEndOfGeneration event and promotes it to a real
event that appears in the trace.

This allows the trace parser to unambiguously identify truncated traces
vs. broken traces. It also makes a lot of the logic around parsing
simpler, because there's no more batch spilling necessary.

Fixes #73904.

Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595
Reviewed-on: https://go-review.googlesource.com/c/go/+/693398
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>

src/internal/trace/batch.go
src/internal/trace/generation.go
src/internal/trace/internal/testgen/trace.go
src/internal/trace/reader.go
src/internal/trace/tracev2/events.go
src/internal/trace/version/version.go
src/runtime/trace.go
src/runtime/trace/batch.go
src/runtime/trace/flightrecorder.go
src/runtime/trace/recorder.go
src/runtime/trace/subscribe.go

index 3ff056f6041368ac0d53a0b1b3f94725143e75e3..1f50350273d3e7669d3b86d5288a46ca47a68c39 100644 (file)
@@ -44,6 +44,10 @@ func (b *batch) isSyncBatch(ver version.Version) bool {
                        (tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
 }
 
+func (b *batch) isEndOfGeneration() bool {
+       return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvEndOfGeneration
+}
+
 // readBatch reads the next full batch from r.
 func readBatch(r interface {
        io.Reader
@@ -54,6 +58,9 @@ func readBatch(r interface {
        if err != nil {
                return batch{}, 0, err
        }
+       if typ := tracev2.EventType(b); typ == tracev2.EvEndOfGeneration {
+               return batch{m: NoThread, exp: tracev2.NoExperiment, data: []byte{b}}, 0, nil
+       }
        if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
                return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
        }
index e91ba80f7dd31d6c53a5f48485ed7ca1927fe4f2..90ceef5f7488968791932468b30b13d3380d0b50 100644 (file)
@@ -9,6 +9,7 @@ import (
        "bytes"
        "cmp"
        "encoding/binary"
+       "errors"
        "fmt"
        "io"
        "slices"
@@ -32,22 +33,102 @@ type generation struct {
        *evTable
 }
 
+// readGeneration buffers and decodes the structural elements of a trace generation
+// out of r.
+func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
+       if ver < version.Go126 {
+               return nil, errors.New("internal error: readGeneration called for <1.26 trace")
+       }
+       g := &generation{
+               evTable: &evTable{
+                       pcs: make(map[uint64]frame),
+               },
+               batches: make(map[ThreadID][]batch),
+       }
+
+       // Read batches one at a time until we either hit the next generation.
+       for {
+               b, gen, err := readBatch(r)
+               if err == io.EOF {
+                       if len(g.batches) != 0 {
+                               return nil, errors.New("incomplete generation found; trace likely truncated")
+                       }
+                       return nil, nil // All done.
+               }
+               if err != nil {
+                       return nil, err
+               }
+               if g.gen == 0 {
+                       // Initialize gen.
+                       g.gen = gen
+               }
+               if b.isEndOfGeneration() {
+                       break
+               }
+               if gen == 0 {
+                       // 0 is a sentinel used by the runtime, so we'll never see it.
+                       return nil, fmt.Errorf("invalid generation number %d", gen)
+               }
+               if gen != g.gen {
+                       return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
+               }
+               if g.minTs == 0 || b.time < g.minTs {
+                       g.minTs = b.time
+               }
+               if err := processBatch(g, b, ver); err != nil {
+                       return nil, err
+               }
+       }
+
+       // Check some invariants.
+       if g.freq == 0 {
+               return nil, fmt.Errorf("no frequency event found")
+       }
+       if !g.hasClockSnapshot {
+               return nil, fmt.Errorf("no clock snapshot event found")
+       }
+
+       // N.B. Trust that the batch order is correct. We can't validate the batch order
+       // by timestamp because the timestamps could just be plain wrong. The source of
+       // truth is the order things appear in the trace and the partial order sequence
+       // numbers on certain events. If it turns out the batch order is actually incorrect
+       // we'll very likely fail to advance a partial order from the frontier.
+
+       // Compactify stacks and strings for better lookup performance later.
+       g.stacks.compactify()
+       g.strings.compactify()
+
+       // Validate stacks.
+       if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
+               return nil, err
+       }
+
+       // Now that we have the frequency, fix up CPU samples.
+       fixUpCPUSamples(g.cpuSamples, g.freq)
+       return g, nil
+}
+
 // spilledBatch represents a batch that was read out for the next generation,
 // while reading the previous one. It's passed on when parsing the next
 // generation.
+//
+// Used only for trace versions < Go126.
 type spilledBatch struct {
        gen uint64
        *batch
 }
 
-// readGeneration buffers and decodes the structural elements of a trace generation
+// readGenerationWithSpill buffers and decodes the structural elements of a trace generation
 // out of r. spill is the first batch of the new generation (already buffered and
 // parsed from reading the last generation). Returns the generation and the first
 // batch read of the next generation, if any.
 //
 // If gen is non-nil, it is valid and must be processed before handling the returned
 // error.
-func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
+func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
+       if ver >= version.Go126 {
+               return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
+       }
        g := &generation{
                evTable: &evTable{
                        pcs: make(map[uint64]frame),
@@ -56,6 +137,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
        }
        // Process the spilled batch.
        if spill != nil {
+               // Process the spilled batch, which contains real data.
                g.gen = spill.gen
                g.minTs = spill.batch.time
                if err := processBatch(g, *spill.batch, ver); err != nil {
@@ -63,8 +145,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
                }
                spill = nil
        }
-       // Read batches one at a time until we either hit EOF or
-       // the next generation.
+       // Read batches one at a time until we either hit the next generation.
        var spillErr error
        for {
                b, gen, err := readBatch(r)
@@ -73,7 +154,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
                }
                if err != nil {
                        if g.gen != 0 {
-                               // This is an error reading the first batch of the next generation.
+                               // This may be an error reading the first batch of the next generation.
                                // This is fine. Let's forge ahead assuming that what we've got so
                                // far is fine.
                                spillErr = err
@@ -89,7 +170,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
                        // Initialize gen.
                        g.gen = gen
                }
-               if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
+               if gen == g.gen+1 {
+                       // TODO: Increment the generation with wraparound the same way the runtime does.
                        spill = &spilledBatch{gen: gen, batch: &b}
                        break
                }
@@ -134,15 +216,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
                return nil, nil, err
        }
 
-       // Fix up the CPU sample timestamps, now that we have freq.
-       for i := range g.cpuSamples {
-               s := &g.cpuSamples[i]
-               s.time = g.freq.mul(timestamp(s.time))
-       }
-       // Sort the CPU samples.
-       slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
-               return cmp.Compare(a.time, b.time)
-       })
+       // Now that we have the frequency, fix up CPU samples.
+       fixUpCPUSamples(g.cpuSamples, g.freq)
        return g, spill, spillErr
 }
 
@@ -174,6 +249,8 @@ func processBatch(g *generation, b batch, ver version.Version) error {
                if err := addExperimentalBatch(g.expBatches, b); err != nil {
                        return err
                }
+       case b.isEndOfGeneration():
+               return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
        default:
                if _, ok := g.batches[b.m]; !ok {
                        g.batchMs = append(g.batchMs, b.m)
@@ -512,3 +589,15 @@ func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch,
        })
        return nil
 }
+
+func fixUpCPUSamples(samples []cpuSample, freq frequency) {
+       // Fix up the CPU sample timestamps.
+       for i := range samples {
+               s := &samples[i]
+               s.time = freq.mul(timestamp(s.time))
+       }
+       // Sort the CPU samples.
+       slices.SortFunc(samples, func(a, b cpuSample) int {
+               return cmp.Compare(a.time, b.time)
+       })
+}
index 38d2febb43b42635ea54e64f9640de0549ead750..2eade48de70e00da8490024cd81be121437d38b4 100644 (file)
@@ -322,6 +322,14 @@ func (g *Generation) writeEventsTo(tw *raw.TextWriter) {
                }
        }
        b.writeEventsTo(tw)
+
+       // Write end-of-generation event if necessary.
+       if g.trace.ver >= version.Go126 {
+               tw.WriteEvent(raw.Event{
+                       Version: g.trace.ver,
+                       Ev:      tracev2.EvEndOfGeneration,
+               })
+       }
 }
 
 func (g *Generation) newStructuralBatch() *Batch {
index 83b5a2f123118c6e0fa2e0841baac7a712eea4f4..5a094277fb5de64fc3e6c700819e0dade5d1e88f 100644 (file)
@@ -6,6 +6,7 @@ package trace
 
 import (
        "bufio"
+       "errors"
        "fmt"
        "io"
        "slices"
@@ -22,18 +23,28 @@ import (
 // event as the first event, and a Sync event as the last event.
 // (There may also be any number of Sync events in the middle, too.)
 type Reader struct {
-       version      version.Version
-       r            *bufio.Reader
-       lastTs       Time
-       gen          *generation
+       version    version.Version
+       r          *bufio.Reader
+       lastTs     Time
+       gen        *generation
+       frontier   []*batchCursor
+       cpuSamples []cpuSample
+       order      ordering
+       syncs      int
+       done       bool
+
+       // Spill state.
+       //
+       // Traces before Go 1.26 had no explicit end-of-generation signal, and
+       // so the first batch of the next generation needed to be parsed to identify
+       // a new generation. This batch is the "spilled" so we don't lose track
+       // of it when parsing the next generation.
+       //
+       // This is unnecessary after Go 1.26 because of an explicit end-of-generation
+       // signal.
        spill        *spilledBatch
        spillErr     error // error from reading spill
        spillErrSync bool  // whether we emitted a Sync before reporting spillErr
-       frontier     []*batchCursor
-       cpuSamples   []cpuSample
-       order        ordering
-       syncs        int
-       done         bool
 
        v1Events *traceV1Converter
 }
@@ -54,7 +65,7 @@ func NewReader(r io.Reader) (*Reader, error) {
                return &Reader{
                        v1Events: convertV1Trace(tr),
                }, nil
-       case version.Go122, version.Go123, version.Go125:
+       case version.Go122, version.Go123, version.Go125, version.Go126:
                return &Reader{
                        version: v,
                        r:       br,
@@ -139,52 +150,14 @@ func (r *Reader) ReadEvent() (e Event, err error) {
 
        // Check if we need to refresh the generation.
        if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
-               if r.spillErr != nil {
-                       if r.spillErrSync {
-                               return Event{}, r.spillErr
-                       }
-                       r.spillErrSync = true
-                       r.syncs++
-                       return syncEvent(nil, r.lastTs, r.syncs), nil
+               if r.version < version.Go126 {
+                       return r.nextGenWithSpill()
                }
-               if r.gen != nil && r.spill == nil {
-                       // If we have a generation from the last read,
-                       // and there's nothing left in the frontier, and
-                       // there's no spilled batch, indicating that there's
-                       // no further generation, it means we're done.
-                       // Emit the final sync event.
-                       r.done = true
-                       r.syncs++
-                       return syncEvent(nil, r.lastTs, r.syncs), nil
-               }
-               // Read the next generation.
-               r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
-               if r.gen == nil {
-                       r.spillErrSync = true
-                       r.syncs++
-                       return syncEvent(nil, r.lastTs, r.syncs), nil
-               }
-
-               // Reset CPU samples cursor.
-               r.cpuSamples = r.gen.cpuSamples
-
-               // Reset frontier.
-               for _, m := range r.gen.batchMs {
-                       batches := r.gen.batches[m]
-                       bc := &batchCursor{m: m}
-                       ok, err := bc.nextEvent(batches, r.gen.freq)
-                       if err != nil {
-                               return Event{}, err
-                       }
-                       if !ok {
-                               // Turns out there aren't actually any events in these batches.
-                               continue
-                       }
-                       r.frontier = heapInsert(r.frontier, bc)
+               gen, err := readGeneration(r.r, r.version)
+               if err != nil {
+                       return Event{}, err
                }
-               r.syncs++
-               // Always emit a sync event at the beginning of the generation.
-               return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+               return r.installGen(gen)
        }
        tryAdvance := func(i int) (bool, error) {
                bc := r.frontier[i]
@@ -251,6 +224,78 @@ func (r *Reader) ReadEvent() (e Event, err error) {
        return ev, nil
 }
 
+// nextGenWithSpill reads the generation and calls nextGen while
+// also handling any spilled batches.
+func (r *Reader) nextGenWithSpill() (Event, error) {
+       if r.version >= version.Go126 {
+               return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
+       }
+       if r.spillErr != nil {
+               if r.spillErrSync {
+                       return Event{}, r.spillErr
+               }
+               r.spillErrSync = true
+               r.syncs++
+               return syncEvent(nil, r.lastTs, r.syncs), nil
+       }
+       if r.gen != nil && r.spill == nil {
+               // If we have a generation from the last read,
+               // and there's nothing left in the frontier, and
+               // there's no spilled batch, indicating that there's
+               // no further generation, it means we're done.
+               // Emit the final sync event.
+               r.done = true
+               r.syncs++
+               return syncEvent(nil, r.lastTs, r.syncs), nil
+       }
+
+       // Read the next generation.
+       var gen *generation
+       gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
+       if gen == nil {
+               r.gen = nil
+               r.spillErrSync = true
+               r.syncs++
+               return syncEvent(nil, r.lastTs, r.syncs), nil
+       }
+       return r.installGen(gen)
+}
+
+// installGen installs the new generation into the Reader and returns
+// a Sync event for the new generation.
+func (r *Reader) installGen(gen *generation) (Event, error) {
+       if gen == nil {
+               // Emit the final sync event.
+               r.gen = nil
+               r.done = true
+               r.syncs++
+               return syncEvent(nil, r.lastTs, r.syncs), nil
+       }
+       r.gen = gen
+
+       // Reset CPU samples cursor.
+       r.cpuSamples = r.gen.cpuSamples
+
+       // Reset frontier.
+       for _, m := range r.gen.batchMs {
+               batches := r.gen.batches[m]
+               bc := &batchCursor{m: m}
+               ok, err := bc.nextEvent(batches, r.gen.freq)
+               if err != nil {
+                       return Event{}, err
+               }
+               if !ok {
+                       // Turns out there aren't actually any events in these batches.
+                       continue
+               }
+               r.frontier = heapInsert(r.frontier, bc)
+       }
+       r.syncs++
+
+       // Always emit a sync event at the beginning of the generation.
+       return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+}
+
 func dumpFrontier(frontier []*batchCursor) string {
        var sb strings.Builder
        for _, bc := range frontier {
index eab5a146261fc948c0075cebb4d8dd7188737013..c5e3c94136946b0202340097f0eeb86f3b8f3aa9 100644 (file)
@@ -87,8 +87,8 @@ const (
        EvSync          // start of a sync batch [...EvFrequency|EvClockSnapshot]
        EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec]
 
-       // Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25.
-       // This could be used as an explicit in-band end-of-generation signal in the future.
+       // In-band end-of-generation signal. Added in Go 1.26.
+       // Used in Go 1.25 only internally.
        EvEndOfGeneration
 
        NumEvents
index ce994bbf4a341b9a59be655ea20d5082cdf80870..328a261a93081f461253aa3e84e490792595e617 100644 (file)
@@ -21,7 +21,8 @@ const (
        Go122   Version = 22 // v2
        Go123   Version = 23 // v2
        Go125   Version = 25 // v2
-       Current         = Go125
+       Go126   Version = 26 // v2
+       Current         = Go126
 )
 
 var versions = map[Version][]tracev2.EventSpec{
@@ -33,7 +34,8 @@ var versions = map[Version][]tracev2.EventSpec{
 
        Go122: tracev2.Specs()[:tracev2.EvUserLog+1],           // All events after are Go 1.23+.
        Go123: tracev2.Specs()[:tracev2.EvExperimentalBatch+1], // All events after are Go 1.25+.
-       Go125: tracev2.Specs(),
+       Go125: tracev2.Specs()[:tracev2.EvClockSnapshot+1],     // All events after are Go 1.26+.
+       Go126: tracev2.Specs(),
 }
 
 // Specs returns the set of event.Specs for this version.
index 0d71ad445c2a4fcd03aff67b7c18036c3cc016bb..2c712469ea66bc0ea1c0567eeb3a2209fcb7958e 100644 (file)
@@ -754,24 +754,7 @@ func traceRegisterLabelsAndReasons(gen uintptr) {
 // was on has been returned, ReadTrace returns nil. The caller must copy the
 // returned data before calling ReadTrace again.
 // ReadTrace must be called from one goroutine at a time.
-func ReadTrace() []byte {
-       for {
-               buf := readTrace()
-
-               // Skip over the end-of-generation signal which must not appear
-               // in the final trace.
-               if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration {
-                       continue
-               }
-               return buf
-       }
-}
-
-// readTrace is the implementation of ReadTrace, except with an additional
-// in-band signal as to when the buffer is for a new generation.
-//
-//go:linkname readTrace runtime/trace.runtime_readTrace
-func readTrace() (buf []byte) {
+func ReadTrace() (buf []byte) {
 top:
        var park bool
        systemstack(func() {
@@ -842,7 +825,7 @@ func readTrace0() (buf []byte, park bool) {
        if !trace.headerWritten {
                trace.headerWritten = true
                unlock(&trace.lock)
-               return []byte("go 1.25 trace\x00\x00\x00"), false
+               return []byte("go 1.26 trace\x00\x00\x00"), false
        }
 
        // Read the next buffer.
index d726a3d375296e46b93d9c99c0c5ee86f01eeb8f..f8b0a96b3f06d99b05d0a94851f89ae022a607db 100644 (file)
@@ -12,72 +12,77 @@ import (
 // timestamp is an unprocessed timestamp.
 type timestamp uint64
 
-// batch represents a batch of trace events.
-// It is unparsed except for its header.
 type batch struct {
-       m    threadID
        time timestamp
+       gen  uint64
        data []byte
 }
 
-// threadID is the runtime-internal M structure's ID. This is unique
-// for each OS thread.
-type threadID int64
-
 // readBatch copies b and parses the trace batch header inside.
-// Returns the batch, the generation, bytes read, and an error.
-func readBatch(b []byte) (batch, uint64, uint64, error) {
+// Returns the batch, bytes read, and an error.
+func readBatch(b []byte) (batch, uint64, error) {
        if len(b) == 0 {
-               return batch{}, 0, 0, fmt.Errorf("batch is empty")
+               return batch{}, 0, fmt.Errorf("batch is empty")
        }
        data := make([]byte, len(b))
-       if nw := copy(data, b); nw != len(b) {
-               return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
-       }
+       copy(data, b)
+
        // Read batch header byte.
+       if typ := tracev2.EventType(b[0]); typ == tracev2.EvEndOfGeneration {
+               if len(b) != 1 {
+                       return batch{}, 1, fmt.Errorf("unexpected end of generation in batch of size >1")
+               }
+               return batch{data: data}, 1, nil
+       }
        if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
-               return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
+               return batch{}, 1, fmt.Errorf("expected batch event, got event %d", typ)
        }
-
-       // Read the batch header: gen (generation), thread (M) ID, base timestamp
-       // for the batch.
        total := 1
        b = b[1:]
+
+       // Read the generation
        gen, n, err := readUvarint(b)
        if err != nil {
-               return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
+               return batch{}, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
        }
        total += n
        b = b[n:]
-       m, n, err := readUvarint(b)
+
+       // Read the M (discard it).
+       _, n, err = readUvarint(b)
        if err != nil {
-               return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
+               return batch{}, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
        }
        total += n
        b = b[n:]
+
+       // Read the timestamp.
        ts, n, err := readUvarint(b)
        if err != nil {
-               return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
+               return batch{}, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
        }
        total += n
        b = b[n:]
 
-       // Read in the size of the batch to follow.
+       // Read the size of the batch to follow.
        size, n, err := readUvarint(b)
        if err != nil {
-               return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
+               return batch{}, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
        }
        if size > tracev2.MaxBatchSize {
-               return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
+               return batch{}, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
        }
        total += n
        total += int(size)
+       if total != len(data) {
+               return batch{}, uint64(total), fmt.Errorf("expected complete batch")
+       }
        data = data[:total]
 
        // Return the batch.
        return batch{
-               m:    threadID(m),
+               gen:  gen,
                time: timestamp(ts),
                data: data,
-       }, gen, uint64(total), nil
+       }, uint64(total), nil
 }
index b0b75ceb60bc0f8df98c2468283223ddbcc106a6..99ee4d060dcdf3faa39cc8c576c96e15325cdab1 100644 (file)
@@ -141,9 +141,9 @@ func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) {
 
        // Write all the data.
        for _, gen := range gens {
-               for _, batch := range gen.batches {
+               for _, data := range gen.batches {
                        // Write batch data.
-                       nw, err = w.Write(batch.data)
+                       nw, err = w.Write(data)
                        n += int64(nw)
                        if err != nil {
                                return n, err
index bf8d7ce647963a8f468e9ecd3187abdcb432896d..4f2d3aa92a412497a3b003736d32ae24c654ab5f 100644 (file)
@@ -41,21 +41,21 @@ func (w *recorder) Write(b []byte) (n int, err error) {
        if len(b) == n {
                return 0, nil
        }
-       ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
+       ba, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
        if err != nil {
                return len(b) - int(nb) - n, err
        }
        n += int(nb)
 
        // Append the batch to the current generation.
-       if r.active.gen == 0 {
-               r.active.gen = gen
+       if ba.gen != 0 && r.active.gen == 0 {
+               r.active.gen = ba.gen
        }
-       if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) {
+       if ba.time != 0 && (r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time)) {
                r.active.minTime = r.freq.mul(ba.time)
        }
        r.active.size += len(ba.data)
-       r.active.batches = append(r.active.batches, ba)
+       r.active.batches = append(r.active.batches, ba.data)
 
        return len(b), nil
 }
@@ -99,7 +99,7 @@ type rawGeneration struct {
        gen     uint64
        size    int
        minTime eventTime
-       batches []batch
+       batches [][]byte
 }
 
 func traceTimeNow(freq frequency) eventTime {
index 7e22b6abdb96a51c67731b109f2b1fb67c60164b..a4d653dcaece92df66856dba8deb50bf732482f3 100644 (file)
@@ -155,7 +155,7 @@ func (t *traceMultiplexer) startLocked() error {
        t.subscribersMu.Unlock()
 
        go func() {
-               header := runtime_readTrace()
+               header := runtime.ReadTrace()
                if traceStartWriter != nil {
                        traceStartWriter.Write(header)
                }
@@ -164,10 +164,16 @@ func (t *traceMultiplexer) startLocked() error {
                }
 
                for {
-                       data := runtime_readTrace()
+                       data := runtime.ReadTrace()
                        if data == nil {
                                break
                        }
+                       if traceStartWriter != nil {
+                               traceStartWriter.Write(data)
+                       }
+                       if flightRecorder != nil {
+                               flightRecorder.Write(data)
+                       }
                        if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
                                if flightRecorder != nil {
                                        flightRecorder.endGeneration()
@@ -187,13 +193,6 @@ func (t *traceMultiplexer) startLocked() error {
                                if frIsNew {
                                        flightRecorder.Write(header)
                                }
-                       } else {
-                               if traceStartWriter != nil {
-                                       traceStartWriter.Write(data)
-                               }
-                               if flightRecorder != nil {
-                                       flightRecorder.Write(data)
-                               }
                        }
                }
        }()