]> Cypherpunks repositories - gostls13.git/commitdiff
cmd/trace: generate jsontrace data in a streaming fashion
authorHana Kim <hakim@google.com>
Tue, 6 Feb 2018 19:34:32 +0000 (14:34 -0500)
committerHyang-Ah Hana Kim <hyangah@gmail.com>
Wed, 7 Mar 2018 14:33:54 +0000 (14:33 +0000)
Update #21870

The Sys went down to 4.25G from 6.2G.

$ DEBUG_MEMORY_USAGE=1 go tool trace trace.out
2018/03/07 08:49:01 Parsing trace...
after parsing trace
 Alloc: 3385757184 Bytes
 Sys: 3661195896 Bytes
 HeapReleased: 0 Bytes
 HeapSys: 3488841728 Bytes
 HeapInUse: 3426516992 Bytes
 HeapAlloc: 3385757184 Bytes
Enter to continue...
2018/03/07 08:49:18 Splitting trace...
after spliting trace
 Alloc: 2352071904 Bytes
 Sys: 4243825464 Bytes
 HeapReleased: 0 Bytes
 HeapSys: 4025712640 Bytes
 HeapInUse: 2377703424 Bytes
 HeapAlloc: 2352071904 Bytes
Enter to continue...
after httpJsonTrace
 Alloc: 3228697832 Bytes
 Sys: 4250379064 Bytes
 HeapReleased: 0 Bytes
 HeapSys: 4025647104 Bytes
 HeapInUse: 3260014592 Bytes
 HeapAlloc: 3228697832 Bytes

Change-Id: I546f26bdbc68b1e58f1af1235a0e299dc0ff115e
Reviewed-on: https://go-review.googlesource.com/92375
Run-TryBot: Hyang-Ah Hana Kim <hyangah@gmail.com>
Reviewed-by: Peter Weinberger <pjw@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/cmd/trace/main.go
src/cmd/trace/trace.go
src/cmd/trace/trace_test.go
src/cmd/trace/trace_unix_test.go

index 856d411f869ec0d30529ac1d9d5cf6b4dcfefe66..c7b6a647e36e0f651b1ebd70a451585706f058a5 100644 (file)
@@ -120,19 +120,8 @@ func main() {
        }
        reportMemoryUsage("after parsing trace")
 
-       log.Print("Serializing trace...")
-       params := &traceParams{
-               parsed:  res,
-               endTime: int64(1<<63 - 1),
-       }
-       data, err := generateTrace(params)
-       if err != nil {
-               dief("%v\n", err)
-       }
-       reportMemoryUsage("after generating trace")
-
        log.Print("Splitting trace...")
-       ranges = splitTrace(data)
+       ranges = splitTrace(res)
        reportMemoryUsage("after spliting trace")
 
        addr := "http://" + ln.Addr().String()
