]> Cypherpunks repositories - gostls13.git/commitdiff
sync: faster Cond
authorDmitriy Vyukov <dvyukov@google.com>
Tue, 13 Aug 2013 10:45:36 +0000 (14:45 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Tue, 13 Aug 2013 10:45:36 +0000 (14:45 +0400)
The new version does not require any memory allocations and is 30-50% faster.
Also detect and painc if Cond is copied after first.

benchmark            old ns/op    new ns/op    delta
BenchmarkCond1             317          195  -38.49%
BenchmarkCond1-2           875          607  -30.63%
BenchmarkCond1-4          1116          548  -50.90%
BenchmarkCond1-8          1013          613  -39.49%
BenchmarkCond1-16          983          450  -54.22%
BenchmarkCond2             559          352  -37.03%
BenchmarkCond2-2          1916         1378  -28.08%
BenchmarkCond2-4          1518         1322  -12.91%
BenchmarkCond2-8          2313         1291  -44.19%
BenchmarkCond2-16         1885         1078  -42.81%
BenchmarkCond4            1070          614  -42.62%
BenchmarkCond4-2          4899         3047  -37.80%
BenchmarkCond4-4          3813         3006  -21.16%
BenchmarkCond4-8          3605         3045  -15.53%
BenchmarkCond4-16         4148         2637  -36.43%
BenchmarkCond8            2086         1264  -39.41%
BenchmarkCond8-2          9961         6736  -32.38%
BenchmarkCond8-4          8135         7689   -5.48%
BenchmarkCond8-8          9623         7517  -21.89%
BenchmarkCond8-16        11661         8093  -30.60%

R=sougou, rsc, bradfitz, r
CC=golang-dev
https://golang.org/cl/11573043

src/pkg/runtime/sema.goc
src/pkg/sync/cond.go
src/pkg/sync/cond_test.go
src/pkg/sync/runtime.go

index 08b47798643273373ace40c27802b39c460a7a27..51a38adafe371736284e4e25c20a5e06025618fa 100644 (file)
@@ -21,22 +21,23 @@ package sync
 #include "runtime.h"
 #include "arch_GOARCH.h"
 
-typedef struct Sema Sema;
-struct Sema
+typedef struct SemaWaiter SemaWaiter;
+struct SemaWaiter
 {
        uint32 volatile*        addr;
        G*      g;
        int64   releasetime;
-       Sema*   prev;
-       Sema*   next;
+       int32   nrelease;       // -1 for acquire
+       SemaWaiter*     prev;
+       SemaWaiter*     next;
 };
 
 typedef struct SemaRoot SemaRoot;
 struct SemaRoot
 {
        Lock;
-       Sema*   head;
-       Sema*   tail;
+       SemaWaiter*     head;
+       SemaWaiter*     tail;
        // Number of waiters. Read w/o the lock.
        uint32 volatile nwait;
 };
@@ -59,7 +60,7 @@ semroot(uint32 *addr)
 }
 
 static void
-semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s)
+semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s)
 {
        s->g = g;
        s->addr = addr;
@@ -73,7 +74,7 @@ semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s)
 }
 
 static void
