// Local per-P Pool appendix.
type poolLocalInternal struct {
- private interface{} // Can be used only by the respective P.
- shared []interface{} // Can be used by any P.
- Mutex // Protects shared.
+ private interface{} // Can be used only by the respective P.
+ shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
- l := p.pin()
+ l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
- runtime_procUnpin()
if x != nil {
- l.Lock()
- l.shared = append(l.shared, x)
- l.Unlock()
+ l.shared.pushHead(x)
}
+ runtime_procUnpin()
if race.Enabled {
race.Enable()
}
if race.Enabled {
race.Disable()
}
- l := p.pin()
+ l, pid := p.pin()
x := l.private
l.private = nil
- runtime_procUnpin()
if x == nil {
- l.Lock()
- last := len(l.shared) - 1
- if last >= 0 {
- x = l.shared[last]
- l.shared = l.shared[:last]
- }
- l.Unlock()
+ // Try to pop the head of the local shard. We prefer
+ // the head over the tail for temporal locality of
+ // reuse.
+ x, _ = l.shared.popHead()
if x == nil {
- x = p.getSlow()
+ x = p.getSlow(pid)
}
}
+ runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
return x
}
-func (p *Pool) getSlow() (x interface{}) {
+func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
- pid := runtime_procPin()
- runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
- l.Lock()
- last := len(l.shared) - 1
- if last >= 0 {
- x = l.shared[last]
- l.shared = l.shared[:last]
- l.Unlock()
- break
+ if x, _ := l.shared.popTail(); x != nil {
+ return x
}
- l.Unlock()
}
- return x
+ return nil
}
-// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
+// pin pins the current goroutine to P, disables preemption and
+// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
-func (p *Pool) pin() *poolLocal {
+func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
- return indexLocal(l, pid)
+ return indexLocal(l, pid), pid
}
return p.pinSlow()
}
-func (p *Pool) pinSlow() *poolLocal {
+func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
- return indexLocal(l, pid)
+ return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
- return &local[pid]
+ return &local[pid], pid
}
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
- // Defensively zero out everything, 2 reasons:
- // 1. To prevent false retention of whole Pools.
- // 2. If GC happens while a goroutine works with l.shared in Put/Get,
- // it will retain whole Pool. So next cycle memory consumption would be doubled.
+
+ // Because the world is stopped, no pool user can be in a
+ // pinned section (in effect, this has all Ps pinned).
for i, p := range allPools {
allPools[i] = nil
- for i := 0; i < int(p.localSize); i++ {
- l := indexLocal(p.local, i)
- l.private = nil
- for j := range l.shared {
- l.shared[j] = nil
- }
- l.shared = nil
- }
p.local = nil
p.localSize = 0
}