// A spanSet is a set of *mspans.
//
-// spanSet is safe for concurrent push operations *or* concurrent
-// pop operations, but not both simultaneously.
+// spanSet is safe for concurrent push and pop operations.
type spanSet struct {
// A spanSet is a two-level data structure consisting of a
// growable spine that points to fixed-sized blocks. The spine
spineLen uintptr // Spine array length, accessed atomically
spineCap uintptr // Spine array cap, accessed under lock
- // index is the first unused slot in the logical concatenation
- // of all blocks. It is accessed atomically.
- index uint32
+ // index is the head and tail of the spanSet in a single field.
+ // The head and the tail both represent an index into the logical
+ // concatenation of all blocks, with the head always behind or
+ // equal to the tail (indicating an empty set). This field is
+ // always accessed atomically.
+ //
+ // The head and the tail are only 32 bits wide, which means we
+ // can only support up to 2^32 pushes before a reset. If every
+ // span in the heap were stored in this set, and each span were
+ // the minimum size (1 runtime page, 8 KiB), then roughly the
+ // smallest heap which would be unrepresentable is 32 TiB in size.
+ index headTailIndex
}
const (
}
// push adds span s to buffer b. push is safe to call concurrently
-// with other push operations, but NOT to call concurrently with pop.
+// with other push and pop operations.
func (b *spanSet) push(s *mspan) {
// Obtain our slot.
- cursor := uintptr(atomic.Xadd(&b.index, +1) - 1)
+ cursor := uintptr(b.index.incTail().tail() - 1)
top, bottom := cursor/spanSetBlockEntries, cursor%spanSetBlockEntries
// Do we need to add a block?
}
// pop removes and returns a span from buffer b, or nil if b is empty.
-// pop is safe to call concurrently with other pop operations, but NOT
-// to call concurrently with push.
+// pop is safe to call concurrently with other pop and push operations.
func (b *spanSet) pop() *mspan {
- cursor := atomic.Xadd(&b.index, -1)
- if int32(cursor) < 0 {
- atomic.Xadd(&b.index, +1)
- return nil
+ var head, tail uint32
+claimLoop:
+ for {
+ headtail := b.index.load()
+ head, tail = headtail.split()
+ if head >= tail {
+ // The buf is empty, as far as we can tell.
+ return nil
+ }
+ // Check if the head position we want to claim is actually
+ // backed by a block.
+ spineLen := atomic.Loaduintptr(&b.spineLen)
+ if spineLen <= uintptr(head)/spanSetBlockEntries {
+ // We're racing with a spine growth and the allocation of
+ // a new block (and maybe a new spine!), and trying to grab
+ // the span at the index which is currently being pushed.
+ // Instead of spinning, let's just notify the caller that
+ // there's nothing currently here. Spinning on this is
+ // almost definitely not worth it.
+ return nil
+ }
+ // Try to claim the current head by CASing in an updated head.
+ // This may fail transiently due to a push which modifies the
+ // tail, so keep trying while the head isn't changing.
+ want := head
+ for want == head {
+ if b.index.cas(headtail, makeHeadTailIndex(want+1, tail)) {
+ break claimLoop
+ }
+ headtail = b.index.load()
+ head, tail = headtail.split()
+ }
+ // We failed to claim the spot we were after and the head changed,
+ // meaning a popper got ahead of us. Try again from the top because
+ // the buf may not be empty.
}
+ top, bottom := head/spanSetBlockEntries, head%spanSetBlockEntries
- // There are no concurrent spine or block modifications during
- // pop, so we can omit the atomics.
- top, bottom := cursor/spanSetBlockEntries, cursor%spanSetBlockEntries
- blockp := (**spanSetBlock)(add(b.spine, sys.PtrSize*uintptr(top)))
- block := *blockp
- s := block.spans[bottom]
- // Clear the pointer for block(i).
- block.spans[bottom] = nil
-
- // If we're the last popper in the block, free the block.
- if used := atomic.Xadd(&block.used, -1); used == 0 {
- // Decrement spine length and clear the block's pointer.
- atomic.Xadduintptr(&b.spineLen, ^uintptr(0) /* -1 */)
- atomic.StorepNoWB(add(b.spine, sys.PtrSize*uintptr(top)), nil)
+ // We may be reading a stale spine pointer, but because the length
+ // grows monotonically and we've already verified it, we'll definitely
+ // be reading from a valid block.
+ spine := atomic.Loadp(unsafe.Pointer(&b.spine))
+ blockp := add(spine, sys.PtrSize*uintptr(top))
+
+ // Given that the spine length is correct, we know we will never
+ // see a nil block here, since the length is always updated after
+ // the block is set.
+ block := (*spanSetBlock)(atomic.Loadp(blockp))
+ s := (*mspan)(atomic.Loadp(unsafe.Pointer(&block.spans[bottom])))
+ for s == nil {
+ // We raced with the span actually being set, but given that we
+ // know a block for this span exists, the race window here is
+ // extremely small. Try again.
+ s = (*mspan)(atomic.Loadp(unsafe.Pointer(&block.spans[bottom])))
+ }
+ // Clear the pointer. This isn't strictly necessary, but defensively
+ // avoids accidentally re-using blocks which could lead to memory
+ // corruption. This way, we'll get a nil pointer access instead.
+ atomic.StorepNoWB(unsafe.Pointer(&block.spans[bottom]), nil)
+
+ // If we're the last possible popper in the block, free the block.
+ if used := atomic.Xadd(&block.used, -1); used == 0 && bottom == spanSetBlockEntries-1 {
+ // Clear the block's pointer.
+ atomic.StorepNoWB(blockp, nil)
// Return the block to the block pool.
spanSetBlockPool.free(block)
return s
}
-// numBlocks returns the number of blocks in buffer b. numBlocks is
-// safe to call concurrently with any other operation. Spans that have
-// been pushed prior to the call to numBlocks are guaranteed to appear
-// in some block in the range [0, numBlocks()), assuming there are no
-// intervening pops. Spans that are pushed after the call may also
-// appear in these blocks.
-func (b *spanSet) numBlocks() int {
- return int((atomic.Load(&b.index) + spanSetBlockEntries - 1) / spanSetBlockEntries)
-}
-
-// block returns the spans in the i'th block of buffer b. block is
-// safe to call concurrently with push. The block may contain nil
-// pointers that must be ignored, and each entry in the block must be
-// loaded atomically.
-func (b *spanSet) block(i int) []*mspan {
- // Perform bounds check before loading spine address since
- // push ensures the allocated length is at least spineLen.
- if i < 0 || uintptr(i) >= atomic.Loaduintptr(&b.spineLen) {
- throw("block index out of range")
+// reset resets a spanSet which is empty. It will also clean up
+// any left over blocks.
+//
+// Throws if the buf is not empty.
+//
+// reset may not be called concurrently with any other operations
+// on the span set.
+func (b *spanSet) reset() {
+ head, tail := b.index.load().split()
+ if head < tail {
+ print("head = ", head, ", tail = ", tail, "\n")
+ throw("attempt to clear non-empty span set")
}
+ top := head / spanSetBlockEntries
+ if uintptr(top) < b.spineLen {
+ // If the head catches up to the tail and the set is empty,
+ // we may not clean up the block containing the head and tail
+ // since it may be pushed into again. In order to avoid leaking
+ // memory since we're going to reset the head and tail, clean
+ // up such a block now, if it exists.
+ blockp := (**spanSetBlock)(add(b.spine, sys.PtrSize*uintptr(top)))
+ block := *blockp
+ if block != nil {
+ // Sanity check the used value.
+ if block.used != 0 {
+ throw("found used block in empty span set")
+ }
+ // Clear the pointer to the block.
+ atomic.StorepNoWB(unsafe.Pointer(blockp), nil)
- // Get block i.
- spine := atomic.Loadp(unsafe.Pointer(&b.spine))
- blockp := add(spine, sys.PtrSize*uintptr(i))
- block := (*spanSetBlock)(atomic.Loadp(blockp))
-
- // Slice the block if necessary.
- cursor := uintptr(atomic.Load(&b.index))
- top, bottom := cursor/spanSetBlockEntries, cursor%spanSetBlockEntries
- var spans []*mspan
- if uintptr(i) < top {
- spans = block.spans[:]
- } else {
- spans = block.spans[:bottom]
+ // Return the block to the block pool.
+ spanSetBlockPool.free(block)
+ }
}
- return spans
+ b.index.reset()
+ atomic.Storeuintptr(&b.spineLen, 0)
}
// spanSetBlockPool is a global pool of spanSetBlocks.
func (p *spanSetBlockAlloc) free(block *spanSetBlock) {
p.stack.push(&block.lfnode)
}
+
+// haidTailIndex represents a combined 32-bit head and 32-bit tail
+// of a queue into a single 64-bit value.
+type headTailIndex uint64
+
+// makeHeadTailIndex creates a headTailIndex value from a separate
+// head and tail.
+func makeHeadTailIndex(head, tail uint32) headTailIndex {
+ return headTailIndex(uint64(head)<<32 | uint64(tail))
+}
+
+// head returns the head of a headTailIndex value.
+func (h headTailIndex) head() uint32 {
+ return uint32(h >> 32)
+}
+
+// tail returns the tail of a headTailIndex value.
+func (h headTailIndex) tail() uint32 {
+ return uint32(h)
+}
+
+// split splits the headTailIndex value into its parts.
+func (h headTailIndex) split() (head uint32, tail uint32) {
+ return h.head(), h.tail()
+}
+
+// load atomically reads a headTailIndex value.
+func (h *headTailIndex) load() headTailIndex {
+ return headTailIndex(atomic.Load64((*uint64)(h)))
+}
+
+// cas atomically compares-and-swaps a headTailIndex value.
+func (h *headTailIndex) cas(old, new headTailIndex) bool {
+ return atomic.Cas64((*uint64)(h), uint64(old), uint64(new))
+}
+
+// incHead atomically increments the head of a headTailIndex.
+func (h *headTailIndex) incHead() headTailIndex {
+ return headTailIndex(atomic.Xadd64((*uint64)(h), (1 << 32)))
+}
+
+// decHead atomically decrements the head of a headTailIndex.
+func (h *headTailIndex) decHead() headTailIndex {
+ return headTailIndex(atomic.Xadd64((*uint64)(h), -(1 << 32)))
+}
+
+// incTail atomically increments the tail of a headTailIndex.
+func (h *headTailIndex) incTail() headTailIndex {
+ ht := headTailIndex(atomic.Xadd64((*uint64)(h), +1))
+ // Check for overflow.
+ if ht.tail() == 0 {
+ print("runtime: head = ", ht.head(), ", tail = ", ht.tail(), "\n")
+ throw("headTailIndex overflow")
+ }
+ return ht
+}
+
+// reset clears the headTailIndex to (0, 0).
+func (h *headTailIndex) reset() {
+ atomic.Store64((*uint64)(h), 0)
+}