]> Cypherpunks repositories - gostls13.git/commitdiff
sync: scalable Pool
authorDmitriy Vyukov <dvyukov@google.com>
Fri, 24 Jan 2014 18:29:53 +0000 (22:29 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Fri, 24 Jan 2014 18:29:53 +0000 (22:29 +0400)
Introduce fixed-size P-local caches.
When local caches overflow/underflow a batch of items
is transferred to/from global mutex-protected cache.

benchmark                    old ns/op    new ns/op    delta
BenchmarkPool                    50554        22423  -55.65%
BenchmarkPool-4                 400359         5904  -98.53%
BenchmarkPool-16                403311         1598  -99.60%
BenchmarkPool-32                367310         1526  -99.58%

BenchmarkPoolOverlflow            5214         3633  -30.32%
BenchmarkPoolOverlflow-4         42663         9539  -77.64%
BenchmarkPoolOverlflow-8         46919        11385  -75.73%
BenchmarkPoolOverlflow-16        39454        13048  -66.93%

BenchmarkSprintfEmpty                    84           63  -25.68%
BenchmarkSprintfEmpty-2                 371           32  -91.13%
BenchmarkSprintfEmpty-4                 465           22  -95.25%
BenchmarkSprintfEmpty-8                 565           12  -97.77%
BenchmarkSprintfEmpty-16                498            5  -98.87%
BenchmarkSprintfEmpty-32                492            4  -99.04%

BenchmarkSprintfString                  259          229  -11.58%
BenchmarkSprintfString-2                574          144  -74.91%
BenchmarkSprintfString-4                651           77  -88.05%
BenchmarkSprintfString-8                868           47  -94.48%
BenchmarkSprintfString-16               825           33  -95.96%
BenchmarkSprintfString-32               825           30  -96.28%

BenchmarkSprintfInt                     213          188  -11.74%
BenchmarkSprintfInt-2                   448          138  -69.20%
BenchmarkSprintfInt-4                   624           52  -91.63%
BenchmarkSprintfInt-8                   691           31  -95.43%
BenchmarkSprintfInt-16                  724           18  -97.46%
BenchmarkSprintfInt-32                  718           16  -97.70%

BenchmarkSprintfIntInt                  311          282   -9.32%
BenchmarkSprintfIntInt-2                333          145  -56.46%
BenchmarkSprintfIntInt-4                642          110  -82.87%
BenchmarkSprintfIntInt-8                832           42  -94.90%
BenchmarkSprintfIntInt-16               817           24  -97.00%
BenchmarkSprintfIntInt-32               805           22  -97.17%

BenchmarkSprintfPrefixedInt             309          269  -12.94%
BenchmarkSprintfPrefixedInt-2           245          168  -31.43%
BenchmarkSprintfPrefixedInt-4           598           99  -83.36%
BenchmarkSprintfPrefixedInt-8           770           67  -91.23%
BenchmarkSprintfPrefixedInt-16          829           54  -93.49%
BenchmarkSprintfPrefixedInt-32          824           50  -93.83%

BenchmarkSprintfFloat                   418          398   -4.78%
BenchmarkSprintfFloat-2                 295          203  -31.19%
BenchmarkSprintfFloat-4                 585          128  -78.12%
BenchmarkSprintfFloat-8                 873           60  -93.13%
BenchmarkSprintfFloat-16                884           33  -96.24%
BenchmarkSprintfFloat-32                881           29  -96.62%

BenchmarkManyArgs                      1097         1069   -2.55%
BenchmarkManyArgs-2                     705          567  -19.57%
BenchmarkManyArgs-4                     792          319  -59.72%
BenchmarkManyArgs-8                     963          172  -82.14%
BenchmarkManyArgs-16                   1115          103  -90.76%
BenchmarkManyArgs-32                   1133           90  -92.03%

LGTM=rsc
R=golang-codereviews, bradfitz, minux.ma, gobot, rsc
CC=golang-codereviews
https://golang.org/cl/46010043

src/pkg/go/build/deps_test.go
src/pkg/runtime/mgc0.c
src/pkg/runtime/proc.c
src/pkg/sync/pool.go
src/pkg/sync/pool_test.go

index dd068d45583552df4b63ffa1d5b31cb44aa787ee..ab56b6554ca5a794b9bbe0a889d8ceb411a90c5b 100644 (file)
@@ -29,7 +29,7 @@ var pkgDeps = map[string][]string{
        "errors":      {},
        "io":          {"errors", "sync"},
        "runtime":     {"unsafe"},
-       "sync":        {"sync/atomic", "unsafe"},
+       "sync":        {"runtime", "sync/atomic", "unsafe"},
        "sync/atomic": {"unsafe"},
        "unsafe":      {},
 
index aa93bfbeda4673668aea7c11df9fa441ee4cc635..8b6eeab1050659e60ebf074ec0a0dd71694f0832 100644 (file)
@@ -68,15 +68,19 @@ clearpools(void)
 {
        void **pool, **next;
        P *p, **pp;
+       uintptr off;
        int32 i;
 
        // clear sync.Pool's
        for(pool = pools.head; pool != nil; pool = next) {
                next = pool[0];
                pool[0] = nil; // next
-               pool[1] = nil; // slice
-               pool[2] = nil;
-               pool[3] = nil;
+               pool[1] = nil; // local
+               pool[2] = nil; // localSize
+               off = (uintptr)pool[3] / sizeof(void*);
+               pool[off+0] = nil; // global slice
+               pool[off+1] = nil;
+               pool[off+2] = nil;
        }
        pools.head = nil;
 
index 24feda4183477ae61a98b65b393d13c59ef8ea5f..afe71ef69e8ae571280e05a64b68cece273d6df2 100644 (file)
@@ -3046,3 +3046,23 @@ haveexperiment(int8 *name)
        }
        return 0;
 }
+
+// func runtime_procPin() int
+void
+sync·runtime_procPin(intgo p)
+{
+       M *mp;
+
+       mp = m;
+       // Disable preemption.
+       mp->locks++;
+       p = mp->p->id;
+       FLUSH(&p);
+}
+
+// func runtime_procUnpin()
+void
+sync·runtime_procUnpin(void)
+{
+       m->locks--;
+}
index 9eb07c3a03f26aebee9b403d0c92d7ccd44dd815..1a388875464a84f264fe19e5778854c699342c3d 100644 (file)
@@ -4,6 +4,18 @@
 
 package sync
 
+import (
+       "runtime"
+       "sync/atomic"
+       "unsafe"
+)
+
+const (
+       cacheLineSize = 128
+       poolLocalSize = 2 * cacheLineSize
+       poolLocalCap  = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1
+)
+
 // A Pool is a set of temporary objects that may be individually saved
 // and retrieved.
 //
@@ -26,29 +38,52 @@ package sync
 //
 // This is an experimental type and might not be released.
 type Pool struct {
-       next *Pool         // for use by runtime. must be first.
-       list []interface{} // offset known to runtime
-       mu   Mutex         // guards list
+       // The following fields are known to runtime.
+       next         *Pool      // for use by runtime
+       local        *poolLocal // local fixed-size per-P pool, actually an array
+       localSize    uintptr    // size of the local array
+       globalOffset uintptr    // offset of global
+       // The rest is not known to runtime.
 
        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() interface{}
+
+       pad [cacheLineSize]byte
+       // Read-mostly date above this point, mutable data follows.
+       mu     Mutex
+       global []interface{} // global fallback pool
 }
 
-func runtime_registerPool(*Pool)
+// Local per-P Pool appendix.
+type poolLocal struct {
+       tail   int
+       unused int
+       buf    [poolLocalCap]interface{}
+}
+
+func init() {
+       var v poolLocal
+       if unsafe.Sizeof(v) != poolLocalSize {
+               panic("sync: incorrect pool size")
+       }
+}
 
 // Put adds x to the pool.
 func (p *Pool) Put(x interface{}) {
        if x == nil {
                return
        }
-       p.mu.Lock()
-       if p.list == nil {
-               runtime_registerPool(p)
+       l := p.pin()
+       t := l.tail
+       if t < int(poolLocalCap) {
+               l.buf[t] = x
+               l.tail = t + 1
+               runtime_procUnpin()
+               return
        }
-       p.list = append(p.list, x)
-       p.mu.Unlock()
+       p.putSlow(l, x)
 }
 
 // Get selects an arbitrary item from the Pool, removes it from the
@@ -60,16 +95,116 @@ func (p *Pool) Put(x interface{}) {
 // If Get would otherwise return nil and p.New is non-nil, Get returns
 // the result of calling p.New.
 func (p *Pool) Get() interface{} {
+       l := p.pin()
+       t := l.tail
+       if t > 0 {
+               t -= 1
+               x := l.buf[t]
+               l.tail = t
+               runtime_procUnpin()
+               return x
+       }
+       return p.getSlow()
+}
+
+func (p *Pool) putSlow(l *poolLocal, x interface{}) {
+       // Grab half of items from local pool and put to global pool.
+       // Can not lock the mutex while pinned.
+       const N = int(poolLocalCap/2 + 1)
+       var buf [N]interface{}
+       buf[0] = x
+       for i := 1; i < N; i++ {
+               l.tail--
+               buf[i] = l.buf[l.tail]
+       }
+       runtime_procUnpin()
+
        p.mu.Lock()
-       var x interface{}
-       if n := len(p.list); n > 0 {
-               x = p.list[n-1]
-               p.list[n-1] = nil // Just to be safe
-               p.list = p.list[:n-1]
+       p.global = append(p.global, buf[:]...)
+       p.mu.Unlock()
+}
+
+func (p *Pool) getSlow() (x interface{}) {
+       // Grab a batch of items from global pool and put to local pool.
+       // Can not lock the mutex while pinned.
+       runtime_procUnpin()
+       p.mu.Lock()
+       pid := runtime_procPin()
+       s := p.localSize
+       l := p.local
+       if uintptr(pid) < s {
+               l = indexLocal(l, pid)
+               // Get the item to return.
+               last := len(p.global) - 1
+               if last >= 0 {
+                       x = p.global[last]
+                       p.global = p.global[:last]
+               }
+               // Try to refill local pool, we may have been rescheduled to another P.
+               if last > 0 && l.tail == 0 {
+                       n := int(poolLocalCap / 2)
+                       gl := len(p.global)
+                       if n > gl {
+                               n = gl
+                       }
+                       copy(l.buf[:], p.global[gl-n:])
+                       p.global = p.global[:gl-n]
+                       l.tail = n
+               }
        }
+       runtime_procUnpin()
        p.mu.Unlock()
+
        if x == nil && p.New != nil {
                x = p.New()
        }
-       return x
+       return
 }
+
+// pin pins current goroutine to P, disables preemption and returns poolLocal pool for the P.
+// Caller must call runtime_procUnpin() when done with the pool.
+func (p *Pool) pin() *poolLocal {
+       pid := runtime_procPin()
+       // In pinSlow we store to localSize and then to local, here we load in opposite order.
+       // Since we've disabled preemption, GC can not happen in between.
+       // Thus here we must observe local at least as large localSize.
+       // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
+       s := atomic.LoadUintptr(&p.localSize) // load-acquire
+       l := p.local                          // load-consume
+       if uintptr(pid) < s {
+               return indexLocal(l, pid)
+       }
+       return p.pinSlow()
+}
+
+func (p *Pool) pinSlow() *poolLocal {
+       // Retry under the mutex.
+       runtime_procUnpin()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       pid := runtime_procPin()
+       s := p.localSize
+       l := p.local
+       if uintptr(pid) < s {
+               return indexLocal(l, pid)
+       }
+       if p.local == nil {
+               p.globalOffset = unsafe.Offsetof(p.global)
+               runtime_registerPool(p)
+       }
+       // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
+       size := runtime.GOMAXPROCS(0)
+       local := make([]poolLocal, size)
+       atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release
+       atomic.StoreUintptr(&p.localSize, uintptr(size))                                            // store-release
+       return &local[pid]
+}
+
+func indexLocal(l *poolLocal, i int) *poolLocal {
+       return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh...
+}
+
+// Implemented in runtime.
+func runtime_registerPool(*Pool)
+func runtime_procPin() int
+func runtime_procUnpin()
index e4aeda4be4ac312e04cc3f0f6ceb675f8d89bb86..3bf5131ea05d8835108283be90274901d2d5b178 100644 (file)
@@ -11,7 +11,6 @@ import (
        "sync/atomic"
        "testing"
        "time"
-       "unsafe"
 )
 
 func TestPool(t *testing.T) {
@@ -125,28 +124,41 @@ func TestPoolStress(t *testing.T) {
 }
 
 func BenchmarkPool(b *testing.B) {
-       procs := runtime.GOMAXPROCS(-1)
-       var dec func() bool
-       if unsafe.Sizeof(b.N) == 8 {
-               n := int64(b.N)
-               dec = func() bool {
-                       return atomic.AddInt64(&n, -1) >= 0
-               }
-       } else {
-               n := int32(b.N)
-               dec = func() bool {
-                       return atomic.AddInt32(&n, -1) >= 0
-               }
+       var p Pool
+       var wg WaitGroup
+       n0 := uintptr(b.N)
+       n := n0
+       for i := 0; i < runtime.GOMAXPROCS(0); i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
+                               for b := 0; b < 100; b++ {
+                                       p.Put(1)
+                                       p.Get()
+                               }
+                       }
+               }()
        }
+       wg.Wait()
+}
+
+func BenchmarkPoolOverlflow(b *testing.B) {
        var p Pool
        var wg WaitGroup
-       for i := 0; i < procs; i++ {
+       n0 := uintptr(b.N)
+       n := n0
+       for i := 0; i < runtime.GOMAXPROCS(0); i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
-                       for dec() {
-                               p.Put(1)
-                               p.Get()
+                       for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
+                               for b := 0; b < 100; b++ {
+                                       p.Put(1)
+                               }
+                               for b := 0; b < 100; b++ {
+                                       p.Get()
+                               }
                        }
                }()
        }