From 8fc6ed4c8901d13fe1a5aa176b0ba808e2855af5 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Mon, 14 Apr 2014 21:13:32 +0400 Subject: [PATCH] sync: less agressive local caching in Pool Currently Pool can cache up to 15 elements per P, and these elements are not accesible to other Ps. If a Pool caches large objects, say 2MB, and GOMAXPROCS is set to a large value, say 32, then the Pool can waste up to 960MB. The new caching policy caches at most 1 per-P element, the rest is shared between Ps. Get/Put performance is unchanged. Nested Get/Put performance is 57% worse. However, overall scalability of nested Get/Put is significantly improved, so the new policy starts winning under contention. benchmark old ns/op new ns/op delta BenchmarkPool 27.4 26.7 -2.55% BenchmarkPool-4 6.63 6.59 -0.60% BenchmarkPool-16 1.98 1.87 -5.56% BenchmarkPool-64 1.93 1.86 -3.63% BenchmarkPoolOverlflow 3970 6235 +57.05% BenchmarkPoolOverlflow-4 10935 1668 -84.75% BenchmarkPoolOverlflow-16 13419 520 -96.12% BenchmarkPoolOverlflow-64 10295 380 -96.31% LGTM=rsc R=rsc CC=golang-codereviews, khr https://golang.org/cl/86020043 --- src/pkg/runtime/mgc0.c | 28 ++---- src/pkg/sync/pool.go | 175 ++++++++++++++++++-------------------- src/pkg/sync/pool_test.go | 6 +- 3 files changed, 92 insertions(+), 117 deletions(-) diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c index 232ac7cd85..26a18d36c7 100644 --- a/src/pkg/runtime/mgc0.c +++ b/src/pkg/runtime/mgc0.c @@ -91,42 +91,24 @@ enum { // Initialized from $GOGC. GOGC=off means no gc. static int32 gcpercent = GcpercentUnknown; -static struct -{ - Lock; - void* head; -} pools; +static FuncVal* poolcleanup; void -sync·runtime_registerPool(void **p) +sync·runtime_registerPoolCleanup(FuncVal *f) { - runtime·lock(&pools); - p[0] = pools.head; - pools.head = p; - runtime·unlock(&pools); + poolcleanup = f; } static void clearpools(void) { - void **pool, **next; P *p, **pp; MCache *c; - uintptr off; int32 i; // clear sync.Pool's - for(pool = pools.head; pool != nil; pool = next) { - next = pool[0]; - pool[0] = nil; // next - pool[1] = nil; // local - pool[2] = nil; // localSize - off = (uintptr)pool[3] / sizeof(void*); - pool[off+0] = nil; // global slice - pool[off+1] = nil; - pool[off+2] = nil; - } - pools.head = nil; + if(poolcleanup != nil) + reflect·call(poolcleanup, nil, 0, 0); for(pp=runtime·allp; p=*pp; pp++) { // clear tinyalloc pool diff --git a/src/pkg/sync/pool.go b/src/pkg/sync/pool.go index a078cdc920..1f08707cd4 100644 --- a/src/pkg/sync/pool.go +++ b/src/pkg/sync/pool.go @@ -10,12 +10,6 @@ import ( "unsafe" ) -const ( - cacheLineSize = 128 - poolLocalSize = 2 * cacheLineSize - poolLocalCap = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1 -) - // A Pool is a set of temporary objects that may be individually saved and // retrieved. // @@ -46,36 +40,21 @@ const ( // free list. // type Pool struct { - // The following fields are known to runtime. - next *Pool // for use by runtime - local *poolLocal // local fixed-size per-P pool, actually an array - localSize uintptr // size of the local array - globalOffset uintptr // offset of global - // The rest is not known to runtime. + local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal + localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} - - pad [cacheLineSize]byte - // Read-mostly date above this point, mutable data follows. - mu Mutex - global []interface{} // global fallback pool } // Local per-P Pool appendix. type poolLocal struct { - tail int - unused int - buf [poolLocalCap]interface{} -} - -func init() { - var v poolLocal - if unsafe.Sizeof(v) != poolLocalSize { - panic("sync: incorrect pool size") - } + private interface{} // Can be used only by the respective P. + shared []interface{} // Can be used by any P. + Mutex // Protects shared. + pad [128]byte // Prevents false sharing. } // Put adds x to the pool. @@ -90,14 +69,17 @@ func (p *Pool) Put(x interface{}) { return } l := p.pin() - t := l.tail - if t < int(poolLocalCap) { - l.buf[t] = x - l.tail = t + 1 - runtime_procUnpin() + if l.private == nil { + l.private = x + x = nil + } + runtime_procUnpin() + if x == nil { return } - p.putSlow(l, x) + l.Lock() + l.shared = append(l.shared, x) + l.Unlock() } // Get selects an arbitrary item from the Pool, removes it from the @@ -116,69 +98,49 @@ func (p *Pool) Get() interface{} { return nil } l := p.pin() - t := l.tail - if t > 0 { - t -= 1 - x := l.buf[t] - l.tail = t - runtime_procUnpin() + x := l.private + l.private = nil + runtime_procUnpin() + if x != nil { return x } - return p.getSlow() -} - -func (p *Pool) putSlow(l *poolLocal, x interface{}) { - // Grab half of items from local pool and put to global pool. - // Can not lock the mutex while pinned. - const N = int(poolLocalCap/2 + 1) - var buf [N]interface{} - buf[0] = x - for i := 1; i < N; i++ { - l.tail-- - buf[i] = l.buf[l.tail] + l.Lock() + last := len(l.shared) - 1 + if last >= 0 { + x = l.shared[last] + l.shared = l.shared[:last] } - runtime_procUnpin() - - p.mu.Lock() - p.global = append(p.global, buf[:]...) - p.mu.Unlock() + l.Unlock() + if x != nil { + return x + } + return p.getSlow() } func (p *Pool) getSlow() (x interface{}) { - // Grab a batch of items from global pool and put to local pool. - // Can not lock the mutex while pinned. - runtime_procUnpin() - p.mu.Lock() + // See the comment in pin regarding ordering of the loads. + size := atomic.LoadUintptr(&p.localSize) // load-acquire + local := p.local // load-consume + // Try to steal one element from other procs. pid := runtime_procPin() - s := p.localSize - l := p.local - if uintptr(pid) < s { - l = indexLocal(l, pid) - // Get the item to return. - last := len(p.global) - 1 + runtime_procUnpin() + for i := 0; i < int(size); i++ { + l := indexLocal(local, (pid+i+1)%int(size)) + l.Lock() + last := len(l.shared) - 1 if last >= 0 { - x = p.global[last] - p.global = p.global[:last] - } - // Try to refill local pool, we may have been rescheduled to another P. - if last > 0 && l.tail == 0 { - n := int(poolLocalCap / 2) - gl := len(p.global) - if n > gl { - n = gl - } - copy(l.buf[:], p.global[gl-n:]) - p.global = p.global[:gl-n] - l.tail = n + x = l.shared[last] + l.shared = l.shared[:last] + l.Unlock() + break } + l.Unlock() } - runtime_procUnpin() - p.mu.Unlock() if x == nil && p.New != nil { x = p.New() } - return + return x } // pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. @@ -199,32 +161,63 @@ func (p *Pool) pin() *poolLocal { func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. + // Can not lock the mutex while pinned. runtime_procUnpin() - p.mu.Lock() - defer p.mu.Unlock() + allPoolsMu.Lock() + defer allPoolsMu.Unlock() pid := runtime_procPin() + // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } if p.local == nil { - p.globalOffset = unsafe.Offsetof(p.global) - runtime_registerPool(p) + allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release - atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release + atomic.StorePointer((*unsafe.Pointer)(&p.local), unsafe.Pointer(&local[0])) // store-release + atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid] } -func indexLocal(l *poolLocal, i int) *poolLocal { - return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh... +func poolCleanup() { + // This function is called with the world stopped, at the beginning of a garbage collection. + // It must not allocate and probably should not call any runtime functions. + // Defensively zero out everything, 2 reasons: + // 1. To prevent false retention of whole Pools. + // 2. If GC happens while a goroutine works with l.shared in Put/Get, + // it will retain whole Pool. So next cycle memory consumption would be doubled. + for i, p := range allPools { + allPools[i] = nil + for i := 0; i < int(p.localSize); i++ { + l := indexLocal(p.local, i) + l.private = nil + for j := range l.shared { + l.shared[j] = nil + } + l.shared = nil + } + } + allPools = []*Pool{} +} + +var ( + allPoolsMu Mutex + allPools []*Pool +) + +func init() { + runtime_registerPoolCleanup(poolCleanup) +} + +func indexLocal(l unsafe.Pointer, i int) *poolLocal { + return &(*[1000000]poolLocal)(l)[i] } // Implemented in runtime. -func runtime_registerPool(*Pool) +func runtime_registerPoolCleanup(cleanup func()) func runtime_procPin() int func runtime_procUnpin() diff --git a/src/pkg/sync/pool_test.go b/src/pkg/sync/pool_test.go index e444e50e5e..509448b620 100644 --- a/src/pkg/sync/pool_test.go +++ b/src/pkg/sync/pool_test.go @@ -25,12 +25,12 @@ func TestPool(t *testing.T) { } p.Put("a") p.Put("b") - if g := p.Get(); g != "b" { - t.Fatalf("got %#v; want b", g) - } if g := p.Get(); g != "a" { t.Fatalf("got %#v; want a", g) } + if g := p.Get(); g != "b" { + t.Fatalf("got %#v; want b", g) + } if g := p.Get(); g != nil { t.Fatalf("got %#v; want nil", g) } -- 2.50.0