]> Cypherpunks repositories - gostls13.git/commitdiff
sync: internal fixed size lock-free queue for sync.Pool
authorAustin Clements <austin@google.com>
Fri, 1 Mar 2019 18:16:37 +0000 (13:16 -0500)
committerAustin Clements <austin@google.com>
Fri, 5 Apr 2019 18:49:03 +0000 (18:49 +0000)
This is the first step toward fixing multiple issues with sync.Pool.
This adds a fixed size, lock-free, single-producer, multi-consumer
queue that will be used in the new Pool stealing implementation.

For #22950, #22331.

Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5
Reviewed-on: https://go-review.googlesource.com/c/go/+/166957
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Chase <drchase@google.com>
src/sync/export_test.go
src/sync/pool_test.go
src/sync/poolqueue.go [new file with mode: 0644]

index 669076efad3f12c5f27c8d408da923c58099eb28..0252b64f58919a5760b0ed527375c031921e0ef4 100644 (file)
@@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire
 var Runtime_Semrelease = runtime_Semrelease
 var Runtime_procPin = runtime_procPin
 var Runtime_procUnpin = runtime_procUnpin
+
+// poolDequeue testing.
+type PoolDequeue interface {
+       PushHead(val interface{}) bool
+       PopHead() (interface{}, bool)
+       PopTail() (interface{}, bool)
+}
+
+func NewPoolDequeue(n int) PoolDequeue {
+       return &poolDequeue{
+               vals: make([]eface, n),
+       }
+}
+
+func (d *poolDequeue) PushHead(val interface{}) bool {
+       return d.pushHead(val)
+}
+
+func (d *poolDequeue) PopHead() (interface{}, bool) {
+       return d.popHead()
+}
+
+func (d *poolDequeue) PopTail() (interface{}, bool) {
+       return d.popTail()
+}
index 9e5132bb18e168dfdb4dee96238f6150b5178202..6e9f9f3463c253f0e111c3ae4c8ac43761fea49b 100644 (file)
@@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) {
        }
 }
 
