From f8e0057bb71cded5bb2d0b09c6292b13c59b5748 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Fri, 24 Jan 2014 22:29:53 +0400 Subject: [PATCH] sync: scalable Pool Introduce fixed-size P-local caches. When local caches overflow/underflow a batch of items is transferred to/from global mutex-protected cache. benchmark old ns/op new ns/op delta BenchmarkPool 50554 22423 -55.65% BenchmarkPool-4 400359 5904 -98.53% BenchmarkPool-16 403311 1598 -99.60% BenchmarkPool-32 367310 1526 -99.58% BenchmarkPoolOverlflow 5214 3633 -30.32% BenchmarkPoolOverlflow-4 42663 9539 -77.64% BenchmarkPoolOverlflow-8 46919 11385 -75.73% BenchmarkPoolOverlflow-16 39454 13048 -66.93% BenchmarkSprintfEmpty 84 63 -25.68% BenchmarkSprintfEmpty-2 371 32 -91.13% BenchmarkSprintfEmpty-4 465 22 -95.25% BenchmarkSprintfEmpty-8 565 12 -97.77% BenchmarkSprintfEmpty-16 498 5 -98.87% BenchmarkSprintfEmpty-32 492 4 -99.04% BenchmarkSprintfString 259 229 -11.58% BenchmarkSprintfString-2 574 144 -74.91% BenchmarkSprintfString-4 651 77 -88.05% BenchmarkSprintfString-8 868 47 -94.48% BenchmarkSprintfString-16 825 33 -95.96% BenchmarkSprintfString-32 825 30 -96.28% BenchmarkSprintfInt 213 188 -11.74% BenchmarkSprintfInt-2 448 138 -69.20% BenchmarkSprintfInt-4 624 52 -91.63% BenchmarkSprintfInt-8 691 31 -95.43% BenchmarkSprintfInt-16 724 18 -97.46% BenchmarkSprintfInt-32 718 16 -97.70% BenchmarkSprintfIntInt 311 282 -9.32% BenchmarkSprintfIntInt-2 333 145 -56.46% BenchmarkSprintfIntInt-4 642 110 -82.87% BenchmarkSprintfIntInt-8 832 42 -94.90% BenchmarkSprintfIntInt-16 817 24 -97.00% BenchmarkSprintfIntInt-32 805 22 -97.17% BenchmarkSprintfPrefixedInt 309 269 -12.94% BenchmarkSprintfPrefixedInt-2 245 168 -31.43% BenchmarkSprintfPrefixedInt-4 598 99 -83.36% BenchmarkSprintfPrefixedInt-8 770 67 -91.23% BenchmarkSprintfPrefixedInt-16 829 54 -93.49% BenchmarkSprintfPrefixedInt-32 824 50 -93.83% BenchmarkSprintfFloat 418 398 -4.78% BenchmarkSprintfFloat-2 295 203 -31.19% BenchmarkSprintfFloat-4 585 128 -78.12% BenchmarkSprintfFloat-8 873 60 -93.13% BenchmarkSprintfFloat-16 884 33 -96.24% BenchmarkSprintfFloat-32 881 29 -96.62% BenchmarkManyArgs 1097 1069 -2.55% BenchmarkManyArgs-2 705 567 -19.57% BenchmarkManyArgs-4 792 319 -59.72% BenchmarkManyArgs-8 963 172 -82.14% BenchmarkManyArgs-16 1115 103 -90.76% BenchmarkManyArgs-32 1133 90 -92.03% LGTM=rsc R=golang-codereviews, bradfitz, minux.ma, gobot, rsc CC=golang-codereviews https://golang.org/cl/46010043 --- src/pkg/go/build/deps_test.go | 2 +- src/pkg/runtime/mgc0.c | 10 ++- src/pkg/runtime/proc.c | 20 +++++ src/pkg/sync/pool.go | 165 ++++++++++++++++++++++++++++++---- src/pkg/sync/pool_test.go | 46 ++++++---- 5 files changed, 207 insertions(+), 36 deletions(-) diff --git a/src/pkg/go/build/deps_test.go b/src/pkg/go/build/deps_test.go index dd068d4558..ab56b6554c 100644 --- a/src/pkg/go/build/deps_test.go +++ b/src/pkg/go/build/deps_test.go @@ -29,7 +29,7 @@ var pkgDeps = map[string][]string{ "errors": {}, "io": {"errors", "sync"}, "runtime": {"unsafe"}, - "sync": {"sync/atomic", "unsafe"}, + "sync": {"runtime", "sync/atomic", "unsafe"}, "sync/atomic": {"unsafe"}, "unsafe": {}, diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c index aa93bfbeda..8b6eeab105 100644 --- a/src/pkg/runtime/mgc0.c +++ b/src/pkg/runtime/mgc0.c @@ -68,15 +68,19 @@ clearpools(void) { void **pool, **next; P *p, **pp; + uintptr off; int32 i; // clear sync.Pool's for(pool = pools.head; pool != nil; pool = next) { next = pool[0]; pool[0] = nil; // next - pool[1] = nil; // slice - pool[2] = nil; - pool[3] = nil; + pool[1] = nil; // local + pool[2] = nil; // localSize + off = (uintptr)pool[3] / sizeof(void*); + pool[off+0] = nil; // global slice + pool[off+1] = nil; + pool[off+2] = nil; } pools.head = nil; diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 24feda4183..afe71ef69e 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -3046,3 +3046,23 @@ haveexperiment(int8 *name) } return 0; } + +// func runtime_procPin() int +void +sync·runtime_procPin(intgo p) +{ + M *mp; + + mp = m; + // Disable preemption. + mp->locks++; + p = mp->p->id; + FLUSH(&p); +} + +// func runtime_procUnpin() +void +sync·runtime_procUnpin(void) +{ + m->locks--; +} diff --git a/src/pkg/sync/pool.go b/src/pkg/sync/pool.go index 9eb07c3a03..1a38887546 100644 --- a/src/pkg/sync/pool.go +++ b/src/pkg/sync/pool.go @@ -4,6 +4,18 @@ package sync +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +const ( + cacheLineSize = 128 + poolLocalSize = 2 * cacheLineSize + poolLocalCap = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1 +) + // A Pool is a set of temporary objects that may be individually saved // and retrieved. // @@ -26,29 +38,52 @@ package sync // // This is an experimental type and might not be released. type Pool struct { - next *Pool // for use by runtime. must be first. - list []interface{} // offset known to runtime - mu Mutex // guards list + // The following fields are known to runtime. + next *Pool // for use by runtime + local *poolLocal // local fixed-size per-P pool, actually an array + localSize uintptr // size of the local array + globalOffset uintptr // offset of global + // The rest is not known to runtime. // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} + + pad [cacheLineSize]byte + // Read-mostly date above this point, mutable data follows. + mu Mutex + global []interface{} // global fallback pool } -func runtime_registerPool(*Pool) +// Local per-P Pool appendix. +type poolLocal struct { + tail int + unused int + buf [poolLocalCap]interface{} +} + +func init() { + var v poolLocal + if unsafe.Sizeof(v) != poolLocalSize { + panic("sync: incorrect pool size") + } +} // Put adds x to the pool. func (p *Pool) Put(x interface{}) { if x == nil { return } - p.mu.Lock() - if p.list == nil { - runtime_registerPool(p) + l := p.pin() + t := l.tail + if t < int(poolLocalCap) { + l.buf[t] = x + l.tail = t + 1 + runtime_procUnpin() + return } - p.list = append(p.list, x) - p.mu.Unlock() + p.putSlow(l, x) } // Get selects an arbitrary item from the Pool, removes it from the @@ -60,16 +95,116 @@ func (p *Pool) Put(x interface{}) { // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() interface{} { + l := p.pin() + t := l.tail + if t > 0 { + t -= 1 + x := l.buf[t] + l.tail = t + runtime_procUnpin() + return x + } + return p.getSlow() +} + +func (p *Pool) putSlow(l *poolLocal, x interface{}) { + // Grab half of items from local pool and put to global pool. + // Can not lock the mutex while pinned. + const N = int(poolLocalCap/2 + 1) + var buf [N]interface{} + buf[0] = x + for i := 1; i < N; i++ { + l.tail-- + buf[i] = l.buf[l.tail] + } + runtime_procUnpin() + p.mu.Lock() - var x interface{} - if n := len(p.list); n > 0 { - x = p.list[n-1] - p.list[n-1] = nil // Just to be safe - p.list = p.list[:n-1] + p.global = append(p.global, buf[:]...) + p.mu.Unlock() +} + +func (p *Pool) getSlow() (x interface{}) { + // Grab a batch of items from global pool and put to local pool. + // Can not lock the mutex while pinned. + runtime_procUnpin() + p.mu.Lock() + pid := runtime_procPin() + s := p.localSize + l := p.local + if uintptr(pid) < s { + l = indexLocal(l, pid) + // Get the item to return. + last := len(p.global) - 1 + if last >= 0 { + x = p.global[last] + p.global = p.global[:last] + } + // Try to refill local pool, we may have been rescheduled to another P. + if last > 0 && l.tail == 0 { + n := int(poolLocalCap / 2) + gl := len(p.global) + if n > gl { + n = gl + } + copy(l.buf[:], p.global[gl-n:]) + p.global = p.global[:gl-n] + l.tail = n + } } + runtime_procUnpin() p.mu.Unlock() + if x == nil && p.New != nil { x = p.New() } - return x + return } + +// pin pins current goroutine to P, disables preemption and returns poolLocal pool for the P. +// Caller must call runtime_procUnpin() when done with the pool. +func (p *Pool) pin() *poolLocal { + pid := runtime_procPin() + // In pinSlow we store to localSize and then to local, here we load in opposite order. + // Since we've disabled preemption, GC can not happen in between. + // Thus here we must observe local at least as large localSize. + // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). + s := atomic.LoadUintptr(&p.localSize) // load-acquire + l := p.local // load-consume + if uintptr(pid) < s { + return indexLocal(l, pid) + } + return p.pinSlow() +} + +func (p *Pool) pinSlow() *poolLocal { + // Retry under the mutex. + runtime_procUnpin() + p.mu.Lock() + defer p.mu.Unlock() + pid := runtime_procPin() + s := p.localSize + l := p.local + if uintptr(pid) < s { + return indexLocal(l, pid) + } + if p.local == nil { + p.globalOffset = unsafe.Offsetof(p.global) + runtime_registerPool(p) + } + // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. + size := runtime.GOMAXPROCS(0) + local := make([]poolLocal, size) + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release + atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release + return &local[pid] +} + +func indexLocal(l *poolLocal, i int) *poolLocal { + return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh... +} + +// Implemented in runtime. +func runtime_registerPool(*Pool) +func runtime_procPin() int +func runtime_procUnpin() diff --git a/src/pkg/sync/pool_test.go b/src/pkg/sync/pool_test.go index e4aeda4be4..3bf5131ea0 100644 --- a/src/pkg/sync/pool_test.go +++ b/src/pkg/sync/pool_test.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "testing" "time" - "unsafe" ) func TestPool(t *testing.T) { @@ -125,28 +124,41 @@ func TestPoolStress(t *testing.T) { } func BenchmarkPool(b *testing.B) { - procs := runtime.GOMAXPROCS(-1) - var dec func() bool - if unsafe.Sizeof(b.N) == 8 { - n := int64(b.N) - dec = func() bool { - return atomic.AddInt64(&n, -1) >= 0 - } - } else { - n := int32(b.N) - dec = func() bool { - return atomic.AddInt32(&n, -1) >= 0 - } + var p Pool + var wg WaitGroup + n0 := uintptr(b.N) + n := n0 + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for atomic.AddUintptr(&n, ^uintptr(0)) < n0 { + for b := 0; b < 100; b++ { + p.Put(1) + p.Get() + } + } + }() } + wg.Wait() +} + +func BenchmarkPoolOverlflow(b *testing.B) { var p Pool var wg WaitGroup - for i := 0; i < procs; i++ { + n0 := uintptr(b.N) + n := n0 + for i := 0; i < runtime.GOMAXPROCS(0); i++ { wg.Add(1) go func() { defer wg.Done() - for dec() { - p.Put(1) - p.Get() + for atomic.AddUintptr(&n, ^uintptr(0)) < n0 { + for b := 0; b < 100; b++ { + p.Put(1) + } + for b := 0; b < 100; b++ { + p.Get() + } } }() } -- 2.48.1