index 9a675aececeb423022226995854a49dcb78ad4f0..fb2d3058de37a63126159bae7619243bed8ecb4a 100644 (file)
@@ -8,7 +8,9 @@ import (
        "encoding/json"
        "fmt"
        "internal/trace"
+       "io"
        "log"
+       "math"
        "net/http"
        "path/filepath"
        "runtime"
@@ -173,7 +175,7 @@ func httpJsonTrace(w http.ResponseWriter, r *http.Request) {
 
        params := &traceParams{
                parsed:  res,
-               endTime: int64(1<<63 - 1),
+               endTime: math.MaxInt64,
        }
 
        if goids := r.FormValue("goid"); goids != "" {
@@ -218,33 +220,25 @@ func httpJsonTrace(w http.ResponseWriter, r *http.Request) {
                params.gs = gs
        }
 
-       data, err := generateTrace(params)
-       if err != nil {
-               log.Printf("failed to generate trace: %v", err)
-               return
-       }
-
+       start := int64(0)
+       end := int64(math.MaxInt64)
        if startStr, endStr := r.FormValue("start"), r.FormValue("end"); startStr != "" && endStr != "" {
                // If start/end arguments are present, we are rendering a range of the trace.
-               start, err := strconv.ParseUint(startStr, 10, 64)
+               start, err = strconv.ParseInt(startStr, 10, 64)
                if err != nil {
                        log.Printf("failed to parse start parameter '%v': %v", startStr, err)
                        return
                }
-               end, err := strconv.ParseUint(endStr, 10, 64)
+               end, err = strconv.ParseInt(endStr, 10, 64)
                if err != nil {
                        log.Printf("failed to parse end parameter '%v': %v", endStr, err)
                        return
                }
-               if start >= uint64(len(data.Events)) || end <= start || end > uint64(len(data.Events)) {
-                       log.Printf("bogus start/end parameters: %v/%v, trace size %v", start, end, len(data.Events))
-                       return
-               }
-               data.Events = append(data.Events[start:end], data.Events[data.footer:]...)
        }
-       err = json.NewEncoder(w).Encode(data)
-       if err != nil {
-               log.Printf("failed to serialize trace: %v", err)
+
+       c := viewerDataTraceConsumer(w, start, end)
+       if err := generateTrace(params, c); err != nil {
+               log.Printf("failed to generate trace: %v", err)
                return
        }
 }
@@ -256,36 +250,98 @@ type Range struct {
 }
 
 // splitTrace splits the trace into a number of ranges,
-// each resulting in approx 100MB of json output (trace viewer can hardly handle more).
-func splitTrace(data ViewerData) []Range {
-       const rangeSize = 100 << 20
-       var ranges []Range
-       cw := new(countingWriter)
-       enc := json.NewEncoder(cw)
-       // First calculate size of the mandatory part of the trace.
-       // This includes stack traces and thread names.
-       data1 := data
-       data1.Events = data.Events[data.footer:]
-       enc.Encode(data1)
-       auxSize := cw.size
-       cw.size = 0
-       // Then calculate size of each individual event and group them into ranges.
-       for i, start := 0, 0; i < data.footer; i++ {
-               enc.Encode(data.Events[i])
-               if cw.size+auxSize > rangeSize || i == data.footer-1 {
-                       ranges = append(ranges, Range{
-                               Name:  fmt.Sprintf("%v-%v", time.Duration(data.Events[start].Time*1000), time.Duration(data.Events[i].Time*1000)),
-                               Start: start,
-                               End:   i + 1,
-                       })
-                       start = i + 1
-                       cw.size = 0
-               }
+// each resulting in approx 100MB of json output
+// (trace viewer can hardly handle more).
+func splitTrace(res trace.ParseResult) []Range {
+       params := &traceParams{
+               parsed:  res,
+               endTime: math.MaxInt64,
        }
-       if len(ranges) == 1 {
-               ranges = nil
+       s, c := splittingTraceConsumer(100 << 20) // 100M
+       if err := generateTrace(params, c); err != nil {
+               dief("%v\n", err)
+       }
+       return s.Ranges
+}
+
+type splitter struct {
+       Ranges []Range
+}
+
+func splittingTraceConsumer(max int) (*splitter, traceConsumer) {
+       type eventSz struct {
+               Time float64
+               Sz   int
+       }
+
+       var (
+               data = ViewerData{Frames: make(map[string]ViewerFrame)}
+
+               sizes []eventSz
+               cw    countingWriter
+       )
+
+       s := new(splitter)
+
+       return s, traceConsumer{
+               consumeTimeUnit: func(unit string) {
+                       data.TimeUnit = unit
+               },
+               consumeViewerEvent: func(v *ViewerEvent, required bool) {
+                       if required {
+                               // Store required events inside data
+                               // so flush can include them in the required
+                               // part of the trace.
+                               data.Events = append(data.Events, v)
+                       }
+                       enc := json.NewEncoder(&cw)
+                       enc.Encode(v)
+                       sizes = append(sizes, eventSz{v.Time, cw.size + 1}) // +1 for ",".
+                       cw.size = 0
+               },
+               consumeViewerFrame: func(k string, v ViewerFrame) {
+                       data.Frames[k] = v
+               },
+               flush: func() {
+                       // Calculate size of the mandatory part of the trace.
+                       // This includes stack traces and thread names.
+                       cw.size = 0
+                       enc := json.NewEncoder(&cw)
+                       enc.Encode(data)
+                       minSize := cw.size
+
+                       // Then calculate size of each individual event
+                       // and group them into ranges.
+                       sum := minSize
+                       start := 0
+                       for i, ev := range sizes {
+                               if sum+ev.Sz > max {
+                                       ranges = append(ranges, Range{
+                                               Name:  fmt.Sprintf("%v-%v", time.Duration(sizes[start].Time*1000), time.Duration(ev.Time*1000)),
+                                               Start: start,
+                                               End:   i + 1,
+                                       })
+                                       start = i + 1
+                                       sum = minSize
+                               } else {
+                                       sum += ev.Sz + 1
+                               }
+                       }
+                       if len(ranges) <= 1 {
+                               s.Ranges = nil
+                               return
+                       }
+
+                       if end := len(sizes) - 1; start < end {
+                               ranges = append(ranges, Range{
+                                       Name:  fmt.Sprintf("%v-%v", time.Duration(sizes[start].Time*1000), time.Duration(sizes[end].Time*1000)),
+                                       Start: start,
+                                       End:   end,
+                               })
+                       }
+                       s.Ranges = ranges
+               },
        }
-       return ranges
 }
 
 type countingWriter struct {
@@ -317,7 +373,7 @@ const (
 
 type traceContext struct {
        *traceParams
-       data      ViewerData
+       consumer  traceConsumer
        frameTree frameNode
        frameSeq  int
        arrowSeq  uint64
@@ -402,6 +458,13 @@ type SortIndexArg struct {
        Index int `json:"sort_index"`
 }
 
+type traceConsumer struct {
+       consumeTimeUnit    func(unit string)
+       consumeViewerEvent func(v *ViewerEvent, required bool)
+       consumeViewerFrame func(key string, f ViewerFrame)
+       flush              func()
+}
+
 // generateTrace generates json trace for trace-viewer:
 // https://github.com/google/trace-viewer
 // Trace format is described at:
@@ -409,11 +472,14 @@ type SortIndexArg struct {
 // If mode==goroutineMode, generate trace for goroutine goid, otherwise whole trace.
 // startTime, endTime determine part of the trace that we are interested in.
 // gset restricts goroutines that are included in the resulting trace.
-func generateTrace(params *traceParams) (ViewerData, error) {
+func generateTrace(params *traceParams, consumer traceConsumer) error {
+       defer consumer.flush()
+
        ctx := &traceContext{traceParams: params}
        ctx.frameTree.children = make(map[uint64]frameNode)
-       ctx.data.Frames = make(map[string]ViewerFrame)
-       ctx.data.TimeUnit = "ns"
+       ctx.consumer = consumer
+
+       ctx.consumer.consumeTimeUnit("ns")
        maxProc := 0
        ginfos := make(map[uint64]*gInfo)
        stacks := params.parsed.Stacks
@@ -459,12 +525,12 @@ func generateTrace(params *traceParams) (ViewerData, error) {
                        newG := ev.Args[0]
                        info := getGInfo(newG)
                        if info.name != "" {
-                               return ctx.data, fmt.Errorf("duplicate go create event for go id=%d detected at offset %d", newG, ev.Off)
+                               return fmt.Errorf("duplicate go create event for go id=%d detected at offset %d", newG, ev.Off)
                        }
 
                        stk, ok := stacks[ev.Args[1]]
                        if !ok || len(stk) == 0 {
-                               return ctx.data, fmt.Errorf("invalid go create event: missing stack information for go id=%d at offset %d", newG, ev.Off)
+                               return fmt.Errorf("invalid go create event: missing stack information for go id=%d at offset %d", newG, ev.Off)
                        }
 
                        fname := stk[0].Fn
@@ -520,10 +586,10 @@ func generateTrace(params *traceParams) (ViewerData, error) {
                        ctx.heapStats.nextGC = ev.Args[0]
                }
                if setGStateErr != nil {
-                       return ctx.data, setGStateErr
+                       return setGStateErr
                }
                if ctx.gstates[gRunnable] < 0 || ctx.gstates[gRunning] < 0 || ctx.threadStats.insyscall < 0 || ctx.threadStats.insyscallRuntime < 0 {
-                       return ctx.data, fmt.Errorf("invalid state after processing %v: runnable=%d running=%d insyscall=%d insyscallRuntime=%d", ev, ctx.gstates[gRunnable], ctx.gstates[gRunning], ctx.threadStats.insyscall, ctx.threadStats.insyscallRuntime)
+                       return fmt.Errorf("invalid state after processing %v: runnable=%d running=%d insyscall=%d insyscallRuntime=%d", ev, ctx.gstates[gRunnable], ctx.gstates[gRunning], ctx.threadStats.insyscall, ctx.threadStats.insyscallRuntime)
                }
 
                // Ignore events that are from uninteresting goroutines
@@ -622,30 +688,29 @@ func generateTrace(params *traceParams) (ViewerData, error) {
                ctx.emitGoroutineCounters(ev)
        }
 
-       ctx.data.footer = len(ctx.data.Events)
-       ctx.emit(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 0, Arg: &NameArg{"PROCS"}})
-       ctx.emit(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 0, Arg: &SortIndexArg{1}})
+       ctx.emitFooter(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 0, Arg: &NameArg{"PROCS"}})
+       ctx.emitFooter(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 0, Arg: &SortIndexArg{1}})
 
-       ctx.emit(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 1, Arg: &NameArg{"STATS"}})
-       ctx.emit(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 1, Arg: &SortIndexArg{0}})
+       ctx.emitFooter(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 1, Arg: &NameArg{"STATS"}})
+       ctx.emitFooter(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 1, Arg: &SortIndexArg{0}})
 
-       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &NameArg{"GC"}})
-       ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &SortIndexArg{-6}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &NameArg{"GC"}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &SortIndexArg{-6}})
 
-       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &NameArg{"Network"}})
-       ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &SortIndexArg{-5}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &NameArg{"Network"}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &SortIndexArg{-5}})
 
