]> Cypherpunks repositories - gostls13.git/commitdiff
sync: yield to the waiter when unlocking a starving mutex
authorCarlo Alberto Ferraris <cafxx@strayorange.com>
Wed, 2 Oct 2019 10:15:53 +0000 (19:15 +0900)
committerIan Lance Taylor <iant@golang.org>
Thu, 7 Nov 2019 05:59:33 +0000 (05:59 +0000)
When we have already assigned the semaphore ticket to a specific
waiter, we want to get the waiter running as fast as possible since
no other G waiting on the semaphore can acquire it optimistically.

The net effect is that, when a sync.Mutex is contented, the code in
the critical section guarded by the Mutex gets a priority boost.

Fixes #33747

Change-Id: I9967f0f763c25504010651bdd7f944ee0189cd45
Reviewed-on: https://go-review.googlesource.com/c/go/+/200577
Reviewed-by: Rhys Hiltner <rhys@justin.tv>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/runtime/export_test.go
src/runtime/proc.go
src/runtime/sema.go
src/runtime/sema_test.go [new file with mode: 0644]
src/sync/mutex.go

index 3c1b4db7504b3f1fb5fd7d9d05dadd366f030ecd..bd73ac9fee1def737588f0928a5c4f00e5055962 100644 (file)
@@ -730,3 +730,11 @@ func RunGetgThreadSwitchTest() {
                panic("g1 != g3")
        }
 }
+
+var Semacquire = semacquire
+var Semrelease1 = semrelease1
+
+func SemNwait(addr *uint32) uint32 {
+       root := semroot(addr)
+       return atomic.Load(&root.nwait)
+}
index b0ac4c44212ce922cd0716cc69e641602cba6cfa..57c79b9b2afe5a1a39bb9742492badf3df7b31ad 100644 (file)
@@ -2753,7 +2753,22 @@ func preemptPark(gp *g) {
        casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
        dropg()
        casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
+       schedule()
+}
 
+// goyield is like Gosched, but it:
+// - does not emit a GoSched trace event
+// - puts the current G on the runq of the current P instead of the globrunq
+func goyield() {
+       checkTimeouts()
+       mcall(goyield_m)
+}
+
+func goyield_m(gp *g) {
+       pp := gp.m.p.ptr()
+       casgstatus(gp, _Grunning, _Grunnable)
+       dropg()
+       runqput(pp, gp, false)
        schedule()
 }
 
index 530af5baa6b647e68babc82e7cc4806e835a7b63..1b80faa8f7ef5bb1842483c430847ac64a70a8e4 100644 (file)
@@ -180,7 +180,7 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) {
                atomic.Xadd(&root.nwait, -1)
        }
        unlock(&root.lock)
-       if s != nil { // May be slow, so unlock first
+       if s != nil { // May be slow or even yield, so unlock first
                acquiretime := s.acquiretime
                if acquiretime != 0 {
                        mutexevent(t0-acquiretime, 3+skipframes)
@@ -192,6 +192,25 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) {
                        s.ticket = 1
                }
                readyWithTime(s, 5+skipframes)
+               if s.ticket == 1 {
+                       // Direct G handoff
+                       // readyWithTime has added the waiter G as runnext in the
+                       // current P; we now call the scheduler so that we start running
+                       // the waiter G immediately.
+                       // Note that waiter inherits our time slice: this is desirable
+                       // to avoid having a highly contended semaphore hog the P
+                       // indefinitely. goyield is like Gosched, but it does not emit a
+                       // GoSched trace event and, more importantly, puts the current G
+                       // on the local runq instead of the global one.
+                       // We only do this in the starving regime (handoff=true), as in
+                       // the non-starving case it is possible for a different waiter
+                       // to acquire the semaphore while we are yielding/scheduling,
+                       // and this would be wasteful. We wait instead to enter starving
+                       // regime, and then we start to do direct handoffs of ticket and
+                       // P.
+                       // See issue 33747 for discussion.
+                       goyield()
+               }
        }
 }
 
diff --git a/src/runtime/sema_test.go b/src/runtime/sema_test.go
new file mode 100644 (file)
index 0000000..5cd2317
--- /dev/null
@@ -0,0 +1,80 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime_test
+
+import (
+       . "runtime"
+       "sync/atomic"
+       "testing"
+)
+
+// TestSemaHandoff checks that when semrelease+handoff is
+// requested, the G that releases the semaphore yields its
+// P directly to the first waiter in line.
+// See issue 33747 for discussion.
+func TestSemaHandoff(t *testing.T) {
+       const iter = 10000
+       ok := 0
+       for i := 0; i < iter; i++ {
+               if testSemaHandoff() {
+                       ok++
+               }
+       }
+       // As long as two thirds of handoffs are direct, we
+       // consider the test successful. The scheduler is
+       // nondeterministic, so this test checks that we get the
+       // desired outcome in a significant majority of cases.
+       // The actual ratio of direct handoffs is much higher
+       // (>90%) but we use a lower threshold to minimize the
+       // chances that unrelated changes in the runtime will
+       // cause the test to fail or become flaky.
+       if ok < iter*2/3 {
+               t.Fatal("direct handoff < 2/3:", ok, iter)
+       }
+}
+
+func TestSemaHandoff1(t *testing.T) {
+       if GOMAXPROCS(-1) <= 1 {
+               t.Skip("GOMAXPROCS <= 1")
+       }
+       defer GOMAXPROCS(GOMAXPROCS(-1))
+       GOMAXPROCS(1)
+       TestSemaHandoff(t)
+}
+
+func TestSemaHandoff2(t *testing.T) {
+       if GOMAXPROCS(-1) <= 2 {
+               t.Skip("GOMAXPROCS <= 2")
+       }
+       defer GOMAXPROCS(GOMAXPROCS(-1))
+       GOMAXPROCS(2)
+       TestSemaHandoff(t)
+}
+
+func testSemaHandoff() bool {
+       var sema, res uint32
+       done := make(chan struct{})
+
+       go func() {
+               Semacquire(&sema)
+               atomic.CompareAndSwapUint32(&res, 0, 1)
+
+               Semrelease1(&sema, true, 0)
+               close(done)
+       }()
+       for SemNwait(&sema) == 0 {
+               Gosched() // wait for goroutine to block in Semacquire
+       }
+
+       // The crux of the test: we release the semaphore with handoff
+       // and immediately perform a CAS both here and in the waiter; we
+       // want the CAS in the waiter to execute first.
+       Semrelease1(&sema, true, 0)
+       atomic.CompareAndSwapUint32(&res, 0, 2)
+
+       <-done // wait for goroutines to finish to avoid data races
+
+       return res == 1 // did the waiter run first?
+}
index 11ad20c9757346026a48eeeb07c4326239793ceb..3028552f743d1fce5e916244b1e141c83079ee0f 100644 (file)
@@ -216,7 +216,8 @@ func (m *Mutex) unlockSlow(new int32) {
                        old = m.state
                }
        } else {
-               // Starving mode: handoff mutex ownership to the next waiter.
+               // Starving mode: handoff mutex ownership to the next waiter, and yield
+               // our time slice so that the next waiter can start to run immediately.
                // Note: mutexLocked is not set, the waiter will set it after wakeup.
                // But mutex is still considered locked if mutexStarving is set,
                // so new coming goroutines won't acquire it.