]> Cypherpunks repositories - gostls13.git/commitdiff
sync: add fast paths to WaitGroup
authorDmitriy Vyukov <dvyukov@google.com>
Mon, 18 Jul 2011 16:35:55 +0000 (12:35 -0400)
committerRuss Cox <rsc@golang.org>
Mon, 18 Jul 2011 16:35:55 +0000 (12:35 -0400)
benchmark                                        old ns/op    new ns/op    delta
BenchmarkWaitGroupUncontended                        93.50        33.60  -64.06%
BenchmarkWaitGroupUncontended-2                      44.30        16.90  -61.85%
BenchmarkWaitGroupUncontended-4                      21.80         8.47  -61.15%
BenchmarkWaitGroupUncontended-8                      12.10         4.86  -59.83%
BenchmarkWaitGroupUncontended-16                      7.38         3.35  -54.61%
BenchmarkWaitGroupAddDone                            58.40        33.70  -42.29%
BenchmarkWaitGroupAddDone-2                         293.00        85.80  -70.72%
BenchmarkWaitGroupAddDone-4                         243.00        51.10  -78.97%
BenchmarkWaitGroupAddDone-8                         236.00        52.20  -77.88%
BenchmarkWaitGroupAddDone-16                        215.00        43.30  -79.86%
BenchmarkWaitGroupAddDoneWork                       826.00       794.00   -3.87%
BenchmarkWaitGroupAddDoneWork-2                     450.00       424.00   -5.78%
BenchmarkWaitGroupAddDoneWork-4                     277.00       220.00  -20.58%
BenchmarkWaitGroupAddDoneWork-8                     440.00       116.00  -73.64%
BenchmarkWaitGroupAddDoneWork-16                    569.00        66.50  -88.31%
BenchmarkWaitGroupWait                               29.00         8.04  -72.28%
BenchmarkWaitGroupWait-2                             74.10         4.15  -94.40%
BenchmarkWaitGroupWait-4                            117.00         2.30  -98.03%
BenchmarkWaitGroupWait-8                            111.00         1.31  -98.82%
BenchmarkWaitGroupWait-16                           104.00         1.27  -98.78%
BenchmarkWaitGroupWaitWork                          802.00       792.00   -1.25%
BenchmarkWaitGroupWaitWork-2                        411.00       401.00   -2.43%
BenchmarkWaitGroupWaitWork-4                        210.00       199.00   -5.24%
BenchmarkWaitGroupWaitWork-8                        206.00       105.00  -49.03%
BenchmarkWaitGroupWaitWork-16                       334.00        54.40  -83.71%

R=rsc
CC=golang-dev
https://golang.org/cl/4672050

src/pkg/sync/atomic/asm_386.s
src/pkg/sync/atomic/asm_amd64.s
src/pkg/sync/atomic/asm_linux_arm.s
src/pkg/sync/atomic/atomic_test.go
src/pkg/sync/atomic/doc.go
src/pkg/sync/waitgroup.go
src/pkg/sync/waitgroup_test.go

index a9360efae9997d9759d9d5e650f0aee91db95fe2..914d2feeb49b9dc1b961f6279b780cdfd3f4f093 100644 (file)
@@ -85,3 +85,12 @@ addloop:
        MOVL    BX, retlo+12(FP)
        MOVL    CX, rethi+16(FP)
        RET
+
+TEXT ·LoadInt32(SB),7,$0
+       JMP     ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+       MOVL    addrptr+0(FP), AX
+       MOVL    0(AX), AX
+       MOVL    AX, ret+4(FP)
+       RET
index a260902a7183470bd245825d77b5e0d4e979a6cf..4282950632aa17a5e74c684fb6d1c6a3303de02b 100644 (file)
@@ -57,3 +57,13 @@ TEXT ·AddUint64(SB),7,$0
        ADDQ    AX, CX
        MOVQ    CX, ret+16(FP)
        RET
+
+TEXT ·LoadInt32(SB),7,$0
+       JMP     ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+       MOVQ    addrptr+0(FP), AX
+       MOVL    0(AX), AX
+       MOVL    AX, ret+8(FP)
+       RET
+
index 72f8d746bb269e2aba73eac3fe35f4f34c3624e4..a09e06703aac9d16de3ee36edb86a9bcb608018f 100644 (file)
@@ -83,3 +83,16 @@ TEXT ·AddInt64(SB),7,$0
 
 TEXT ·AddUint64(SB),7,$0
        B       ·armAddUint64(SB)