-       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &NameArg{"Timers"}})
-       ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &SortIndexArg{-4}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &NameArg{"Timers"}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &SortIndexArg{-4}})
 
-       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &NameArg{"Syscalls"}})
-       ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &SortIndexArg{-3}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &NameArg{"Syscalls"}})
+       ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &SortIndexArg{-3}})
 
        // Display rows for Ps if we are in the default trace view mode.
        if ctx.mode == defaultTraceview {
                for i := 0; i <= maxProc; i++ {
-                       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &NameArg{fmt.Sprintf("Proc %v", i)}})
-                       ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &SortIndexArg{i}})
+                       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &NameArg{fmt.Sprintf("Proc %v", i)}})
+                       ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &SortIndexArg{i}})
                }
        }
 
@@ -681,20 +746,23 @@ func generateTrace(params *traceParams) (ViewerData, error) {
                        if !ctx.gs[k] {
                                continue
                        }
-                       ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: k, Arg: &NameArg{v.name}})
+                       ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: k, Arg: &NameArg{v.name}})
                }
                // Row for the main goroutine (maing)
-               ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: ctx.maing, Arg: &SortIndexArg{-2}})
-
+               ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: ctx.maing, Arg: &SortIndexArg{-2}})
                // Row for GC or global state (specified with G=0)
-               ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: 0, Arg: &SortIndexArg{-1}})
+               ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: 0, Arg: &SortIndexArg{-1}})
        }
 
