]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: add fast paths to non-blocking channel operations
authorDmitriy Vyukov <dvyukov@google.com>
Mon, 25 Aug 2014 07:55:42 +0000 (11:55 +0400)
committerDmitriy Vyukov <dvyukov@google.com>
Mon, 25 Aug 2014 07:55:42 +0000 (11:55 +0400)
benchmark                      old ns/op     new ns/op     delta
BenchmarkChanNonblocking       27.8          7.80          -71.94%
BenchmarkChanNonblocking-2     79.1          3.94          -95.02%
BenchmarkChanNonblocking-4     71.2          2.04          -97.13%

LGTM=rsc
R=golang-codereviews, rsc, dave
CC=golang-codereviews
https://golang.org/cl/110580043

src/pkg/runtime/chan.go
src/pkg/runtime/chan.goc
src/pkg/runtime/chan.h
src/pkg/runtime/chan_test.go

index 67427e960e65581b5d77bfaf450ee83b8ae52a34..f9a540af3d502e9650fa8c77301d94e3ec7969ef 100644 (file)
@@ -96,17 +96,37 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
                println("chansend: chan=", c)
        }
 
+       if raceenabled {
+               fn := chansend
+               pc := **(**uintptr)(unsafe.Pointer(&fn))
+               racereadpc(unsafe.Pointer(c), pc, callerpc)
+       }
+
+       // Fast path: check for failed non-blocking operation without acquiring the lock.
+       //
+       // After observing that the channel is not closed, we observe that the channel is
+       // not ready for sending. Each of these observations is a single word-sized read
+       // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
+       // Because a closed channel cannot transition from 'ready for sending' to
+       // 'not ready for sending', even if the channel is closed between the two observations,
+       // they imply a moment between the two when the channel was both not yet closed
+       // and not ready for sending. We behave as if we observed the channel at that moment,
+       // and report that the send cannot proceed.
+       //
+       // It is okay if the reads are reordered here: if we observe that the channel is not
+       // ready for sending and then observe that it is not closed, that implies that the
+       // channel wasn't closed during the first observation.
+       if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
+               (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
+               return false
+       }
+
        var t0 int64
        if blockprofilerate > 0 {
                t0 = gocputicks()
        }
 
        golock(&c.lock)
-       if raceenabled {
-               fn := chansend
-               pc := **(**uintptr)(unsafe.Pointer(&fn))
-               racereadpc(unsafe.Pointer(c), pc, callerpc)
-       }
        if c.closed != 0 {
                gounlock(&c.lock)
                panic("send on closed channel")
index 7f6373dc8183f50546e9a2f9c435a7e45b79d227..4d4f366efa11cc902215035012e273553652c83b 100644 (file)
@@ -57,6 +57,27 @@ chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
                runtime·prints("\n");
        }
 
+       if(raceenabled)
+               runtime·racereadpc(c, pc, chansend);
+
+       // Fast path: check for failed non-blocking operation without acquiring the lock.
+       //
+       // After observing that the channel is not closed, we observe that the channel is
+       // not ready for sending. Each of these observations is a single word-sized read
+       // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
+       // Because a closed channel cannot transition from 'ready for sending' to
+       // 'not ready for sending', even if the channel is closed between the two observations,
+       // they imply a moment between the two when the channel was both not yet closed
+       // and not ready for sending. We behave as if we observed the channel at that moment,
+       // and report that the send cannot proceed.
+       //
+       // It is okay if the reads are reordered here: if we observe that the channel is not
+       // ready for sending and then observe that it is not closed, that implies that the
+       // channel wasn't closed during the first observation.
+       if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil) ||
+               (c->dataqsiz > 0 && c->qcount == c->dataqsiz)))
+               return false;
+
        t0 = 0;
        mysg.releasetime = 0;
        if(runtime·blockprofilerate > 0) {
@@ -65,8 +86,6 @@ chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
        }
 
        runtime·lock(&c->lock);
-       if(raceenabled)
-               runtime·racereadpc(c, pc, chansend);
        if(c->closed)
                goto closed;
 
@@ -183,6 +202,23 @@ chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
                return false;  // not reached
        }
 
+       // Fast path: check for failed non-blocking operation without acquiring the lock.
+       //
+       // After observing that the channel is not ready for receiving, we observe that the
+       // channel is not closed. Each of these observations is a single word-sized read
+       // (first c.sendq.first or c.qcount, and second c.closed).
+       // Because a channel cannot be reopened, the later observation of the channel
+       // being not closed implies that it was also not closed at the moment of the
+       // first observation. We behave as if we observed the channel at that moment
+       // and report that the receive cannot proceed.
+       //
+       // The order of operations is important here: reversing the operations can lead to
+       // incorrect behavior when racing with a close.
+       if(!block && ((c->dataqsiz == 0 && c->sendq.first == nil) ||
+               (c->dataqsiz > 0 && runtime·atomicloadp((void**)&c->qcount) == 0)) &&
+               !runtime·atomicload(&c->closed))
+               return false;
+
        t0 = 0;
        mysg.releasetime = 0;
        if(runtime·blockprofilerate > 0) {
index 5ebbcfd4da1e56082fb62cac467baa472999ca8d..52eb20099dcb69b86c06a77dba94a4ab4a8a3b6f 100644 (file)
@@ -20,7 +20,7 @@ struct        Hchan
        uintgo  dataqsiz;               // size of the circular q
        byte*   buf;
        uint16  elemsize;
-       bool    closed;
+       uint32  closed;
        Type*   elemtype;               // element type
        uintgo  sendx;                  // send index
        uintgo  recvx;                  // receive index
index bb0f28655dbf431a34ecc7a3116821c197f371d9..01632892ed2de8a1b69a31322bfc5720f74506d4 100644 (file)
@@ -198,6 +198,26 @@ func TestChan(t *testing.T) {
        }
 }
 
+func TestNonblockRecvRace(t *testing.T) {
+       n := 10000
+       if testing.Short() {
+               n = 100
+       }
+       for i := 0; i < n; i++ {
+               c := make(chan int, 1)
+               c <- 1
+               go func() {
+                       select {
+                       case <-c:
+                       default:
+                               t.Fatal("chan is not ready")
+                       }
+               }()
+               close(c)
+               <-c
+       }
+}
+
 func TestSelfSelect(t *testing.T) {
        // Ensure that send/recv on the same chan in select
        // does not crash nor deadlock.