From: Hana Kim Date: Tue, 6 Feb 2018 19:34:32 +0000 (-0500) Subject: cmd/trace: generate jsontrace data in a streaming fashion X-Git-Tag: go1.11beta1~1312 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=ee465831eccef9d8380a0cbfbb526684399d35eb;p=gostls13.git cmd/trace: generate jsontrace data in a streaming fashion Update #21870 The Sys went down to 4.25G from 6.2G. $ DEBUG_MEMORY_USAGE=1 go tool trace trace.out 2018/03/07 08:49:01 Parsing trace... after parsing trace Alloc: 3385757184 Bytes Sys: 3661195896 Bytes HeapReleased: 0 Bytes HeapSys: 3488841728 Bytes HeapInUse: 3426516992 Bytes HeapAlloc: 3385757184 Bytes Enter to continue... 2018/03/07 08:49:18 Splitting trace... after spliting trace Alloc: 2352071904 Bytes Sys: 4243825464 Bytes HeapReleased: 0 Bytes HeapSys: 4025712640 Bytes HeapInUse: 2377703424 Bytes HeapAlloc: 2352071904 Bytes Enter to continue... after httpJsonTrace Alloc: 3228697832 Bytes Sys: 4250379064 Bytes HeapReleased: 0 Bytes HeapSys: 4025647104 Bytes HeapInUse: 3260014592 Bytes HeapAlloc: 3228697832 Bytes Change-Id: I546f26bdbc68b1e58f1af1235a0e299dc0ff115e Reviewed-on: https://go-review.googlesource.com/92375 Run-TryBot: Hyang-Ah Hana Kim Reviewed-by: Peter Weinberger TryBot-Result: Gobot Gobot --- diff --git a/src/cmd/trace/main.go b/src/cmd/trace/main.go index 856d411f86..c7b6a647e3 100644 --- a/src/cmd/trace/main.go +++ b/src/cmd/trace/main.go @@ -120,19 +120,8 @@ func main() { } reportMemoryUsage("after parsing trace") - log.Print("Serializing trace...") - params := &traceParams{ - parsed: res, - endTime: int64(1<<63 - 1), - } - data, err := generateTrace(params) - if err != nil { - dief("%v\n", err) - } - reportMemoryUsage("after generating trace") - log.Print("Splitting trace...") - ranges = splitTrace(data) + ranges = splitTrace(res) reportMemoryUsage("after spliting trace") addr := "http://" + ln.Addr().String() diff --git a/src/cmd/trace/trace.go b/src/cmd/trace/trace.go index 9a675aecec..fb2d3058de 100644 --- a/src/cmd/trace/trace.go +++ b/src/cmd/trace/trace.go @@ -8,7 +8,9 @@ import ( "encoding/json" "fmt" "internal/trace" + "io" "log" + "math" "net/http" "path/filepath" "runtime" @@ -173,7 +175,7 @@ func httpJsonTrace(w http.ResponseWriter, r *http.Request) { params := &traceParams{ parsed: res, - endTime: int64(1<<63 - 1), + endTime: math.MaxInt64, } if goids := r.FormValue("goid"); goids != "" { @@ -218,33 +220,25 @@ func httpJsonTrace(w http.ResponseWriter, r *http.Request) { params.gs = gs } - data, err := generateTrace(params) - if err != nil { - log.Printf("failed to generate trace: %v", err) - return - } - + start := int64(0) + end := int64(math.MaxInt64) if startStr, endStr := r.FormValue("start"), r.FormValue("end"); startStr != "" && endStr != "" { // If start/end arguments are present, we are rendering a range of the trace. - start, err := strconv.ParseUint(startStr, 10, 64) + start, err = strconv.ParseInt(startStr, 10, 64) if err != nil { log.Printf("failed to parse start parameter '%v': %v", startStr, err) return } - end, err := strconv.ParseUint(endStr, 10, 64) + end, err = strconv.ParseInt(endStr, 10, 64) if err != nil { log.Printf("failed to parse end parameter '%v': %v", endStr, err) return } - if start >= uint64(len(data.Events)) || end <= start || end > uint64(len(data.Events)) { - log.Printf("bogus start/end parameters: %v/%v, trace size %v", start, end, len(data.Events)) - return - } - data.Events = append(data.Events[start:end], data.Events[data.footer:]...) } - err = json.NewEncoder(w).Encode(data) - if err != nil { - log.Printf("failed to serialize trace: %v", err) + + c := viewerDataTraceConsumer(w, start, end) + if err := generateTrace(params, c); err != nil { + log.Printf("failed to generate trace: %v", err) return } } @@ -256,36 +250,98 @@ type Range struct { } // splitTrace splits the trace into a number of ranges, -// each resulting in approx 100MB of json output (trace viewer can hardly handle more). -func splitTrace(data ViewerData) []Range { - const rangeSize = 100 << 20 - var ranges []Range - cw := new(countingWriter) - enc := json.NewEncoder(cw) - // First calculate size of the mandatory part of the trace. - // This includes stack traces and thread names. - data1 := data - data1.Events = data.Events[data.footer:] - enc.Encode(data1) - auxSize := cw.size - cw.size = 0 - // Then calculate size of each individual event and group them into ranges. - for i, start := 0, 0; i < data.footer; i++ { - enc.Encode(data.Events[i]) - if cw.size+auxSize > rangeSize || i == data.footer-1 { - ranges = append(ranges, Range{ - Name: fmt.Sprintf("%v-%v", time.Duration(data.Events[start].Time*1000), time.Duration(data.Events[i].Time*1000)), - Start: start, - End: i + 1, - }) - start = i + 1 - cw.size = 0 - } +// each resulting in approx 100MB of json output +// (trace viewer can hardly handle more). +func splitTrace(res trace.ParseResult) []Range { + params := &traceParams{ + parsed: res, + endTime: math.MaxInt64, } - if len(ranges) == 1 { - ranges = nil + s, c := splittingTraceConsumer(100 << 20) // 100M + if err := generateTrace(params, c); err != nil { + dief("%v\n", err) + } + return s.Ranges +} + +type splitter struct { + Ranges []Range +} + +func splittingTraceConsumer(max int) (*splitter, traceConsumer) { + type eventSz struct { + Time float64 + Sz int + } + + var ( + data = ViewerData{Frames: make(map[string]ViewerFrame)} + + sizes []eventSz + cw countingWriter + ) + + s := new(splitter) + + return s, traceConsumer{ + consumeTimeUnit: func(unit string) { + data.TimeUnit = unit + }, + consumeViewerEvent: func(v *ViewerEvent, required bool) { + if required { + // Store required events inside data + // so flush can include them in the required + // part of the trace. + data.Events = append(data.Events, v) + } + enc := json.NewEncoder(&cw) + enc.Encode(v) + sizes = append(sizes, eventSz{v.Time, cw.size + 1}) // +1 for ",". + cw.size = 0 + }, + consumeViewerFrame: func(k string, v ViewerFrame) { + data.Frames[k] = v + }, + flush: func() { + // Calculate size of the mandatory part of the trace. + // This includes stack traces and thread names. + cw.size = 0 + enc := json.NewEncoder(&cw) + enc.Encode(data) + minSize := cw.size + + // Then calculate size of each individual event + // and group them into ranges. + sum := minSize + start := 0 + for i, ev := range sizes { + if sum+ev.Sz > max { + ranges = append(ranges, Range{ + Name: fmt.Sprintf("%v-%v", time.Duration(sizes[start].Time*1000), time.Duration(ev.Time*1000)), + Start: start, + End: i + 1, + }) + start = i + 1 + sum = minSize + } else { + sum += ev.Sz + 1 + } + } + if len(ranges) <= 1 { + s.Ranges = nil + return + } + + if end := len(sizes) - 1; start < end { + ranges = append(ranges, Range{ + Name: fmt.Sprintf("%v-%v", time.Duration(sizes[start].Time*1000), time.Duration(sizes[end].Time*1000)), + Start: start, + End: end, + }) + } + s.Ranges = ranges + }, } - return ranges } type countingWriter struct { @@ -317,7 +373,7 @@ const ( type traceContext struct { *traceParams - data ViewerData + consumer traceConsumer frameTree frameNode frameSeq int arrowSeq uint64 @@ -402,6 +458,13 @@ type SortIndexArg struct { Index int `json:"sort_index"` } +type traceConsumer struct { + consumeTimeUnit func(unit string) + consumeViewerEvent func(v *ViewerEvent, required bool) + consumeViewerFrame func(key string, f ViewerFrame) + flush func() +} + // generateTrace generates json trace for trace-viewer: // https://github.com/google/trace-viewer // Trace format is described at: @@ -409,11 +472,14 @@ type SortIndexArg struct { // If mode==goroutineMode, generate trace for goroutine goid, otherwise whole trace. // startTime, endTime determine part of the trace that we are interested in. // gset restricts goroutines that are included in the resulting trace. -func generateTrace(params *traceParams) (ViewerData, error) { +func generateTrace(params *traceParams, consumer traceConsumer) error { + defer consumer.flush() + ctx := &traceContext{traceParams: params} ctx.frameTree.children = make(map[uint64]frameNode) - ctx.data.Frames = make(map[string]ViewerFrame) - ctx.data.TimeUnit = "ns" + ctx.consumer = consumer + + ctx.consumer.consumeTimeUnit("ns") maxProc := 0 ginfos := make(map[uint64]*gInfo) stacks := params.parsed.Stacks @@ -459,12 +525,12 @@ func generateTrace(params *traceParams) (ViewerData, error) { newG := ev.Args[0] info := getGInfo(newG) if info.name != "" { - return ctx.data, fmt.Errorf("duplicate go create event for go id=%d detected at offset %d", newG, ev.Off) + return fmt.Errorf("duplicate go create event for go id=%d detected at offset %d", newG, ev.Off) } stk, ok := stacks[ev.Args[1]] if !ok || len(stk) == 0 { - return ctx.data, fmt.Errorf("invalid go create event: missing stack information for go id=%d at offset %d", newG, ev.Off) + return fmt.Errorf("invalid go create event: missing stack information for go id=%d at offset %d", newG, ev.Off) } fname := stk[0].Fn @@ -520,10 +586,10 @@ func generateTrace(params *traceParams) (ViewerData, error) { ctx.heapStats.nextGC = ev.Args[0] } if setGStateErr != nil { - return ctx.data, setGStateErr + return setGStateErr } if ctx.gstates[gRunnable] < 0 || ctx.gstates[gRunning] < 0 || ctx.threadStats.insyscall < 0 || ctx.threadStats.insyscallRuntime < 0 { - return ctx.data, fmt.Errorf("invalid state after processing %v: runnable=%d running=%d insyscall=%d insyscallRuntime=%d", ev, ctx.gstates[gRunnable], ctx.gstates[gRunning], ctx.threadStats.insyscall, ctx.threadStats.insyscallRuntime) + return fmt.Errorf("invalid state after processing %v: runnable=%d running=%d insyscall=%d insyscallRuntime=%d", ev, ctx.gstates[gRunnable], ctx.gstates[gRunning], ctx.threadStats.insyscall, ctx.threadStats.insyscallRuntime) } // Ignore events that are from uninteresting goroutines @@ -622,30 +688,29 @@ func generateTrace(params *traceParams) (ViewerData, error) { ctx.emitGoroutineCounters(ev) } - ctx.data.footer = len(ctx.data.Events) - ctx.emit(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 0, Arg: &NameArg{"PROCS"}}) - ctx.emit(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 0, Arg: &SortIndexArg{1}}) + ctx.emitFooter(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 0, Arg: &NameArg{"PROCS"}}) + ctx.emitFooter(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 0, Arg: &SortIndexArg{1}}) - ctx.emit(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 1, Arg: &NameArg{"STATS"}}) - ctx.emit(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 1, Arg: &SortIndexArg{0}}) + ctx.emitFooter(&ViewerEvent{Name: "process_name", Phase: "M", Pid: 1, Arg: &NameArg{"STATS"}}) + ctx.emitFooter(&ViewerEvent{Name: "process_sort_index", Phase: "M", Pid: 1, Arg: &SortIndexArg{0}}) - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &NameArg{"GC"}}) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &SortIndexArg{-6}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &NameArg{"GC"}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.GCP, Arg: &SortIndexArg{-6}}) - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &NameArg{"Network"}}) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &SortIndexArg{-5}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &NameArg{"Network"}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.NetpollP, Arg: &SortIndexArg{-5}}) - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &NameArg{"Timers"}}) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &SortIndexArg{-4}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &NameArg{"Timers"}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.TimerP, Arg: &SortIndexArg{-4}}) - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &NameArg{"Syscalls"}}) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &SortIndexArg{-3}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &NameArg{"Syscalls"}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: trace.SyscallP, Arg: &SortIndexArg{-3}}) // Display rows for Ps if we are in the default trace view mode. if ctx.mode == defaultTraceview { for i := 0; i <= maxProc; i++ { - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &NameArg{fmt.Sprintf("Proc %v", i)}}) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &SortIndexArg{i}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &NameArg{fmt.Sprintf("Proc %v", i)}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: uint64(i), Arg: &SortIndexArg{i}}) } } @@ -681,20 +746,23 @@ func generateTrace(params *traceParams) (ViewerData, error) { if !ctx.gs[k] { continue } - ctx.emit(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: k, Arg: &NameArg{v.name}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_name", Phase: "M", Pid: 0, Tid: k, Arg: &NameArg{v.name}}) } // Row for the main goroutine (maing) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: ctx.maing, Arg: &SortIndexArg{-2}}) - + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: ctx.maing, Arg: &SortIndexArg{-2}}) // Row for GC or global state (specified with G=0) - ctx.emit(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: 0, Arg: &SortIndexArg{-1}}) + ctx.emitFooter(&ViewerEvent{Name: "thread_sort_index", Phase: "M", Pid: 0, Tid: 0, Arg: &SortIndexArg{-1}}) } - return ctx.data, nil + return nil } func (ctx *traceContext) emit(e *ViewerEvent) { - ctx.data.Events = append(ctx.data.Events, e) + ctx.consumer.consumeViewerEvent(e, false) +} + +func (ctx *traceContext) emitFooter(e *ViewerEvent) { + ctx.consumer.consumeViewerEvent(e, true) } func (ctx *traceContext) time(ev *trace.Event) float64 { @@ -903,7 +971,7 @@ func (ctx *traceContext) buildBranch(parent frameNode, stk []*trace.Frame) int { node.id = ctx.frameSeq node.children = make(map[uint64]frameNode) parent.children[frame.PC] = node - ctx.data.Frames[strconv.Itoa(node.id)] = ViewerFrame{fmt.Sprintf("%v:%v", frame.Fn, frame.Line), parent.id} + ctx.consumer.consumeViewerFrame(strconv.Itoa(node.id), ViewerFrame{fmt.Sprintf("%v:%v", frame.Fn, frame.Line), parent.id}) } return ctx.buildBranch(node, stk) } @@ -925,3 +993,47 @@ func lastTimestamp() int64 { } return 0 } + +type jsonWriter struct { + w io.Writer + enc *json.Encoder +} + +func viewerDataTraceConsumer(w io.Writer, start, end int64) traceConsumer { + frames := make(map[string]ViewerFrame) + enc := json.NewEncoder(w) + written := 0 + index := int64(-1) + + io.WriteString(w, "{") + return traceConsumer{ + consumeTimeUnit: func(unit string) { + io.WriteString(w, `"displayTimeUnit":`) + enc.Encode(unit) + io.WriteString(w, ",") + }, + consumeViewerEvent: func(v *ViewerEvent, required bool) { + index++ + if !required && (index < start || index > end) { + // not in the range. Skip! + return + } + if written == 0 { + io.WriteString(w, `"traceEvents": [`) + } + if written > 0 { + io.WriteString(w, ",") + } + enc.Encode(v) + written++ + }, + consumeViewerFrame: func(k string, v ViewerFrame) { + frames[k] = v + }, + flush: func() { + io.WriteString(w, `], "stackFrames":`) + enc.Encode(frames) + io.WriteString(w, `}`) + }, + } +} diff --git a/src/cmd/trace/trace_test.go b/src/cmd/trace/trace_test.go index aaffda87f0..aff3863802 100644 --- a/src/cmd/trace/trace_test.go +++ b/src/cmd/trace/trace_test.go @@ -6,6 +6,7 @@ package main import ( "internal/trace" + "io/ioutil" "strings" "testing" ) @@ -68,12 +69,10 @@ func TestGoroutineCount(t *testing.T) { endTime: int64(1<<63 - 1), } - // If the counts drop below 0, generateTrace will return an error. - viewerData, err := generateTrace(params) - if err != nil { - t.Fatalf("generateTrace failed: %v", err) - } - for _, ev := range viewerData.Events { + // Use the default viewerDataTraceConsumer but replace + // consumeViewerEvent to intercept the ViewerEvents for testing. + c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1) + c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) { if ev.Name == "Goroutines" { cnt := ev.Arg.(*goroutineCountersArg) if cnt.Runnable+cnt.Running > 2 { @@ -82,6 +81,11 @@ func TestGoroutineCount(t *testing.T) { t.Logf("read %+v %+v", ev, cnt) } } + + // If the counts drop below 0, generateTrace will return an error. + if err := generateTrace(params, c); err != nil { + t.Fatalf("generateTrace failed: %v", err) + } } func TestGoroutineFilter(t *testing.T) { @@ -120,8 +124,8 @@ func TestGoroutineFilter(t *testing.T) { gs: map[uint64]bool{10: true}, } - _, err = generateTrace(params) - if err != nil { + c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1) + if err := generateTrace(params, c); err != nil { t.Fatalf("generateTrace failed: %v", err) } } @@ -152,17 +156,18 @@ func TestPreemptedMarkAssist(t *testing.T) { endTime: int64(1<<63 - 1), } - viewerData, err := generateTrace(params) - if err != nil { - t.Fatalf("generateTrace failed: %v", err) - } + c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1) marks := 0 - for _, ev := range viewerData.Events { + c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) { if strings.Contains(ev.Name, "MARK ASSIST") { marks++ } } + if err := generateTrace(params, c); err != nil { + t.Fatalf("generateTrace failed: %v", err) + } + if marks != 2 { t.Errorf("Got %v MARK ASSIST events, want %v", marks, 2) } diff --git a/src/cmd/trace/trace_unix_test.go b/src/cmd/trace/trace_unix_test.go index 1c0d76fa3f..19a8af0d4d 100644 --- a/src/cmd/trace/trace_unix_test.go +++ b/src/cmd/trace/trace_unix_test.go @@ -9,6 +9,7 @@ package main import ( "bytes" "internal/trace" + "io/ioutil" "runtime" rtrace "runtime/trace" "sync" @@ -79,14 +80,8 @@ func TestGoroutineInSyscall(t *testing.T) { // Check only one thread for the pipe read goroutine is // considered in-syscall. - viewerData, err := generateTrace(&traceParams{ - parsed: res, - endTime: int64(1<<63 - 1), - }) - if err != nil { - t.Fatalf("failed to generate ViewerData: %v", err) - } - for _, ev := range viewerData.Events { + c := viewerDataTraceConsumer(ioutil.Discard, 0, 1<<63-1) + c.consumeViewerEvent = func(ev *ViewerEvent, _ bool) { if ev.Name == "Threads" { arg := ev.Arg.(*threadCountersArg) if arg.InSyscall > 1 { @@ -94,4 +89,12 @@ func TestGoroutineInSyscall(t *testing.T) { } } } + + param := &traceParams{ + parsed: res, + endTime: int64(1<<63 - 1), + } + if err := generateTrace(param, c); err != nil { + t.Fatalf("failed to generate ViewerData: %v", err) + } }