-       return ctx.data, nil
+       return nil
 }
 
 func (ctx *traceContext) emit(e *ViewerEvent) {
-       ctx.data.Events = append(ctx.data.Events, e)
+       ctx.consumer.consumeViewerEvent(e, false)
+}
+
+func (ctx *traceContext) emitFooter(e *ViewerEvent) {
+       ctx.consumer.consumeViewerEvent(e, true)
 }
 
 func (ctx *traceContext) time(ev *trace.Event) float64 {
@@ -903,7 +971,7 @@ func (ctx *traceContext) buildBranch(parent frameNode, stk []*trace.Frame) int {
                node.id = ctx.frameSeq
                node.children = make(map[uint64]frameNode)
                parent.children[frame.PC] = node
-               ctx.data.Frames[strconv.Itoa(node.id)] = ViewerFrame{fmt.Sprintf("%v:%v", frame.Fn, frame.Line), parent.id}
+               ctx.consumer.consumeViewerFrame(strconv.Itoa(node.id), ViewerFrame{fmt.Sprintf("%v:%v", frame.Fn, frame.Line), parent.id})
        }
        return ctx.buildBranch(node, stk)
 }
@@ -925,3 +993,47 @@ func lastTimestamp() int64 {
        }
        return 0
 }
+
+type jsonWriter struct {
+       w   io.Writer
+       enc *json.Encoder
+}
+
+func viewerDataTraceConsumer(w io.Writer, start, end int64) traceConsumer {
+       frames := make(map[string]ViewerFrame)
+       enc := json.NewEncoder(w)
+       written := 0
+       index := int64(-1)
+
+       io.WriteString(w, "{")
+       return traceConsumer{
+               consumeTimeUnit: func(unit string) {
+                       io.WriteString(w, `"displayTimeUnit":`)
+                       enc.Encode(unit)
+                       io.WriteString(w, ",")
+               },
+               consumeViewerEvent: func(v *ViewerEvent, required bool) {
+                       index++
+                       if !required && (index < start || index > end) {
+                               // not in the range. Skip!
+                               return
+                       }
+                       if written == 0 {
+                               io.WriteString(w, `"traceEvents": [`)
+                       }
+                       if written > 0 {
+                               io.WriteString(w, ",")
+                       }
+                       enc.Encode(v)
+                       written++
+               },
+               consumeViewerFrame: func(k string, v ViewerFrame) {
+                       frames[k] = v
+               },
+               flush: func() {
+                       io.WriteString(w, `], "stackFrames":`)
+                       enc.Encode(frames)
+                       io.WriteString(w, `}`)
+               },
+       }
+}
index aaffda87f082eb438c99c083195f9af19154fdf5..aff38638029a9f8f29f8ad9b4eb18fa2b01c82a6 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "internal/trace"
+       "io/ioutil"
        "strings"
        "testing"
 )
