From ee833ed72e8ccfdd2193b0e6c0223ee8eb99b380 Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Fri, 19 Aug 2022 00:31:07 +0700 Subject: [PATCH] sync: use atomic.Uint64 for WaitGroup state MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit So it's guaranteed to have 64-bit alignment, simplify the code without losing any performance: name old time/op new time/op delta WaitGroupUncontended-8 3.84ns ± 2% 3.82ns ± 1% ~ (p=0.159 n=10+10) WaitGroupAddDone-8 33.2ns ± 3% 33.0ns ± 3% ~ (p=0.564 n=9+10) WaitGroupAddDoneWork-8 39.3ns ± 1% 39.3ns ± 1% ~ (p=1.000 n=8+9) WaitGroupWait-8 0.70ns ± 3% 0.70ns ± 2% ~ (p=0.720 n=9+10) WaitGroupWaitWork-8 7.93ns ± 1% 7.99ns ± 3% ~ (p=0.271 n=10+10) WaitGroupActuallyWait-8 135ns ± 2% 135ns ± 1% ~ (p=0.897 n=10+10) Change-Id: I446b53fa92873419aadd592f45e51398f8ad8652 Reviewed-on: https://go-review.googlesource.com/c/go/+/424835 TryBot-Result: Gopher Robot Reviewed-by: Keith Randall Reviewed-by: Cherry Mui Run-TryBot: Cuong Manh Le --- src/sync/waitgroup.go | 47 +++++++++++-------------------------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index 9f26ae106c..be21417f9c 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -23,27 +23,8 @@ import ( type WaitGroup struct { noCopy noCopy - // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. - // 64-bit atomic operations require 64-bit alignment, but 32-bit - // compilers only guarantee that 64-bit fields are 32-bit aligned. - // For this reason on 32 bit architectures we need to check in state() - // if state1 is aligned or not, and dynamically "swap" the field order if - // needed. - state1 uint64 - state2 uint32 -} - -// state returns pointers to the state and sema fields stored within wg.state*. -func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { - if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { - // state1 is 64-bit aligned: nothing to do. - return &wg.state1, &wg.state2 - } else { - // state1 is 32-bit aligned but not 64-bit aligned: this means that - // (&state1)+4 is 64-bit aligned. - state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) - return (*uint64)(unsafe.Pointer(&state[1])), &state[0] - } + state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count. + sema uint32 } // Add adds delta, which may be negative, to the WaitGroup counter. @@ -60,9 +41,7 @@ func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { // new Add calls must happen after all previous Wait calls have returned. // See the WaitGroup example. func (wg *WaitGroup) Add(delta int) { - statep, semap := wg.state() if race.Enabled { - _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) @@ -70,14 +49,14 @@ func (wg *WaitGroup) Add(delta int) { race.Disable() defer race.Enable() } - state := atomic.AddUint64(statep, uint64(delta)<<32) + state := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. - race.Read(unsafe.Pointer(semap)) + race.Read(unsafe.Pointer(&wg.sema)) } if v < 0 { panic("sync: negative WaitGroup counter") @@ -93,13 +72,13 @@ func (wg *WaitGroup) Add(delta int) { // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. - if *statep != state { + if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. - *statep = 0 + wg.state.Store(0) for ; w != 0; w-- { - runtime_Semrelease(semap, false, 0) + runtime_Semrelease(&wg.sema, false, 0) } } @@ -110,13 +89,11 @@ func (wg *WaitGroup) Done() { // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { - statep, semap := wg.state() if race.Enabled { - _ = *statep // trigger nil deref early race.Disable() } for { - state := atomic.LoadUint64(statep) + state := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { @@ -128,16 +105,16 @@ func (wg *WaitGroup) Wait() { return } // Increment waiters count. - if atomic.CompareAndSwapUint64(statep, state, state+1) { + if wg.state.CompareAndSwap(state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. - race.Write(unsafe.Pointer(semap)) + race.Write(unsafe.Pointer(&wg.sema)) } - runtime_Semacquire(semap) - if *statep != 0 { + runtime_Semacquire(&wg.sema) + if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { -- 2.48.1