From 5a20b4a6a9dcf26a402edfe352aa1e8564f2fb01 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Tue, 13 Aug 2013 14:45:36 +0400 Subject: [PATCH] sync: faster Cond The new version does not require any memory allocations and is 30-50% faster. Also detect and painc if Cond is copied after first. benchmark old ns/op new ns/op delta BenchmarkCond1 317 195 -38.49% BenchmarkCond1-2 875 607 -30.63% BenchmarkCond1-4 1116 548 -50.90% BenchmarkCond1-8 1013 613 -39.49% BenchmarkCond1-16 983 450 -54.22% BenchmarkCond2 559 352 -37.03% BenchmarkCond2-2 1916 1378 -28.08% BenchmarkCond2-4 1518 1322 -12.91% BenchmarkCond2-8 2313 1291 -44.19% BenchmarkCond2-16 1885 1078 -42.81% BenchmarkCond4 1070 614 -42.62% BenchmarkCond4-2 4899 3047 -37.80% BenchmarkCond4-4 3813 3006 -21.16% BenchmarkCond4-8 3605 3045 -15.53% BenchmarkCond4-16 4148 2637 -36.43% BenchmarkCond8 2086 1264 -39.41% BenchmarkCond8-2 9961 6736 -32.38% BenchmarkCond8-4 8135 7689 -5.48% BenchmarkCond8-8 9623 7517 -21.89% BenchmarkCond8-16 11661 8093 -30.60% R=sougou, rsc, bradfitz, r CC=golang-dev https://golang.org/cl/11573043 --- src/pkg/runtime/sema.goc | 111 +++++++++++++++++++++++++++++--- src/pkg/sync/cond.go | 113 +++++++++++++++------------------ src/pkg/sync/cond_test.go | 129 ++++++++++++++++++++++++++++++++++++++ src/pkg/sync/runtime.go | 18 ++++++ 4 files changed, 297 insertions(+), 74 deletions(-) diff --git a/src/pkg/runtime/sema.goc b/src/pkg/runtime/sema.goc index 08b4779864..51a38adafe 100644 --- a/src/pkg/runtime/sema.goc +++ b/src/pkg/runtime/sema.goc @@ -21,22 +21,23 @@ package sync #include "runtime.h" #include "arch_GOARCH.h" -typedef struct Sema Sema; -struct Sema +typedef struct SemaWaiter SemaWaiter; +struct SemaWaiter { uint32 volatile* addr; G* g; int64 releasetime; - Sema* prev; - Sema* next; + int32 nrelease; // -1 for acquire + SemaWaiter* prev; + SemaWaiter* next; }; typedef struct SemaRoot SemaRoot; struct SemaRoot { Lock; - Sema* head; - Sema* tail; + SemaWaiter* head; + SemaWaiter* tail; // Number of waiters. Read w/o the lock. uint32 volatile nwait; }; @@ -59,7 +60,7 @@ semroot(uint32 *addr) } static void -semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s) +semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s) { s->g = g; s->addr = addr; @@ -73,7 +74,7 @@ semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s) } static void -semdequeue(SemaRoot *root, Sema *s) +semdequeue(SemaRoot *root, SemaWaiter *s) { if(s->next) s->next->prev = s->prev; @@ -101,7 +102,7 @@ cansemacquire(uint32 *addr) void runtime·semacquire(uint32 volatile *addr, bool profile) { - Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it + SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it SemaRoot *root; int64 t0; @@ -147,7 +148,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile) void runtime·semrelease(uint32 volatile *addr) { - Sema *s; + SemaWaiter *s; SemaRoot *root; root = semroot(addr); @@ -200,3 +201,93 @@ func runtime_Semacquire(addr *uint32) { func runtime_Semrelease(addr *uint32) { runtime·semrelease(addr); } + +typedef struct SyncSema SyncSema; +struct SyncSema +{ + Lock; + SemaWaiter* head; + SemaWaiter* tail; +}; + +func runtime_Syncsemcheck(size uintptr) { + if(size != sizeof(SyncSema)) { + runtime·printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema)); + runtime·throw("bad SyncSema size"); + } +} + +// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s. +func runtime_Syncsemacquire(s *SyncSema) { + SemaWaiter w, *wake; + int64 t0; + + w.g = g; + w.nrelease = -1; + w.next = nil; + w.releasetime = 0; + t0 = 0; + if(runtime·blockprofilerate > 0) { + t0 = runtime·cputicks(); + w.releasetime = -1; + } + + runtime·lock(s); + if(s->head && s->head->nrelease > 0) { + // have pending release, consume it + wake = nil; + s->head->nrelease--; + if(s->head->nrelease == 0) { + wake = s->head; + s->head = wake->next; + if(s->head == nil) + s->tail = nil; + } + runtime·unlock(s); + if(wake) + runtime·ready(wake->g); + } else { + // enqueue itself + if(s->tail == nil) + s->head = &w; + else + s->tail->next = &w; + s->tail = &w; + runtime·park(runtime·unlock, s, "semacquire"); + if(t0) + runtime·blockevent(w.releasetime - t0, 2); + } +} + +// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s. +func runtime_Syncsemrelease(s *SyncSema, n uint32) { + SemaWaiter w, *wake; + + w.g = g; + w.nrelease = (int32)n; + w.next = nil; + w.releasetime = 0; + + runtime·lock(s); + while(w.nrelease > 0 && s->head && s->head->nrelease < 0) { + // have pending acquire, satisfy it + wake = s->head; + s->head = wake->next; + if(s->head == nil) + s->tail = nil; + if(wake->releasetime) + wake->releasetime = runtime·cputicks(); + runtime·ready(wake->g); + w.nrelease--; + } + if(w.nrelease > 0) { + // enqueue itself + if(s->tail == nil) + s->head = &w; + else + s->tail->next = &w; + s->tail = &w; + runtime·park(runtime·unlock, s, "semarelease"); + } else + runtime·unlock(s); +} diff --git a/src/pkg/sync/cond.go b/src/pkg/sync/cond.go index 13547a8a11..9e6bc170f1 100644 --- a/src/pkg/sync/cond.go +++ b/src/pkg/sync/cond.go @@ -4,6 +4,11 @@ package sync +import ( + "sync/atomic" + "unsafe" +) + // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence // of an event. @@ -11,27 +16,16 @@ package sync // Each Cond has an associated Locker L (often a *Mutex or *RWMutex), // which must be held when changing the condition and // when calling the Wait method. +// +// A Cond can be created as part of other structures. +// A Cond must not be copied after first use. type Cond struct { - L Locker // held while observing or changing the condition - m Mutex // held to avoid internal races - - // We must be careful to make sure that when Signal - // releases a semaphore, the corresponding acquire is - // executed by a goroutine that was already waiting at - // the time of the call to Signal, not one that arrived later. - // To ensure this, we segment waiting goroutines into - // generations punctuated by calls to Signal. Each call to - // Signal begins another generation if there are no goroutines - // left in older generations for it to wake. Because of this - // optimization (only begin another generation if there - // are no older goroutines left), we only need to keep track - // of the two most recent generations, which we call old - // and new. - oldWaiters int // number of waiters in old generation... - oldSema *uint32 // ... waiting on this semaphore + // L is held while observing or changing the condition + L Locker - newWaiters int // number of waiters in new generation... - newSema *uint32 // ... waiting on this semaphore + sema syncSema + waiters uint32 // number of waiters + checker copyChecker } // NewCond returns a new Cond with Locker l. @@ -56,22 +50,16 @@ func NewCond(l Locker) *Cond { // c.L.Unlock() // func (c *Cond) Wait() { + c.checker.check() if raceenabled { - _ = c.m.state raceDisable() } - c.m.Lock() - if c.newSema == nil { - c.newSema = new(uint32) - } - s := c.newSema - c.newWaiters++ - c.m.Unlock() + atomic.AddUint32(&c.waiters, 1) if raceenabled { raceEnable() } c.L.Unlock() - runtime_Semacquire(s) + runtime_Syncsemacquire(&c.sema) c.L.Lock() } @@ -80,26 +68,7 @@ func (c *Cond) Wait() { // It is allowed but not required for the caller to hold c.L // during the call. func (c *Cond) Signal() { - if raceenabled { - _ = c.m.state - raceDisable() - } - c.m.Lock() - if c.oldWaiters == 0 && c.newWaiters > 0 { - // Retire old generation; rename new to old. - c.oldWaiters = c.newWaiters - c.oldSema = c.newSema - c.newWaiters = 0 - c.newSema = nil - } - if c.oldWaiters > 0 { - c.oldWaiters-- - runtime_Semrelease(c.oldSema) - } - c.m.Unlock() - if raceenabled { - raceEnable() - } + c.signalImpl(false) } // Broadcast wakes all goroutines waiting on c. @@ -107,27 +76,43 @@ func (c *Cond) Signal() { // It is allowed but not required for the caller to hold c.L // during the call. func (c *Cond) Broadcast() { + c.signalImpl(true) +} + +func (c *Cond) signalImpl(all bool) { + c.checker.check() if raceenabled { - _ = c.m.state raceDisable() } - c.m.Lock() - // Wake both generations. - if c.oldWaiters > 0 { - for i := 0; i < c.oldWaiters; i++ { - runtime_Semrelease(c.oldSema) + for { + old := atomic.LoadUint32(&c.waiters) + if old == 0 { + if raceenabled { + raceEnable() + } + return } - c.oldWaiters = 0 - } - if c.newWaiters > 0 { - for i := 0; i < c.newWaiters; i++ { - runtime_Semrelease(c.newSema) + new := old - 1 + if all { + new = 0 + } + if atomic.CompareAndSwapUint32(&c.waiters, old, new) { + if raceenabled { + raceEnable() + } + runtime_Syncsemrelease(&c.sema, old-new) + return } - c.newWaiters = 0 - c.newSema = nil } - c.m.Unlock() - if raceenabled { - raceEnable() +} + +// copyChecker holds back pointer to itself to detect object copying. +type copyChecker uintptr + +func (c *copyChecker) check() { + if uintptr(*c) != uintptr(unsafe.Pointer(c)) && + !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && + uintptr(*c) != uintptr(unsafe.Pointer(c)) { + panic("sync.Cond is copied") } } diff --git a/src/pkg/sync/cond_test.go b/src/pkg/sync/cond_test.go index cefacb184e..467c80621d 100644 --- a/src/pkg/sync/cond_test.go +++ b/src/pkg/sync/cond_test.go @@ -5,6 +5,8 @@ package sync_test import ( . "sync" + + "runtime" "testing" ) @@ -124,3 +126,130 @@ func TestCondBroadcast(t *testing.T) { } c.Broadcast() } + +func TestRace(t *testing.T) { + x := 0 + c := NewCond(&Mutex{}) + done := make(chan bool) + go func() { + c.L.Lock() + x = 1 + c.Wait() + if x != 2 { + t.Fatal("want 2") + } + x = 3 + c.Signal() + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 1 { + x = 2 + c.Signal() + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + go func() { + c.L.Lock() + for { + if x == 2 { + c.Wait() + if x != 3 { + t.Fatal("want 3") + } + break + } + if x == 3 { + break + } + c.L.Unlock() + runtime.Gosched() + c.L.Lock() + } + c.L.Unlock() + done <- true + }() + <-done + <-done + <-done +} + +func TestCondCopy(t *testing.T) { + defer func() { + err := recover() + if err == nil || err.(string) != "sync.Cond is copied" { + t.Fatalf("got %v, expect sync.Cond is copied", err) + } + }() + c := Cond{L: &Mutex{}} + c.Signal() + c2 := c + c2.Signal() +} + +func BenchmarkCond1(b *testing.B) { + benchmarkCond(b, 1) +} + +func BenchmarkCond2(b *testing.B) { + benchmarkCond(b, 2) +} + +func BenchmarkCond4(b *testing.B) { + benchmarkCond(b, 4) +} + +func BenchmarkCond8(b *testing.B) { + benchmarkCond(b, 8) +} + +func BenchmarkCond16(b *testing.B) { + benchmarkCond(b, 16) +} + +func BenchmarkCond32(b *testing.B) { + benchmarkCond(b, 32) +} + +func benchmarkCond(b *testing.B, waiters int) { + c := NewCond(&Mutex{}) + done := make(chan bool) + id := 0 + + for routine := 0; routine < waiters+1; routine++ { + go func() { + for i := 0; i < b.N; i++ { + c.L.Lock() + if id == -1 { + c.L.Unlock() + break + } + id++ + if id == waiters+1 { + id = 0 + c.Broadcast() + } else { + c.Wait() + } + c.L.Unlock() + } + c.L.Lock() + id = -1 + c.Broadcast() + c.L.Unlock() + done <- true + }() + } + for routine := 0; routine < waiters+1; routine++ { + <-done + } +} diff --git a/src/pkg/sync/runtime.go b/src/pkg/sync/runtime.go index e99599c11a..3bf47ea52a 100644 --- a/src/pkg/sync/runtime.go +++ b/src/pkg/sync/runtime.go @@ -4,6 +4,8 @@ package sync +import "unsafe" + // defined in package runtime // Semacquire waits until *s > 0 and then atomically decrements it. @@ -16,3 +18,19 @@ func runtime_Semacquire(s *uint32) // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. func runtime_Semrelease(s *uint32) + +// Opaque representation of SyncSema in runtime/sema.goc. +type syncSema [3]uintptr + +// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s. +func runtime_Syncsemacquire(s *syncSema) + +// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s. +func runtime_Syncsemrelease(s *syncSema, n uint32) + +// Ensure that sync and runtime agree on size of syncSema. +func runtime_Syncsemcheck(size uintptr) +func init() { + var s syncSema + runtime_Syncsemcheck(unsafe.Sizeof(s)) +} -- 2.48.1