From 33563d1cfc7939d99d18e58cea7eedd6fb1c6ed6 Mon Sep 17 00:00:00 2001 From: Austin Clements Date: Thu, 17 Aug 2017 11:31:03 -0400 Subject: [PATCH] internal/trace: support for mutator utilization distributions This adds support for computing the quantiles of a mutator utilization distribution. Change-Id: Ia8b3ed14bf415c234e2f567360fd1b361d28bd40 Reviewed-on: https://go-review.googlesource.com/c/60799 Run-TryBot: Austin Clements TryBot-Result: Gobot Gobot Reviewed-by: Hyang-Ah Hana Kim --- src/internal/traceparser/gc.go | 143 ++++++++++++++++- src/internal/traceparser/gc_test.go | 22 ++- src/internal/traceparser/mud.go | 223 +++++++++++++++++++++++++++ src/internal/traceparser/mud_test.go | 87 +++++++++++ 4 files changed, 467 insertions(+), 8 deletions(-) create mode 100644 src/internal/traceparser/mud.go create mode 100644 src/internal/traceparser/mud_test.go diff --git a/src/internal/traceparser/gc.go b/src/internal/traceparser/gc.go index 569ab86b82..a5b29112b3 100644 --- a/src/internal/traceparser/gc.go +++ b/src/internal/traceparser/gc.go @@ -273,6 +273,10 @@ func NewMMUCurve(utils [][]MutatorUtil) *MMUCurve { return &MMUCurve{series} } +// bandsPerSeries is the number of bands to divide each series into. +// This is only changed by tests. +var bandsPerSeries = 1000 + func newMMUSeries(util []MutatorUtil) mmuSeries { // Compute cumulative sum. sums := make([]totalUtil, len(util)) @@ -289,7 +293,7 @@ func newMMUSeries(util []MutatorUtil) mmuSeries { // these bands. // // Compute the duration of each band. - numBands := 1000 + numBands := bandsPerSeries if numBands > len(util) { // There's no point in having lots of bands if there // aren't many events. @@ -393,8 +397,8 @@ func (h *utilHeap) Pop() interface{} { return x } -// An accumulator collects different MMU-related statistics depending -// on what's desired. +// An accumulator takes a windowed mutator utilization function and +// tracks various statistics for that function. type accumulator struct { mmu float64 @@ -406,10 +410,30 @@ type accumulator struct { // Worst N window tracking nWorst int wHeap utilHeap -} -// addMU records mutator utilization mu over the given window starting -// at time. + // Mutator utilization distribution tracking + mud *mud + // preciseMass is the distribution mass that must be precise + // before accumulation is stopped. + preciseMass float64 + // lastTime and lastMU are the previous point added to the + // windowed mutator utilization function. + lastTime int64 + lastMU float64 +} + +// resetTime declares a discontinuity in the windowed mutator +// utilization function by resetting the current time. +func (acc *accumulator) resetTime() { + // This only matters for distribution collection, since that's + // the only thing that depends on the progression of the + // windowed mutator utilization function. + acc.lastTime = math.MaxInt64 +} + +// addMU adds a point to the windowed mutator utilization function at +// (time, mu). This must be called for monotonically increasing values +// of time. // // It returns true if further calls to addMU would be pointless. func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool { @@ -458,6 +482,25 @@ func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool acc.bound = math.Max(acc.bound, acc.wHeap[0].MutatorUtil) } + if acc.mud != nil { + if acc.lastTime != math.MaxInt64 { + // Update distribution. + acc.mud.add(acc.lastMU, mu, float64(time-acc.lastTime)) + } + acc.lastTime, acc.lastMU = time, mu + if _, mudBound, ok := acc.mud.approxInvCumulativeSum(); ok { + acc.bound = math.Max(acc.bound, mudBound) + } else { + // We haven't accumulated enough total precise + // mass yet to even reach our goal, so keep + // accumulating. + acc.bound = 1 + } + // It's not worth checking percentiles every time, so + // just keep accumulating this band. + return false + } + // If we've found enough 0 utilizations, we can stop immediately. return len(acc.wHeap) == acc.nWorst && acc.wHeap[0].MutatorUtil == 0 } @@ -484,6 +527,85 @@ func (c *MMUCurve) Examples(window time.Duration, n int) (worst []UtilWindow) { return ([]UtilWindow)(acc.wHeap) } +// MUD returns mutator utilization distribution quantiles for the +// given window size. +// +// The mutator utilization distribution is the distribution of mean +// mutator utilization across all windows of the given window size in +// the trace. +// +// The minimum mutator utilization is the minimum (0th percentile) of +// this distribution. (However, if only the minimum is desired, it's +// more efficient to use the MMU method.) +func (c *MMUCurve) MUD(window time.Duration, quantiles []float64) []float64 { + if len(quantiles) == 0 { + return []float64{} + } + + // Each unrefined band contributes a known total mass to the + // distribution (bandDur except at the end), but in an unknown + // way. However, we know that all the mass it contributes must + // be at or above its worst-case mean mutator utilization. + // + // Hence, we refine bands until the highest desired + // distribution quantile is less than the next worst-case mean + // mutator utilization. At this point, all further + // contributions to the distribution must be beyond the + // desired quantile and hence cannot affect it. + // + // First, find the highest desired distribution quantile. + maxQ := quantiles[0] + for _, q := range quantiles { + if q > maxQ { + maxQ = q + } + } + // The distribution's mass is in units of time (it's not + // normalized because this would make it more annoying to + // account for future contributions of unrefined bands). The + // total final mass will be the duration of the trace itself + // minus the window size. Using this, we can compute the mass + // corresponding to quantile maxQ. + var duration int64 + for _, s := range c.series { + duration1 := s.util[len(s.util)-1].Time - s.util[0].Time + if duration1 >= int64(window) { + duration += duration1 - int64(window) + } + } + qMass := float64(duration) * maxQ + + // Accumulate the MUD until we have precise information for + // everything to the left of qMass. + acc := accumulator{mmu: 1.0, bound: 1.0, preciseMass: qMass, mud: new(mud)} + acc.mud.setTrackMass(qMass) + c.mmu(window, &acc) + + // Evaluate the quantiles on the accumulated MUD. + out := make([]float64, len(quantiles)) + for i := range out { + mu, _ := acc.mud.invCumulativeSum(float64(duration) * quantiles[i]) + if math.IsNaN(mu) { + // There are a few legitimate ways this can + // happen: + // + // 1. If the window is the full trace + // duration, then the windowed MU function is + // only defined at a single point, so the MU + // distribution is not well-defined. + // + // 2. If there are no events, then the MU + // distribution has no mass. + // + // Either way, all of the quantiles will have + // converged toward the MMU at this point. + mu = acc.mmu + } + out[i] = mu + } + return out +} + func (c *MMUCurve) mmu(window time.Duration, acc *accumulator) { if window <= 0 { acc.mmu = 0 @@ -607,12 +729,16 @@ func (c *mmuSeries) bandMMU(bandIdx int, window time.Duration, acc *accumulator) if utilEnd := util[len(util)-1].Time - int64(window); utilEnd < endTime { endTime = utilEnd } + acc.resetTime() for { // Advance edges to time and time+window. mu := (right.advance(time+int64(window)) - left.advance(time)).mean(window) if acc.addMU(time, mu, window) { break } + if time == endTime { + break + } // The maximum slope of the windowed mutator // utilization function is 1/window, so we can always @@ -632,7 +758,10 @@ func (c *mmuSeries) bandMMU(bandIdx int, window time.Duration, acc *accumulator) time = minTime } if time >= endTime { - break + // For MMUs we could stop here, but for MUDs + // it's important that we span the entire + // band. + time = endTime } } } diff --git a/src/internal/traceparser/gc_test.go b/src/internal/traceparser/gc_test.go index b438a2931f..1cd8fb6f78 100644 --- a/src/internal/traceparser/gc_test.go +++ b/src/internal/traceparser/gc_test.go @@ -75,7 +75,8 @@ func TestMMU(t *testing.T) { } func TestMMUTrace(t *testing.T) { - t.Parallel() + // Can't be t.Parallel() because it modifies the + // testingOneBand package variable. p, err := New("../trace/testdata/stress_1_10_good") if err != nil { @@ -96,6 +97,25 @@ func TestMMUTrace(t *testing.T) { t.Errorf("want %f, got %f mutator utilization in window %s", want, got, window) } } + + // Test MUD with band optimization against MUD without band + // optimization. We don't have a simple testing implementation + // of MUDs (the simplest implementation is still quite + // complex), but this is still a pretty good test. + defer func(old int) { bandsPerSeries = old }(bandsPerSeries) + bandsPerSeries = 1 + mmuCurve2 := NewMMUCurve(mu) + quantiles := []float64{0, 1 - .999, 1 - .99} + for window := time.Microsecond; window < time.Second; window *= 10 { + mud1 := mmuCurve.MUD(window, quantiles) + mud2 := mmuCurve2.MUD(window, quantiles) + for i := range mud1 { + if !aeq(mud1[i], mud2[i]) { + t.Errorf("for quantiles %v at window %v, want %v, got %v", quantiles, window, mud2, mud1) + break + } + } + } } func BenchmarkMMU(b *testing.B) { diff --git a/src/internal/traceparser/mud.go b/src/internal/traceparser/mud.go new file mode 100644 index 0000000000..8eed89ff36 --- /dev/null +++ b/src/internal/traceparser/mud.go @@ -0,0 +1,223 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package traceparser + +import ( + "math" + "sort" +) + +// mud is an updatable mutator utilization distribution. +// +// This is a continuous distribution of duration over mutator +// utilization. For example, the integral from mutator utilization a +// to b is the total duration during which the mutator utilization was +// in the range [a, b]. +// +// This distribution is *not* normalized (it is not a probability +// distribution). This makes it easier to work with as it's being +// updated. +// +// It is represented as the sum of scaled uniform distribution +// functions and Dirac delta functions (which are treated as +// degenerate uniform distributions). +type mud struct { + sorted, unsorted []edge + + // trackMass is the inverse cumulative sum to track as the + // distribution is updated. + trackMass float64 + // trackBucket is the bucket in which trackMass falls. If the + // total mass of the distribution is < trackMass, this is + // len(hist). + trackBucket int + // trackSum is the cumulative sum of hist[:trackBucket]. Once + // trackSum >= trackMass, trackBucket must be recomputed. + trackSum float64 + + // hist is a hierarchical histogram of distribution mass. + hist [mudDegree]float64 +} + +const ( + // mudDegree is the number of buckets in the MUD summary + // histogram. + mudDegree = 1024 +) + +type edge struct { + // At x, the function increases by y. + x, delta float64 + // Additionally at x is a Dirac delta function with area dirac. + dirac float64 +} + +// add adds a uniform function over [l, r] scaled so the total weight +// of the uniform is area. If l==r, this adds a Dirac delta function. +func (d *mud) add(l, r, area float64) { + if area == 0 { + return + } + + if r < l { + l, r = r, l + } + + // Add the edges. + if l == r { + d.unsorted = append(d.unsorted, edge{l, 0, area}) + } else { + delta := area / (r - l) + d.unsorted = append(d.unsorted, edge{l, delta, 0}, edge{r, -delta, 0}) + } + + // Update the histogram. + h := &d.hist + lbFloat, lf := math.Modf(l * mudDegree) + lb := int(lbFloat) + if lb >= mudDegree { + lb, lf = mudDegree-1, 1 + } + if l == r { + h[lb] += area + } else { + rbFloat, rf := math.Modf(r * mudDegree) + rb := int(rbFloat) + if rb >= mudDegree { + rb, rf = mudDegree-1, 1 + } + if lb == rb { + h[lb] += area + } else { + perBucket := area / (r - l) / mudDegree + h[lb] += perBucket * (1 - lf) + h[rb] += perBucket * rf + for i := lb + 1; i < rb; i++ { + h[i] += perBucket + } + } + } + + // Update mass tracking. + if thresh := float64(d.trackBucket) / mudDegree; l < thresh { + if r < thresh { + d.trackSum += area + } else { + d.trackSum += area * (thresh - l) / (r - l) + } + if d.trackSum >= d.trackMass { + // The tracked mass now falls in a different + // bucket. Recompute the inverse cumulative sum. + d.setTrackMass(d.trackMass) + } + } +} + +// setTrackMass sets the mass to track the inverse cumulative sum for. +// +// Specifically, mass is a cumulative duration, and the mutator +// utilization bounds for this duration can be queried using +// approxInvCumulativeSum. +func (d *mud) setTrackMass(mass float64) { + d.trackMass = mass + + // Find the bucket currently containing trackMass by computing + // the cumulative sum. + sum := 0.0 + for i, val := range d.hist[:] { + newSum := sum + val + if newSum > mass { + // mass falls in bucket i. + d.trackBucket = i + d.trackSum = sum + return + } + sum = newSum + } + d.trackBucket = len(d.hist) + d.trackSum = sum +} + +// approxInvCumulativeSum is like invCumulativeSum, but specifically +// operates on the tracked mass and returns an upper and lower bound +// approximation of the inverse cumulative sum. +// +// The true inverse cumulative sum will be in the range [lower, upper). +func (d *mud) approxInvCumulativeSum() (float64, float64, bool) { + if d.trackBucket == len(d.hist) { + return math.NaN(), math.NaN(), false + } + return float64(d.trackBucket) / mudDegree, float64(d.trackBucket+1) / mudDegree, true +} + +// invCumulativeSum returns x such that the integral of d from -∞ to x +// is y. If the total weight of d is less than y, it returns the +// maximum of the distribution and false. +// +// Specifically, y is a cumulative duration, and invCumulativeSum +// returns the mutator utilization x such that at least y time has +// been spent with mutator utilization <= x. +func (d *mud) invCumulativeSum(y float64) (float64, bool) { + if len(d.sorted) == 0 && len(d.unsorted) == 0 { + return math.NaN(), false + } + + // Sort edges. + edges := d.unsorted + sort.Slice(edges, func(i, j int) bool { + return edges[i].x < edges[j].x + }) + // Merge with sorted edges. + d.unsorted = nil + if d.sorted == nil { + d.sorted = edges + } else { + oldSorted := d.sorted + newSorted := make([]edge, len(oldSorted)+len(edges)) + i, j := 0, 0 + for o := range newSorted { + if i >= len(oldSorted) { + copy(newSorted[o:], edges[j:]) + break + } else if j >= len(edges) { + copy(newSorted[o:], oldSorted[i:]) + break + } else if oldSorted[i].x < edges[j].x { + newSorted[o] = oldSorted[i] + i++ + } else { + newSorted[o] = edges[j] + j++ + } + } + d.sorted = newSorted + } + + // Traverse edges in order computing a cumulative sum. + csum, rate, prevX := 0.0, 0.0, 0.0 + for _, e := range d.sorted { + newCsum := csum + (e.x-prevX)*rate + if newCsum >= y { + // y was exceeded between the previous edge + // and this one. + if rate == 0 { + // Anywhere between prevX and + // e.x will do. We return e.x + // because that takes care of + // the y==0 case naturally. + return e.x, true + } + return (y-csum)/rate + prevX, true + } + newCsum += e.dirac + if newCsum >= y { + // y was exceeded by the Dirac delta at e.x. + return e.x, true + } + csum, prevX = newCsum, e.x + rate += e.delta + } + return prevX, false +} diff --git a/src/internal/traceparser/mud_test.go b/src/internal/traceparser/mud_test.go new file mode 100644 index 0000000000..6e048fcf19 --- /dev/null +++ b/src/internal/traceparser/mud_test.go @@ -0,0 +1,87 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package traceparser + +import ( + "math/rand" + "testing" +) + +func TestMUD(t *testing.T) { + // Insert random uniforms and check histogram mass and + // cumulative sum approximations. + rnd := rand.New(rand.NewSource(42)) + mass := 0.0 + var mud mud + for i := 0; i < 100; i++ { + area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64() + if rnd.Intn(10) == 0 { + r = l + } + t.Log(l, r, area) + mud.add(l, r, area) + mass += area + + // Check total histogram weight. + hmass := 0.0 + for _, val := range mud.hist { + hmass += val + } + if !aeq(mass, hmass) { + t.Fatalf("want mass %g, got %g", mass, hmass) + } + + // Check inverse cumulative sum approximations. + for j := 0.0; j < mass; j += mass * 0.099 { + mud.setTrackMass(j) + l, u, ok := mud.approxInvCumulativeSum() + inv, ok2 := mud.invCumulativeSum(j) + if !ok || !ok2 { + t.Fatalf("inverse cumulative sum failed: approx %v, exact %v", ok, ok2) + } + if !(l <= inv && inv < u) { + t.Fatalf("inverse(%g) = %g, not ∈ [%g, %g)", j, inv, l, u) + } + } + } +} + +func TestMUDTracking(t *testing.T) { + // Test that the tracked mass is tracked correctly across + // updates. + rnd := rand.New(rand.NewSource(42)) + const uniforms = 100 + for trackMass := 0.0; trackMass < uniforms; trackMass += uniforms / 50 { + var mud mud + mass := 0.0 + mud.setTrackMass(trackMass) + for i := 0; i < uniforms; i++ { + area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64() + mud.add(l, r, area) + mass += area + l, u, ok := mud.approxInvCumulativeSum() + inv, ok2 := mud.invCumulativeSum(trackMass) + + if mass < trackMass { + if ok { + t.Errorf("approx(%g) = [%g, %g), but mass = %g", trackMass, l, u, mass) + } + if ok2 { + t.Errorf("exact(%g) = %g, but mass = %g", trackMass, inv, mass) + } + } else { + if !ok { + t.Errorf("approx(%g) failed, but mass = %g", trackMass, mass) + } + if !ok2 { + t.Errorf("exact(%g) failed, but mass = %g", trackMass, mass) + } + if ok && ok2 && !(l <= inv && inv < u) { + t.Errorf("inverse(%g) = %g, not ∈ [%g, %g)", trackMass, inv, l, u) + } + } + } + } +} -- 2.48.1