}
}
-func TestMSpanQueue(t *testing.T) {
- expectSize := func(t *testing.T, q *runtime.MSpanQueue, want int) {
- t.Helper()
- if got := q.Size(); got != want {
- t.Errorf("expected size %d, got %d", want, got)
- }
- }
- expectMSpan := func(t *testing.T, got, want *runtime.MSpan, op string) {
- t.Helper()
- if got != want {
- t.Errorf("expected mspan %p from %s, got %p", want, op, got)
- }
- }
- makeSpans := func(t *testing.T, n int) ([]*runtime.MSpan, func()) {
- t.Helper()
- spans := make([]*runtime.MSpan, 0, n)
- for range cap(spans) {
- spans = append(spans, runtime.AllocMSpan())
- }
- return spans, func() {
- for i, s := range spans {
- runtime.FreeMSpan(s)
- spans[i] = nil
- }
- }
- }
- t.Run("Empty", func(t *testing.T) {
- var q runtime.MSpanQueue
- expectSize(t, &q, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- })
- t.Run("PushPop", func(t *testing.T) {
- s := runtime.AllocMSpan()
- defer runtime.FreeMSpan(s)
-
- var q runtime.MSpanQueue
- q.Push(s)
- expectSize(t, &q, 1)
- expectMSpan(t, q.Pop(), s, "pop")
- expectMSpan(t, q.Pop(), nil, "pop")
- })
- t.Run("PushPopPushPop", func(t *testing.T) {
- s0 := runtime.AllocMSpan()
- defer runtime.FreeMSpan(s0)
- s1 := runtime.AllocMSpan()
- defer runtime.FreeMSpan(s1)
-
- var q runtime.MSpanQueue
-
- // Push and pop s0.
- q.Push(s0)
- expectSize(t, &q, 1)
- expectMSpan(t, q.Pop(), s0, "pop")
- expectMSpan(t, q.Pop(), nil, "pop")
-
- // Push and pop s1.
- q.Push(s1)
- expectSize(t, &q, 1)
- expectMSpan(t, q.Pop(), s1, "pop")
- expectMSpan(t, q.Pop(), nil, "pop")
- })
- t.Run("PushPushPopPop", func(t *testing.T) {
- s0 := runtime.AllocMSpan()
- defer runtime.FreeMSpan(s0)
- s1 := runtime.AllocMSpan()
- defer runtime.FreeMSpan(s1)
-
- var q runtime.MSpanQueue
- q.Push(s0)
- expectSize(t, &q, 1)
- q.Push(s1)
- expectSize(t, &q, 2)
- expectMSpan(t, q.Pop(), s0, "pop")
- expectMSpan(t, q.Pop(), s1, "pop")
- expectMSpan(t, q.Pop(), nil, "pop")
- })
- t.Run("EmptyTakeAll", func(t *testing.T) {
- var q runtime.MSpanQueue
- var p runtime.MSpanQueue
- expectSize(t, &p, 0)
- expectSize(t, &q, 0)
- p.TakeAll(&q)
- expectSize(t, &p, 0)
- expectSize(t, &q, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
- t.Run("Push4TakeAll", func(t *testing.T) {
- spans, free := makeSpans(t, 4)
- defer free()
-
- var q runtime.MSpanQueue
- for i, s := range spans {
- expectSize(t, &q, i)
- q.Push(s)
- expectSize(t, &q, i+1)
- }
-
- var p runtime.MSpanQueue
- p.TakeAll(&q)
- expectSize(t, &p, 4)
- for i := range p.Size() {
- expectMSpan(t, p.Pop(), spans[i], "pop")
- }
- expectSize(t, &p, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
- t.Run("Push4Pop3", func(t *testing.T) {
- spans, free := makeSpans(t, 4)
- defer free()
-
- var q runtime.MSpanQueue
- for i, s := range spans {
- expectSize(t, &q, i)
- q.Push(s)
- expectSize(t, &q, i+1)
- }
- p := q.PopN(3)
- expectSize(t, &p, 3)
- expectSize(t, &q, 1)
- for i := range p.Size() {
- expectMSpan(t, p.Pop(), spans[i], "pop")
- }
- expectMSpan(t, q.Pop(), spans[len(spans)-1], "pop")
- expectSize(t, &p, 0)
- expectSize(t, &q, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
- t.Run("Push4Pop0", func(t *testing.T) {
- spans, free := makeSpans(t, 4)
- defer free()
-
- var q runtime.MSpanQueue
- for i, s := range spans {
- expectSize(t, &q, i)
- q.Push(s)
- expectSize(t, &q, i+1)
- }
- p := q.PopN(0)
- expectSize(t, &p, 0)
- expectSize(t, &q, 4)
- for i := range q.Size() {
- expectMSpan(t, q.Pop(), spans[i], "pop")
- }
- expectSize(t, &p, 0)
- expectSize(t, &q, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
- t.Run("Push4Pop4", func(t *testing.T) {
- spans, free := makeSpans(t, 4)
- defer free()
-
- var q runtime.MSpanQueue
- for i, s := range spans {
- expectSize(t, &q, i)
- q.Push(s)
- expectSize(t, &q, i+1)
- }
- p := q.PopN(4)
- expectSize(t, &p, 4)
- expectSize(t, &q, 0)
- for i := range p.Size() {
- expectMSpan(t, p.Pop(), spans[i], "pop")
- }
- expectSize(t, &p, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
- t.Run("Push4Pop5", func(t *testing.T) {
- spans, free := makeSpans(t, 4)
- defer free()
-
- var q runtime.MSpanQueue
- for i, s := range spans {
- expectSize(t, &q, i)
- q.Push(s)
- expectSize(t, &q, i+1)
- }
- p := q.PopN(5)
- expectSize(t, &p, 4)
- expectSize(t, &q, 0)
- for i := range p.Size() {
- expectMSpan(t, p.Pop(), spans[i], "pop")
- }
- expectSize(t, &p, 0)
- expectMSpan(t, q.Pop(), nil, "pop")
- expectMSpan(t, p.Pop(), nil, "pop")
- })
-}
-
func TestDetectFinalizerAndCleanupLeaks(t *testing.T) {
got := runTestProg(t, "testprog", "DetectFinalizerAndCleanupLeaks", "GODEBUG=checkfinalizers=1")
sp := strings.SplitN(got, "detected possible issues with cleanups and/or finalizers", 2)
package runtime
import (
- "internal/cpu"
"internal/goarch"
"internal/runtime/atomic"
"internal/runtime/gc"
if q.tryAcquire() {
if gcw.spanq.put(makeObjPtr(base, objIndex)) {
if gcphase == _GCmark {
+ // This is intentionally racy; the bit set here might get
+ // stomped on by a stealing P. See the comment in tryStealSpan
+ // for an explanation as to why this is OK.
+ if !work.spanqMask.read(uint32(gcw.id)) {
+ work.spanqMask.set(gcw.id)
+ }
gcw.mayNeedWorker = true
}
gcw.flushedWork = true
return true
}
+// tryGetSpanFast attempts to get an entire span to scan.
+func (w *gcWork) tryGetSpanFast() objptr {
+ return w.spanq.tryGetFast()
+}
+
// tryGetSpan attempts to get an entire span to scan.
-func (w *gcWork) tryGetSpan(slow bool) objptr {
- if s := w.spanq.get(); s != 0 {
+func (w *gcWork) tryGetSpan() objptr {
+ if s := w.spanq.tryGetFast(); s != 0 {
return s
}
-
- if slow {
- // Check the global span queue.
- if s := work.spanq.get(w); s != 0 {
- return s
- }
-
- // Attempt to steal spans to scan from other Ps.
- return spanQueueSteal(w)
+ // "Steal" from ourselves.
+ if s := w.spanq.steal(&w.spanq); s != 0 {
+ return s
+ }
+ // We failed to get any local work, so we're fresh out.
+ // Nobody else is going to add work for us. Clear our bit.
+ if work.spanqMask.read(uint32(w.id)) {
+ work.spanqMask.clear(w.id)
}
return 0
}
-// spanQueue is a concurrent safe queue of mspans. Each mspan is represented
-// as an objptr whose spanBase is the base address of the span.
+// spanQueue is a P-local stealable span queue.
type spanQueue struct {
- avail atomic.Bool // optimization to check emptiness w/o the lock
- _ cpu.CacheLinePad // prevents false-sharing between lock and avail
- lock mutex
- q mSpanQueue
-}
-
-func (q *spanQueue) empty() bool {
- return !q.avail.Load()
-}
+ // head, tail, and ring represent a local non-thread-safe ring buffer.
+ head, tail uint32
+ ring [256]objptr
-func (q *spanQueue) size() int {
- return q.q.n
-}
+ // putsSinceDrain counts the number of put calls since the last drain.
+ putsSinceDrain int
-// putBatch adds a whole batch of spans to the queue.
-func (q *spanQueue) putBatch(batch []objptr) {
- var list mSpanQueue
- for _, p := range batch {
- s := spanOfUnchecked(p.spanBase())
- s.scanIdx = p.objIndex()
- list.push(s)
+ // chain contains state visible to other Ps.
+ //
+ // In particular, that means a linked chain of single-producer multi-consumer
+ // ring buffers where the single producer is this P only.
+ //
+ // This linked chain structure is based off the sync.Pool dequeue.
+ chain struct {
+ // head is the spanSPMC to put to. This is only accessed
+ // by the producer, so doesn't need to be synchronized.
+ head *spanSPMC
+
+ // tail is the spanSPMC to steal from. This is accessed
+ // by consumers, so reads and writes must be atomic.
+ tail atomic.UnsafePointer // *spanSPMC
}
+}
- lock(&q.lock)
- if q.q.n == 0 {
- q.avail.Store(true)
+// putFast tries to put s onto the queue, but may fail if it's full.
+func (q *spanQueue) putFast(s objptr) (ok bool) {
+ if q.tail-q.head == uint32(len(q.ring)) {
+ return false
}
- q.q.takeAll(&list)
- unlock(&q.lock)
+ q.ring[q.tail%uint32(len(q.ring))] = s
+ q.tail++
+ return true
}
-// get tries to take a span off the queue.
+// put puts s onto the queue.
//
-// Returns a non-zero objptr on success. Also, moves additional
-// spans to gcw's local span queue.
-func (q *spanQueue) get(gcw *gcWork) objptr {
- if q.empty() {
- return 0
- }
- lock(&q.lock)
- if q.q.n == 0 {
- unlock(&q.lock)
- return 0
- }
- n := q.q.n/int(gomaxprocs) + 1
- if n > q.q.n {
- n = q.q.n
- }
- if max := len(gcw.spanq.ring) / 2; n > max {
- n = max
+// Returns whether the caller should spin up a new worker.
+func (q *spanQueue) put(s objptr) bool {
+ // The constants below define the period of and volume of
+ // spans we spill to the spmc chain when the local queue is
+ // not full.
+ //
+ // spillPeriod must be > spillMax, otherwise that sets the
+ // effective maximum size of our local span queue. Even if
+ // we have a span ring of size N, but we flush K spans every
+ // K puts, then K becomes our effective maximum length. When
+ // spillPeriod > spillMax, then we're always spilling spans
+ // at a slower rate than we're accumulating them.
+ const (
+ // spillPeriod defines how often to check if we should
+ // spill some spans, counted in the number of calls to put.
+ spillPeriod = 64
+
+ // spillMax defines, at most, how many spans to drain with
+ // each spill.
+ spillMax = 16
+ )
+
+ if q.putFast(s) {
+ // Occasionally try to spill some work to generate parallelism.
+ q.putsSinceDrain++
+ if q.putsSinceDrain >= spillPeriod {
+ // Reset even if we don't drain, so we don't check every time.
+ q.putsSinceDrain = 0
+
+ // Try to drain some spans. Don't bother if there's very
+ // few of them or there's already spans in the spmc chain.
+ n := min((q.tail-q.head)/2, spillMax)
+ if n > 4 && q.chainEmpty() {
+ q.drain(n)
+ return true
+ }
+ }
+ return false
}
- newQ := q.q.popN(n)
- if q.q.n == 0 {
- q.avail.Store(false)
+
+ // We're out of space. Drain out our local spans.
+ q.drain(uint32(len(q.ring)) / 2)
+ if !q.putFast(s) {
+ throw("failed putFast after drain")
}
- unlock(&q.lock)
+ return true
+}
- s := newQ.pop()
- for newQ.n > 0 {
- s := newQ.pop()
- gcw.spanq.put(makeObjPtr(s.base(), s.scanIdx))
+// flush publishes all spans in the local queue to the spmc chain.
+func (q *spanQueue) flush() {
+ n := q.tail - q.head
+ if n == 0 {
+ return
}
- return makeObjPtr(s.base(), s.scanIdx)
+ q.drain(n)
}
-// localSpanQueue is a P-local ring buffer of objptrs that represent spans.
-// Accessed without a lock.
+// empty returns true if there's no more work on the queue.
//
-// Multi-consumer, single-producer. The only producer is the P that owns this
-// queue, but any other P may consume from it.
-//
-// This is based on the scheduler runqueues. If making changes there, consider
-// also making them here.
-type localSpanQueue struct {
- head atomic.Uint32
- tail atomic.Uint32
- ring [256]objptr
+// Not thread-safe. Must only be called by the owner of q.
+func (q *spanQueue) empty() bool {
+ // Check the local queue for work.
+ if q.tail-q.head > 0 {
+ return false
+ }
+ return q.chainEmpty()
}
-// put adds s to the queue. Returns true if put flushed to the global queue
-// because it was full.
-func (q *localSpanQueue) put(s objptr) (flushed bool) {
- for {
- h := q.head.Load() // synchronize with consumers
- t := q.tail.Load()
- if t-h < uint32(len(q.ring)) {
- q.ring[t%uint32(len(q.ring))] = s
- q.tail.Store(t + 1) // Makes the item avail for consumption.
+// chainEmpty returns true if the spmc chain is empty.
+//
+// Thread-safe.
+func (q *spanQueue) chainEmpty() bool {
+ // Check the rest of the rings for work.
+ r := (*spanSPMC)(q.chain.tail.Load())
+ for r != nil {
+ if !r.empty() {
return false
}
- if q.putSlow(s, h, t) {
- return true
- }
- // The queue is not full, now the put above must succeed.
+ r = (*spanSPMC)(r.prev.Load())
}
+ return true
}
-// putSlow is a helper for put to move spans to the global queue.
-// Returns true on success, false on failure (nothing moved).
-func (q *localSpanQueue) putSlow(s objptr, h, t uint32) bool {
- var batch [len(q.ring)/2 + 1]objptr
+// drain publishes n spans from the local queue to the spmc chain.
+func (q *spanQueue) drain(n uint32) {
+ q.putsSinceDrain = 0
+
+ if q.chain.head == nil {
+ // N.B. We target 1024, but this may be bigger if the physical
+ // page size is bigger, or if we can fit more uintptrs into a
+ // physical page. See newSpanSPMC docs.
+ r := newSpanSPMC(1024)
+ q.chain.head = r
+ q.chain.tail.StoreNoWB(unsafe.Pointer(r))
+ }
+
+ // Try to drain some of the queue to the head spmc.
+ if q.tryDrain(q.chain.head, n) {
+ return
+ }
+ // No space. Create a bigger spmc and add it to the chain.
- // First, grab a batch from local queue.
- n := t - h
- n = n / 2
- if n != uint32(len(q.ring)/2) {
- throw("localSpanQueue.putSlow: queue is not full")
+ // Double the size of the next one, up to a maximum.
+ //
+ // We double each time so we can avoid taking this slow path
+ // in the future, which involves a global lock. Ideally we want
+ // to hit a steady-state where the deepest any queue goes during
+ // a mark phase can fit in the ring.
+ //
+ // However, we still set a maximum on this. We set the maximum
+ // to something large to amortize the cost of lock acquisition, but
+ // still at a reasonable size for big heaps and/or a lot of Ps (which
+ // tend to be correlated).
+ //
+ // It's not too bad to burn relatively large-but-fixed amounts of per-P
+ // memory if we need to deal with really, really deep queues, since the
+ // constants of proportionality are small. Simultaneously, we want to
+ // avoid a situation where a single worker ends up queuing O(heap)
+ // work and then forever retains a queue of that size.
+ const maxCap = 1 << 20 / goarch.PtrSize
+ newCap := q.chain.head.cap * 2
+ if newCap > maxCap {
+ newCap = maxCap
}
- for i := uint32(0); i < n; i++ {
- batch[i] = q.ring[(h+i)%uint32(len(q.ring))]
+ newHead := newSpanSPMC(newCap)
+ if !q.tryDrain(newHead, n) {
+ throw("failed to put span on newly-allocated spanSPMC")
}
- if !q.head.CompareAndSwap(h, h+n) { // Commits consume.
- return false
+ q.chain.head.prev.StoreNoWB(unsafe.Pointer(newHead))
+ q.chain.head = newHead
+}
+
+// tryDrain attempts to drain n spans from q's local queue to the chain.
+//
+// Returns whether it succeeded.
+func (q *spanQueue) tryDrain(r *spanSPMC, n uint32) bool {
+ if q.head+n > q.tail {
+ throw("attempt to drain too many elements")
}
- batch[n] = s
+ h := r.head.Load() // synchronize with consumers
+ t := r.tail.Load()
+ rn := t - h
+ if rn+n <= r.cap {
+ for i := uint32(0); i < n; i++ {
+ *r.slot(t + i) = q.ring[(q.head+i)%uint32(len(q.ring))]
+ }
+ r.tail.Store(t + n) // Makes the items avail for consumption.
+ q.head += n
+ return true
+ }
+ return false
+}
- work.spanq.putBatch(batch[:])
- return true
+// tryGetFast attempts to get a span from the local queue, but may fail if it's empty,
+// returning false.
+func (q *spanQueue) tryGetFast() objptr {
+ if q.tail-q.head == 0 {
+ return 0
+ }
+ s := q.ring[q.head%uint32(len(q.ring))]
+ q.head++
+ return s
}
-// get attempts to take a span off the queue. Might fail if the
-// queue is empty. May be called by multiple threads, but callers
-// are better off using stealFrom to amortize the cost of stealing.
-// This method is intended for use by the owner of this queue.
-func (q *localSpanQueue) get() objptr {
+// steal takes some spans from the ring chain of another span queue.
+//
+// q == q2 is OK.
+func (q *spanQueue) steal(q2 *spanQueue) objptr {
+ r := (*spanSPMC)(q2.chain.tail.Load())
+ if r == nil {
+ return 0
+ }
for {
- h := q.head.Load()
- t := q.tail.Load()
- if t == h {
+ // It's important that we load the next pointer
+ // *before* popping the tail. In general, r may be
+ // transiently empty, but if next is non-nil before
+ // the pop and the pop fails, then r is permanently
+ // empty, which is the only condition under which it's
+ // safe to drop r from the chain.
+ r2 := (*spanSPMC)(r.prev.Load())
+
+ // Try to refill from one of the rings
+ if s := q.refill(r); s != 0 {
+ return s
+ }
+
+ if r2 == nil {
+ // This is the only ring. It's empty right
+ // now, but could be pushed to in the future.
return 0
}
- s := q.ring[h%uint32(len(q.ring))]
- if q.head.CompareAndSwap(h, h+1) {
- return s
+
+ // The tail of the chain has been drained, so move on
+ // to the next ring. Try to drop it from the chain
+ // so the next consumer doesn't have to look at the empty
+ // ring again.
+ if q2.chain.tail.CompareAndSwapNoWB(unsafe.Pointer(r), unsafe.Pointer(r2)) {
+ r.dead.Store(true)
}
- }
-}
-func (q *localSpanQueue) empty() bool {
- h := q.head.Load()
- t := q.tail.Load()
- return t == h
+ r = r2
+ }
}
-// stealFrom takes spans from q2 and puts them into q1. One span is removed
-// from the stolen spans and returned on success. Failure to steal returns a
-// zero objptr.
-func (q1 *localSpanQueue) stealFrom(q2 *localSpanQueue) objptr {
- writeHead := q1.tail.Load()
+// refill takes some spans from r and puts them into q's local queue.
+//
+// One span is removed from the stolen spans and returned on success.
+// Failure to steal returns a zero objptr.
+//
+// steal is thread-safe with respect to r.
+func (q *spanQueue) refill(r *spanSPMC) objptr {
+ if q.tail-q.head != 0 {
+ throw("steal with local work available")
+ }
+ // Steal some spans.
var n uint32
for {
- h := q2.head.Load() // load-acquire, synchronize with other consumers
- t := q2.tail.Load() // load-acquire, synchronize with the producer
+ h := r.head.Load() // load-acquire, synchronize with other consumers
+ t := r.tail.Load() // load-acquire, synchronize with the producer
n = t - h
n = n - n/2
if n == 0 {
return 0
}
- if n > uint32(len(q2.ring)/2) { // read inconsistent h and t
+ if n > r.cap { // read inconsistent h and t
continue
}
+ n = min(n, uint32(len(q.ring)/2))
for i := uint32(0); i < n; i++ {
- c := q2.ring[(h+i)%uint32(len(q2.ring))]
- q1.ring[(writeHead+i)%uint32(len(q1.ring))] = c
+ q.ring[i] = *r.slot(h + i)
}
- if q2.head.CompareAndSwap(h, h+n) {
+ if r.head.CompareAndSwap(h, h+n) {
break
}
}
- n--
- c := q1.ring[(writeHead+n)%uint32(len(q1.ring))]
- if n == 0 {
- return c
+
+ // Update local queue head and tail to reflect new buffered values.
+ q.head = 0
+ q.tail = n
+
+ // Pop off the head of the queue and return it.
+ return q.tryGetFast()
+}
+
+// spanSPMC is a ring buffer of objptrs that represent spans.
+// Accessed without a lock.
+//
+// Single-producer, multi-consumer. The only producer is the P that owns this
+// queue, but any other P may consume from it.
+//
+// ## Invariants for memory management
+//
+// 1. All spanSPMCs are allocated from mheap_.spanSPMCAlloc.
+// 2. All allocated spanSPMCs must be on the work.spanSPMCs list.
+// 3. spanSPMCs may only be allocated if gcphase != _GCoff.
+// 4. spanSPMCs may only be deallocated if gcphase == _GCoff.
+//
+// Invariants (3) and (4) ensure that we do not need to concern ourselves with
+// tricky reuse issues that stem from not knowing when a thread is truly done
+// with a spanSPMC. For example, two threads could load the same spanSPMC from
+// the tail of the chain. One thread is then paused while the other steals the
+// last few elements off of it. It's not safe to free at that point since the
+// other thread will still inspect that spanSPMC, and we have no way of knowing
+// without more complex and/or heavyweight synchronization.
+//
+// Instead, we rely on the global synchronization inherent to GC phases, and
+// the fact that spanSPMCs are only ever used during the mark phase, to ensure
+// memory safety. This means we temporarily waste some memory, but it's only
+// until the end of the mark phase.
+type spanSPMC struct {
+ _ sys.NotInHeap
+
+ // allnext is the link to the next spanSPMC on the work.spanSPMCs list.
+ // This is used to find and free dead spanSPMCs. Protected by
+ // work.spanSPMCs.lock.
+ allnext *spanSPMC
+
+ // dead indicates whether the spanSPMC is no longer in use.
+ // Protected by the CAS to the prev field of the spanSPMC pointing
+ // to this spanSPMC. That is, whoever wins that CAS takes ownership
+ // of marking this spanSPMC as dead. See spanQueue.steal for details.
+ dead atomic.Bool
+
+ // prev is the next link up a spanQueue's SPMC chain, from tail to head,
+ // hence the name "prev." Set by a spanQueue's producer, cleared by a
+ // CAS in spanQueue.steal.
+ prev atomic.UnsafePointer // *spanSPMC
+
+ // head, tail, cap, and ring together represent a fixed-size SPMC lock-free
+ // ring buffer of size cap. The ring buffer contains objptr values.
+ head atomic.Uint32
+ tail atomic.Uint32
+ cap uint32 // cap(ring))
+ ring *objptr
+}
+
+// newSpanSPMC allocates and initializes a new spmc with the provided capacity.
+//
+// newSpanSPMC may override the capacity with a larger one if the provided one would
+// waste memory.
+func newSpanSPMC(cap uint32) *spanSPMC {
+ lock(&work.spanSPMCs.lock)
+ r := (*spanSPMC)(mheap_.spanSPMCAlloc.alloc())
+ r.allnext = work.spanSPMCs.all
+ work.spanSPMCs.all = r
+ unlock(&work.spanSPMCs.lock)
+
+ // If cap < the capacity of a single physical page, round up.
+ pageCap := uint32(physPageSize / goarch.PtrSize) // capacity of a single page
+ if cap < pageCap {
+ cap = pageCap
}
- h := q1.head.Load()
- if writeHead-h+n >= uint32(len(q1.ring)) {
- throw("localSpanQueue.stealFrom: queue overflow")
+ if cap&(cap-1) != 0 {
+ throw("spmc capacity must be a power of 2")
}
- q1.tail.Store(writeHead + n)
- return c
+
+ r.cap = cap
+ ring := sysAlloc(uintptr(cap)*unsafe.Sizeof(objptr(0)), &memstats.gcMiscSys, "GC span queue")
+ atomic.StorepNoWB(unsafe.Pointer(&r.ring), ring)
+ return r
}
-// drain moves all spans in the queue to the global queue.
+// empty returns true if the spmc is empty.
//
-// Returns true if anything was moved.
-func (q *localSpanQueue) drain() bool {
- var batch [len(q.ring)]objptr
+// empty is thread-safe.
+func (r *spanSPMC) empty() bool {
+ h := r.head.Load()
+ t := r.tail.Load()
+ return t == h
+}
- var n uint32
- for {
- var h uint32
- for {
- h = q.head.Load()
- t := q.tail.Load()
- n = t - h
- if n == 0 {
- return false
- }
- if n <= uint32(len(q.ring)) {
- break
- }
- // Read inconsistent h and t.
- }
- for i := uint32(0); i < n; i++ {
- batch[i] = q.ring[(h+i)%uint32(len(q.ring))]
- }
- if q.head.CompareAndSwap(h, h+n) { // Commits consume.
+// deinit frees any resources the spanSPMC is holding onto and zeroes it.
+func (r *spanSPMC) deinit() {
+ sysFree(unsafe.Pointer(r.ring), uintptr(r.cap)*unsafe.Sizeof(objptr(0)), &memstats.gcMiscSys)
+ r.ring = nil
+ r.dead.Store(false)
+ r.prev.StoreNoWB(nil)
+ r.head.Store(0)
+ r.tail.Store(0)
+ r.cap = 0
+}
+
+// slot returns a pointer to slot i%r.cap.
+func (r *spanSPMC) slot(i uint32) *objptr {
+ idx := uintptr(i & (r.cap - 1))
+ return (*objptr)(unsafe.Add(unsafe.Pointer(r.ring), idx*unsafe.Sizeof(objptr(0))))
+}
+
+// freeSomeSpanSPMCs frees some spanSPMCs back to the OS and returns
+// true if it should be called again to free more.
+func freeSomeSpanSPMCs(preemptible bool) bool {
+ // TODO(mknyszek): This is arbitrary, but some kind of limit is necessary
+ // to help bound delays to cooperatively preempt ourselves.
+ const batchSize = 64
+
+ // According to the SPMC memory management invariants, we can only free
+ // spanSPMCs outside of the mark phase. We ensure we do this in two ways.
+ //
+ // 1. We take the work.spanSPMCs lock, which we need anyway. This ensures
+ // that we are non-preemptible. If this path becomes lock-free, we will
+ // need to become non-preemptible in some other way.
+ // 2. Once we are non-preemptible, we check the gcphase, and back out if
+ // it's not safe.
+ //
+ // This way, we ensure that we don't start freeing if we're in the wrong
+ // phase, and the phase can't change on us while we're freeing.
+ lock(&work.spanSPMCs.lock)
+ if gcphase != _GCoff || work.spanSPMCs.all == nil {
+ unlock(&work.spanSPMCs.lock)
+ return false
+ }
+ rp := &work.spanSPMCs.all
+ gp := getg()
+ more := true
+ for i := 0; i < batchSize && !(preemptible && gp.preempt); i++ {
+ r := *rp
+ if r == nil {
+ more = false
break
}
+ if r.dead.Load() {
+ // It's dead. Deinitialize and free it.
+ *rp = r.allnext
+ r.deinit()
+ mheap_.spanSPMCAlloc.free(unsafe.Pointer(r))
+ } else {
+ // Still alive, likely in some P's chain.
+ // Skip it.
+ rp = &r.allnext
+ }
}
- if !q.empty() {
- throw("drained local span queue, but not empty")
- }
-
- work.spanq.putBatch(batch[:n])
- return true
+ unlock(&work.spanSPMCs.lock)
+ return more
}
-// spanQueueSteal attempts to steal a span from another P's local queue.
+// tryStealSpan attempts to steal a span from another P's local queue.
//
// Returns a non-zero objptr on success.
-func spanQueueSteal(gcw *gcWork) objptr {
+func (w *gcWork) tryStealSpan() objptr {
pp := getg().m.p.ptr()
for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
+ if !work.spanqMask.read(enum.position()) {
+ continue
+ }
p2 := allp[enum.position()]
if pp == p2 {
continue
}
- if s := gcw.spanq.stealFrom(&p2.gcw.spanq); s != 0 {
+ if s := w.spanq.steal(&p2.gcw.spanq); s != 0 {
return s
}
+ // N.B. This is intentionally racy. We may stomp on a mask set by
+ // a P that just put a bunch of work into its local queue.
+ //
+ // This is OK because the ragged barrier in gcMarkDone will set
+ // the bit on each P if there's local work we missed. This race
+ // should generally be rare, since the window between noticing
+ // an empty local queue and this bit being set is quite small.
+ work.spanqMask.clear(int32(enum.position()))
}
return 0
}
clear(w.stats[:])
}
+// gcMarkWorkAvailable reports whether there's any non-local work available to do.
+//
+// This is a heavyweight check and must only be used for correctness, not
+// as a hint.
+func gcMarkWorkAvailable() bool {
+ if !work.full.empty() {
+ return true // global work available
+ }
+ if work.markrootNext < work.markrootJobs {
+ return true // root scan work available
+ }
+ if work.spanqMask.any() {
+ return true // stealable local work available
+ }
+ return false
+}
+
// scanObject scans the object starting at b, adding pointers to gcw.
// b must point to the beginning of a heap object or an oblet.
// scanObject consults the GC bitmap for the pointer mask and the