From: Gustavo Niemeyer Date: Thu, 3 Feb 2011 20:39:11 +0000 (-0800) Subject: sync: Proposal for barrier implementation X-Git-Tag: weekly.2011-02-15~102 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=63457d089ea4aebe35223dc6b70feae804a6a926;p=gostls13.git sync: Proposal for barrier implementation As discussed in the mailing list, this adds a simple barrier implementation to the sync package which enables one or more goroutines to wait for a counter to go down to zero. R=rsc, rog, r CC=golang-dev https://golang.org/cl/3770045 --- diff --git a/src/pkg/sync/Makefile b/src/pkg/sync/Makefile index f843795b0f..fd8e5d9987 100644 --- a/src/pkg/sync/Makefile +++ b/src/pkg/sync/Makefile @@ -9,6 +9,7 @@ GOFILES=\ mutex.go\ once.go \ rwmutex.go\ + waitgroup.go\ # 386-specific object files OFILES_386=\ diff --git a/src/pkg/sync/mutex.go b/src/pkg/sync/mutex.go index c4d82af00c..6c8a5d51d4 100644 --- a/src/pkg/sync/mutex.go +++ b/src/pkg/sync/mutex.go @@ -3,10 +3,10 @@ // license that can be found in the LICENSE file. // The sync package provides basic synchronization primitives -// such as mutual exclusion locks. Other than the Once type, -// most are intended for use by low-level library routines. -// Higher-level synchronization is better done via channels -// and communication. +// such as mutual exclusion locks. Other than the Once and +// WaitGroup types, most are intended for use by low-level +// library routines. Higher-level synchronization is better +// done via channels and communication. package sync import "runtime" diff --git a/src/pkg/sync/waitgroup.go b/src/pkg/sync/waitgroup.go new file mode 100644 index 0000000000..68e1d509f4 --- /dev/null +++ b/src/pkg/sync/waitgroup.go @@ -0,0 +1,86 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import "runtime" + +// A WaitGroup waits for a collection of goroutines to finish. +// The main goroutine calls Add to set the number of +// goroutines to wait for. Then each of the goroutines +// runs and calls Done when finished. At the same time, +// Wait can be used to block until all goroutines have finished. +// +// For example: +// +// for i := 0; i < n; i++ { +// if !condition(i) { +// continue +// } +// wg.Add(1) +// go func() { +// // Do something. +// wg.Done() +// } +// } +// wg.Wait() +// +type WaitGroup struct { + m Mutex + counter int + waiters int + sema *uint32 +} + +// WaitGroup creates a new semaphore each time the old semaphore +// is released. This is to avoid the following race: +// +// G1: Add(1) +// G1: go G2() +// G1: Wait() // Context switch after Unlock() and before Semacquire(). +// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet. +// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block. +// G3: Add(1) // Makes counter == 1, waiters == 0. +// G3: go G4() +// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug. + +// Add adds delta, which may be negative, to the WaitGroup counter. +// If the counter becomes zero, all goroutines blocked on Wait() are released. +func (wg *WaitGroup) Add(delta int) { + wg.m.Lock() + if delta < -wg.counter { + wg.m.Unlock() + panic("sync: negative WaitGroup count") + } + wg.counter += delta + if wg.counter == 0 && wg.waiters > 0 { + for i := 0; i < wg.waiters; i++ { + runtime.Semrelease(wg.sema) + } + wg.waiters = 0 + wg.sema = nil + } + wg.m.Unlock() +} + +// Done decrements the WaitGroup counter. +func (wg *WaitGroup) Done() { + wg.Add(-1) +} + +// Wait blocks until the WaitGroup counter is zero. +func (wg *WaitGroup) Wait() { + wg.m.Lock() + if wg.counter == 0 { + wg.m.Unlock() + return + } + wg.waiters++ + if wg.sema == nil { + wg.sema = new(uint32) + } + s := wg.sema + wg.m.Unlock() + runtime.Semacquire(s) +} diff --git a/src/pkg/sync/waitgroup_test.go b/src/pkg/sync/waitgroup_test.go new file mode 100644 index 0000000000..fe35732e7a --- /dev/null +++ b/src/pkg/sync/waitgroup_test.go @@ -0,0 +1,60 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + . "sync" + "testing" +) + +func testWaitGroup(t *testing.T, wg1 *WaitGroup, wg2 *WaitGroup) { + n := 16 + wg1.Add(n) + wg2.Add(n) + exited := make(chan bool, n) + for i := 0; i != n; i++ { + go func(i int) { + wg1.Done() + wg2.Wait() + exited <- true + }(i) + } + wg1.Wait() + for i := 0; i != n; i++ { + select { + case <-exited: + t.Fatal("WaitGroup released group too soon") + default: + } + wg2.Done() + } + for i := 0; i != n; i++ { + <-exited // Will block if barrier fails to unlock someone. + } +} + +func TestWaitGroup(t *testing.T) { + wg1 := &WaitGroup{} + wg2 := &WaitGroup{} + + // Run the same test a few times to ensure barrier is in a proper state. + for i := 0; i != 8; i++ { + testWaitGroup(t, wg1, wg2) + } +} + +func TestWaitGroupMisuse(t *testing.T) { + defer func() { + err := recover() + if err != "sync: negative WaitGroup count" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + wg := &WaitGroup{} + wg.Add(1) + wg.Done() + wg.Done() + t.Fatal("Should panic") +}