+
+TEXT ·LoadInt32(SB),7,$0
+       B       ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+       MOVW    addrptr+0(FP), R2
+loadloop1:
+       MOVW    0(R2), R0
+       MOVW    R0, R1
+       BL      cas<>(SB)
+       BCC     loadloop1
+       MOVW    R0, val+4(FP)
+       RET
index 119ad0036fdb5eea8efd079ee38fbeedd0f86df6..2229e58d0c72566c85942804a0a4b0a74d4fe064 100644 (file)
@@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) {
        }
 }
 
+func TestLoadInt32(t *testing.T) {
+       var x struct {
+               before int32
+               i      int32
+               after  int32
+       }
+       x.before = magic32
+       x.after = magic32
+       for delta := int32(1); delta+delta > delta; delta += delta {
+               k := LoadInt32(&x.i)
+               if k != x.i {
+                       t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+               }
+               x.i += delta
+       }
+       if x.before != magic32 || x.after != magic32 {
+               t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+       }
+}
+
+func TestLoadUint32(t *testing.T) {
+       var x struct {
+               before uint32
+               i      uint32
+               after  uint32
+       }
+       x.before = magic32
+       x.after = magic32
+       for delta := uint32(1); delta+delta > delta; delta += delta {
+               k := LoadUint32(&x.i)
+               if k != x.i {
+                       t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+               }
+               x.i += delta
+       }
+       if x.before != magic32 || x.after != magic32 {
+               t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+       }
+}
+
 // Tests of correct behavior, with contention.
 // (Is the function atomic?)
 //
@@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) {
                }
        }
 }
+
+func hammerLoadInt32(t *testing.T, uval *uint32) {
+       val := (*int32)(unsafe.Pointer(uval))
+       for {
+               v := LoadInt32(val)
+               vlo := v & ((1 << 16) - 1)
+               vhi := v >> 16
+               if vlo != vhi {
+                       t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi)
+               }
+               new := v + 1 + 1<<16
+               if vlo == 1e4 {
+                       new = 0
+               }
+               if CompareAndSwapInt32(val, v, new) {
+                       break
+               }
+       }
+}
+
+func hammerLoadUint32(t *testing.T, val *uint32) {
+       for {
+               v := LoadUint32(val)
+               vlo := v & ((1 << 16) - 1)
+               vhi := v >> 16
+               if vlo != vhi {
+                       t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi)
+               }
+               new := v + 1 + 1<<16
+               if vlo == 1e4 {
+                       new = 0
+               }
+               if CompareAndSwapUint32(val, v, new) {
+                       break
+               }
+       }
+}
+
+func TestHammerLoad(t *testing.T) {
+       tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32}
+       n := 100000
+       if testing.Short() {
+               n = 10000
+       }
+       const procs = 8
+       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs))
+       for _, tt := range tests {
+               c := make(chan int)
+               var val uint32
+               for p := 0; p < procs; p++ {
+                       go func() {
+                               for i := 0; i < n; i++ {
+                                       tt(t, &val)
+                               }
+                               c <- 1
+                       }()
+               }
+               for p := 0; p < procs; p++ {
+                       <-c
+               }
+       }
+}
index ec5a0d33af123c2a8920384a0fb964cb923ac570..b35eb539c054ff5bd89e86f9f9ab89f3c4f3d8ac 100644 (file)
@@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64)
 // AddUintptr atomically adds delta to *val and returns the new value.
 func AddUintptr(val *uintptr, delta uintptr) (new uintptr)
 
+// LoadInt32 atomically loads *addr.
+func LoadInt32(addr *int32) (val int32)
+
+// LoadUint32 atomically loads *addr.
+func LoadUint32(addr *uint32) (val uint32)
+
 // Helper for ARM.  Linker will discard on other systems
 func panic64() {
        panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)")
index 05478c630667eb705515e91305b331a850d49c45..a4c9b7e43cd852d3e778795d3c401f800e60f223 100644 (file)
@@ -4,7 +4,10 @@
 
 package sync
 
-import "runtime"
+import (
+       "runtime"
+       "sync/atomic"
+)
 
 // A WaitGroup waits for a collection of goroutines to finish.
 // The main goroutine calls Add to set the number of