@@ -68,12 +69,10 @@ func TestGoroutineCount(t *testing.T) {
                endTime: int64(1<<63 - 1),
        }
 
-       // If the counts drop below 0, generateTrace will return an error.
-       viewerData, err := generateTrace(params)
-       if err != nil {
-               t.Fatalf("generateTrace failed: %v", err)
-       }
-       for _, ev := range viewerData.Events {
+       // Use the default viewerDataTraceConsumer but replace
+       // consumeViewerEvent to intercept the ViewerEvents for testing.
+       c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1)
+       c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) {
                if ev.Name == "Goroutines" {
                        cnt := ev.Arg.(*goroutineCountersArg)
                        if cnt.Runnable+cnt.Running > 2 {
@@ -82,6 +81,11 @@ func TestGoroutineCount(t *testing.T) {
                        t.Logf("read %+v %+v", ev, cnt)
                }
        }
+
+       // If the counts drop below 0, generateTrace will return an error.
+       if err := generateTrace(params, c); err != nil {
+               t.Fatalf("generateTrace failed: %v", err)
+       }
 }
 
 func TestGoroutineFilter(t *testing.T) {
@@ -120,8 +124,8 @@ func TestGoroutineFilter(t *testing.T) {
                gs:      map[uint64]bool{10: true},
        }
 
-       _, err = generateTrace(params)
-       if err != nil {
+       c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1)
+       if err := generateTrace(params, c); err != nil {
                t.Fatalf("generateTrace failed: %v", err)
        }
 }
@@ -152,17 +156,18 @@ func TestPreemptedMarkAssist(t *testing.T) {
                endTime: int64(1<<63 - 1),
        }
 
-       viewerData, err := generateTrace(params)
-       if err != nil {
-               t.Fatalf("generateTrace failed: %v", err)
-       }
+       c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1)
 
        marks := 0
-       for _, ev := range viewerData.Events {
+       c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) {
                if strings.Contains(ev.Name, "MARK ASSIST") {
                        marks++
                }
        }
+       if err := generateTrace(params, c); err != nil {
+               t.Fatalf("generateTrace failed: %v", err)
+       }
+
        if marks != 2 {
                t.Errorf("Got %v MARK ASSIST events, want %v", marks, 2)
        }
index 1c0d76fa3fd6b5175c304fc5ee377f98ecb05598..19a8af0d4db520211dddcabcbca9fe6cffff2514 100644 (file)
@@ -9,6 +9,7 @@ package main
 import (
        "bytes"
        "internal/trace"
+       "io/ioutil"
        "runtime"
        rtrace "runtime/trace"
        "sync"
@@ -79,14 +80,8 @@ func TestGoroutineInSyscall(t *testing.T) {
 
        // Check only one thread for the pipe read goroutine is
        // considered in-syscall.
-       viewerData, err := generateTrace(&traceParams{
-               parsed:  res,
-               endTime: int64(1<<63 - 1),
-       })
-       if err != nil {
-               t.Fatalf("failed to generate ViewerData: %v", err)
-       }
-       for _, ev := range viewerData.Events {
+       c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1)
+       c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) {
                if ev.Name == "Threads" {
                        arg := ev.Arg.(*threadCountersArg)
                        if arg.InSyscall > 1 {
@@ -94,4 +89,12 @@ func TestGoroutineInSyscall(t *testing.T) {
                        }
                }
        }
+
+       param := &traceParams{
+               parsed:  res,
+               endTime: int64(1<<63 - 1),
+       }
+       if err := generateTrace(param, c); err != nil {
+               t.Fatalf("failed to generate ViewerData: %v", err)
+       }
 }