The new version does not require any memory allocations and is 30-50% faster.
Also detect and painc if Cond is copied after first.
benchmark old ns/op new ns/op delta
BenchmarkCond1 317 195 -38.49%
BenchmarkCond1-2 875 607 -30.63%
BenchmarkCond1-4 1116 548 -50.90%
BenchmarkCond1-8 1013 613 -39.49%
BenchmarkCond1-16 983 450 -54.22%
BenchmarkCond2 559 352 -37.03%
BenchmarkCond2-2 1916 1378 -28.08%
BenchmarkCond2-4 1518 1322 -12.91%
BenchmarkCond2-8 2313 1291 -44.19%
BenchmarkCond2-16 1885 1078 -42.81%
BenchmarkCond4 1070 614 -42.62%
BenchmarkCond4-2 4899 3047 -37.80%
BenchmarkCond4-4 3813 3006 -21.16%
BenchmarkCond4-8 3605 3045 -15.53%
BenchmarkCond4-16 4148 2637 -36.43%
BenchmarkCond8 2086 1264 -39.41%
BenchmarkCond8-2 9961 6736 -32.38%
BenchmarkCond8-4 8135 7689 -5.48%
BenchmarkCond8-8 9623 7517 -21.89%
BenchmarkCond8-16 11661 8093 -30.60%
R=sougou, rsc, bradfitz, r
CC=golang-dev
https://golang.org/cl/
11573043
#include "runtime.h"
#include "arch_GOARCH.h"
-typedef struct Sema Sema;
-struct Sema
+typedef struct SemaWaiter SemaWaiter;
+struct SemaWaiter
{
uint32 volatile* addr;
G* g;
int64 releasetime;
- Sema* prev;
- Sema* next;
+ int32 nrelease; // -1 for acquire
+ SemaWaiter* prev;
+ SemaWaiter* next;
};
typedef struct SemaRoot SemaRoot;
struct SemaRoot
{
Lock;
- Sema* head;
- Sema* tail;
+ SemaWaiter* head;
+ SemaWaiter* tail;
// Number of waiters. Read w/o the lock.
uint32 volatile nwait;
};
}
static void
-semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s)
+semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s)
{
s->g = g;
s->addr = addr;
}
static void
-semdequeue(SemaRoot *root, Sema *s)
+semdequeue(SemaRoot *root, SemaWaiter *s)
{
if(s->next)
s->next->prev = s->prev;
void
runtime·semacquire(uint32 volatile *addr, bool profile)
{
- Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
+ SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaRoot *root;
int64 t0;
void
runtime·semrelease(uint32 volatile *addr)
{
- Sema *s;
+ SemaWaiter *s;
SemaRoot *root;
root = semroot(addr);
func runtime_Semrelease(addr *uint32) {
runtime·semrelease(addr);
}
+
+typedef struct SyncSema SyncSema;
+struct SyncSema
+{
+ Lock;
+ SemaWaiter* head;
+ SemaWaiter* tail;
+};
+
+func runtime_Syncsemcheck(size uintptr) {
+ if(size != sizeof(SyncSema)) {
+ runtime·printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema));
+ runtime·throw("bad SyncSema size");
+ }
+}
+
+// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
+func runtime_Syncsemacquire(s *SyncSema) {
+ SemaWaiter w, *wake;
+ int64 t0;
+
+ w.g = g;
+ w.nrelease = -1;
+ w.next = nil;
+ w.releasetime = 0;
+ t0 = 0;
+ if(runtime·blockprofilerate > 0) {
+ t0 = runtime·cputicks();
+ w.releasetime = -1;
+ }
+
+ runtime·lock(s);
+ if(s->head && s->head->nrelease > 0) {
+ // have pending release, consume it
+ wake = nil;
+ s->head->nrelease--;
+ if(s->head->nrelease == 0) {
+ wake = s->head;
+ s->head = wake->next;
+ if(s->head == nil)
+ s->tail = nil;
+ }
+ runtime·unlock(s);
+ if(wake)
+ runtime·ready(wake->g);
+ } else {
+ // enqueue itself
+ if(s->tail == nil)
+ s->head = &w;
+ else
+ s->tail->next = &w;
+ s->tail = &w;
+ runtime·park(runtime·unlock, s, "semacquire");
+ if(t0)
+ runtime·blockevent(w.releasetime - t0, 2);
+ }
+}
+
+// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
+func runtime_Syncsemrelease(s *SyncSema, n uint32) {
+ SemaWaiter w, *wake;
+
+ w.g = g;
+ w.nrelease = (int32)n;
+ w.next = nil;
+ w.releasetime = 0;
+
+ runtime·lock(s);
+ while(w.nrelease > 0 && s->head && s->head->nrelease < 0) {
+ // have pending acquire, satisfy it
+ wake = s->head;
+ s->head = wake->next;
+ if(s->head == nil)
+ s->tail = nil;
+ if(wake->releasetime)
+ wake->releasetime = runtime·cputicks();
+ runtime·ready(wake->g);
+ w.nrelease--;
+ }
+ if(w.nrelease > 0) {
+ // enqueue itself
+ if(s->tail == nil)
+ s->head = &w;
+ else
+ s->tail->next = &w;
+ s->tail = &w;
+ runtime·park(runtime·unlock, s, "semarelease");
+ } else
+ runtime·unlock(s);
+}
package sync
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
+//
+// A Cond can be created as part of other structures.
+// A Cond must not be copied after first use.
type Cond struct {
- L Locker // held while observing or changing the condition
- m Mutex // held to avoid internal races
-
- // We must be careful to make sure that when Signal
- // releases a semaphore, the corresponding acquire is
- // executed by a goroutine that was already waiting at
- // the time of the call to Signal, not one that arrived later.
- // To ensure this, we segment waiting goroutines into
- // generations punctuated by calls to Signal. Each call to
- // Signal begins another generation if there are no goroutines
- // left in older generations for it to wake. Because of this
- // optimization (only begin another generation if there
- // are no older goroutines left), we only need to keep track
- // of the two most recent generations, which we call old
- // and new.
- oldWaiters int // number of waiters in old generation...
- oldSema *uint32 // ... waiting on this semaphore
+ // L is held while observing or changing the condition
+ L Locker
- newWaiters int // number of waiters in new generation...
- newSema *uint32 // ... waiting on this semaphore
+ sema syncSema
+ waiters uint32 // number of waiters
+ checker copyChecker
}
// NewCond returns a new Cond with Locker l.
// c.L.Unlock()
//
func (c *Cond) Wait() {
+ c.checker.check()
if raceenabled {
- _ = c.m.state
raceDisable()
}
- c.m.Lock()
- if c.newSema == nil {
- c.newSema = new(uint32)
- }
- s := c.newSema
- c.newWaiters++
- c.m.Unlock()
+ atomic.AddUint32(&c.waiters, 1)
if raceenabled {
raceEnable()
}
c.L.Unlock()
- runtime_Semacquire(s)
+ runtime_Syncsemacquire(&c.sema)
c.L.Lock()
}
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
- if raceenabled {
- _ = c.m.state
- raceDisable()
- }
- c.m.Lock()
- if c.oldWaiters == 0 && c.newWaiters > 0 {
- // Retire old generation; rename new to old.
- c.oldWaiters = c.newWaiters
- c.oldSema = c.newSema
- c.newWaiters = 0
- c.newSema = nil
- }
- if c.oldWaiters > 0 {
- c.oldWaiters--
- runtime_Semrelease(c.oldSema)
- }
- c.m.Unlock()
- if raceenabled {
- raceEnable()
- }
+ c.signalImpl(false)
}
// Broadcast wakes all goroutines waiting on c.
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
+ c.signalImpl(true)
+}
+
+func (c *Cond) signalImpl(all bool) {
+ c.checker.check()
if raceenabled {
- _ = c.m.state
raceDisable()
}
- c.m.Lock()
- // Wake both generations.
- if c.oldWaiters > 0 {
- for i := 0; i < c.oldWaiters; i++ {
- runtime_Semrelease(c.oldSema)
+ for {
+ old := atomic.LoadUint32(&c.waiters)
+ if old == 0 {
+ if raceenabled {
+ raceEnable()
+ }
+ return
}
- c.oldWaiters = 0
- }
- if c.newWaiters > 0 {
- for i := 0; i < c.newWaiters; i++ {
- runtime_Semrelease(c.newSema)
+ new := old - 1
+ if all {
+ new = 0
+ }
+ if atomic.CompareAndSwapUint32(&c.waiters, old, new) {
+ if raceenabled {
+ raceEnable()
+ }
+ runtime_Syncsemrelease(&c.sema, old-new)
+ return
}
- c.newWaiters = 0
- c.newSema = nil
}
- c.m.Unlock()
- if raceenabled {
- raceEnable()
+}
+
+// copyChecker holds back pointer to itself to detect object copying.
+type copyChecker uintptr
+
+func (c *copyChecker) check() {
+ if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
+ !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
+ uintptr(*c) != uintptr(unsafe.Pointer(c)) {
+ panic("sync.Cond is copied")
}
}
import (
. "sync"
+
+ "runtime"
"testing"
)
}
c.Broadcast()
}
+
+func TestRace(t *testing.T) {
+ x := 0
+ c := NewCond(&Mutex{})
+ done := make(chan bool)
+ go func() {
+ c.L.Lock()
+ x = 1
+ c.Wait()
+ if x != 2 {
+ t.Fatal("want 2")
+ }
+ x = 3
+ c.Signal()
+ c.L.Unlock()
+ done <- true
+ }()
+ go func() {
+ c.L.Lock()
+ for {
+ if x == 1 {
+ x = 2
+ c.Signal()
+ break
+ }
+ c.L.Unlock()
+ runtime.Gosched()
+ c.L.Lock()
+ }
+ c.L.Unlock()
+ done <- true
+ }()
+ go func() {
+ c.L.Lock()
+ for {
+ if x == 2 {
+ c.Wait()
+ if x != 3 {
+ t.Fatal("want 3")
+ }
+ break
+ }
+ if x == 3 {
+ break
+ }
+ c.L.Unlock()
+ runtime.Gosched()
+ c.L.Lock()
+ }
+ c.L.Unlock()
+ done <- true
+ }()
+ <-done
+ <-done
+ <-done
+}
+
+func TestCondCopy(t *testing.T) {
+ defer func() {
+ err := recover()
+ if err == nil || err.(string) != "sync.Cond is copied" {
+ t.Fatalf("got %v, expect sync.Cond is copied", err)
+ }
+ }()
+ c := Cond{L: &Mutex{}}
+ c.Signal()
+ c2 := c
+ c2.Signal()
+}
+
+func BenchmarkCond1(b *testing.B) {
+ benchmarkCond(b, 1)
+}
+
+func BenchmarkCond2(b *testing.B) {
+ benchmarkCond(b, 2)
+}
+
+func BenchmarkCond4(b *testing.B) {
+ benchmarkCond(b, 4)
+}
+
+func BenchmarkCond8(b *testing.B) {
+ benchmarkCond(b, 8)
+}
+
+func BenchmarkCond16(b *testing.B) {
+ benchmarkCond(b, 16)
+}
+
+func BenchmarkCond32(b *testing.B) {
+ benchmarkCond(b, 32)
+}
+
+func benchmarkCond(b *testing.B, waiters int) {
+ c := NewCond(&Mutex{})
+ done := make(chan bool)
+ id := 0
+
+ for routine := 0; routine < waiters+1; routine++ {
+ go func() {
+ for i := 0; i < b.N; i++ {
+ c.L.Lock()
+ if id == -1 {
+ c.L.Unlock()
+ break
+ }
+ id++
+ if id == waiters+1 {
+ id = 0
+ c.Broadcast()
+ } else {
+ c.Wait()
+ }
+ c.L.Unlock()
+ }
+ c.L.Lock()
+ id = -1
+ c.Broadcast()
+ c.L.Unlock()
+ done <- true
+ }()
+ }
+ for routine := 0; routine < waiters+1; routine++ {
+ <-done
+ }
+}
package sync
+import "unsafe"
+
// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semrelease(s *uint32)
+
+// Opaque representation of SyncSema in runtime/sema.goc.
+type syncSema [3]uintptr
+
+// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
+func runtime_Syncsemacquire(s *syncSema)
+
+// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
+func runtime_Syncsemrelease(s *syncSema, n uint32)
+
+// Ensure that sync and runtime agree on size of syncSema.
+func runtime_Syncsemcheck(size uintptr)
+func init() {
+ var s syncSema
+ runtime_Syncsemcheck(unsafe.Sizeof(s))
+}