From 80832974ac9306f992c797f8394e44d7f63f307e Mon Sep 17 00:00:00 2001 From: Austin Clements Date: Wed, 28 Jun 2017 15:54:46 -0400 Subject: [PATCH] runtime: make rwmutex work on Ms instead of Gs Currently runtime.rwmutex is written to block the calling goroutine rather than the calling thread. However, rwmutex was intended to be used in the scheduler, which means it needs to be a thread-level synchronization primitive. Hence, this modifies rwmutex to synchronize threads instead of goroutines. This has the consequence of making it write-barrier-free, which is also important for using it in the scheduler. The implementation makes three changes: it replaces the "w" semaphore with a mutex, since this was all it was being used for anyway; it replaces "writerSem" with a single pending M that parks on its note; and it replaces "readerSem" with a list of Ms that park on their notes plus a pass count that together emulate a counting semaphore. I model-checked the safety and liveness of this implementation through >1 billion schedules. For #20738. Change-Id: I3cf5a18c266a96a3f38165083812803510217787 Reviewed-on: https://go-review.googlesource.com/47071 Run-TryBot: Austin Clements TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- src/runtime/HACKING.md | 4 +- src/runtime/export_test.go | 4 -- src/runtime/rwmutex.go | 77 ++++++++++++++++++++++++++++--------- src/runtime/rwmutex_test.go | 4 -- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/runtime/HACKING.md b/src/runtime/HACKING.md index 883559c690..0b390c34d9 100644 --- a/src/runtime/HACKING.md +++ b/src/runtime/HACKING.md @@ -105,7 +105,7 @@ The simplest is `mutex`, which is manipulated using `lock` and periods. Blocking on a `mutex` directly blocks the M, without interacting with the Go scheduler. This means it is safe to use from the lowest levels of the runtime, but also prevents any associated G -and P from being rescheduled. +and P from being rescheduled. `rwmutex` is similar. For one-shot notifications, use `note`, which provides `notesleep` and `notewakeup`. Unlike traditional UNIX `sleep`/`wakeup`, `note`s are @@ -130,7 +130,7 @@ In summary, - +
Blocks
InterfaceGMP
mutexYYY
(rw)mutexYYY
noteYYY/N
parkYNN
diff --git a/src/runtime/export_test.go b/src/runtime/export_test.go index d83afcef2d..c929bd4618 100644 --- a/src/runtime/export_test.go +++ b/src/runtime/export_test.go @@ -354,10 +354,6 @@ type RWMutex struct { rw rwmutex } -func (rw *RWMutex) Init() { - rw.rw.init() -} - func (rw *RWMutex) RLock() { rw.rw.rlock() } diff --git a/src/runtime/rwmutex.go b/src/runtime/rwmutex.go index 7b32769915..bca29d15d0 100644 --- a/src/runtime/rwmutex.go +++ b/src/runtime/rwmutex.go @@ -13,28 +13,43 @@ import ( // An rwmutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers or a single writer. // This is a variant of sync.RWMutex, for the runtime package. -// This is less convenient than sync.RWMutex, because it must be -// initialized before use. Sorry. +// Like mutex, rwmutex blocks the calling M. +// It does not interact with the goroutine scheduler. type rwmutex struct { - w uint32 // semaphore for pending writers - writerSem uint32 // semaphore for writers to wait for completing readers - readerSem uint32 // semaphore for readers to wait for completing writers + rLock mutex // protects readers, readerPass, writer + readers muintptr // list of pending readers + readerPass uint32 // number of pending readers to skip readers list + + wLock mutex // serializes writers + writer muintptr // pending writer waiting for completing readers + readerCount uint32 // number of pending readers readerWait uint32 // number of departing readers } const rwmutexMaxReaders = 1 << 30 -// init initializes rw. This must be called before any other methods. -func (rw *rwmutex) init() { - rw.w = 1 -} - // rlock locks rw for reading. func (rw *rwmutex) rlock() { if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 { - // A writer is pending. - semacquire(&rw.readerSem) + // A writer is pending. Park on the reader queue. + systemstack(func() { + lock(&rw.rLock) + if rw.readerPass > 0 { + // Writer finished. + rw.readerPass -= 1 + unlock(&rw.rLock) + } else { + // Queue this reader to be woken by + // the writer. + m := getg().m + m.schedlink = rw.readers + rw.readers.set(m) + unlock(&rw.rLock) + notesleep(&m.park) + noteclear(&m.park) + } + }) } } @@ -47,7 +62,12 @@ func (rw *rwmutex) runlock() { // A writer is pending. if atomic.Xadd(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. - semrelease(&rw.writerSem) + lock(&rw.rLock) + w := rw.writer.ptr() + if w != nil { + notewakeup(&w.park) + } + unlock(&rw.rLock) } } } @@ -55,12 +75,22 @@ func (rw *rwmutex) runlock() { // lock locks rw for writing. func (rw *rwmutex) lock() { // Resolve competition with other writers. - semacquire(&rw.w) + lock(&rw.wLock) + m := getg().m // Announce that there is a pending writer. r := int32(atomic.Xadd(&rw.readerCount, -rwmutexMaxReaders)) + rwmutexMaxReaders // Wait for any active readers to complete. + lock(&rw.rLock) if r != 0 && atomic.Xadd(&rw.readerWait, r) != 0 { - semacquire(&rw.writerSem) + // Wait for reader to wake us up. + systemstack(func() { + rw.writer.set(m) + unlock(&rw.rLock) + notesleep(&m.park) + noteclear(&m.park) + }) + } else { + unlock(&rw.rLock) } } @@ -71,10 +101,19 @@ func (rw *rwmutex) unlock() { if r >= rwmutexMaxReaders { throw("unlock of unlocked rwmutex") } - // Unblock blocked readers, if any. - for i := int32(0); i < r; i++ { - semrelease(&rw.readerSem) + // Unblock blocked readers. + lock(&rw.rLock) + for rw.readers.ptr() != nil { + reader := rw.readers.ptr() + rw.readers = reader.schedlink + reader.schedlink.set(nil) + notewakeup(&reader.park) + r -= 1 } + // If r > 0, there are pending readers that aren't on the + // queue. Tell them to skip waiting. + rw.readerPass += uint32(r) + unlock(&rw.rLock) // Allow other writers to proceed. - semrelease(&rw.w) + unlock(&rw.wLock) } diff --git a/src/runtime/rwmutex_test.go b/src/runtime/rwmutex_test.go index f21a531256..b78a8e7987 100644 --- a/src/runtime/rwmutex_test.go +++ b/src/runtime/rwmutex_test.go @@ -27,7 +27,6 @@ func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) { func doTestParallelReaders(numReaders, gomaxprocs int) { GOMAXPROCS(gomaxprocs) var m RWMutex - m.Init() clocked := make(chan bool) cunlock := make(chan bool) cdone := make(chan bool) @@ -89,7 +88,6 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) { // Number of active readers + 10000 * number of active writers. var activity int32 var rwm RWMutex - rwm.Init() cdone := make(chan bool) go writer(&rwm, num_iterations, &activity, cdone) var i int @@ -131,7 +129,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) { } b.RunParallel(func(pb *testing.PB) { var rwm PaddedRWMutex - rwm.RWMutex.Init() for pb.Next() { rwm.RLock() rwm.RLock() @@ -145,7 +142,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) { func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { var rwm RWMutex - rwm.Init() b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { -- 2.50.0