@@ -28,8 +31,8 @@ import "runtime"
 // 
 type WaitGroup struct {
        m       Mutex
-       counter int
-       waiters int
+       counter int32
+       waiters int32
        sema    *uint32
 }
 
@@ -48,19 +51,19 @@ type WaitGroup struct {
 // Add adds delta, which may be negative, to the WaitGroup counter.
 // If the counter becomes zero, all goroutines blocked on Wait() are released.
 func (wg *WaitGroup) Add(delta int) {
-       wg.m.Lock()
-       if delta < -wg.counter {
-               wg.m.Unlock()
+       v := atomic.AddInt32(&wg.counter, int32(delta))
+       if v < 0 {
                panic("sync: negative WaitGroup count")
        }
-       wg.counter += delta
-       if wg.counter == 0 && wg.waiters > 0 {
-               for i := 0; i < wg.waiters; i++ {
-                       runtime.Semrelease(wg.sema)
-               }
-               wg.waiters = 0
-               wg.sema = nil
+       if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
+               return
        }
+       wg.m.Lock()
+       for i := int32(0); i < wg.waiters; i++ {
+               runtime.Semrelease(wg.sema)
+       }
+       wg.waiters = 0
+       wg.sema = nil
        wg.m.Unlock()
 }
 
@@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() {
 
 // Wait blocks until the WaitGroup counter is zero.
 func (wg *WaitGroup) Wait() {
+       if atomic.LoadInt32(&wg.counter) == 0 {
+               return
+       }
        wg.m.Lock()
-       if wg.counter == 0 {
+       atomic.AddInt32(&wg.waiters, 1)
+       // This code is racing with the unlocked path in Add above.
+       // The code above modifies counter and then reads waiters.
+       // We must modify waiters and then read counter (the opposite order)
+       // to avoid missing an Add.
+       if atomic.LoadInt32(&wg.counter) == 0 {
+               atomic.AddInt32(&wg.waiters, -1)
                wg.m.Unlock()
                return
        }
-       wg.waiters++
        if wg.sema == nil {
                wg.sema = new(uint32)
        }
index fe35732e7a613d1e7b85af4c942f6766ed9dd681..34430fc2158f2f1eb37ec857aa01b36370d13ae4 100644 (file)
@@ -5,7 +5,9 @@
 package sync_test
 
 import (
+       "runtime"
        . "sync"
+       "sync/atomic"
        "testing"
 )
 
@@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) {
        wg.Done()
        t.Fatal("Should panic")
 }
+
+func BenchmarkWaitGroupUncontended(b *testing.B) {
+       type PaddedWaitGroup struct {
+               WaitGroup
+               pad [128]uint8
+       }
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       var wg PaddedWaitGroup
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               runtime.Gosched()
+                               for g := 0; g < CallsPerSched; g++ {
+                                       wg.Add(1)
+                                       wg.Done()
+                                       wg.Wait()
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       var wg WaitGroup
+       for p := 0; p < procs; p++ {
+               go func() {
+                       foo := 0
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               runtime.Gosched()
+                               for g := 0; g < CallsPerSched; g++ {
+                                       wg.Add(1)
+                                       for i := 0; i < localWork; i++ {
+                                               foo *= 2
+                                               foo /= 2
+                                       }
+                                       wg.Done()
+                               }
+                       }
+                       c <- foo == 42
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkWaitGroupAddDone(b *testing.B) {
+       benchmarkWaitGroupAddDone(b, 0)
+}
+
+func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
+       benchmarkWaitGroupAddDone(b, 100)
+}
+
+func benchmarkWaitGroupWait(b *testing.B, localWork int) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       var wg WaitGroup
+       wg.Add(procs)
+       for p := 0; p < procs; p++ {
+               go wg.Done()
+       }
+       for p := 0; p < procs; p++ {
+               go func() {
+                       foo := 0
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               runtime.Gosched()
+                               for g := 0; g < CallsPerSched; g++ {
+                                       wg.Wait()
+                                       for i := 0; i < localWork; i++ {
+                                               foo *= 2
+                                               foo /= 2
+                                       }
+                               }
+                       }
+                       c <- foo == 42
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkWaitGroupWait(b *testing.B) {
+       benchmarkWaitGroupWait(b, 0)
+}
+
+func BenchmarkWaitGroupWaitWork(b *testing.B) {
+       benchmarkWaitGroupWait(b, 100)
+}