]> Cypherpunks repositories - gostls13.git/commitdiff
sync: internal dynamically sized lock-free queue for sync.Pool
authorAustin Clements <austin@google.com>
Fri, 1 Mar 2019 19:54:00 +0000 (14:54 -0500)
committerAustin Clements <austin@google.com>
Fri, 5 Apr 2019 18:49:04 +0000 (18:49 +0000)
This adds a dynamically sized, lock-free, single-producer,
multi-consumer queue that will be used in the new Pool stealing
implementation. It's built on top of the fixed-size queue added in the
previous CL.

For #22950, #22331.

Change-Id: Ifc0ca3895bec7e7f9289ba9fb7dd0332bf96ba5a
Reviewed-on: https://go-review.googlesource.com/c/go/+/166958
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Chase <drchase@google.com>
src/sync/export_test.go
src/sync/pool_test.go
src/sync/poolqueue.go

index 0252b64f58919a5760b0ed527375c031921e0ef4..10d3599f476550c7a7240a3f20036f87b09dd392 100644 (file)
@@ -34,3 +34,20 @@ func (d *poolDequeue) PopHead() (interface{}, bool) {
 func (d *poolDequeue) PopTail() (interface{}, bool) {
        return d.popTail()
 }
+
+func NewPoolChain() PoolDequeue {
+       return new(poolChain)
+}
+
+func (c *poolChain) PushHead(val interface{}) bool {
+       c.pushHead(val)
+       return true
+}
+
+func (c *poolChain) PopHead() (interface{}, bool) {
+       return c.popHead()
+}
+
+func (c *poolChain) PopTail() (interface{}, bool) {
+       return c.popTail()
+}
index 6e9f9f3463c253f0e111c3ae4c8ac43761fea49b..62085b5c966050fad28a2a1013167db4282a818f 100644 (file)
@@ -151,6 +151,14 @@ func TestPoolStress(t *testing.T) {
 }
 
 func TestPoolDequeue(t *testing.T) {
+       testPoolDequeue(t, NewPoolDequeue(16))
+}
+
+func TestPoolChain(t *testing.T) {
+       testPoolDequeue(t, NewPoolChain())
+}
+
+func testPoolDequeue(t *testing.T, d PoolDequeue) {
        const P = 10
        // In long mode, do enough pushes to wrap around the 21-bit
        // indexes.
@@ -158,7 +166,6 @@ func TestPoolDequeue(t *testing.T) {
        if testing.Short() {
                N = 1e3
        }
-       d := NewPoolDequeue(16)
        have := make([]int32, N)
        var stop int32
        var wg WaitGroup
index bc2ab647ffbc70777ba53c58044042b3b023aa53..22f74969d969ead7a1da9d9d9af630652dce7a32 100644 (file)
@@ -52,10 +52,10 @@ const dequeueBits = 32
 
 // dequeueLimit is the maximum size of a poolDequeue.
 //
-// This is half of 1<<dequeueBits because detecting fullness depends
-// on wrapping around the ring buffer without wrapping around the
-// index.
-const dequeueLimit = (1 << dequeueBits) / 2
+// This must be at most (1<<dequeueBits)/2 because detecting fullness
+// depends on wrapping around the ring buffer without wrapping around
+// the index. We divide by 4 so this fits in an int on 32-bit.
+const dequeueLimit = (1 << dequeueBits) / 4
 
 // dequeueNil is used in poolDeqeue to represent interface{}(nil).
 // Since we use nil to represent empty slots, we need a sentinel value
@@ -183,3 +183,127 @@ func (d *poolDequeue) popTail() (interface{}, bool) {
 
        return val, true
 }
+
+// poolChain is a dynamically-sized version of poolDequeue.
+//
+// This is implemented as a doubly-linked list queue of poolDequeues
+// where each dequeue is double the size of the previous one. Once a
+// dequeue fills up, this allocates a new one and only ever pushes to
+// the latest dequeue. Pops happen from the other end of the list and
+// once a dequeue is exhausted, it gets removed from the list.
+type poolChain struct {
+       // head is the poolDequeue to push to. This is only accessed
+       // by the producer, so doesn't need to be synchronized.
+       head *poolChainElt
+
+       // tail is the poolDequeue to popTail from. This is accessed
+       // by consumers, so reads and writes must be atomic.
+       tail *poolChainElt
+}
+
+type poolChainElt struct {
+       poolDequeue
+
+       // next and prev link to the adjacent poolChainElts in this
+       // poolChain.
+       //
+       // next is written atomically by the producer and read
+       // atomically by the consumer. It only transitions from nil to
+       // non-nil.
+       //
+       // prev is written atomically by the consumer and read
+       // atomically by the producer. It only transitions from
+       // non-nil to nil.
+       next, prev *poolChainElt
+}
+
+func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
+       atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
+}
+
+func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
+       return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
+}
+
+func (c *poolChain) pushHead(val interface{}) {
+       d := c.head
+       if d == nil {
+               // Initialize the chain.
+               const initSize = 8 // Must be a power of 2
+               d = new(poolChainElt)
+               d.vals = make([]eface, initSize)
+               c.head = d
+               storePoolChainElt(&c.tail, d)
+       }
+
+       if d.pushHead(val) {
+               return
+       }
+
+       // The current dequeue is full. Allocate a new one of twice
+       // the size.
+       newSize := len(d.vals) * 2
+       if newSize >= dequeueLimit {
+               // Can't make it any bigger.
+               newSize = dequeueLimit
+       }
+
+       d2 := &poolChainElt{prev: d}
+       d2.vals = make([]eface, newSize)
+       c.head = d2
+       storePoolChainElt(&d.next, d2)
+       d2.pushHead(val)
+}
+
+func (c *poolChain) popHead() (interface{}, bool) {
+       d := c.head
+       for d != nil {
+               if val, ok := d.popHead(); ok {
+                       return val, ok
+               }
+               // There may still be unconsumed elements in the
+               // previous dequeue, so try backing up.
+               d = loadPoolChainElt(&d.prev)
+       }
+       return nil, false
+}
+
+func (c *poolChain) popTail() (interface{}, bool) {
+       d := loadPoolChainElt(&c.tail)
+       if d == nil {
+               return nil, false
+       }
+
+       for {
+               // It's important that we load the next pointer
+               // *before* popping the tail. In general, d may be
+               // transiently empty, but if next is non-nil before
+               // the pop and the pop fails, then d is permanently
+               // empty, which is the only condition under which it's
+               // safe to drop d from the chain.
+               d2 := loadPoolChainElt(&d.next)
+
+               if val, ok := d.popTail(); ok {
+                       return val, ok
+               }
+
+               if d2 == nil {
+                       // This is the only dequeue. It's empty right
+                       // now, but could be pushed to in the future.
+                       return nil, false
+               }
+
+               // The tail of the chain has been drained, so move on
+               // to the next dequeue. Try to drop it from the chain
+               // so the next pop doesn't have to look at the empty
+               // dequeue again.
+               if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
+                       // We won the race. Clear the prev pointer so
+                       // the garbage collector can collect the empty
+                       // dequeue and so popHead doesn't back up
+                       // further than necessary.
+                       storePoolChainElt(&d2.prev, nil)
+               }
+               d = d2
+       }
+}