+func TestPoolDequeue(t *testing.T) {
+       const P = 10
+       // In long mode, do enough pushes to wrap around the 21-bit
+       // indexes.
+       N := 1<<21 + 1000
+       if testing.Short() {
+               N = 1e3
+       }
+       d := NewPoolDequeue(16)
+       have := make([]int32, N)
+       var stop int32
+       var wg WaitGroup
+
+       // Start P-1 consumers.
+       for i := 1; i < P; i++ {
+               wg.Add(1)
+               go func() {
+                       fail := 0
+                       for atomic.LoadInt32(&stop) == 0 {
+                               val, ok := d.PopTail()
+                               if ok {
+                                       fail = 0
+                                       atomic.AddInt32(&have[val.(int)], 1)
+                                       if val.(int) == N-1 {
+                                               atomic.StoreInt32(&stop, 1)
+                                       }
+                               } else {
+                                       // Speed up the test by
+                                       // allowing the pusher to run.
+                                       if fail++; fail%100 == 0 {
+                                               runtime.Gosched()
+                                       }
+                               }
+                       }
+                       wg.Done()
+               }()
+       }
+
+       // Start 1 producer.
+       nPopHead := 0
+       wg.Add(1)
+       go func() {
+               for j := 0; j < N; j++ {
+                       for !d.PushHead(j) {
+                               // Allow a popper to run.
+                               runtime.Gosched()
+                       }
+                       if j%10 == 0 {
+                               val, ok := d.PopHead()
+                               if ok {
+                                       nPopHead++
+                                       atomic.AddInt32(&have[val.(int)], 1)
+                               }
+                       }
+               }
+               wg.Done()
+       }()
+       wg.Wait()
+
+       // Check results.
+       for i, count := range have {
+               if count != 1 {
+                       t.Errorf("expected have[%d] = 1, got %d", i, count)
+               }
+       }
+       if nPopHead == 0 {
+               // In theory it's possible in a valid schedule for
+               // popHead to never succeed, but in practice it almost
+               // always succeeds, so this is unlikely to flake.
+               t.Errorf("popHead never succeeded")
+       }
+}
+
 func BenchmarkPool(b *testing.B) {
        var p Pool
        b.RunParallel(func(pb *testing.PB) {
diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go
new file mode 100644 (file)
index 0000000..bc2ab64
--- /dev/null
@@ -0,0 +1,185 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import (
+       "sync/atomic"
+       "unsafe"
+)
+
+// poolDequeue is a lock-free fixed-size single-producer,
+// multi-consumer queue. The single producer can both push and pop
+// from the head, and consumers can pop from the tail.
+//
+// It has the added feature that it nils out unused slots to avoid
+// unnecessary retention of objects. This is important for sync.Pool,
+// but not typically a property considered in the literature.
+type poolDequeue struct {
+       // headTail packs together a 32-bit head index and a 32-bit
+       // tail index. Both are indexes into vals modulo len(vals)-1.
+       //
+       // tail = index of oldest data in queue
+       // head = index of next slot to fill
+       //
+       // Slots in the range [tail, head) are owned by consumers.
+       // A consumer continues to own a slot outside this range until
+       // it nils the slot, at which point ownership passes to the
+       // producer.
+       //
+       // The head index is stored in the most-significant bits so
+       // that we can atomically add to it and the overflow is
+       // harmless.
+       headTail uint64
+
+       // vals is a ring buffer of interface{} values stored in this
+       // dequeue. The size of this must be a power of 2.
+       //
+       // vals[i].typ is nil if the slot is empty and non-nil
+       // otherwise. A slot is still in use until *both* the tail
+       // index has moved beyond it and typ has been set to nil. This
+       // is set to nil atomically by the consumer and read
+       // atomically by the producer.
+       vals []eface
+}
+
+type eface struct {
+       typ, val unsafe.Pointer
+}
+
+const dequeueBits = 32
+
+// dequeueLimit is the maximum size of a poolDequeue.
+//
+// This is half of 1<<dequeueBits because detecting fullness depends
+// on wrapping around the ring buffer without wrapping around the
+// index.
+const dequeueLimit = (1 << dequeueBits) / 2
+
+// dequeueNil is used in poolDeqeue to represent interface{}(nil).
+// Since we use nil to represent empty slots, we need a sentinel value
+// to represent nil.
+type dequeueNil *struct{}
+
+func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
+       const mask = 1<<dequeueBits - 1
+       head = uint32((ptrs >> dequeueBits) & mask)
+       tail = uint32(ptrs & mask)
+       return
+}
+
+func (d *poolDequeue) pack(head, tail uint32) uint64 {
+       const mask = 1<<dequeueBits - 1
+       return (uint64(head) << dequeueBits) |
+               uint64(tail&mask)
+}
+
+// pushHead adds val at the head of the queue. It returns false if the
+// queue is full. It must only be called by a single producer.
+func (d *poolDequeue) pushHead(val interface{}) bool {
+       ptrs := atomic.LoadUint64(&d.headTail)
+       head, tail := d.unpack(ptrs)
+       if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
+               // Queue is full.
+               return false
+       }
+       slot := &d.vals[head&uint32(len(d.vals)-1)]
+
+       // Check if the head slot has been released by popTail.
+       typ := atomic.LoadPointer(&slot.typ)
+       if typ != nil {
+               // Another goroutine is still cleaning up the tail, so
+               // the queue is actually still full.
+               return false
+       }
+
+       // The head slot is free, so we own it.
+       if val == nil {
+               val = dequeueNil(nil)
+       }
+       *(*interface{})(unsafe.Pointer(slot)) = val
+
+       // Increment head. This passes ownership of slot to popTail
+       // and acts as a store barrier for writing the slot.
+       atomic.AddUint64(&d.headTail, 1<<dequeueBits)
+       return true
+}
+
+// popHead removes and returns the element at the head of the queue.
+// It returns false if the queue is empty. It must only be called by a
+// single producer.
+func (d *poolDequeue) popHead() (interface{}, bool) {
+       var slot *eface
+       for {
+               ptrs := atomic.LoadUint64(&d.headTail)
+               head, tail := d.unpack(ptrs)
+               if tail == head {
+                       // Queue is empty.
+                       return nil, false
+               }
+
+               // Confirm tail and decrement head. We do this before
+               // reading the value to take back ownership of this
+               // slot.
+               head--
+               ptrs2 := d.pack(head, tail)
+               if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+                       // We successfully took back slot.
+                       slot = &d.vals[head&uint32(len(d.vals)-1)]
+                       break
+               }
+       }
+
+       val := *(*interface{})(unsafe.Pointer(slot))
+       if val == dequeueNil(nil) {
+               val = nil
+       }
+       // Zero the slot. Unlike popTail, this isn't racing with
+       // pushHead, so we don't need to be careful here.
+       *slot = eface{}
+       return val, true
+}
+
+// popTail removes and returns the element at the tail of the queue.
+// It returns false if the queue is empty. It may be called by any
+// number of consumers.
+func (d *poolDequeue) popTail() (interface{}, bool) {
+       var slot *eface
+       for {
+               ptrs := atomic.LoadUint64(&d.headTail)
+               head, tail := d.unpack(ptrs)
+               if tail == head {
+                       // Queue is empty.
+                       return nil, false
+               }
+
+               // Confirm head and tail (for our speculative check
+               // above) and increment tail. If this succeeds, then
+               // we own the slot at tail.
+               ptrs2 := d.pack(head, tail+1)
+               if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+                       // Success.
+                       slot = &d.vals[tail&uint32(len(d.vals)-1)]
+                       break
+               }
+       }
+
+       // We now own slot.
+       val := *(*interface{})(unsafe.Pointer(slot))
+       if val == dequeueNil(nil) {
+               val = nil
+       }
+
+       // Tell pushHead that we're done with this slot. Zeroing the
+       // slot is also important so we don't leave behind references
+       // that could keep this object live longer than necessary.
+       //
+       // We write to val first and then publish that we're done with
+       // this slot by atomically writing to typ.
+       slot.val = nil
+       atomic.StorePointer(&slot.typ, nil)
+       // At this point pushHead owns the slot.
+
+       return val, true
+}