From: Dmitriy Vyukov Date: Mon, 17 Feb 2014 02:29:56 +0000 (+0400) Subject: testing: ease writing parallel benchmarks X-Git-Tag: go1.3beta1~679 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=c3922f0a63b826000e6ab46cadc344782fbae178;p=gostls13.git testing: ease writing parallel benchmarks Add b.RunParallel function that captures parallel benchmark boilerplate: creates worker goroutines, joins worker goroutines, distributes work among them in an efficient way, auto-tunes grain size. Fixes #7090. R=bradfitz, iant, josharian, tracey.brendan, r, rsc, gobot CC=golang-codereviews https://golang.org/cl/57270043 --- diff --git a/doc/go1.3.txt b/doc/go1.3.txt index d2ba78dddd..cdf241ae5a 100644 --- a/doc/go1.3.txt +++ b/doc/go1.3.txt @@ -4,5 +4,6 @@ misc/dist: renamed misc/makerelease (CL 39920043) runtime: output how long goroutines are blocked (CL 50420043) syscall: add NewCallbackCDecl to use for windows callbacks (CL 36180044) testing: diagnose buggy tests that panic(nil) (CL 55780043) +testing: add b.RunParallel function (CL 57270043) misc/benchcmp has been replaced by go tool benchcmp (CL 47980043) cmd/go, go/build: support .m files (CL 60590044) diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go index 3ee7fe465d..782176c883 100644 --- a/src/pkg/runtime/chan_test.go +++ b/src/pkg/runtime/chan_test.go @@ -455,146 +455,93 @@ func BenchmarkChanNonblocking(b *testing.B) { } func BenchmarkSelectUncontended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc1 := make(chan int, 1) - myc2 := make(chan int, 1) - myc1 <- 0 - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - myc2 <- 0 - case <-myc2: - myc1 <- 0 - } - } + b.RunParallel(func(pb *testing.PB) { + myc1 := make(chan int, 1) + myc2 := make(chan int, 1) + myc1 <- 0 + for pb.Next() { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + }) } func BenchmarkSelectContended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) + procs := runtime.GOMAXPROCS(0) myc1 := make(chan int, procs) myc2 := make(chan int, procs) - for p := 0; p < procs; p++ { + b.RunParallel(func(pb *testing.PB) { myc1 <- 0 - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - myc2 <- 0 - case <-myc2: - myc1 <- 0 - } - } + for pb.Next() { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + } + }) } func BenchmarkSelectNonblock(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc1 := make(chan int) - myc2 := make(chan int) - myc3 := make(chan int, 1) - myc4 := make(chan int, 1) - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - select { - case <-myc1: - default: - } - select { - case myc2 <- 0: - default: - } - select { - case <-myc3: - default: - } - select { - case myc4 <- 0: - default: - } - } + b.RunParallel(func(pb *testing.PB) { + myc1 := make(chan int) + myc2 := make(chan int) + myc3 := make(chan int, 1) + myc4 := make(chan int, 1) + for pb.Next() { + select { + case <-myc1: + default: } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + select { + case myc2 <- 0: + default: + } + select { + case <-myc3: + default: + } + select { + case myc4 <- 0: + default: + } + } + }) } func BenchmarkChanUncontended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - myc := make(chan int, CallsPerSched) - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - myc <- 0 - } - for g := 0; g < CallsPerSched; g++ { - <-myc - } + const C = 100 + b.RunParallel(func(pb *testing.PB) { + myc := make(chan int, C) + for pb.Next() { + for i := 0; i < C; i++ { + myc <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + for i := 0; i < C; i++ { + <-myc + } + } + }) } func BenchmarkChanContended(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - myc := make(chan int, procs*CallsPerSched) - for p := 0; p < procs; p++ { - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - myc <- 0 - } - for g := 0; g < CallsPerSched; g++ { - <-myc - } + const C = 100 + myc := make(chan int, C*runtime.GOMAXPROCS(0)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < C; i++ { + myc <- 0 } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + for i := 0; i < C; i++ { + <-myc + } + } + }) } func BenchmarkChanSync(b *testing.B) { @@ -755,25 +702,13 @@ func BenchmarkSelectProdCons(b *testing.B) { } func BenchmarkChanCreation(b *testing.B) { - const CallsPerSched = 1000 - procs := runtime.GOMAXPROCS(-1) - N := int32(b.N / CallsPerSched) - c := make(chan bool, procs) - for p := 0; p < procs; p++ { - go func() { - for atomic.AddInt32(&N, -1) >= 0 { - for g := 0; g < CallsPerSched; g++ { - myc := make(chan int, 1) - myc <- 0 - <-myc - } - } - c <- true - }() - } - for p := 0; p < procs; p++ { - <-c - } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + myc := make(chan int, 1) + myc <- 0 + <-myc + } + }) } func BenchmarkChanSem(b *testing.B) { diff --git a/src/pkg/testing/benchmark.go b/src/pkg/testing/benchmark.go index cff0774179..e6f3c6d790 100644 --- a/src/pkg/testing/benchmark.go +++ b/src/pkg/testing/benchmark.go @@ -10,6 +10,7 @@ import ( "os" "runtime" "sync" + "sync/atomic" "time" ) @@ -34,12 +35,15 @@ type InternalBenchmark struct { // timing and to specify the number of iterations to run. type B struct { common - N int - benchmark InternalBenchmark - bytes int64 - timerOn bool - showAllocResult bool - result BenchmarkResult + N int + previousN int // number of iterations in the previous run + previousDuration time.Duration // total duration of the previous run + benchmark InternalBenchmark + bytes int64 + timerOn bool + showAllocResult bool + result BenchmarkResult + parallelism int // RunParallel creates parallelism*GOMAXPROCS goroutines // The initial states of memStats.Mallocs and memStats.TotalAlloc. startAllocs uint64 startBytes uint64 @@ -114,10 +118,13 @@ func (b *B) runN(n int) { // by clearing garbage from previous runs. runtime.GC() b.N = n + b.parallelism = 1 b.ResetTimer() b.StartTimer() b.benchmark.F(b) b.StopTimer() + b.previousN = n + b.previousDuration = b.duration } func min(x, y int) int { @@ -343,6 +350,84 @@ func (b *B) trimOutput() { } } +// A PB is used by RunParallel for running parallel benchmarks. +type PB struct { + globalN *uint64 // shared between all worker goroutines iteration counter + grain uint64 // acquire that many iterations from globalN at once + cache uint64 // local cache of acquired iterations + bN uint64 // total number of iterations to execute (b.N) +} + +// Next reports whether there are more iterations to execute. +func (pb *PB) Next() bool { + if pb.cache == 0 { + n := atomic.AddUint64(pb.globalN, pb.grain) + if n <= pb.bN { + pb.cache = pb.grain + } else if n < pb.bN+pb.grain { + pb.cache = pb.bN + pb.grain - n + } else { + return false + } + } + pb.cache-- + return true +} + +// RunParallel runs a benchmark in parallel. +// It creates multiple goroutines and distributes b.N iterations among them. +// The number of goroutines defaults to GOMAXPROCS. To increase parallelism for +// non-CPU-bound benchmarks, call SetParallelism before RunParallel. +// RunParallel is usually used with the go test -cpu flag. +// +// The body function will be run in each goroutine. It should set up any +// goroutine-local state and then iterate until pb.Next returns false. +// It should not use the StartTimer, StopTimer, or ResetTimer functions, +// because they have global effect. +func (b *B) RunParallel(body func(*PB)) { + // Calculate grain size as number of iterations that take ~100µs. + // 100µs is enough to amortize the overhead and provide sufficient + // dynamic load balancing. + grain := uint64(0) + if b.previousN > 0 && b.previousDuration > 0 { + grain = 1e5 * uint64(b.previousN) / uint64(b.previousDuration) + } + if grain < 1 { + grain = 1 + } + // We expect the inner loop and function call to take at least 10ns, + // so do not do more than 100µs/10ns=1e4 iterations. + if grain > 1e4 { + grain = 1e4 + } + + n := uint64(0) + numProcs := b.parallelism * runtime.GOMAXPROCS(0) + var wg sync.WaitGroup + wg.Add(numProcs) + for p := 0; p < numProcs; p++ { + go func() { + defer wg.Done() + pb := &PB{ + globalN: &n, + grain: grain, + bN: uint64(b.N), + } + body(pb) + }() + } + wg.Wait() +} + +// SetParallelism sets the number of goroutines used by RunParallel to p*GOMAXPROCS. +// There is usually no need to call SetParallelism for CPU-bound benchmarks. +// If p is less than 1, this call will have no effect. +func (b *B) SetParallelism(p int) { + if p >= 1 { + b.parallelism = p + } +} + // Benchmark benchmarks a single function. Useful for creating // custom benchmarks that do not use the "go test" command. func Benchmark(f func(b *B)) BenchmarkResult { diff --git a/src/pkg/testing/benchmark_test.go b/src/pkg/testing/benchmark_test.go index 94e994dfae..9997b99204 100644 --- a/src/pkg/testing/benchmark_test.go +++ b/src/pkg/testing/benchmark_test.go @@ -5,7 +5,11 @@ package testing_test import ( + "bytes" + "runtime" + "sync/atomic" "testing" + "text/template" ) var roundDownTests = []struct { @@ -56,3 +60,53 @@ func TestRoundUp(t *testing.T) { } } } + +func TestRunParallel(t *testing.T) { + testing.Benchmark(func(b *testing.B) { + procs := uint32(0) + iters := uint64(0) + b.SetParallelism(3) + b.RunParallel(func(pb *testing.PB) { + atomic.AddUint32(&procs, 1) + for pb.Next() { + atomic.AddUint64(&iters, 1) + } + }) + if want := uint32(3 * runtime.GOMAXPROCS(0)); procs != want { + t.Errorf("got %v procs, want %v", procs, want) + } + if iters != uint64(b.N) { + t.Errorf("got %v iters, want %v", iters, b.N) + } + }) +} + +func TestRunParallelFail(t *testing.T) { + testing.Benchmark(func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + // The function must be able to log/abort + // w/o crashing/deadlocking the whole benchmark. + b.Log("log") + b.Error("error") + b.Fatal("fatal") + }) + }) +} + +func ExampleB_RunParallel() { + // Parallel benchmark for text/template.Template.Execute on a single object. + testing.Benchmark(func(b *testing.B) { + templ := template.Must(template.New("test").Parse("Hello, {{.}}!")) + // RunParallel will create GOMAXPROCS goroutines + // and distribute work among them. + b.RunParallel(func(pb *testing.PB) { + // Each goroutine has its own bytes.Buffer. + var buf bytes.Buffer + for pb.Next() { + // The loop body is executed b.N times total across all goroutines. + buf.Reset() + templ.Execute(&buf, "World") + } + }) + }) +} diff --git a/src/pkg/testing/testing.go b/src/pkg/testing/testing.go index 826d8e0120..855f3a9bbe 100644 --- a/src/pkg/testing/testing.go +++ b/src/pkg/testing/testing.go @@ -43,6 +43,7 @@ // // If a benchmark needs some expensive setup before running, the timer // may be reset: +// // func BenchmarkBigLen(b *testing.B) { // big := NewBig() // b.ResetTimer() @@ -51,6 +52,21 @@ // } // } // +// If a benchmark needs to test performance in a parallel setting, it may use +// the RunParallel helper function; such benchmarks are intended to be used with +// the go test -cpu flag: +// +// func BenchmarkTemplateParallel(b *testing.B) { +// templ := template.Must(template.New("test").Parse("Hello, {{.}}!")) +// b.RunParallel(func(pb *testing.PB) { +// var buf bytes.Buffer +// for pb.Next() { +// buf.Reset() +// templ.Execute(&buf, "World") +// } +// }) +// } +// // Examples // // The package also runs and verifies example code. Example functions may