// which must be held when changing the condition and
// when calling the Wait method.
type Cond struct {
- L Locker // held while observing or changing the condition
- m Mutex // held to avoid internal races
- waiters int // number of goroutines blocked on Wait
- sema *uint32
+ L Locker // held while observing or changing the condition
+ m Mutex // held to avoid internal races
+
+ // We must be careful to make sure that when Signal
+ // releases a semaphore, the corresponding acquire is
+ // executed by a goroutine that was already waiting at
+ // the time of the call to Signal, not one that arrived later.
+ // To ensure this, we segment waiting goroutines into
+ // generations punctuated by calls to Signal. Each call to
+ // Signal begins another generation if there are no goroutines
+ // left in older generations for it to wake. Because of this
+ // optimization (only begin another generation if there
+ // are no older goroutines left), we only need to keep track
+ // of the two most recent generations, which we call old
+ // and new.
+ oldWaiters int // number of waiters in old generation...
+ oldSema *uint32 // ... waiting on this semaphore
+
+ newWaiters int // number of waiters in new generation...
+ newSema *uint32 // ... waiting on this semaphore
}
// NewCond returns a new Cond with Locker l.
//
func (c *Cond) Wait() {
c.m.Lock()
- if c.sema == nil {
- c.sema = new(uint32)
+ if c.newSema == nil {
+ c.newSema = new(uint32)
}
- s := c.sema
- c.waiters++
+ s := c.newSema
+ c.newWaiters++
c.m.Unlock()
c.L.Unlock()
runtime.Semacquire(s)
// during the call.
func (c *Cond) Signal() {
c.m.Lock()
- if c.waiters > 0 {
- c.waiters--
- runtime.Semrelease(c.sema)
+ if c.oldWaiters == 0 && c.newWaiters > 0 {
+ // Retire old generation; rename new to old.
+ c.oldWaiters = c.newWaiters
+ c.oldSema = c.newSema
+ c.newWaiters = 0
+ c.newSema = nil
+ }
+ if c.oldWaiters > 0 {
+ c.oldWaiters--
+ runtime.Semrelease(c.oldSema)
}
c.m.Unlock()
}
// during the call.
func (c *Cond) Broadcast() {
c.m.Lock()
- if c.waiters > 0 {
- s := c.sema
- n := c.waiters
- for i := 0; i < n; i++ {
- runtime.Semrelease(s)
+ // Wake both generations.
+ if c.oldWaiters > 0 {
+ for i := 0; i < c.oldWaiters; i++ {
+ runtime.Semrelease(c.oldSema)
+ }
+ c.oldWaiters = 0
+ }
+ if c.newWaiters > 0 {
+ for i := 0; i < c.newWaiters; i++ {
+ runtime.Semrelease(c.newSema)
}
- // We just issued n wakeups via the semaphore s.
- // To ensure that they wake up the existing waiters
- // and not waiters that arrive after Broadcast returns,
- // clear c.sema. The next operation will allocate
- // a new one.
- c.sema = nil
- c.waiters = 0
+ c.newWaiters = 0
+ c.newSema = nil
}
c.m.Unlock()
}
c.Signal()
}
+func TestCondSignalGenerations(t *testing.T) {
+ var m Mutex
+ c := NewCond(&m)
+ n := 100
+ running := make(chan bool, n)
+ awake := make(chan int, n)
+ for i := 0; i < n; i++ {
+ go func(i int) {
+ m.Lock()
+ running <- true
+ c.Wait()
+ awake <- i
+ m.Unlock()
+ }(i)
+ if i > 0 {
+ a := <-awake
+ if a != i-1 {
+ t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a)
+ }
+ }
+ <-running
+ m.Lock()
+ c.Signal()
+ m.Unlock()
+ }
+}
+
func TestCondBroadcast(t *testing.T) {
var m Mutex
c := NewCond(&m)