]> Cypherpunks repositories - gostls13.git/commitdiff
testing: ease writing parallel benchmarks
authorDmitriy Vyukov <dvyukov@google.com>
Mon, 17 Feb 2014 02:29:56 +0000 (06:29 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Mon, 17 Feb 2014 02:29:56 +0000 (06:29 +0400)
Add b.RunParallel function that captures parallel benchmark boilerplate:
creates worker goroutines, joins worker goroutines, distributes work
among them in an efficient way, auto-tunes grain size.
Fixes #7090.

R=bradfitz, iant, josharian, tracey.brendan, r, rsc, gobot
CC=golang-codereviews
https://golang.org/cl/57270043

doc/go1.3.txt
src/pkg/runtime/chan_test.go
src/pkg/testing/benchmark.go
src/pkg/testing/benchmark_test.go
src/pkg/testing/testing.go

index d2ba78ddddf7682fd0da890d8a3e47a73c0f5454..cdf241ae5a0a848a1c7e43d62442176fd4569963 100644 (file)
@@ -4,5 +4,6 @@ misc/dist: renamed misc/makerelease (CL 39920043)
 runtime: output how long goroutines are blocked (CL 50420043)
 syscall: add NewCallbackCDecl to use for windows callbacks (CL 36180044)
 testing: diagnose buggy tests that panic(nil) (CL 55780043)
+testing: add b.RunParallel function (CL 57270043)
 misc/benchcmp has been replaced by go tool benchcmp (CL 47980043)
 cmd/go, go/build: support .m files (CL 60590044)
index 3ee7fe465d18be627ddd00292468970e3069b4da..782176c883639152035cbfe0a8da8c180043c935 100644 (file)
@@ -455,146 +455,93 @@ func BenchmarkChanNonblocking(b *testing.B) {
 }
 
 func BenchmarkSelectUncontended(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
-       for p := 0; p < procs; p++ {
-               go func() {
-                       myc1 := make(chan int, 1)
-                       myc2 := make(chan int, 1)
-                       myc1 <- 0
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       select {
-                                       case <-myc1:
-                                               myc2 <- 0
-                                       case <-myc2:
-                                               myc1 <- 0
-                                       }
-                               }
+       b.RunParallel(func(pb *testing.PB) {
+               myc1 := make(chan int, 1)
+               myc2 := make(chan int, 1)
+               myc1 <- 0
+               for pb.Next() {
+                       select {
+                       case <-myc1:
+                               myc2 <- 0
+                       case <-myc2:
+                               myc1 <- 0
                        }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+               }
+       })
 }
 
 func BenchmarkSelectContended(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
+       procs := runtime.GOMAXPROCS(0)
        myc1 := make(chan int, procs)
        myc2 := make(chan int, procs)
-       for p := 0; p < procs; p++ {
+       b.RunParallel(func(pb *testing.PB) {
                myc1 <- 0
-               go func() {
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       select {
-                                       case <-myc1:
-                                               myc2 <- 0
-                                       case <-myc2:
-                                               myc1 <- 0
-                                       }
-                               }
+               for pb.Next() {
+                       select {
+                       case <-myc1:
+                               myc2 <- 0
+                       case <-myc2:
+                               myc1 <- 0
                        }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+               }
+       })
 }
 
 func BenchmarkSelectNonblock(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
-       for p := 0; p < procs; p++ {
-               go func() {
-                       myc1 := make(chan int)
-                       myc2 := make(chan int)
-                       myc3 := make(chan int, 1)
-                       myc4 := make(chan int, 1)
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       select {
-                                       case <-myc1:
-                                       default:
-                                       }
-                                       select {
-                                       case myc2 <- 0:
-                                       default:
-                                       }
-                                       select {
-                                       case <-myc3:
-                                       default:
-                                       }
-                                       select {
-                                       case myc4 <- 0:
-                                       default:
-                                       }
-                               }
+       b.RunParallel(func(pb *testing.PB) {
+               myc1 := make(chan int)
+               myc2 := make(chan int)
+               myc3 := make(chan int, 1)
+               myc4 := make(chan int, 1)
+               for pb.Next() {
+                       select {
+                       case <-myc1:
+                       default:
                        }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+                       select {
+                       case myc2 <- 0:
+                       default:
+                       }
+                       select {
+                       case <-myc3:
+                       default:
+                       }
+                       select {
+                       case myc4 <- 0:
+                       default:
+                       }
+               }
+       })
 }
 
 func BenchmarkChanUncontended(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
-       for p := 0; p < procs; p++ {
-               go func() {
-                       myc := make(chan int, CallsPerSched)
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       myc <- 0
-                               }
-                               for g := 0; g < CallsPerSched; g++ {
-                                       <-myc
-                               }
+       const C = 100
+       b.RunParallel(func(pb *testing.PB) {
+               myc := make(chan int, C)
+               for pb.Next() {
+                       for i := 0; i < C; i++ {
+                               myc <- 0
                        }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+                       for i := 0; i < C; i++ {
+                               <-myc
+                       }
+               }
+       })
 }
 
 func BenchmarkChanContended(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
-       myc := make(chan int, procs*CallsPerSched)
-       for p := 0; p < procs; p++ {
-               go func() {
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       myc <- 0
-                               }
-                               for g := 0; g < CallsPerSched; g++ {
-                                       <-myc
-                               }
+       const C = 100
+       myc := make(chan int, C*runtime.GOMAXPROCS(0))
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       for i := 0; i < C; i++ {
+                               myc <- 0
                        }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+                       for i := 0; i < C; i++ {
+                               <-myc
+                       }
+               }
+       })
 }
 
 func BenchmarkChanSync(b *testing.B) {
@@ -755,25 +702,13 @@ func BenchmarkSelectProdCons(b *testing.B) {
 }
 
 func BenchmarkChanCreation(b *testing.B) {
-       const CallsPerSched = 1000
-       procs := runtime.GOMAXPROCS(-1)
-       N := int32(b.N / CallsPerSched)
-       c := make(chan bool, procs)
-       for p := 0; p < procs; p++ {
-               go func() {
-                       for atomic.AddInt32(&N, -1) >= 0 {
-                               for g := 0; g < CallsPerSched; g++ {
-                                       myc := make(chan int, 1)
-                                       myc <- 0
-                                       <-myc
-                               }
-                       }
-                       c <- true
-               }()
-       }
-       for p := 0; p < procs; p++ {
-               <-c
-       }
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       myc := make(chan int, 1)
+                       myc <- 0
+                       <-myc
+               }
+       })
 }
 
 func BenchmarkChanSem(b *testing.B) {
index cff0774179fae376dc871bda261395f4a87fe27d..e6f3c6d7903214c776d08afa983a56b7252e520d 100644 (file)
@@ -10,6 +10,7 @@ import (
        "os"
        "runtime"
        "sync"
+       "sync/atomic"
        "time"
 )
 
@@ -34,12 +35,15 @@ type InternalBenchmark struct {
 // timing and to specify the number of iterations to run.
 type B struct {
        common
-       N               int
-       benchmark       InternalBenchmark
-       bytes           int64
-       timerOn         bool
-       showAllocResult bool
-       result          BenchmarkResult
+       N                int
+       previousN        int           // number of iterations in the previous run
+       previousDuration time.Duration // total duration of the previous run
+       benchmark        InternalBenchmark
+       bytes            int64
+       timerOn          bool
+       showAllocResult  bool
+       result           BenchmarkResult
+       parallelism      int // RunParallel creates parallelism*GOMAXPROCS goroutines
        // The initial states of memStats.Mallocs and memStats.TotalAlloc.
        startAllocs uint64
        startBytes  uint64
@@ -114,10 +118,13 @@ func (b *B) runN(n int) {
        // by clearing garbage from previous runs.
        runtime.GC()
        b.N = n
+       b.parallelism = 1
        b.ResetTimer()
        b.StartTimer()
        b.benchmark.F(b)
        b.StopTimer()
+       b.previousN = n
+       b.previousDuration = b.duration
 }
 
 func min(x, y int) int {
@@ -343,6 +350,84 @@ func (b *B) trimOutput() {
        }
 }
 
+// A PB is used by RunParallel for running parallel benchmarks.
+type PB struct {
+       globalN *uint64 // shared between all worker goroutines iteration counter
+       grain   uint64  // acquire that many iterations from globalN at once
+       cache   uint64  // local cache of acquired iterations
+       bN      uint64  // total number of iterations to execute (b.N)
+}
+
+// Next reports whether there are more iterations to execute.
+func (pb *PB) Next() bool {
+       if pb.cache == 0 {
+               n := atomic.AddUint64(pb.globalN, pb.grain)
+               if n <= pb.bN {
+                       pb.cache = pb.grain
+               } else if n < pb.bN+pb.grain {
+                       pb.cache = pb.bN + pb.grain - n
+               } else {
+                       return false
+               }
+       }
+       pb.cache--
+       return true
+}
+
+// RunParallel runs a benchmark in parallel.
+// It creates multiple goroutines and distributes b.N iterations among them.
+// The number of goroutines defaults to GOMAXPROCS. To increase parallelism for
+// non-CPU-bound benchmarks, call SetParallelism before RunParallel.
+// RunParallel is usually used with the go test -cpu flag.
+//
+// The body function will be run in each goroutine. It should set up any
+// goroutine-local state and then iterate until pb.Next returns false.
+// It should not use the StartTimer, StopTimer, or ResetTimer functions,
+// because they have global effect.
+func (b *B) RunParallel(body func(*PB)) {
+       // Calculate grain size as number of iterations that take ~100µs.
+       // 100µs is enough to amortize the overhead and provide sufficient
+       // dynamic load balancing.
+       grain := uint64(0)
+       if b.previousN > 0 && b.previousDuration > 0 {
+               grain = 1e5 * uint64(b.previousN) / uint64(b.previousDuration)
+       }
+       if grain < 1 {
+               grain = 1
+       }
+       // We expect the inner loop and function call to take at least 10ns,
+       // so do not do more than 100µs/10ns=1e4 iterations.
+       if grain > 1e4 {
+               grain = 1e4
+       }
+
+       n := uint64(0)
+       numProcs := b.parallelism * runtime.GOMAXPROCS(0)
+       var wg sync.WaitGroup
+       wg.Add(numProcs)
+       for p := 0; p < numProcs; p++ {
+               go func() {
+                       defer wg.Done()
+                       pb := &PB{
+                               globalN: &n,
+                               grain:   grain,
+                               bN:      uint64(b.N),
+                       }
+                       body(pb)
+               }()
+       }
+       wg.Wait()
+}
+
+// SetParallelism sets the number of goroutines used by RunParallel to p*GOMAXPROCS.
+// There is usually no need to call SetParallelism for CPU-bound benchmarks.
+// If p is less than 1, this call will have no effect.
+func (b *B) SetParallelism(p int) {
+       if p >= 1 {
+               b.parallelism = p
+       }
+}
+
 // Benchmark benchmarks a single function. Useful for creating
 // custom benchmarks that do not use the "go test" command.
 func Benchmark(f func(b *B)) BenchmarkResult {
index 94e994dfae08b6af86d09c94ba7dd62ddfa111c0..9997b9920424509670b61166e94fa20406f37e72 100644 (file)
@@ -5,7 +5,11 @@
 package testing_test
 
 import (
+       "bytes"
+       "runtime"
+       "sync/atomic"
        "testing"
+       "text/template"
 )
 
 var roundDownTests = []struct {
@@ -56,3 +60,53 @@ func TestRoundUp(t *testing.T) {
                }
        }
 }
+
+func TestRunParallel(t *testing.T) {
+       testing.Benchmark(func(b *testing.B) {
+               procs := uint32(0)
+               iters := uint64(0)
+               b.SetParallelism(3)
+               b.RunParallel(func(pb *testing.PB) {
+                       atomic.AddUint32(&procs, 1)
+                       for pb.Next() {
+                               atomic.AddUint64(&iters, 1)
+                       }
+               })
+               if want := uint32(3 * runtime.GOMAXPROCS(0)); procs != want {
+                       t.Errorf("got %v procs, want %v", procs, want)
+               }
+               if iters != uint64(b.N) {
+                       t.Errorf("got %v iters, want %v", iters, b.N)
+               }
+       })
+}
+
+func TestRunParallelFail(t *testing.T) {
+       testing.Benchmark(func(b *testing.B) {
+               b.RunParallel(func(pb *testing.PB) {
+                       // The function must be able to log/abort
+                       // w/o crashing/deadlocking the whole benchmark.
+                       b.Log("log")
+                       b.Error("error")
+                       b.Fatal("fatal")
+               })
+       })
+}
+
+func ExampleB_RunParallel() {
+       // Parallel benchmark for text/template.Template.Execute on a single object.
+       testing.Benchmark(func(b *testing.B) {
+               templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
+               // RunParallel will create GOMAXPROCS goroutines
+               // and distribute work among them.
+               b.RunParallel(func(pb *testing.PB) {
+                       // Each goroutine has its own bytes.Buffer.
+                       var buf bytes.Buffer
+                       for pb.Next() {
+                               // The loop body is executed b.N times total across all goroutines.
+                               buf.Reset()
+                               templ.Execute(&buf, "World")
+                       }
+               })
+       })
+}
index 826d8e012063897c6b50a1ac464d99f43f788a0d..855f3a9bbe9145e1fd69d2111405adf36dbd47f5 100644 (file)
@@ -43,6 +43,7 @@
 //
 // If a benchmark needs some expensive setup before running, the timer
 // may be reset:
+//
 //     func BenchmarkBigLen(b *testing.B) {
 //         big := NewBig()
 //         b.ResetTimer()
 //         }
 //     }
 //
+// If a benchmark needs to test performance in a parallel setting, it may use
+// the RunParallel helper function; such benchmarks are intended to be used with
+// the go test -cpu flag:
+//
+//     func BenchmarkTemplateParallel(b *testing.B) {
+//         templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
+//         b.RunParallel(func(pb *testing.PB) {
+//             var buf bytes.Buffer
+//             for pb.Next() {
+//                 buf.Reset()
+//                 templ.Execute(&buf, "World")
+//             }
+//         })
+//     }
+//
 // Examples
 //
 // The package also runs and verifies example code. Example functions may