]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: speed up receive on empty closed channel
authorBen Schwartz <bemasc@google.com>
Mon, 10 Jun 2019 16:29:23 +0000 (12:29 -0400)
committerIan Lance Taylor <iant@golang.org>
Sun, 22 Mar 2020 20:37:22 +0000 (20:37 +0000)
Currently, nonblocking receive on an open channel is about
700 times faster than nonblocking receive on a closed channel.
This change makes closed channels equally fast.

Fixes #32529.  Includes a correction based on #36714.

relevant benchstat output:
name                       old time/op    new time/op    delta
MakeChan/Byte-40            140ns ± 4%     137ns ± 7%   -2.38%  (p=0.023 n=17+19)
MakeChan/Int-40             174ns ± 5%     173ns ± 6%     ~     (p=0.437 n=18+19)
MakeChan/Ptr-40             315ns ±15%     301ns ±15%     ~     (p=0.051 n=20+20)
MakeChan/Struct/0-40        123ns ± 8%      99ns ±11%  -19.18%  (p=0.000 n=20+17)
MakeChan/Struct/32-40       297ns ± 8%     241ns ±18%  -19.13%  (p=0.000 n=20+20)
MakeChan/Struct/40-40       344ns ± 5%     273ns ±23%  -20.49%  (p=0.000 n=20+20)
ChanNonblocking-40         0.32ns ± 2%    0.32ns ± 2%   -1.25%  (p=0.000 n=19+18)
SelectUncontended-40       5.72ns ± 1%    5.71ns ± 2%     ~     (p=0.326 n=19+19)
SelectSyncContended-40     10.9µs ±10%    10.6µs ± 3%   -2.77%  (p=0.009 n=20+16)
SelectAsyncContended-40    1.00µs ± 0%    1.10µs ± 0%  +10.75%  (p=0.000 n=18+19)
SelectNonblock-40          1.22ns ± 2%    1.21ns ± 4%     ~     (p=0.141 n=18+19)
ChanUncontended-40          240ns ± 4%     233ns ± 4%   -2.82%  (p=0.000 n=20+20)
ChanContended-40           86.7µs ± 0%    82.7µs ± 0%   -4.64%  (p=0.000 n=20+19)
ChanSync-40                 294ns ± 7%     284ns ± 9%   -3.44%  (p=0.006 n=20+20)
ChanSyncWork-40            38.4µs ±19%    34.0µs ± 4%  -11.33%  (p=0.000 n=20+18)
ChanProdCons0-40           1.50µs ± 1%    1.63µs ± 0%   +8.53%  (p=0.000 n=19+19)
ChanProdCons10-40          1.17µs ± 0%    1.18µs ± 1%   +0.44%  (p=0.000 n=19+20)
ChanProdCons100-40          985ns ± 0%     959ns ± 1%   -2.64%  (p=0.000 n=20+20)
ChanProdConsWork0-40       1.50µs ± 0%    1.60µs ± 2%   +6.54%  (p=0.000 n=18+20)
ChanProdConsWork10-40      1.26µs ± 0%    1.26µs ± 2%   +0.40%  (p=0.015 n=20+19)
ChanProdConsWork100-40     1.27µs ± 0%    1.22µs ± 0%   -4.15%  (p=0.000 n=20+19)
SelectProdCons-40          1.50µs ± 1%    1.53µs ± 1%   +1.95%  (p=0.000 n=20+20)
ChanCreation-40            82.1ns ± 5%    81.6ns ± 7%     ~     (p=0.483 n=19+19)
ChanSem-40                  877ns ± 0%     719ns ± 0%  -17.98%  (p=0.000 n=18+19)
ChanPopular-40             1.75ms ± 2%    1.78ms ± 3%   +1.76%  (p=0.002 n=20+19)
ChanClosed-40               215ns ± 1%       0ns ± 6%  -99.82%  (p=0.000 n=20+18)

Previously committed in CL 181543 and reverted in CL 216158.

Change-Id: Ib767b08d724cfad03598d77271dbc1087485feb8
Reviewed-on: https://go-review.googlesource.com/c/go/+/216818
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Keith Randall <khr@golang.org>
src/runtime/chan.go
src/runtime/chan_test.go

index c953b23add7e2211ea3a992135e30e9e0de4b390..1d4599e260dbff0dc4ec918728c52aaf06a098a5 100644 (file)
@@ -121,6 +121,21 @@ func chanbuf(c *hchan, i uint) unsafe.Pointer {
        return add(c.buf, uintptr(i)*uintptr(c.elemsize))
 }
 
