]> Cypherpunks repositories - gostls13.git/commitdiff
sync: new Cond implementation
authorWedson Almeida Filho <wedsonaf@google.com>
Sun, 24 Jan 2016 18:23:48 +0000 (19:23 +0100)
committerAustin Clements <austin@google.com>
Tue, 15 Mar 2016 22:01:20 +0000 (22:01 +0000)
Change Cond implementation to use a notification list such that waiters
can first register for a notification, release the lock, then actually
wait. Signalers never have to park anymore.

This is intended to address an issue in the previous implementation
where Broadcast could fail to signal all waiters.

Results of the existing benchmark are below.

                                          Original          New  Diff
BenchmarkCond1-48        2000000               745 ns/op    755 +1.3%
BenchmarkCond2-48        1000000              1545 ns/op   1532 -0.8%
BenchmarkCond4-48         300000              3833 ns/op   3896 +1.6%
BenchmarkCond8-48         200000             10049 ns/op  10257 +2.1%
BenchmarkCond16-48        100000             21123 ns/op  21236 +0.5%
BenchmarkCond32-48         30000             40393 ns/op  41097 +1.7%

Fixes #14064

Change-Id: I083466d61593a791a034df61f5305adfb8f1c7f9
Reviewed-on: https://go-review.googlesource.com/18892
Reviewed-by: Dmitry Vyukov <dvyukov@google.com>
Reviewed-by: Austin Clements <austin@google.com>
Run-TryBot: Caleb Spare <cespare@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>

src/cmd/compile/internal/gc/select.go
src/runtime/runtime2.go
src/runtime/sema.go
src/sync/cond.go
src/sync/cond_test.go
src/sync/runtime.go

index 4f637883befe8a5e30f591b01817109a45644609..3ee21b70e8f3145ad7b83b852231845588d3c55c 100644 (file)
@@ -329,7 +329,7 @@ func selecttype(size int32) *Type {
        sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("prev")), typenod(Ptrto(Types[TUINT8]))))
        sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("elem")), typenod(Ptrto(Types[TUINT8]))))
        sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("releasetime")), typenod(Types[TUINT64])))
-       sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("nrelease")), typenod(Types[TINT32])))
+       sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("ticket")), typenod(Types[TUINT32])))
        sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("waitlink")), typenod(Ptrto(Types[TUINT8]))))
        typecheck(&sudog, Etype)
        sudog.Type.Noalg = true
index aa87f312500a0fb7ab3671212f7541df5c2f2164..3ac8f196c566405f2553fc08b00e82b21d509c0f 100644 (file)
@@ -211,7 +211,8 @@ type gobuf struct {
 }
 
 // Known to compiler.
-// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
+// Changes here must also be made in src/cmd/compile/internal/gc/select.go's
+// selecttype.
 type sudog struct {
        g           *g
        selectdone  *uint32
@@ -219,7 +220,7 @@ type sudog struct {
        prev        *sudog
        elem        unsafe.Pointer // data element
        releasetime int64
-       nrelease    int32  // -1 for acquire
+       ticket      uint32
        waitlink    *sudog // g.waiting list
 }
 
index a56758e5bb2d510657e4c6ad07354e3bd724b5c1..45fbbcaa4f1c8f5d2e681ac76cb63b92e43bd1b4 100644 (file)
@@ -62,6 +62,13 @@ func net_runtime_Semrelease(addr *uint32) {
        semrelease(addr)
 }
 
+func readyWithTime(s *sudog, traceskip int) {
+       if s.releasetime != 0 {
+               s.releasetime = cputicks()
+       }
+       goready(s.g, traceskip)
+}
+
 // Called from runtime.
 func semacquire(addr *uint32, profile bool) {
        gp := getg()
@@ -141,10 +148,7 @@ func semrelease(addr *uint32) {
        }
        unlock(&root.lock)
        if s != nil {
-               if s.releasetime != 0 {
-                       s.releasetime = cputicks()
-               }
-               goready(s.g, 4)
+               readyWithTime(s, 5)
        }
 }
 
@@ -193,101 +197,158 @@ func (root *semaRoot) dequeue(s *sudog) {
        s.prev = nil
 }
 
-// Synchronous semaphore for sync.Cond.
-type syncSema struct {
+// notifyList is a ticket-based notification list used to implement sync.Cond.
+//
+// It must be kept in sync with the sync package.
+type notifyList struct {
+       // wait is the ticket number of the next waiter. It is atomically
+       // incremented outside the lock.
+       wait uint32
+
+       // notify is the ticket number of the next waiter to be notified. It can
+       // be read outside the lock, but is only written to with lock held.
+       //
+       // Both wait & notify can wrap around, and such cases will be correctly
+       // handled as long as their "unwrapped" difference is bounded by 2^31.
+       // For this not to be the case, we'd need to have 2^31+ goroutines
+       // blocked on the same condvar, which is currently not possible.
+       notify uint32
+
+       // List of parked waiters.
        lock mutex
        head *sudog
        tail *sudog
 }
 
-// syncsemacquire waits for a pairing syncsemrelease on the same semaphore s.
-//go:linkname syncsemacquire sync.runtime_Syncsemacquire
-func syncsemacquire(s *syncSema) {
-       lock(&s.lock)
-       if s.head != nil && s.head.nrelease > 0 {
-               // Have pending release, consume it.
-               var wake *sudog
-               s.head.nrelease--
-               if s.head.nrelease == 0 {
-                       wake = s.head
-                       s.head = wake.next
-                       if s.head == nil {
-                               s.tail = nil
-                       }
-               }
-               unlock(&s.lock)
-               if wake != nil {
-                       wake.next = nil
-                       goready(wake.g, 4)
-               }
+// less checks if a < b, considering a & b running counts that may overflow the
+// 32-bit range, and that their "unwrapped" difference is always less than 2^31.
+func less(a, b uint32) bool {
+       return int32(a-b) < 0
+}
+
+// notifyListAdd adds the caller to a notify list such that it can receive
+// notifications. The caller must eventually call notifyListWait to wait for
+// such a notification, passing the returned ticket number.
+//go:linkname notifyListAdd sync.runtime_notifyListAdd
+func notifyListAdd(l *notifyList) uint32 {
+       // This may be called concurrently, for example, when called from
+       // sync.Cond.Wait while holding a RWMutex in read mode.
+       return atomic.Xadd(&l.wait, 1) - 1
+}
+
+// notifyListWait waits for a notification. If one has been sent since
+// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
+//go:linkname notifyListWait sync.runtime_notifyListWait
+func notifyListWait(l *notifyList, t uint32) {
+       lock(&l.lock)
+
+       // Return right away if this ticket has already been notified.
+       if less(t, l.notify) {
+               unlock(&l.lock)
+               return
+       }
+
+       // Enqueue itself.
+       s := acquireSudog()
+       s.g = getg()
+       s.ticket = t
+       s.releasetime = 0
+       t0 := int64(0)
+       if blockprofilerate > 0 {
+               t0 = cputicks()
+               s.releasetime = -1
+       }
+       if l.tail == nil {
+               l.head = s
        } else {
-               // Enqueue itself.
-               w := acquireSudog()
-               w.g = getg()
-               w.nrelease = -1
-               w.next = nil
-               w.releasetime = 0
-               t0 := int64(0)
-               if blockprofilerate > 0 {
-                       t0 = cputicks()
-                       w.releasetime = -1
-               }
-               if s.tail == nil {
-                       s.head = w
-               } else {
-                       s.tail.next = w
-               }
-               s.tail = w
-               goparkunlock(&s.lock, "semacquire", traceEvGoBlockCond, 3)
-               if t0 != 0 {
-                       blockevent(w.releasetime-t0, 2)
-               }
-               releaseSudog(w)
+               l.tail.next = s
        }
+       l.tail = s
+       goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
+       if t0 != 0 {
+               blockevent(s.releasetime-t0, 2)
+       }
+       releaseSudog(s)
 }
 
-// syncsemrelease waits for n pairing syncsemacquire on the same semaphore s.
-//go:linkname syncsemrelease sync.runtime_Syncsemrelease
-func syncsemrelease(s *syncSema, n uint32) {
-       lock(&s.lock)
-       for n > 0 && s.head != nil && s.head.nrelease < 0 {
-               // Have pending acquire, satisfy it.
-               wake := s.head
-               s.head = wake.next
-               if s.head == nil {
-                       s.tail = nil
-               }
-               if wake.releasetime != 0 {
-                       wake.releasetime = cputicks()
-               }
-               wake.next = nil
-               goready(wake.g, 4)
-               n--
+// notifyListNotifyAll notifies all entries in the list.
+//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
+func notifyListNotifyAll(l *notifyList) {
+       // Fast-path: if there are no new waiters since the last notification
+       // we don't need to acquire the lock.
+       if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
+               return
+       }
+
+       // Pull the list out into a local variable, waiters will be readied
+       // outside the lock.
+       lock(&l.lock)
+       s := l.head
+       l.head = nil
+       l.tail = nil
+
+       // Update the next ticket to be notified. We can set it to the current
+       // value of wait because any previous waiters are already in the list
+       // or will notice that they have already been notified when trying to
+       // add themselves to the list.
+       atomic.Store(&l.notify, atomic.Load(&l.wait))
+       unlock(&l.lock)
+
+       // Go through the local list and ready all waiters.
+       for s != nil {
+               next := s.next
+               s.next = nil
+               readyWithTime(s, 4)
+               s = next
        }
-       if n > 0 {
-               // enqueue itself
-               w := acquireSudog()
-               w.g = getg()
-               w.nrelease = int32(n)
-               w.next = nil
-               w.releasetime = 0
-               if s.tail == nil {
-                       s.head = w
-               } else {
-                       s.tail.next = w
+}
+
+// notifyListNotifyOne notifies one entry in the list.
+//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
+func notifyListNotifyOne(l *notifyList) {
+       // Fast-path: if there are no new waiters since the last notification
+       // we don't need to acquire the lock at all.
+       if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
+               return
+       }
+
+       lock(&l.lock)
+
+       // Re-check under the lock if we need to do anything.
+       t := l.notify
+       if t == atomic.Load(&l.wait) {
+               unlock(&l.lock)
+               return
+       }
+
+       // Update the next notify ticket number, and try to find the G that
+       // needs to be notified. If it hasn't made it to the list yet we won't
+       // find it, but it won't park itself once it sees the new notify number.
+       atomic.Store(&l.notify, t+1)
+       for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
+               if s.ticket == t {
+                       n := s.next
+                       if p != nil {
+                               p.next = n
+                       } else {
+                               l.head = n
+                       }
+                       if n == nil {
+                               l.tail = p
+                       }
+                       unlock(&l.lock)
+                       s.next = nil
+                       readyWithTime(s, 4)
+                       return
                }
-               s.tail = w
-               goparkunlock(&s.lock, "semarelease", traceEvGoBlockCond, 3)
-               releaseSudog(w)
-       } else {
-               unlock(&s.lock)
        }
+       unlock(&l.lock)
 }
 
-//go:linkname syncsemcheck sync.runtime_Syncsemcheck
-func syncsemcheck(sz uintptr) {
-       if sz != unsafe.Sizeof(syncSema{}) {
-               print("runtime: bad syncSema size - sync=", sz, " runtime=", unsafe.Sizeof(syncSema{}), "\n")
-               throw("bad syncSema size")
+//go:linkname notifyListCheck sync.runtime_notifyListCheck
+func notifyListCheck(sz uintptr) {
+       if sz != unsafe.Sizeof(notifyList{}) {
+               print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
+               throw("bad notifyList size")
        }
 }
index 273884767f7f79ac88f2c43a8143aebf567324fb..f711c39da2d856c580dbb4494c3f031dc2440382 100644 (file)
@@ -5,7 +5,6 @@
 package sync
 
 import (
-       "internal/race"
        "sync/atomic"
        "unsafe"
 )
@@ -24,8 +23,7 @@ type Cond struct {
        // L is held while observing or changing the condition
        L Locker
 
-       sema    syncSema
-       waiters uint32 // number of waiters
+       notify  notifyList
        checker copyChecker
 }
 
@@ -52,15 +50,9 @@ func NewCond(l Locker) *Cond {
 //
 func (c *Cond) Wait() {
        c.checker.check()
-       if race.Enabled {
-               race.Disable()
-       }
-       atomic.AddUint32(&c.waiters, 1)
-       if race.Enabled {
-               race.Enable()
-       }
+       t := runtime_notifyListAdd(&c.notify)
        c.L.Unlock()
-       runtime_Syncsemacquire(&c.sema)
+       runtime_notifyListWait(&c.notify, t)
        c.L.Lock()
 }
 
@@ -69,7 +61,8 @@ func (c *Cond) Wait() {
 // It is allowed but not required for the caller to hold c.L
 // during the call.
 func (c *Cond) Signal() {
-       c.signalImpl(false)
+       c.checker.check()
+       runtime_notifyListNotifyOne(&c.notify)
 }
 
 // Broadcast wakes all goroutines waiting on c.
@@ -77,34 +70,8 @@ func (c *Cond) Signal() {
 // It is allowed but not required for the caller to hold c.L
 // during the call.
 func (c *Cond) Broadcast() {
-       c.signalImpl(true)
-}
-
-func (c *Cond) signalImpl(all bool) {
        c.checker.check()
-       if race.Enabled {
-               race.Disable()
-       }
-       for {
-               old := atomic.LoadUint32(&c.waiters)
-               if old == 0 {
-                       if race.Enabled {
-                               race.Enable()
-                       }
-                       return
-               }
-               new := old - 1
-               if all {
-                       new = 0
-               }
-               if atomic.CompareAndSwapUint32(&c.waiters, old, new) {
-                       if race.Enabled {
-                               race.Enable()
-                       }
-                       runtime_Syncsemrelease(&c.sema, old-new)
-                       return
-               }
-       }
+       runtime_notifyListNotifyAll(&c.notify)
 }
 
 // copyChecker holds back pointer to itself to detect object copying.
index 467c80621de2c57a76f129cf151b2d9037d35f1a..7b0729571ce1f05bc028968d32dce825ddb1acdd 100644 (file)
@@ -8,6 +8,7 @@ import (
 
        "runtime"
        "testing"
+       "time"
 )
 
 func TestCondSignal(t *testing.T) {
@@ -183,6 +184,64 @@ func TestRace(t *testing.T) {
        <-done
 }
 
+func TestCondSignalStealing(t *testing.T) {
+       for iters := 0; iters < 1000; iters++ {
+               var m Mutex
+               cond := NewCond(&m)
+
+               // Start a waiter.
+               ch := make(chan struct{})
+               go func() {
+                       m.Lock()
+                       ch <- struct{}{}
+                       cond.Wait()
+                       m.Unlock()
+
+                       ch <- struct{}{}
+               }()
+
+               <-ch
+               m.Lock()
+               m.Unlock()
+
+               // We know that the waiter is in the cond.Wait() call because we
+               // synchronized with it, then acquired/released the mutex it was
+               // holding when we synchronized.
+               //
+               // Start two goroutines that will race: one will broadcast on
+               // the cond var, the other will wait on it.
+               //
+               // The new waiter may or may not get notified, but the first one
+               // has to be notified.
+               done := false
+               go func() {
+                       cond.Broadcast()
+               }()
+
+               go func() {
+                       m.Lock()
+                       for !done {
+                               cond.Wait()
+                       }
+                       m.Unlock()
+               }()
+
+               // Check that the first waiter does get signaled.
+               select {
+               case <-ch:
+               case <-time.After(2 * time.Second):
+                       t.Fatalf("First waiter didn't get broadcast.")
+               }
+
+               // Release the second waiter in case it didn't get the
+               // broadcast.
+               m.Lock()
+               done = true
+               m.Unlock()
+               cond.Broadcast()
+       }
+}
+
 func TestCondCopy(t *testing.T) {
        defer func() {
                err := recover()
index 53e8ae8d2075abe6331a2352b73f5ad548c7827a..96c56c85224c2199563d54b4cb44878865094966 100644 (file)
@@ -19,24 +19,33 @@ func runtime_Semacquire(s *uint32)
 // library and should not be used directly.
 func runtime_Semrelease(s *uint32)
 
-// Approximation of syncSema in runtime/sema.go.
-type syncSema struct {
-       lock uintptr
-       head unsafe.Pointer
-       tail unsafe.Pointer
+// Approximation of notifyList in runtime/sema.go. Size and alignment must
+// agree.
+type notifyList struct {
+       wait   uint32
+       notify uint32
+       lock   uintptr
+       head   unsafe.Pointer
+       tail   unsafe.Pointer
 }
 
-// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
-func runtime_Syncsemacquire(s *syncSema)
+// See runtime/sema.go for documentation.
+func runtime_notifyListAdd(l *notifyList) uint32
 
-// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
-func runtime_Syncsemrelease(s *syncSema, n uint32)
+// See runtime/sema.go for documentation.
+func runtime_notifyListWait(l *notifyList, t uint32)
 
-// Ensure that sync and runtime agree on size of syncSema.
-func runtime_Syncsemcheck(size uintptr)
+// See runtime/sema.go for documentation.
+func runtime_notifyListNotifyAll(l *notifyList)
+
+// See runtime/sema.go for documentation.
+func runtime_notifyListNotifyOne(l *notifyList)
+
+// Ensure that sync and runtime agree on size of notifyList.
+func runtime_notifyListCheck(size uintptr)
 func init() {
-       var s syncSema
-       runtime_Syncsemcheck(unsafe.Sizeof(s))
+       var n notifyList
+       runtime_notifyListCheck(unsafe.Sizeof(n))
 }
 
 // Active spinning runtime support.