]> Cypherpunks repositories - gostls13.git/commitdiff
sync: Proposal for barrier implementation
authorGustavo Niemeyer <gustavo@niemeyer.net>
Thu, 3 Feb 2011 20:39:11 +0000 (12:39 -0800)
committerRob Pike <r@golang.org>
Thu, 3 Feb 2011 20:39:11 +0000 (12:39 -0800)
As discussed in the mailing list, this adds a simple barrier
implementation to the sync package which enables one or more
goroutines to wait for a counter to go down to zero.

R=rsc, rog, r
CC=golang-dev
https://golang.org/cl/3770045

src/pkg/sync/Makefile
src/pkg/sync/mutex.go
src/pkg/sync/waitgroup.go [new file with mode: 0644]
src/pkg/sync/waitgroup_test.go [new file with mode: 0644]

index f843795b0f8ac1194819f4aaebff97d9a2255a37..fd8e5d9987e6e5157000eb96602c0d2afd511148 100644 (file)
@@ -9,6 +9,7 @@ GOFILES=\
        mutex.go\
        once.go \
        rwmutex.go\
+       waitgroup.go\
 
 # 386-specific object files
 OFILES_386=\
index c4d82af00c87876f21bc424f6bfb079b1818b7c4..6c8a5d51d42c3db298f859ba8140a3b0a1d42bed 100644 (file)
@@ -3,10 +3,10 @@
 // license that can be found in the LICENSE file.
 
 // The sync package provides basic synchronization primitives
-// such as mutual exclusion locks.  Other than the Once type,
-// most are intended for use by low-level library routines.
-// Higher-level synchronization is better done via channels
-// and communication.
+// such as mutual exclusion locks.  Other than the Once and
+// WaitGroup types, most are intended for use by low-level
+// library routines.  Higher-level synchronization is better
+// done via channels and communication.
 package sync
 
 import "runtime"
diff --git a/src/pkg/sync/waitgroup.go b/src/pkg/sync/waitgroup.go
new file mode 100644 (file)
index 0000000..68e1d50
--- /dev/null
@@ -0,0 +1,86 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import "runtime"
+
+// A WaitGroup waits for a collection of goroutines to finish.
+// The main goroutine calls Add to set the number of
+// goroutines to wait for.  Then each of the goroutines
+// runs and calls Done when finished.  At the same time,
+// Wait can be used to block until all goroutines have finished.
+//
+// For example:
+//
+//   for i := 0; i < n; i++ {
+//       if !condition(i) {
+//           continue
+//       }
+//       wg.Add(1)
+//       go func() {
+//           // Do something.
+//           wg.Done()
+//       }
+//   }
+//   wg.Wait()
+// 
+type WaitGroup struct {
+       m       Mutex
+       counter int
+       waiters int
+       sema    *uint32
+}
+
+// WaitGroup creates a new semaphore each time the old semaphore
+// is released. This is to avoid the following race:
+//
+// G1: Add(1)
+// G1: go G2()
+// G1: Wait() // Context switch after Unlock() and before Semacquire().
+// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
+// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
+// G3: Add(1) // Makes counter == 1, waiters == 0.
+// G3: go G4()
+// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
+
+// Add adds delta, which may be negative, to the WaitGroup counter.
+// If the counter becomes zero, all goroutines blocked on Wait() are released.
+func (wg *WaitGroup) Add(delta int) {
+       wg.m.Lock()
+       if delta < -wg.counter {
+               wg.m.Unlock()
+               panic("sync: negative WaitGroup count")
+       }
+       wg.counter += delta
+       if wg.counter == 0 && wg.waiters > 0 {
+               for i := 0; i < wg.waiters; i++ {
+                       runtime.Semrelease(wg.sema)
+               }
+               wg.waiters = 0
+               wg.sema = nil
+       }
+       wg.m.Unlock()
+}
+
+// Done decrements the WaitGroup counter.
+func (wg *WaitGroup) Done() {
+       wg.Add(-1)
+}
+
+// Wait blocks until the WaitGroup counter is zero.
+func (wg *WaitGroup) Wait() {
+       wg.m.Lock()
+       if wg.counter == 0 {
+               wg.m.Unlock()
+               return
+       }
+       wg.waiters++
+       if wg.sema == nil {
+               wg.sema = new(uint32)
+       }
+       s := wg.sema
+       wg.m.Unlock()
+       runtime.Semacquire(s)
+}
diff --git a/src/pkg/sync/waitgroup_test.go b/src/pkg/sync/waitgroup_test.go
new file mode 100644 (file)
index 0000000..fe35732
--- /dev/null
@@ -0,0 +1,60 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync_test
+
+import (
+       . "sync"
+       "testing"
+)
+
+func testWaitGroup(t *testing.T, wg1 *WaitGroup, wg2 *WaitGroup) {
+       n := 16
+       wg1.Add(n)
+       wg2.Add(n)
+       exited := make(chan bool, n)
+       for i := 0; i != n; i++ {
+               go func(i int) {
+                       wg1.Done()
+                       wg2.Wait()
+                       exited <- true
+               }(i)
+       }
+       wg1.Wait()
+       for i := 0; i != n; i++ {
+               select {
+               case <-exited:
+                       t.Fatal("WaitGroup released group too soon")
+               default:
+               }
+               wg2.Done()
+       }
+       for i := 0; i != n; i++ {
+               <-exited // Will block if barrier fails to unlock someone.
+       }
+}
+
+func TestWaitGroup(t *testing.T) {
+       wg1 := &WaitGroup{}
+       wg2 := &WaitGroup{}
+
+       // Run the same test a few times to ensure barrier is in a proper state.
+       for i := 0; i != 8; i++ {
+               testWaitGroup(t, wg1, wg2)
+       }
+}
+
+func TestWaitGroupMisuse(t *testing.T) {
+       defer func() {
+               err := recover()
+               if err != "sync: negative WaitGroup count" {
+                       t.Fatalf("Unexpected panic: %#v", err)
+               }
+       }()
+       wg := &WaitGroup{}
+       wg.Add(1)
+       wg.Done()
+       wg.Done()
+       t.Fatal("Should panic")
+}