return &MMUCurve{series}
}
+// bandsPerSeries is the number of bands to divide each series into.
+// This is only changed by tests.
+var bandsPerSeries = 1000
+
func newMMUSeries(util []MutatorUtil) mmuSeries {
// Compute cumulative sum.
sums := make([]totalUtil, len(util))
// these bands.
//
// Compute the duration of each band.
- numBands := 1000
+ numBands := bandsPerSeries
if numBands > len(util) {
// There's no point in having lots of bands if there
// aren't many events.
return x
}
-// An accumulator collects different MMU-related statistics depending
-// on what's desired.
+// An accumulator takes a windowed mutator utilization function and
+// tracks various statistics for that function.
type accumulator struct {
mmu float64
// Worst N window tracking
nWorst int
wHeap utilHeap
-}
-// addMU records mutator utilization mu over the given window starting
-// at time.
+ // Mutator utilization distribution tracking
+ mud *mud
+ // preciseMass is the distribution mass that must be precise
+ // before accumulation is stopped.
+ preciseMass float64
+ // lastTime and lastMU are the previous point added to the
+ // windowed mutator utilization function.
+ lastTime int64
+ lastMU float64
+}
+
+// resetTime declares a discontinuity in the windowed mutator
+// utilization function by resetting the current time.
+func (acc *accumulator) resetTime() {
+ // This only matters for distribution collection, since that's
+ // the only thing that depends on the progression of the
+ // windowed mutator utilization function.
+ acc.lastTime = math.MaxInt64
+}
+
+// addMU adds a point to the windowed mutator utilization function at
+// (time, mu). This must be called for monotonically increasing values
+// of time.
//
// It returns true if further calls to addMU would be pointless.
func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool {
acc.bound = math.Max(acc.bound, acc.wHeap[0].MutatorUtil)
}
+ if acc.mud != nil {
+ if acc.lastTime != math.MaxInt64 {
+ // Update distribution.
+ acc.mud.add(acc.lastMU, mu, float64(time-acc.lastTime))
+ }
+ acc.lastTime, acc.lastMU = time, mu
+ if _, mudBound, ok := acc.mud.approxInvCumulativeSum(); ok {
+ acc.bound = math.Max(acc.bound, mudBound)
+ } else {
+ // We haven't accumulated enough total precise
+ // mass yet to even reach our goal, so keep
+ // accumulating.
+ acc.bound = 1
+ }
+ // It's not worth checking percentiles every time, so
+ // just keep accumulating this band.
+ return false
+ }
+
// If we've found enough 0 utilizations, we can stop immediately.
return len(acc.wHeap) == acc.nWorst && acc.wHeap[0].MutatorUtil == 0
}
return ([]UtilWindow)(acc.wHeap)
}
+// MUD returns mutator utilization distribution quantiles for the
+// given window size.
+//
+// The mutator utilization distribution is the distribution of mean
+// mutator utilization across all windows of the given window size in
+// the trace.
+//
+// The minimum mutator utilization is the minimum (0th percentile) of
+// this distribution. (However, if only the minimum is desired, it's
+// more efficient to use the MMU method.)
+func (c *MMUCurve) MUD(window time.Duration, quantiles []float64) []float64 {
+ if len(quantiles) == 0 {
+ return []float64{}
+ }
+
+ // Each unrefined band contributes a known total mass to the
+ // distribution (bandDur except at the end), but in an unknown
+ // way. However, we know that all the mass it contributes must
+ // be at or above its worst-case mean mutator utilization.
+ //
+ // Hence, we refine bands until the highest desired
+ // distribution quantile is less than the next worst-case mean
+ // mutator utilization. At this point, all further
+ // contributions to the distribution must be beyond the
+ // desired quantile and hence cannot affect it.
+ //
+ // First, find the highest desired distribution quantile.
+ maxQ := quantiles[0]
+ for _, q := range quantiles {
+ if q > maxQ {
+ maxQ = q
+ }
+ }
+ // The distribution's mass is in units of time (it's not
+ // normalized because this would make it more annoying to
+ // account for future contributions of unrefined bands). The
+ // total final mass will be the duration of the trace itself
+ // minus the window size. Using this, we can compute the mass
+ // corresponding to quantile maxQ.
+ var duration int64
+ for _, s := range c.series {
+ duration1 := s.util[len(s.util)-1].Time - s.util[0].Time
+ if duration1 >= int64(window) {
+ duration += duration1 - int64(window)
+ }
+ }
+ qMass := float64(duration) * maxQ
+
+ // Accumulate the MUD until we have precise information for
+ // everything to the left of qMass.
+ acc := accumulator{mmu: 1.0, bound: 1.0, preciseMass: qMass, mud: new(mud)}
+ acc.mud.setTrackMass(qMass)
+ c.mmu(window, &acc)
+
+ // Evaluate the quantiles on the accumulated MUD.
+ out := make([]float64, len(quantiles))
+ for i := range out {
+ mu, _ := acc.mud.invCumulativeSum(float64(duration) * quantiles[i])
+ if math.IsNaN(mu) {
+ // There are a few legitimate ways this can
+ // happen:
+ //
+ // 1. If the window is the full trace
+ // duration, then the windowed MU function is
+ // only defined at a single point, so the MU
+ // distribution is not well-defined.
+ //
+ // 2. If there are no events, then the MU
+ // distribution has no mass.
+ //
+ // Either way, all of the quantiles will have
+ // converged toward the MMU at this point.
+ mu = acc.mmu
+ }
+ out[i] = mu
+ }
+ return out
+}
+
func (c *MMUCurve) mmu(window time.Duration, acc *accumulator) {
if window <= 0 {
acc.mmu = 0
if utilEnd := util[len(util)-1].Time - int64(window); utilEnd < endTime {
endTime = utilEnd
}
+ acc.resetTime()
for {
// Advance edges to time and time+window.
mu := (right.advance(time+int64(window)) - left.advance(time)).mean(window)
if acc.addMU(time, mu, window) {
break
}
+ if time == endTime {
+ break
+ }
// The maximum slope of the windowed mutator
// utilization function is 1/window, so we can always
time = minTime
}
if time >= endTime {
- break
+ // For MMUs we could stop here, but for MUDs
+ // it's important that we span the entire
+ // band.
+ time = endTime
}
}
}
}
func TestMMUTrace(t *testing.T) {
- t.Parallel()
+ // Can't be t.Parallel() because it modifies the
+ // testingOneBand package variable.
p, err := New("../trace/testdata/stress_1_10_good")
if err != nil {
t.Errorf("want %f, got %f mutator utilization in window %s", want, got, window)
}
}
+
+ // Test MUD with band optimization against MUD without band
+ // optimization. We don't have a simple testing implementation
+ // of MUDs (the simplest implementation is still quite
+ // complex), but this is still a pretty good test.
+ defer func(old int) { bandsPerSeries = old }(bandsPerSeries)
+ bandsPerSeries = 1
+ mmuCurve2 := NewMMUCurve(mu)
+ quantiles := []float64{0, 1 - .999, 1 - .99}
+ for window := time.Microsecond; window < time.Second; window *= 10 {
+ mud1 := mmuCurve.MUD(window, quantiles)
+ mud2 := mmuCurve2.MUD(window, quantiles)
+ for i := range mud1 {
+ if !aeq(mud1[i], mud2[i]) {
+ t.Errorf("for quantiles %v at window %v, want %v, got %v", quantiles, window, mud2, mud1)
+ break
+ }
+ }
+ }
}
func BenchmarkMMU(b *testing.B) {
--- /dev/null
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package traceparser
+
+import (
+ "math"
+ "sort"
+)
+
+// mud is an updatable mutator utilization distribution.
+//
+// This is a continuous distribution of duration over mutator
+// utilization. For example, the integral from mutator utilization a
+// to b is the total duration during which the mutator utilization was
+// in the range [a, b].
+//
+// This distribution is *not* normalized (it is not a probability
+// distribution). This makes it easier to work with as it's being
+// updated.
+//
+// It is represented as the sum of scaled uniform distribution
+// functions and Dirac delta functions (which are treated as
+// degenerate uniform distributions).
+type mud struct {
+ sorted, unsorted []edge
+
+ // trackMass is the inverse cumulative sum to track as the
+ // distribution is updated.
+ trackMass float64
+ // trackBucket is the bucket in which trackMass falls. If the
+ // total mass of the distribution is < trackMass, this is
+ // len(hist).
+ trackBucket int
+ // trackSum is the cumulative sum of hist[:trackBucket]. Once
+ // trackSum >= trackMass, trackBucket must be recomputed.
+ trackSum float64
+
+ // hist is a hierarchical histogram of distribution mass.
+ hist [mudDegree]float64
+}
+
+const (
+ // mudDegree is the number of buckets in the MUD summary
+ // histogram.
+ mudDegree = 1024
+)
+
+type edge struct {
+ // At x, the function increases by y.
+ x, delta float64
+ // Additionally at x is a Dirac delta function with area dirac.
+ dirac float64
+}
+
+// add adds a uniform function over [l, r] scaled so the total weight
+// of the uniform is area. If l==r, this adds a Dirac delta function.
+func (d *mud) add(l, r, area float64) {
+ if area == 0 {
+ return
+ }
+
+ if r < l {
+ l, r = r, l
+ }
+
+ // Add the edges.
+ if l == r {
+ d.unsorted = append(d.unsorted, edge{l, 0, area})
+ } else {
+ delta := area / (r - l)
+ d.unsorted = append(d.unsorted, edge{l, delta, 0}, edge{r, -delta, 0})
+ }
+
+ // Update the histogram.
+ h := &d.hist
+ lbFloat, lf := math.Modf(l * mudDegree)
+ lb := int(lbFloat)
+ if lb >= mudDegree {
+ lb, lf = mudDegree-1, 1
+ }
+ if l == r {
+ h[lb] += area
+ } else {
+ rbFloat, rf := math.Modf(r * mudDegree)
+ rb := int(rbFloat)
+ if rb >= mudDegree {
+ rb, rf = mudDegree-1, 1
+ }
+ if lb == rb {
+ h[lb] += area
+ } else {
+ perBucket := area / (r - l) / mudDegree
+ h[lb] += perBucket * (1 - lf)
+ h[rb] += perBucket * rf
+ for i := lb + 1; i < rb; i++ {
+ h[i] += perBucket
+ }
+ }
+ }
+
+ // Update mass tracking.
+ if thresh := float64(d.trackBucket) / mudDegree; l < thresh {
+ if r < thresh {
+ d.trackSum += area
+ } else {
+ d.trackSum += area * (thresh - l) / (r - l)
+ }
+ if d.trackSum >= d.trackMass {
+ // The tracked mass now falls in a different
+ // bucket. Recompute the inverse cumulative sum.
+ d.setTrackMass(d.trackMass)
+ }
+ }
+}
+
+// setTrackMass sets the mass to track the inverse cumulative sum for.
+//
+// Specifically, mass is a cumulative duration, and the mutator
+// utilization bounds for this duration can be queried using
+// approxInvCumulativeSum.
+func (d *mud) setTrackMass(mass float64) {
+ d.trackMass = mass
+
+ // Find the bucket currently containing trackMass by computing
+ // the cumulative sum.
+ sum := 0.0
+ for i, val := range d.hist[:] {
+ newSum := sum + val
+ if newSum > mass {
+ // mass falls in bucket i.
+ d.trackBucket = i
+ d.trackSum = sum
+ return
+ }
+ sum = newSum
+ }
+ d.trackBucket = len(d.hist)
+ d.trackSum = sum
+}
+
+// approxInvCumulativeSum is like invCumulativeSum, but specifically
+// operates on the tracked mass and returns an upper and lower bound
+// approximation of the inverse cumulative sum.
+//
+// The true inverse cumulative sum will be in the range [lower, upper).
+func (d *mud) approxInvCumulativeSum() (float64, float64, bool) {
+ if d.trackBucket == len(d.hist) {
+ return math.NaN(), math.NaN(), false
+ }
+ return float64(d.trackBucket) / mudDegree, float64(d.trackBucket+1) / mudDegree, true
+}
+
+// invCumulativeSum returns x such that the integral of d from -∞ to x
+// is y. If the total weight of d is less than y, it returns the
+// maximum of the distribution and false.
+//
+// Specifically, y is a cumulative duration, and invCumulativeSum
+// returns the mutator utilization x such that at least y time has
+// been spent with mutator utilization <= x.
+func (d *mud) invCumulativeSum(y float64) (float64, bool) {
+ if len(d.sorted) == 0 && len(d.unsorted) == 0 {
+ return math.NaN(), false
+ }
+
+ // Sort edges.
+ edges := d.unsorted
+ sort.Slice(edges, func(i, j int) bool {
+ return edges[i].x < edges[j].x
+ })
+ // Merge with sorted edges.
+ d.unsorted = nil
+ if d.sorted == nil {
+ d.sorted = edges
+ } else {
+ oldSorted := d.sorted
+ newSorted := make([]edge, len(oldSorted)+len(edges))
+ i, j := 0, 0
+ for o := range newSorted {
+ if i >= len(oldSorted) {
+ copy(newSorted[o:], edges[j:])
+ break
+ } else if j >= len(edges) {
+ copy(newSorted[o:], oldSorted[i:])
+ break
+ } else if oldSorted[i].x < edges[j].x {
+ newSorted[o] = oldSorted[i]
+ i++
+ } else {
+ newSorted[o] = edges[j]
+ j++
+ }
+ }
+ d.sorted = newSorted
+ }
+
+ // Traverse edges in order computing a cumulative sum.
+ csum, rate, prevX := 0.0, 0.0, 0.0
+ for _, e := range d.sorted {
+ newCsum := csum + (e.x-prevX)*rate
+ if newCsum >= y {
+ // y was exceeded between the previous edge
+ // and this one.
+ if rate == 0 {
+ // Anywhere between prevX and
+ // e.x will do. We return e.x
+ // because that takes care of
+ // the y==0 case naturally.
+ return e.x, true
+ }
+ return (y-csum)/rate + prevX, true
+ }
+ newCsum += e.dirac
+ if newCsum >= y {
+ // y was exceeded by the Dirac delta at e.x.
+ return e.x, true
+ }
+ csum, prevX = newCsum, e.x
+ rate += e.delta
+ }
+ return prevX, false
+}
--- /dev/null
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package traceparser
+
+import (
+ "math/rand"
+ "testing"
+)
+
+func TestMUD(t *testing.T) {
+ // Insert random uniforms and check histogram mass and
+ // cumulative sum approximations.
+ rnd := rand.New(rand.NewSource(42))
+ mass := 0.0
+ var mud mud
+ for i := 0; i < 100; i++ {
+ area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64()
+ if rnd.Intn(10) == 0 {
+ r = l
+ }
+ t.Log(l, r, area)
+ mud.add(l, r, area)
+ mass += area
+
+ // Check total histogram weight.
+ hmass := 0.0
+ for _, val := range mud.hist {
+ hmass += val
+ }
+ if !aeq(mass, hmass) {
+ t.Fatalf("want mass %g, got %g", mass, hmass)
+ }
+
+ // Check inverse cumulative sum approximations.
+ for j := 0.0; j < mass; j += mass * 0.099 {
+ mud.setTrackMass(j)
+ l, u, ok := mud.approxInvCumulativeSum()
+ inv, ok2 := mud.invCumulativeSum(j)
+ if !ok || !ok2 {
+ t.Fatalf("inverse cumulative sum failed: approx %v, exact %v", ok, ok2)
+ }
+ if !(l <= inv && inv < u) {
+ t.Fatalf("inverse(%g) = %g, not ∈ [%g, %g)", j, inv, l, u)
+ }
+ }
+ }
+}
+
+func TestMUDTracking(t *testing.T) {
+ // Test that the tracked mass is tracked correctly across
+ // updates.
+ rnd := rand.New(rand.NewSource(42))
+ const uniforms = 100
+ for trackMass := 0.0; trackMass < uniforms; trackMass += uniforms / 50 {
+ var mud mud
+ mass := 0.0
+ mud.setTrackMass(trackMass)
+ for i := 0; i < uniforms; i++ {
+ area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64()
+ mud.add(l, r, area)
+ mass += area
+ l, u, ok := mud.approxInvCumulativeSum()
+ inv, ok2 := mud.invCumulativeSum(trackMass)
+
+ if mass < trackMass {
+ if ok {
+ t.Errorf("approx(%g) = [%g, %g), but mass = %g", trackMass, l, u, mass)
+ }
+ if ok2 {
+ t.Errorf("exact(%g) = %g, but mass = %g", trackMass, inv, mass)
+ }
+ } else {
+ if !ok {
+ t.Errorf("approx(%g) failed, but mass = %g", trackMass, mass)
+ }
+ if !ok2 {
+ t.Errorf("exact(%g) failed, but mass = %g", trackMass, mass)
+ }
+ if ok && ok2 && !(l <= inv && inv < u) {
+ t.Errorf("inverse(%g) = %g, not ∈ [%g, %g)", trackMass, inv, l, u)
+ }
+ }
+ }
+ }
+}