// rlock locks rw for reading.
func (rw *rwmutex) rlock() {
+ // The reader must not be allowed to lose its P or else other
+ // things blocking on the lock may consume all of the Ps and
+ // deadlock (issue #20903). Alternatively, we could drop the P
+ // while sleeping.
+ acquirem()
if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 {
// A writer is pending. Park on the reader queue.
systemstack(func() {
unlock(&rw.rLock)
}
}
+ releasem(getg().m)
}
// lock locks rw for writing.
func (rw *rwmutex) lock() {
- // Resolve competition with other writers.
+ // Resolve competition with other writers and stick to our P.
lock(&rw.wLock)
m := getg().m
// Announce that there is a pending writer.
"testing"
)
-func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) {
+func parallelReader(m *RWMutex, clocked chan bool, cunlock *uint32, cdone chan bool) {
m.RLock()
clocked <- true
- <-cunlock
+ for atomic.LoadUint32(cunlock) == 0 {
+ }
m.RUnlock()
cdone <- true
}
-func doTestParallelReaders(numReaders, gomaxprocs int) {
- GOMAXPROCS(gomaxprocs)
+func doTestParallelReaders(numReaders int) {
+ GOMAXPROCS(numReaders + 1)
var m RWMutex
- clocked := make(chan bool)
- cunlock := make(chan bool)
+ clocked := make(chan bool, numReaders)
+ var cunlock uint32
cdone := make(chan bool)
for i := 0; i < numReaders; i++ {
- go parallelReader(&m, clocked, cunlock, cdone)
+ go parallelReader(&m, clocked, &cunlock, cdone)
}
// Wait for all parallel RLock()s to succeed.
for i := 0; i < numReaders; i++ {
<-clocked
}
- for i := 0; i < numReaders; i++ {
- cunlock <- true
- }
+ atomic.StoreUint32(&cunlock, 1)
// Wait for the goroutines to finish.
for i := 0; i < numReaders; i++ {
<-cdone
func TestParallelRWMutexReaders(t *testing.T) {
defer GOMAXPROCS(GOMAXPROCS(-1))
- doTestParallelReaders(1, 4)
- doTestParallelReaders(3, 4)
- doTestParallelReaders(4, 2)
+ doTestParallelReaders(1)
+ doTestParallelReaders(3)
+ doTestParallelReaders(4)
}
func reader(rwm *RWMutex, num_iterations int, activity *int32, cdone chan bool) {