From: Dmitriy Vyukov Date: Mon, 18 Jul 2011 16:35:55 +0000 (-0400) Subject: sync: add fast paths to WaitGroup X-Git-Tag: weekly.2011-07-19~23 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=ee6e1a3ff77a41eff5a606a5aa8c46bf8b571a13;p=gostls13.git sync: add fast paths to WaitGroup benchmark old ns/op new ns/op delta BenchmarkWaitGroupUncontended 93.50 33.60 -64.06% BenchmarkWaitGroupUncontended-2 44.30 16.90 -61.85% BenchmarkWaitGroupUncontended-4 21.80 8.47 -61.15% BenchmarkWaitGroupUncontended-8 12.10 4.86 -59.83% BenchmarkWaitGroupUncontended-16 7.38 3.35 -54.61% BenchmarkWaitGroupAddDone 58.40 33.70 -42.29% BenchmarkWaitGroupAddDone-2 293.00 85.80 -70.72% BenchmarkWaitGroupAddDone-4 243.00 51.10 -78.97% BenchmarkWaitGroupAddDone-8 236.00 52.20 -77.88% BenchmarkWaitGroupAddDone-16 215.00 43.30 -79.86% BenchmarkWaitGroupAddDoneWork 826.00 794.00 -3.87% BenchmarkWaitGroupAddDoneWork-2 450.00 424.00 -5.78% BenchmarkWaitGroupAddDoneWork-4 277.00 220.00 -20.58% BenchmarkWaitGroupAddDoneWork-8 440.00 116.00 -73.64% BenchmarkWaitGroupAddDoneWork-16 569.00 66.50 -88.31% BenchmarkWaitGroupWait 29.00 8.04 -72.28% BenchmarkWaitGroupWait-2 74.10 4.15 -94.40% BenchmarkWaitGroupWait-4 117.00 2.30 -98.03% BenchmarkWaitGroupWait-8 111.00 1.31 -98.82% BenchmarkWaitGroupWait-16 104.00 1.27 -98.78% BenchmarkWaitGroupWaitWork 802.00 792.00 -1.25% BenchmarkWaitGroupWaitWork-2 411.00 401.00 -2.43% BenchmarkWaitGroupWaitWork-4 210.00 199.00 -5.24% BenchmarkWaitGroupWaitWork-8 206.00 105.00 -49.03% BenchmarkWaitGroupWaitWork-16 334.00 54.40 -83.71% R=rsc CC=golang-dev https://golang.org/cl/4672050 --- diff --git a/src/pkg/sync/atomic/asm_386.s b/src/pkg/sync/atomic/asm_386.s index a9360efae9..914d2feeb4 100644 --- a/src/pkg/sync/atomic/asm_386.s +++ b/src/pkg/sync/atomic/asm_386.s @@ -85,3 +85,12 @@ addloop: MOVL BX, retlo+12(FP) MOVL CX, rethi+16(FP) RET + +TEXT ·LoadInt32(SB),7,$0 + JMP ·LoadUint32(SB) + +TEXT ·LoadUint32(SB),7,$0 + MOVL addrptr+0(FP), AX + MOVL 0(AX), AX + MOVL AX, ret+4(FP) + RET diff --git a/src/pkg/sync/atomic/asm_amd64.s b/src/pkg/sync/atomic/asm_amd64.s index a260902a71..4282950632 100644 --- a/src/pkg/sync/atomic/asm_amd64.s +++ b/src/pkg/sync/atomic/asm_amd64.s @@ -57,3 +57,13 @@ TEXT ·AddUint64(SB),7,$0 ADDQ AX, CX MOVQ CX, ret+16(FP) RET + +TEXT ·LoadInt32(SB),7,$0 + JMP ·LoadUint32(SB) + +TEXT ·LoadUint32(SB),7,$0 + MOVQ addrptr+0(FP), AX + MOVL 0(AX), AX + MOVL AX, ret+8(FP) + RET + diff --git a/src/pkg/sync/atomic/asm_linux_arm.s b/src/pkg/sync/atomic/asm_linux_arm.s index 72f8d746bb..a09e06703a 100644 --- a/src/pkg/sync/atomic/asm_linux_arm.s +++ b/src/pkg/sync/atomic/asm_linux_arm.s @@ -83,3 +83,16 @@ TEXT ·AddInt64(SB),7,$0 TEXT ·AddUint64(SB),7,$0 B ·armAddUint64(SB) + +TEXT ·LoadInt32(SB),7,$0 + B ·LoadUint32(SB) + +TEXT ·LoadUint32(SB),7,$0 + MOVW addrptr+0(FP), R2 +loadloop1: + MOVW 0(R2), R0 + MOVW R0, R1 + BL cas<>(SB) + BCC loadloop1 + MOVW R0, val+4(FP) + RET diff --git a/src/pkg/sync/atomic/atomic_test.go b/src/pkg/sync/atomic/atomic_test.go index 119ad0036f..2229e58d0c 100644 --- a/src/pkg/sync/atomic/atomic_test.go +++ b/src/pkg/sync/atomic/atomic_test.go @@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) { } } +func TestLoadInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + for delta := int32(1); delta+delta > delta; delta += delta { + k := LoadInt32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestLoadUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + for delta := uint32(1); delta+delta > delta; delta += delta { + k := LoadUint32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + // Tests of correct behavior, with contention. // (Is the function atomic?) // @@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) { } } } + +func hammerLoadInt32(t *testing.T, uval *uint32) { + val := (*int32)(unsafe.Pointer(uval)) + for { + v := LoadInt32(val) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + if CompareAndSwapInt32(val, v, new) { + break + } + } +} + +func hammerLoadUint32(t *testing.T, val *uint32) { + for { + v := LoadUint32(val) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + if CompareAndSwapUint32(val, v, new) { + break + } + } +} + +func TestHammerLoad(t *testing.T) { + tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32} + n := 100000 + if testing.Short() { + n = 10000 + } + const procs = 8 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs)) + for _, tt := range tests { + c := make(chan int) + var val uint32 + for p := 0; p < procs; p++ { + go func() { + for i := 0; i < n; i++ { + tt(t, &val) + } + c <- 1 + }() + } + for p := 0; p < procs; p++ { + <-c + } + } +} diff --git a/src/pkg/sync/atomic/doc.go b/src/pkg/sync/atomic/doc.go index ec5a0d33af..b35eb539c0 100644 --- a/src/pkg/sync/atomic/doc.go +++ b/src/pkg/sync/atomic/doc.go @@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64) // AddUintptr atomically adds delta to *val and returns the new value. func AddUintptr(val *uintptr, delta uintptr) (new uintptr) +// LoadInt32 atomically loads *addr. +func LoadInt32(addr *int32) (val int32) + +// LoadUint32 atomically loads *addr. +func LoadUint32(addr *uint32) (val uint32) + // Helper for ARM. Linker will discard on other systems func panic64() { panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)") diff --git a/src/pkg/sync/waitgroup.go b/src/pkg/sync/waitgroup.go index 05478c6306..a4c9b7e43c 100644 --- a/src/pkg/sync/waitgroup.go +++ b/src/pkg/sync/waitgroup.go @@ -4,7 +4,10 @@ package sync -import "runtime" +import ( + "runtime" + "sync/atomic" +) // A WaitGroup waits for a collection of goroutines to finish. // The main goroutine calls Add to set the number of @@ -28,8 +31,8 @@ import "runtime" // type WaitGroup struct { m Mutex - counter int - waiters int + counter int32 + waiters int32 sema *uint32 } @@ -48,19 +51,19 @@ type WaitGroup struct { // Add adds delta, which may be negative, to the WaitGroup counter. // If the counter becomes zero, all goroutines blocked on Wait() are released. func (wg *WaitGroup) Add(delta int) { - wg.m.Lock() - if delta < -wg.counter { - wg.m.Unlock() + v := atomic.AddInt32(&wg.counter, int32(delta)) + if v < 0 { panic("sync: negative WaitGroup count") } - wg.counter += delta - if wg.counter == 0 && wg.waiters > 0 { - for i := 0; i < wg.waiters; i++ { - runtime.Semrelease(wg.sema) - } - wg.waiters = 0 - wg.sema = nil + if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 { + return } + wg.m.Lock() + for i := int32(0); i < wg.waiters; i++ { + runtime.Semrelease(wg.sema) + } + wg.waiters = 0 + wg.sema = nil wg.m.Unlock() } @@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() { // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { + if atomic.LoadInt32(&wg.counter) == 0 { + return + } wg.m.Lock() - if wg.counter == 0 { + atomic.AddInt32(&wg.waiters, 1) + // This code is racing with the unlocked path in Add above. + // The code above modifies counter and then reads waiters. + // We must modify waiters and then read counter (the opposite order) + // to avoid missing an Add. + if atomic.LoadInt32(&wg.counter) == 0 { + atomic.AddInt32(&wg.waiters, -1) wg.m.Unlock() return } - wg.waiters++ if wg.sema == nil { wg.sema = new(uint32) } diff --git a/src/pkg/sync/waitgroup_test.go b/src/pkg/sync/waitgroup_test.go index fe35732e7a..34430fc215 100644 --- a/src/pkg/sync/waitgroup_test.go +++ b/src/pkg/sync/waitgroup_test.go @@ -5,7 +5,9 @@ package sync_test import ( + "runtime" . "sync" + "sync/atomic" "testing" ) @@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) { wg.Done() t.Fatal("Should panic") } + +func BenchmarkWaitGroupUncontended(b *testing.B) { + type PaddedWaitGroup struct { + WaitGroup + pad [128]uint8 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var wg PaddedWaitGroup + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + wg.Done() + wg.Wait() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkWaitGroupAddDone(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg WaitGroup + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + wg.Done() + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupAddDone(b *testing.B) { + benchmarkWaitGroupAddDone(b, 0) +} + +func BenchmarkWaitGroupAddDoneWork(b *testing.B) { + benchmarkWaitGroupAddDone(b, 100) +} + +func benchmarkWaitGroupWait(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg WaitGroup + wg.Add(procs) + for p := 0; p < procs; p++ { + go wg.Done() + } + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupWait(b *testing.B) { + benchmarkWaitGroupWait(b, 0) +} + +func BenchmarkWaitGroupWaitWork(b *testing.B) { + benchmarkWaitGroupWait(b, 100) +}