]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: make rwmutex work on Ms instead of Gs
authorAustin Clements <austin@google.com>
Wed, 28 Jun 2017 19:54:46 +0000 (15:54 -0400)
committerAustin Clements <austin@google.com>
Wed, 28 Jun 2017 22:08:57 +0000 (22:08 +0000)
Currently runtime.rwmutex is written to block the calling goroutine
rather than the calling thread. However, rwmutex was intended to be
used in the scheduler, which means it needs to be a thread-level
synchronization primitive.

Hence, this modifies rwmutex to synchronize threads instead of
goroutines. This has the consequence of making it write-barrier-free,
which is also important for using it in the scheduler.

The implementation makes three changes: it replaces the "w" semaphore
with a mutex, since this was all it was being used for anyway; it
replaces "writerSem" with a single pending M that parks on its note;
and it replaces "readerSem" with a list of Ms that park on their notes
plus a pass count that together emulate a counting semaphore. I
model-checked the safety and liveness of this implementation through
>1 billion schedules.

For #20738.

Change-Id: I3cf5a18c266a96a3f38165083812803510217787
Reviewed-on: https://go-review.googlesource.com/47071
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
src/runtime/HACKING.md
src/runtime/export_test.go
src/runtime/rwmutex.go
src/runtime/rwmutex_test.go

index 883559c690d5647b2f5431bb7f2bd064ffa4b4aa..0b390c34d96df79773ec021e4b3813563bdb829f 100644 (file)
@@ -105,7 +105,7 @@ The simplest is `mutex`, which is manipulated using `lock` and
 periods. Blocking on a `mutex` directly blocks the M, without
 interacting with the Go scheduler. This means it is safe to use from
 the lowest levels of the runtime, but also prevents any associated G
-and P from being rescheduled.
+and P from being rescheduled. `rwmutex` is similar.
 
 For one-shot notifications, use `note`, which provides `notesleep` and
 `notewakeup`. Unlike traditional UNIX `sleep`/`wakeup`, `note`s are
@@ -130,7 +130,7 @@ In summary,
 <table>
 <tr><th></th><th colspan="3">Blocks</th></tr>
 <tr><th>Interface</th><th>G</th><th>M</th><th>P</th></tr>
-<tr><td>mutex</td><td>Y</td><td>Y</td><td>Y</td></tr>
+<tr><td>(rw)mutex</td><td>Y</td><td>Y</td><td>Y</td></tr>
 <tr><td>note</td><td>Y</td><td>Y</td><td>Y/N</td></tr>
 <tr><td>park</td><td>Y</td><td>N</td><td>N</td></tr>
 </table>
index d83afcef2d54d2f14da793ef8c78f2f5d418a440..c929bd46187a89687f761638bc858832242a83b8 100644 (file)
@@ -354,10 +354,6 @@ type RWMutex struct {
        rw rwmutex
 }
 
-func (rw *RWMutex) Init() {
-       rw.rw.init()
-}
-
 func (rw *RWMutex) RLock() {
        rw.rw.rlock()
 }
index 7b32769915249914659ed9bf9cf538e6816eba23..bca29d15d04223db58ea25519ecd68c28e863880 100644 (file)
@@ -13,28 +13,43 @@ import (
 // An rwmutex is a reader/writer mutual exclusion lock.
 // The lock can be held by an arbitrary number of readers or a single writer.
 // This is a variant of sync.RWMutex, for the runtime package.
-// This is less convenient than sync.RWMutex, because it must be
-// initialized before use. Sorry.
+// Like mutex, rwmutex blocks the calling M.
+// It does not interact with the goroutine scheduler.
 type rwmutex struct {
-       w           uint32 // semaphore for pending writers
-       writerSem   uint32 // semaphore for writers to wait for completing readers
-       readerSem   uint32 // semaphore for readers to wait for completing writers
+       rLock      mutex    // protects readers, readerPass, writer
+       readers    muintptr // list of pending readers
+       readerPass uint32   // number of pending readers to skip readers list
+
+       wLock  mutex    // serializes writers
+       writer muintptr // pending writer waiting for completing readers
+
        readerCount uint32 // number of pending readers
        readerWait  uint32 // number of departing readers
 }
 
 const rwmutexMaxReaders = 1 << 30
 
-// init initializes rw. This must be called before any other methods.
-func (rw *rwmutex) init() {
-       rw.w = 1
-}
-
 // rlock locks rw for reading.
 func (rw *rwmutex) rlock() {
        if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 {
-               // A writer is pending.
-               semacquire(&rw.readerSem)
+               // A writer is pending. Park on the reader queue.
+               systemstack(func() {
+                       lock(&rw.rLock)
+                       if rw.readerPass > 0 {
+                               // Writer finished.
+                               rw.readerPass -= 1
+                               unlock(&rw.rLock)
+                       } else {
+                               // Queue this reader to be woken by
+                               // the writer.
+                               m := getg().m
+                               m.schedlink = rw.readers
+                               rw.readers.set(m)
+                               unlock(&rw.rLock)
+                               notesleep(&m.park)
+                               noteclear(&m.park)
+                       }
+               })
        }
 }
 
@@ -47,7 +62,12 @@ func (rw *rwmutex) runlock() {
                // A writer is pending.
                if atomic.Xadd(&rw.readerWait, -1) == 0 {
                        // The last reader unblocks the writer.
-                       semrelease(&rw.writerSem)
+                       lock(&rw.rLock)
+                       w := rw.writer.ptr()
+                       if w != nil {
+                               notewakeup(&w.park)
+                       }
+                       unlock(&rw.rLock)
                }
        }
 }