+// full reports whether a send on c would block (that is, the channel is full).
+// It uses a single word-sized read of mutable state, so although
+// the answer is instantaneously true, the correct answer may have changed
+// by the time the calling function receives the return value.
+func full(c *hchan) bool {
+       // c.dataqsiz is immutable (never written after the channel is created)
+       // so it is safe to read at any time during channel operation.
+       if c.dataqsiz == 0 {
+               // Assumes that a pointer read is relaxed-atomic.
+               return c.recvq.first == nil
+       }
+       // Assumes that a uint read is relaxed-atomic.
+       return c.qcount == c.dataqsiz
+}
+
 // entry point for c <- x from compiled code
 //go:nosplit
 func chansend1(c *hchan, elem unsafe.Pointer) {
@@ -160,7 +175,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        //
        // After observing that the channel is not closed, we observe that the channel is
        // not ready for sending. Each of these observations is a single word-sized read
-       // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
+       // (first c.closed and second full()).
        // Because a closed channel cannot transition from 'ready for sending' to
        // 'not ready for sending', even if the channel is closed between the two observations,
        // they imply a moment between the two when the channel was both not yet closed
@@ -169,9 +184,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        //
        // It is okay if the reads are reordered here: if we observe that the channel is not
        // ready for sending and then observe that it is not closed, that implies that the
-       // channel wasn't closed during the first observation.
-       if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
-               (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
+       // channel wasn't closed during the first observation. However, nothing here
+       // guarantees forward progress. We rely on the side effects of lock release in
+       // chanrecv() and closechan() to update this thread's view of c.closed and full().
+       if !block && c.closed == 0 && full(c) {
                return false
        }
 
@@ -401,6 +417,16 @@ func closechan(c *hchan) {
        }
 }
 
+// empty reports whether a read from c would block (that is, the channel is
+// empty).  It uses a single atomic read of mutable state.
+func empty(c *hchan) bool {
+       // c.dataqsiz is immutable.
+       if c.dataqsiz == 0 {
+               return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
+       }
+       return atomic.Loaduint(&c.qcount) == 0
+}
+
 // entry points for <- c from compiled code
 //go:nosplit
 func chanrecv1(c *hchan, elem unsafe.Pointer) {
@@ -436,21 +462,36 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
        }
 
        // Fast path: check for failed non-blocking operation without acquiring the lock.
-       //
-       // After observing that the channel is not ready for receiving, we observe that the
-       // channel is not closed. Each of these observations is a single word-sized read
-       // (first c.sendq.first or c.qcount, and second c.closed).
-       // Because a channel cannot be reopened, the later observation of the channel
-       // being not closed implies that it was also not closed at the moment of the
-       // first observation. We behave as if we observed the channel at that moment
-       // and report that the receive cannot proceed.
-       //
-       // The order of operations is important here: reversing the operations can lead to
-       // incorrect behavior when racing with a close.
-       if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
-               c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
-               atomic.Load(&c.closed) == 0 {
-               return
+       if !block && empty(c) {
+               // After observing that the channel is not ready for receiving, we observe whether the
+               // channel is closed.
+               //
+               // Reordering of these checks could lead to incorrect behavior when racing with a close.
+               // For example, if the channel was open and not empty, was closed, and then drained,
+               // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
+               // we use atomic loads for both checks, and rely on emptying and closing to happen in
+               // separate critical sections under the same lock.  This assumption fails when closing
+               // an unbuffered channel with a blocked send, but that is an error condition anyway.
+               if atomic.Load(&c.closed) == 0 {
+                       // Because a channel cannot be reopened, the later observation of the channel
+                       // being not closed implies that it was also not closed at the moment of the
+                       // first observation. We behave as if we observed the channel at that moment
+                       // and report that the receive cannot proceed.
+                       return
+               }
+               // The channel is irreversibly closed. Re-check whether the channel has any pending data
+               // to receive, which could have arrived between the empty and closed checks above.
+               // Sequential consistency is also required here, when racing with such a send.
+               if empty(c) {
+                       // The channel is irreversibly closed and empty.
+                       if raceenabled {
+                               raceacquire(c.raceaddr())
+                       }
+                       if ep != nil {
+                               typedmemclr(c.elemtype, ep)
+                       }
+                       return true, false
+               }
        }
 
        var t0 int64
index 1180e76fcd84f43a31ce8b59fbee6d97bd5bb9fb..039a086e9b146a5684844203c143c944b9b84c2b 100644 (file)
@@ -1127,6 +1127,20 @@ func BenchmarkChanPopular(b *testing.B) {
        wg.Wait()
 }
 
+func BenchmarkChanClosed(b *testing.B) {
+       c := make(chan struct{})
+       close(c)
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       select {
+                       case <-c:
+                       default:
+                               b.Error("Unreachable")
+                       }
+               }
+       })
+}
+
 var (
        alwaysFalse = false
        workSink    = 0