-semdequeue(SemaRoot *root, Sema *s)
+semdequeue(SemaRoot *root, SemaWaiter *s)
 {
        if(s->next)
                s->next->prev = s->prev;
@@ -101,7 +102,7 @@ cansemacquire(uint32 *addr)
 void
 runtime·semacquire(uint32 volatile *addr, bool profile)
 {
-       Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
+       SemaWaiter s;   // Needs to be allocated on stack, otherwise garbage collector could deallocate it
        SemaRoot *root;
        int64 t0;
        
@@ -147,7 +148,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile)
 void
 runtime·semrelease(uint32 volatile *addr)
 {
-       Sema *s;
+       SemaWaiter *s;
        SemaRoot *root;
 
        root = semroot(addr);
@@ -200,3 +201,93 @@ func runtime_Semacquire(addr *uint32) {
 func runtime_Semrelease(addr *uint32) {
        runtime·semrelease(addr);
 }
+
+typedef struct SyncSema SyncSema;
+struct SyncSema
+{
+       Lock;
+       SemaWaiter*     head;
+       SemaWaiter*     tail;
+};
+
+func runtime_Syncsemcheck(size uintptr) {
+       if(size != sizeof(SyncSema)) {
+               runtime·printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema));
+               runtime·throw("bad SyncSema size");
+       }
+}
+
+// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
+func runtime_Syncsemacquire(s *SyncSema) {
+       SemaWaiter w, *wake;
+       int64 t0;
+
+       w.g = g;
+       w.nrelease = -1;
+       w.next = nil;
+       w.releasetime = 0;
+       t0 = 0;
+       if(runtime·blockprofilerate > 0) {
+               t0 = runtime·cputicks();
+               w.releasetime = -1;
+       }
+
+       runtime·lock(s);
+       if(s->head && s->head->nrelease > 0) {
+               // have pending release, consume it
+               wake = nil;
+               s->head->nrelease--;
+               if(s->head->nrelease == 0) {
+                       wake = s->head;
+                       s->head = wake->next;
+                       if(s->head == nil)
+                               s->tail = nil;
+               }
+               runtime·unlock(s);
+               if(wake)
+                       runtime·ready(wake->g);
+       } else {
+               // enqueue itself
+               if(s->tail == nil)
+                       s->head = &w;
+               else
+                       s->tail->next = &w;
+               s->tail = &w;
+               runtime·park(runtime·unlock, s, "semacquire");
+               if(t0)
+                       runtime·blockevent(w.releasetime - t0, 2);
+       }
+}
+
+// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
+func runtime_Syncsemrelease(s *SyncSema, n uint32) {
+       SemaWaiter w, *wake;
+
+       w.g = g;
+       w.nrelease = (int32)n;
+       w.next = nil;
+       w.releasetime = 0;
+
+       runtime·lock(s);
+       while(w.nrelease > 0 && s->head && s->head->nrelease < 0) {
+               // have pending acquire, satisfy it
+               wake = s->head;
+               s->head = wake->next;
+               if(s->head == nil)
+                       s->tail = nil;
+               if(wake->releasetime)
+                       wake->releasetime = runtime·cputicks();
+               runtime·ready(wake->g);
+               w.nrelease--;
+       }
+       if(w.nrelease > 0) {
+               // enqueue itself
+               if(s->tail == nil)
+                       s->head = &w;
+               else
+                       s->tail->next = &w;
+               s->tail = &w;
+               runtime·park(runtime·unlock, s, "semarelease");
+       } else
+               runtime·unlock(s);
+}
index 13547a8a11b694635bda62380c25f4a0bcebd69a..9e6bc170f19b853a66111c418037678e9e7ed650 100644 (file)
@@ -4,6 +4,11 @@
 
 package sync
 
+import (
+       "sync/atomic"
+       "unsafe"
+)
+
 // Cond implements a condition variable, a rendezvous point
 // for goroutines waiting for or announcing the occurrence
 // of an event.
@@ -11,27 +16,16 @@ package sync
 // Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
 // which must be held when changing the condition and
 // when calling the Wait method.
+//
+// A Cond can be created as part of other structures.
+// A Cond must not be copied after first use.
 type Cond struct {
-       L Locker // held while observing or changing the condition
-       m Mutex  // held to avoid internal races
-
-       // We must be careful to make sure that when Signal
-       // releases a semaphore, the corresponding acquire is
-       // executed by a goroutine that was already waiting at
-       // the time of the call to Signal, not one that arrived later.
-       // To ensure this, we segment waiting goroutines into
-       // generations punctuated by calls to Signal.  Each call to
-       // Signal begins another generation if there are no goroutines
-       // left in older generations for it to wake.  Because of this
-       // optimization (only begin another generation if there
-       // are no older goroutines left), we only need to keep track
-       // of the two most recent generations, which we call old
-       // and new.
-       oldWaiters int     // number of waiters in old generation...
-       oldSema    *uint32 // ... waiting on this semaphore
+       // L is held while observing or changing the condition
+       L Locker
 
-       newWaiters int     // number of waiters in new generation...
-       newSema    *uint32 // ... waiting on this semaphore
+       sema    syncSema
+       waiters uint32 // number of waiters
+       checker copyChecker
 }
 
 // NewCond returns a new Cond with Locker l.
@@ -56,22 +50,16 @@ func NewCond(l Locker) *Cond {
 //    c.L.Unlock()
 //
 func (c *Cond) Wait() {
+       c.checker.check()
        if raceenabled {
-               _ = c.m.state
                raceDisable()
        }
-       c.m.Lock()
-       if c.newSema == nil {
-               c.newSema = new(uint32)
-       }
-       s := c.newSema
-       c.newWaiters++
-       c.m.Unlock()
+       atomic.AddUint32(&c.waiters, 1)
        if raceenabled {
                raceEnable()
        }
        c.L.Unlock()
-       runtime_Semacquire(s)
+       runtime_Syncsemacquire(&c.sema)
        c.L.Lock()
 }
 
@@ -80,26 +68,7 @@ func (c *Cond) Wait() {
 // It is allowed but not required for the caller to hold c.L
 // during the call.
 func (c *Cond) Signal() {
-       if raceenabled {
-               _ = c.m.state
-               raceDisable()
-       }
-       c.m.Lock()
-       if c.oldWaiters == 0 && c.newWaiters > 0 {
-               // Retire old generation; rename new to old.
-               c.oldWaiters = c.newWaiters
-               c.oldSema = c.newSema
-               c.newWaiters = 0
-               c.newSema = nil
-       }
-       if c.oldWaiters > 0 {
-               c.oldWaiters--
-               runtime_Semrelease(c.oldSema)
-       }
-       c.m.Unlock()
-       if raceenabled {
-               raceEnable()
-       }
+       c.signalImpl(false)
 }
 
 // Broadcast wakes all goroutines waiting on c.
@@ -107,27 +76,43 @@ func (c *Cond) Signal() {
 // It is allowed but not required for the caller to hold c.L
 // during the call.
 func (c *Cond) Broadcast() {
+       c.signalImpl(true)
+}
+
+func (c *Cond) signalImpl(all bool) {
+       c.checker.check()
        if raceenabled {
-               _ = c.m.state
                raceDisable()
        }
-       c.m.Lock()
-       // Wake both generations.
-       if c.oldWaiters > 0 {
-               for i := 0; i < c.oldWaiters; i++ {
-                       runtime_Semrelease(c.oldSema)
+       for {
+               old := atomic.LoadUint32(&c.waiters)
+               if old == 0 {
+                       if raceenabled {
+                               raceEnable()
+                       }
+                       return
                }
-               c.oldWaiters = 0
-       }
-       if c.newWaiters > 0 {
-               for i := 0; i < c.newWaiters; i++ {
-                       runtime_Semrelease(c.newSema)
+               new := old - 1
+               if all {
+                       new = 0
+               }
+               if atomic.CompareAndSwapUint32(&c.waiters, old, new) {
+                       if raceenabled {
+                               raceEnable()
+                       }
+                       runtime_Syncsemrelease(&c.sema, old-new)
+                       return
                }
-               c.newWaiters = 0
-               c.newSema = nil
        }
-       c.m.Unlock()
-       if raceenabled {
-               raceEnable()
+}
+
+// copyChecker holds back pointer to itself to detect object copying.
+type copyChecker uintptr
+
+func (c *copyChecker) check() {
+       if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
+               !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
+               uintptr(*c) != uintptr(unsafe.Pointer(c)) {
+               panic("sync.Cond is copied")
        }
 }
index cefacb184e10b582554bae872bd8101535034b6e..467c80621de2c57a76f129cf151b2d9037d35f1a 100644 (file)
@@ -5,6 +5,8 @@ package sync_test
 
 import (
        . "sync"
+
+       "runtime"
        "testing"
 )
 
@@ -124,3 +126,130 @@ func TestCondBroadcast(t *testing.T) {
        }
        c.Broadcast()
 }
+
+func TestRace(t *testing.T) {
+       x := 0
+       c := NewCond(&Mutex{})
+       done := make(chan bool)
+       go func() {
+               c.L.Lock()
+               x = 1
+               c.Wait()
+               if x != 2 {
+                       t.Fatal("want 2")
+               }
+               x = 3
+               c.Signal()
+               c.L.Unlock()
+               done <- true
+       }()
+       go func() {
+               c.L.Lock()
+               for {
+                       if x == 1 {
+                               x = 2
+                               c.Signal()
+                               break
+                       }
+                       c.L.Unlock()
+                       runtime.Gosched()
+                       c.L.Lock()
+               }
+               c.L.Unlock()
+               done <- true
+       }()
+       go func() {
+               c.L.Lock()
+               for {
+                       if x == 2 {
+                               c.Wait()
+                               if x != 3 {
+                                       t.Fatal("want 3")
+                               }
+                               break
+                       }
+                       if x == 3 {
+                               break
+                       }
+                       c.L.Unlock()
+                       runtime.Gosched()
+                       c.L.Lock()
+               }
+               c.L.Unlock()
+               done <- true
+       }()
+       <-done
+       <-done
+       <-done
+}
+
+func TestCondCopy(t *testing.T) {
+       defer func() {
+               err := recover()
+               if err == nil || err.(string) != "sync.Cond is copied" {
+                       t.Fatalf("got %v, expect sync.Cond is copied", err)
+               }
+       }()
+       c := Cond{L: &Mutex{}}
+       c.Signal()
+       c2 := c
+       c2.Signal()
+}
+
+func BenchmarkCond1(b *testing.B) {
+       benchmarkCond(b, 1)
+}
+
+func BenchmarkCond2(b *testing.B) {
+       benchmarkCond(b, 2)
+}
+
+func BenchmarkCond4(b *testing.B) {
+       benchmarkCond(b, 4)
+}
+
+func BenchmarkCond8(b *testing.B) {
+       benchmarkCond(b, 8)
+}
+
+func BenchmarkCond16(b *testing.B) {
+       benchmarkCond(b, 16)
+}
+
+func BenchmarkCond32(b *testing.B) {
+       benchmarkCond(b, 32)
+}
+
+func benchmarkCond(b *testing.B, waiters int) {
+       c := NewCond(&Mutex{})
+       done := make(chan bool)
+       id := 0
+
+       for routine := 0; routine < waiters+1; routine++ {
+               go func() {
+                       for i := 0; i < b.N; i++ {
+                               c.L.Lock()
+                               if id == -1 {
+                                       c.L.Unlock()
+                                       break
+                               }
+                               id++
+                               if id == waiters+1 {
+                                       id = 0
+                                       c.Broadcast()
+                               } else {
+                                       c.Wait()
+                               }
+                               c.L.Unlock()
+                       }
+                       c.L.Lock()
+                       id = -1
+                       c.Broadcast()
+                       c.L.Unlock()
+                       done <- true
+               }()
+       }
+       for routine := 0; routine < waiters+1; routine++ {
+               <-done
+       }
+}
index e99599c11ae36f6d8d8134a83ae568647aa3d148..3bf47ea52aad3e3e98a25b2ab200d876618b35df 100644 (file)
@@ -4,6 +4,8 @@
 
 package sync
 
+import "unsafe"
+
 // defined in package runtime
 
 // Semacquire waits until *s > 0 and then atomically decrements it.
@@ -16,3 +18,19 @@ func runtime_Semacquire(s *uint32)
 // It is intended as a simple wakeup primitive for use by the synchronization
 // library and should not be used directly.
 func runtime_Semrelease(s *uint32)
+
+// Opaque representation of SyncSema in runtime/sema.goc.
+type syncSema [3]uintptr
+
+// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
+func runtime_Syncsemacquire(s *syncSema)
+
+// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
+func runtime_Syncsemrelease(s *syncSema, n uint32)
+
+// Ensure that sync and runtime agree on size of syncSema.
+func runtime_Syncsemcheck(size uintptr)
+func init() {
+       var s syncSema
+       runtime_Syncsemcheck(unsafe.Sizeof(s))
+}