@@ -55,12 +75,22 @@ func (rw *rwmutex) runlock() {
 // lock locks rw for writing.
 func (rw *rwmutex) lock() {
        // Resolve competition with other writers.
-       semacquire(&rw.w)
+       lock(&rw.wLock)
+       m := getg().m
        // Announce that there is a pending writer.
        r := int32(atomic.Xadd(&rw.readerCount, -rwmutexMaxReaders)) + rwmutexMaxReaders
        // Wait for any active readers to complete.
+       lock(&rw.rLock)
        if r != 0 && atomic.Xadd(&rw.readerWait, r) != 0 {
-               semacquire(&rw.writerSem)
+               // Wait for reader to wake us up.
+               systemstack(func() {
+                       rw.writer.set(m)
+                       unlock(&rw.rLock)
+                       notesleep(&m.park)
+                       noteclear(&m.park)
+               })
+       } else {
+               unlock(&rw.rLock)
        }
 }
 
@@ -71,10 +101,19 @@ func (rw *rwmutex) unlock() {
        if r >= rwmutexMaxReaders {
                throw("unlock of unlocked rwmutex")
        }
-       // Unblock blocked readers, if any.
-       for i := int32(0); i < r; i++ {
-               semrelease(&rw.readerSem)
+       // Unblock blocked readers.
+       lock(&rw.rLock)
+       for rw.readers.ptr() != nil {
+               reader := rw.readers.ptr()
+               rw.readers = reader.schedlink
+               reader.schedlink.set(nil)
+               notewakeup(&reader.park)
+               r -= 1
        }
+       // If r > 0, there are pending readers that aren't on the
+       // queue. Tell them to skip waiting.
+       rw.readerPass += uint32(r)
+       unlock(&rw.rLock)
        // Allow other writers to proceed.
-       semrelease(&rw.w)
+       unlock(&rw.wLock)
 }
index f21a5312561726b36651a7ce56b8e5c31014284f..b78a8e79870c4bad54c09b08e465174dfd5933cf 100644 (file)
@@ -27,7 +27,6 @@ func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) {
 func doTestParallelReaders(numReaders, gomaxprocs int) {
        GOMAXPROCS(gomaxprocs)
        var m RWMutex
-       m.Init()
        clocked := make(chan bool)
        cunlock := make(chan bool)
        cdone := make(chan bool)
@@ -89,7 +88,6 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) {
        // Number of active readers + 10000 * number of active writers.
        var activity int32
        var rwm RWMutex
-       rwm.Init()
        cdone := make(chan bool)
        go writer(&rwm, num_iterations, &activity, cdone)
        var i int
@@ -131,7 +129,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) {
        }
        b.RunParallel(func(pb *testing.PB) {
                var rwm PaddedRWMutex
-               rwm.RWMutex.Init()
                for pb.Next() {
                        rwm.RLock()
                        rwm.RLock()
@@ -145,7 +142,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) {
 
 func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
        var rwm RWMutex
-       rwm.Init()
        b.RunParallel(func(pb *testing.PB) {
                foo := 0
                for pb.Next() {