From 0556e26273f704db73df9e7c4c3d2e8434dec7be Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Tue, 13 Dec 2016 16:45:55 +0100 Subject: [PATCH] sync: make Mutex more fair MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add new starvation mode for Mutex. In starvation mode ownership is directly handed off from unlocking goroutine to the next waiter. New arriving goroutines don't compete for ownership. Unfair wait time is now limited to 1ms. Also fix a long standing bug that goroutines were requeued at the tail of the wait queue. That lead to even more unfair acquisition times with multiple waiters. Performance of normal mode is not considerably affected. Fixes #13086 On the provided in the issue lockskew program: done in 1.207853ms done in 1.177451ms done in 1.184168ms done in 1.198633ms done in 1.185797ms done in 1.182502ms done in 1.316485ms done in 1.211611ms done in 1.182418ms name old time/op new time/op delta MutexUncontended-48 0.65ns ± 0% 0.65ns ± 1% ~ (p=0.087 n=10+10) Mutex-48 112ns ± 1% 114ns ± 1% +1.69% (p=0.000 n=10+10) MutexSlack-48 113ns ± 0% 87ns ± 1% -22.65% (p=0.000 n=8+10) MutexWork-48 149ns ± 0% 145ns ± 0% -2.48% (p=0.000 n=9+10) MutexWorkSlack-48 149ns ± 0% 122ns ± 3% -18.26% (p=0.000 n=6+10) MutexNoSpin-48 103ns ± 4% 105ns ± 3% ~ (p=0.089 n=10+10) MutexSpin-48 490ns ± 4% 515ns ± 6% +5.08% (p=0.006 n=10+10) Cond32-48 13.4µs ± 6% 13.1µs ± 5% -2.75% (p=0.023 n=10+10) RWMutexWrite100-48 53.2ns ± 3% 41.2ns ± 3% -22.57% (p=0.000 n=10+10) RWMutexWrite10-48 45.9ns ± 2% 43.9ns ± 2% -4.38% (p=0.000 n=10+10) RWMutexWorkWrite100-48 122ns ± 2% 134ns ± 1% +9.92% (p=0.000 n=10+10) RWMutexWorkWrite10-48 206ns ± 1% 188ns ± 1% -8.52% (p=0.000 n=8+10) Cond32-24 12.1µs ± 3% 12.4µs ± 3% +1.98% (p=0.043 n=10+9) MutexUncontended-24 0.74ns ± 1% 0.75ns ± 1% ~ (p=0.650 n=10+10) Mutex-24 122ns ± 2% 124ns ± 1% +1.31% (p=0.007 n=10+10) MutexSlack-24 96.9ns ± 2% 102.8ns ± 2% +6.11% (p=0.000 n=10+10) MutexWork-24 146ns ± 1% 135ns ± 2% -7.70% (p=0.000 n=10+9) MutexWorkSlack-24 135ns ± 1% 128ns ± 2% -5.01% (p=0.000 n=10+9) MutexNoSpin-24 114ns ± 3% 110ns ± 4% -3.84% (p=0.000 n=10+10) MutexSpin-24 482ns ± 4% 475ns ± 8% ~ (p=0.286 n=10+10) RWMutexWrite100-24 43.0ns ± 3% 43.1ns ± 2% ~ (p=0.956 n=10+10) RWMutexWrite10-24 43.4ns ± 1% 43.2ns ± 1% ~ (p=0.085 n=10+9) RWMutexWorkWrite100-24 130ns ± 3% 131ns ± 3% ~ (p=0.747 n=10+10) RWMutexWorkWrite10-24 191ns ± 1% 192ns ± 1% ~ (p=0.210 n=10+10) Cond32-12 11.5µs ± 2% 11.7µs ± 2% +1.98% (p=0.002 n=10+10) MutexUncontended-12 1.48ns ± 0% 1.50ns ± 1% +1.08% (p=0.004 n=10+10) Mutex-12 141ns ± 1% 143ns ± 1% +1.63% (p=0.000 n=10+10) MutexSlack-12 121ns ± 0% 119ns ± 0% -1.65% (p=0.001 n=8+9) MutexWork-12 141ns ± 2% 150ns ± 3% +6.36% (p=0.000 n=9+10) MutexWorkSlack-12 131ns ± 0% 138ns ± 0% +5.73% (p=0.000 n=9+10) MutexNoSpin-12 87.0ns ± 1% 83.7ns ± 1% -3.80% (p=0.000 n=10+10) MutexSpin-12 364ns ± 1% 377ns ± 1% +3.77% (p=0.000 n=10+10) RWMutexWrite100-12 42.8ns ± 1% 43.9ns ± 1% +2.41% (p=0.000 n=8+10) RWMutexWrite10-12 39.8ns ± 4% 39.3ns ± 1% ~ (p=0.433 n=10+9) RWMutexWorkWrite100-12 131ns ± 1% 131ns ± 0% ~ (p=0.591 n=10+9) RWMutexWorkWrite10-12 173ns ± 1% 174ns ± 0% ~ (p=0.059 n=10+8) Cond32-6 10.9µs ± 2% 10.9µs ± 2% ~ (p=0.739 n=10+10) MutexUncontended-6 2.97ns ± 0% 2.97ns ± 0% ~ (all samples are equal) Mutex-6 122ns ± 6% 122ns ± 2% ~ (p=0.668 n=10+10) MutexSlack-6 149ns ± 3% 142ns ± 3% -4.63% (p=0.000 n=10+10) MutexWork-6 136ns ± 3% 140ns ± 5% ~ (p=0.077 n=10+10) MutexWorkSlack-6 152ns ± 0% 138ns ± 2% -9.21% (p=0.000 n=6+10) MutexNoSpin-6 150ns ± 1% 152ns ± 0% +1.50% (p=0.000 n=8+10) MutexSpin-6 726ns ± 0% 730ns ± 1% ~ (p=0.069 n=10+10) RWMutexWrite100-6 40.6ns ± 1% 40.9ns ± 1% +0.91% (p=0.001 n=8+10) RWMutexWrite10-6 37.1ns ± 0% 37.0ns ± 1% ~ (p=0.386 n=9+10) RWMutexWorkWrite100-6 133ns ± 1% 134ns ± 1% +1.01% (p=0.005 n=9+10) RWMutexWorkWrite10-6 152ns ± 0% 152ns ± 0% ~ (all samples are equal) Cond32-2 7.86µs ± 2% 7.95µs ± 2% +1.10% (p=0.023 n=10+10) MutexUncontended-2 8.10ns ± 0% 9.11ns ± 4% +12.44% (p=0.000 n=9+10) Mutex-2 32.9ns ± 9% 38.4ns ± 6% +16.58% (p=0.000 n=10+10) MutexSlack-2 93.4ns ± 1% 98.5ns ± 2% +5.39% (p=0.000 n=10+9) MutexWork-2 40.8ns ± 3% 43.8ns ± 7% +7.38% (p=0.000 n=10+9) MutexWorkSlack-2 98.6ns ± 5% 108.2ns ± 2% +9.80% (p=0.000 n=10+8) MutexNoSpin-2 399ns ± 1% 398ns ± 2% ~ (p=0.463 n=8+9) MutexSpin-2 1.99µs ± 3% 1.97µs ± 1% -0.81% (p=0.003 n=9+8) RWMutexWrite100-2 37.6ns ± 5% 46.0ns ± 4% +22.17% (p=0.000 n=10+8) RWMutexWrite10-2 50.1ns ± 6% 36.8ns ±12% -26.46% (p=0.000 n=9+10) RWMutexWorkWrite100-2 136ns ± 0% 134ns ± 2% -1.80% (p=0.001 n=7+9) RWMutexWorkWrite10-2 140ns ± 1% 138ns ± 1% -1.50% (p=0.000 n=10+10) Cond32 5.93µs ± 1% 5.91µs ± 0% ~ (p=0.411 n=9+10) MutexUncontended 15.9ns ± 0% 15.8ns ± 0% -0.63% (p=0.000 n=8+8) Mutex 15.9ns ± 0% 15.8ns ± 0% -0.44% (p=0.003 n=10+10) MutexSlack 26.9ns ± 3% 26.7ns ± 2% ~ (p=0.084 n=10+10) MutexWork 47.8ns ± 0% 47.9ns ± 0% +0.21% (p=0.014 n=9+8) MutexWorkSlack 54.9ns ± 3% 54.5ns ± 3% ~ (p=0.254 n=10+10) MutexNoSpin 786ns ± 2% 765ns ± 1% -2.66% (p=0.000 n=10+10) MutexSpin 3.87µs ± 1% 3.83µs ± 0% -0.85% (p=0.005 n=9+8) RWMutexWrite100 21.2ns ± 2% 21.0ns ± 1% -0.88% (p=0.018 n=10+9) RWMutexWrite10 22.6ns ± 1% 22.6ns ± 0% ~ (p=0.471 n=9+9) RWMutexWorkWrite100 132ns ± 0% 132ns ± 0% ~ (all samples are equal) RWMutexWorkWrite10 124ns ± 0% 123ns ± 0% ~ (p=0.656 n=10+10) Change-Id: I66412a3a0980df1233ad7a5a0cd9723b4274528b Reviewed-on: https://go-review.googlesource.com/34310 Run-TryBot: Russ Cox TryBot-Result: Gobot Gobot Reviewed-by: Russ Cox --- src/runtime/mgc.go | 6 +- src/runtime/proc.go | 2 +- src/runtime/sema.go | 80 ++++++++++++++---- src/runtime/trace.go | 2 +- src/sync/mutex.go | 149 ++++++++++++++++++++++++++-------- src/sync/mutex_test.go | 35 +++++++- src/sync/runtime.go | 8 +- src/sync/runtime_sema_test.go | 6 +- src/sync/rwmutex.go | 4 +- src/sync/waitgroup.go | 2 +- 10 files changed, 231 insertions(+), 63 deletions(-) diff --git a/src/runtime/mgc.go b/src/runtime/mgc.go index f1112a6ae3..94adef46cb 100644 --- a/src/runtime/mgc.go +++ b/src/runtime/mgc.go @@ -953,7 +953,7 @@ func gcStart(mode gcMode, forceTrigger bool) { // another thread. useStartSema := mode == gcBackgroundMode if useStartSema { - semacquire(&work.startSema, 0) + semacquire(&work.startSema) // Re-check transition condition under transition lock. if !gcShouldStart(forceTrigger) { semrelease(&work.startSema) @@ -977,7 +977,7 @@ func gcStart(mode gcMode, forceTrigger bool) { } // Ok, we're doing it! Stop everybody else - semacquire(&worldsema, 0) + semacquire(&worldsema) if trace.enabled { traceGCStart() @@ -1087,7 +1087,7 @@ func gcStart(mode gcMode, forceTrigger bool) { // by mark termination. func gcMarkDone() { top: - semacquire(&work.markDoneSema, 0) + semacquire(&work.markDoneSema) // Re-check transition condition under transition lock. if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) { diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 6562eaa8a0..89244cfa7d 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -928,7 +928,7 @@ func restartg(gp *g) { // in panic or being exited, this may not reliably stop all // goroutines. func stopTheWorld(reason string) { - semacquire(&worldsema, 0) + semacquire(&worldsema) getg().m.preemptoff = reason systemstack(stopTheWorldWithSema) } diff --git a/src/runtime/sema.go b/src/runtime/sema.go index d8d8710501..860765cd91 100644 --- a/src/runtime/sema.go +++ b/src/runtime/sema.go @@ -53,22 +53,22 @@ var semtable [semTabSize]struct { //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { - semacquire(addr, semaBlockProfile) + semacquire1(addr, false, semaBlockProfile) } //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { - semacquire(addr, semaBlockProfile) + semacquire1(addr, false, semaBlockProfile) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease -func sync_runtime_Semrelease(addr *uint32) { - semrelease(addr) +func sync_runtime_Semrelease(addr *uint32, handoff bool) { + semrelease1(addr, handoff) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex -func sync_runtime_SemacquireMutex(addr *uint32) { - semacquire(addr, semaBlockProfile|semaMutexProfile) +func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) { + semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile) } //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease @@ -91,7 +91,11 @@ const ( ) // Called from runtime. -func semacquire(addr *uint32, profile semaProfileFlags) { +func semacquire(addr *uint32) { + semacquire1(addr, false, 0) +} + +func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") @@ -113,6 +117,7 @@ func semacquire(addr *uint32, profile semaProfileFlags) { t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 + s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 @@ -135,9 +140,9 @@ func semacquire(addr *uint32, profile semaProfileFlags) { } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. - root.queue(addr, s) + root.queue(addr, s, lifo) goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) - if cansemacquire(addr) { + if s.ticket != 0 || cansemacquire(addr) { break } } @@ -148,6 +153,10 @@ func semacquire(addr *uint32, profile semaProfileFlags) { } func semrelease(addr *uint32) { + semrelease1(addr, false) +} + +func semrelease1(addr *uint32, handoff bool) { root := semroot(addr) atomic.Xadd(addr, 1) @@ -173,6 +182,12 @@ func semrelease(addr *uint32) { unlock(&root.lock) if s != nil { // May be slow, so unlock first acquiretime := s.acquiretime + if s.ticket != 0 { + throw("corrupted semaphore ticket") + } + if handoff && cansemacquire(addr) { + s.ticket = 1 + } readyWithTime(s, 5) if acquiretime != 0 { mutexevent(t0-acquiretime, 3) @@ -197,7 +212,7 @@ func cansemacquire(addr *uint32) bool { } // queue adds s to the blocked goroutines in semaRoot. -func (root *semaRoot) queue(addr *uint32, s *sudog) { +func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.g = getg() s.elem = unsafe.Pointer(addr) s.next = nil @@ -207,14 +222,41 @@ func (root *semaRoot) queue(addr *uint32, s *sudog) { pt := &root.treap for t := *pt; t != nil; t = *pt { if t.elem == unsafe.Pointer(addr) { - // Already have addr in list; add s to end of per-addr list. - if t.waittail == nil { - t.waitlink = s + // Already have addr in list. + if lifo { + // Substitute s in t's place in treap. + *pt = s + s.ticket = t.ticket + s.acquiretime = t.acquiretime + s.parent = t.parent + s.prev = t.prev + s.next = t.next + if s.prev != nil { + s.prev.parent = s + } + if s.next != nil { + s.next.parent = s + } + // Add t first in s's wait list. + s.waitlink = t + s.waittail = t.waittail + if s.waittail == nil { + s.waittail = t + } + t.parent = nil + t.prev = nil + t.next = nil + t.waittail = nil } else { - t.waittail.waitlink = s + // Add s to end of t's wait list. + if t.waittail == nil { + t.waitlink = s + } else { + t.waittail.waitlink = s + } + t.waittail = s + s.waitlink = nil } - t.waittail = s - s.waitlink = nil return } last = t @@ -319,6 +361,7 @@ Found: s.elem = nil s.next = nil s.prev = nil + s.ticket = 0 return s, now } @@ -561,3 +604,8 @@ func notifyListCheck(sz uintptr) { throw("bad notifyList size") } } + +//go:linkname sync_nanotime sync.runtime_nanotime +func sync_nanotime() int64 { + return nanotime() +} diff --git a/src/runtime/trace.go b/src/runtime/trace.go index fa5e422b0c..9f319cd570 100644 --- a/src/runtime/trace.go +++ b/src/runtime/trace.go @@ -313,7 +313,7 @@ func StopTrace() { // The world is started but we've set trace.shutdown, so new tracing can't start. // Wait for the trace reader to flush pending buffers and stop. - semacquire(&trace.shutdownSema, 0) + semacquire(&trace.shutdownSema) if raceenabled { raceacquire(unsafe.Pointer(&trace.shutdownSema)) } diff --git a/src/sync/mutex.go b/src/sync/mutex.go index 8c9366f4fe..506b23f6ff 100644 --- a/src/sync/mutex.go +++ b/src/sync/mutex.go @@ -37,7 +37,34 @@ type Locker interface { const ( mutexLocked = 1 << iota // mutex is locked mutexWoken + mutexStarving mutexWaiterShift = iota + + // Mutex fairness. + // + // Mutex can be in 2 modes of operations: normal and starvation. + // In normal mode waiters are queued in FIFO order, but a woken up waiter + // does not own the mutex and competes with new arriving goroutines over + // the ownership. New arriving goroutines have an advantage -- they are + // already running on CPU and there can be lots of them, so a woken up + // waiter has good chances of losing. In such case it is queued at front + // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, + // it switches mutex to the starvation mode. + // + // In starvation mode ownership of the mutex is directly handed off from + // the unlocking goroutine to the waiter at the front of the queue. + // New arriving goroutines don't try to acquire the mutex even if it appears + // to be unlocked, and don't try to spin. Instead they queue themselves at + // the tail of the wait queue. + // + // If a waiter receives ownership of the mutex and sees that either + // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, + // it switches mutex back to normal operation mode. + // + // Normal mode has considerably better performance as a goroutine can acquire + // a mutex several times in a row even if there are blocked waiters. + // Starvation mode is important to prevent pathological cases of tail latency. + starvationThresholdNs = 1e6 ) // Lock locks m. @@ -52,41 +79,86 @@ func (m *Mutex) Lock() { return } + var waitStartTime int64 + starving := false awoke := false iter := 0 + old := m.state for { - old := m.state - new := old | mutexLocked - if old&mutexLocked != 0 { - if runtime_canSpin(iter) { - // Active spinning makes sense. - // Try to set mutexWoken flag to inform Unlock - // to not wake other blocked goroutines. - if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && - atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { - awoke = true - } - runtime_doSpin() - iter++ - continue + // Don't spin in starvation mode, ownership is handed off to waiters + // so we won't be able to acquire the mutex anyway. + if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true } - new = old + 1< starvationThresholdNs + old = m.state + if old&mutexStarving != 0 { + // If this goroutine was woken and mutex is in starvation mode, + // ownership was handed off to us but mutex is in somewhat + // inconsistent state: mutexLocked is not set and we are still + // accounted as waiter. Fix that. + if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { + panic("sync: inconsistent mutex state") + } + delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { + // Exit starvation mode. + // Critical to do it here and consider wait time. + // Starvation mode is so inefficient, that two goroutines + // can go lock-step infinitely once they switch mutex + // to starvation mode. + delta -= mutexStarving + } + atomic.AddInt32(&m.state, delta) break } - runtime_SemacquireMutex(&m.sema) awoke = true iter = 0 + } else { + old = m.state } } @@ -110,22 +182,33 @@ func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { - throw("sync: unlock of unlocked mutex") + panic("sync: unlock of unlocked mutex") } - - old := new - for { - // If there are no waiters or a goroutine has already - // been woken or grabbed the lock, no need to wake anyone. - if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { - return - } - // Grab the right to wake someone. - new = (old - 1<>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<