periods. Blocking on a `mutex` directly blocks the M, without
interacting with the Go scheduler. This means it is safe to use from
the lowest levels of the runtime, but also prevents any associated G
-and P from being rescheduled.
+and P from being rescheduled. `rwmutex` is similar.
For one-shot notifications, use `note`, which provides `notesleep` and
`notewakeup`. Unlike traditional UNIX `sleep`/`wakeup`, `note`s are
<table>
<tr><th></th><th colspan="3">Blocks</th></tr>
<tr><th>Interface</th><th>G</th><th>M</th><th>P</th></tr>
-<tr><td>mutex</td><td>Y</td><td>Y</td><td>Y</td></tr>
+<tr><td>(rw)mutex</td><td>Y</td><td>Y</td><td>Y</td></tr>
<tr><td>note</td><td>Y</td><td>Y</td><td>Y/N</td></tr>
<tr><td>park</td><td>Y</td><td>N</td><td>N</td></tr>
</table>
// An rwmutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// This is a variant of sync.RWMutex, for the runtime package.
-// This is less convenient than sync.RWMutex, because it must be
-// initialized before use. Sorry.
+// Like mutex, rwmutex blocks the calling M.
+// It does not interact with the goroutine scheduler.
type rwmutex struct {
- w uint32 // semaphore for pending writers
- writerSem uint32 // semaphore for writers to wait for completing readers
- readerSem uint32 // semaphore for readers to wait for completing writers
+ rLock mutex // protects readers, readerPass, writer
+ readers muintptr // list of pending readers
+ readerPass uint32 // number of pending readers to skip readers list
+
+ wLock mutex // serializes writers
+ writer muintptr // pending writer waiting for completing readers
+
readerCount uint32 // number of pending readers
readerWait uint32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30
-// init initializes rw. This must be called before any other methods.
-func (rw *rwmutex) init() {
- rw.w = 1
-}
-
// rlock locks rw for reading.
func (rw *rwmutex) rlock() {
if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 {
- // A writer is pending.
- semacquire(&rw.readerSem)
+ // A writer is pending. Park on the reader queue.
+ systemstack(func() {
+ lock(&rw.rLock)
+ if rw.readerPass > 0 {
+ // Writer finished.
+ rw.readerPass -= 1
+ unlock(&rw.rLock)
+ } else {
+ // Queue this reader to be woken by
+ // the writer.
+ m := getg().m
+ m.schedlink = rw.readers
+ rw.readers.set(m)
+ unlock(&rw.rLock)
+ notesleep(&m.park)
+ noteclear(&m.park)
+ }
+ })
}
}
// A writer is pending.
if atomic.Xadd(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
- semrelease(&rw.writerSem)
+ lock(&rw.rLock)
+ w := rw.writer.ptr()
+ if w != nil {
+ notewakeup(&w.park)
+ }
+ unlock(&rw.rLock)
}
}
}
// lock locks rw for writing.
func (rw *rwmutex) lock() {
// Resolve competition with other writers.
- semacquire(&rw.w)
+ lock(&rw.wLock)
+ m := getg().m
// Announce that there is a pending writer.
r := int32(atomic.Xadd(&rw.readerCount, -rwmutexMaxReaders)) + rwmutexMaxReaders
// Wait for any active readers to complete.
+ lock(&rw.rLock)
if r != 0 && atomic.Xadd(&rw.readerWait, r) != 0 {
- semacquire(&rw.writerSem)
+ // Wait for reader to wake us up.
+ systemstack(func() {
+ rw.writer.set(m)
+ unlock(&rw.rLock)
+ notesleep(&m.park)
+ noteclear(&m.park)
+ })
+ } else {
+ unlock(&rw.rLock)
}
}
if r >= rwmutexMaxReaders {
throw("unlock of unlocked rwmutex")
}
- // Unblock blocked readers, if any.
- for i := int32(0); i < r; i++ {
- semrelease(&rw.readerSem)
+ // Unblock blocked readers.
+ lock(&rw.rLock)
+ for rw.readers.ptr() != nil {
+ reader := rw.readers.ptr()
+ rw.readers = reader.schedlink
+ reader.schedlink.set(nil)
+ notewakeup(&reader.park)
+ r -= 1
}
+ // If r > 0, there are pending readers that aren't on the
+ // queue. Tell them to skip waiting.
+ rw.readerPass += uint32(r)
+ unlock(&rw.rLock)
// Allow other writers to proceed.
- semrelease(&rw.w)
+ unlock(&rw.wLock)
}