]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: improve performance of sync channels
authorDmitriy Vyukov <dvyukov@google.com>
Wed, 20 Jul 2011 15:51:25 +0000 (11:51 -0400)
committerRuss Cox <rsc@golang.org>
Wed, 20 Jul 2011 15:51:25 +0000 (11:51 -0400)
1. SudoG always contains a pointer to the element
(thus no variable size, and less copying).
2. chansend/chanrecv allocate SudoG on the stack.
3. Copying of elements and gorotuine notifications
are moved out of critical sections.

benchmark                        old ns/op    new ns/op    delta
BenchmarkSelectUncontended          515.00       514.00   -0.19%
BenchmarkSelectUncontended-2        291.00       281.00   -3.44%
BenchmarkSelectUncontended-4        213.00       189.00  -11.27%
BenchmarkSelectUncontended-8         78.30        79.00   +0.89%
BenchmarkSelectContended            518.00       514.00   -0.77%
BenchmarkSelectContended-2          655.00       631.00   -3.66%
BenchmarkSelectContended-4         1026.00      1051.00   +2.44%
BenchmarkSelectContended-8         2026.00      2128.00   +5.03%
BenchmarkSelectNonblock             175.00       173.00   -1.14%
BenchmarkSelectNonblock-2            85.10        87.70   +3.06%
BenchmarkSelectNonblock-4            60.10        43.30  -27.95%
BenchmarkSelectNonblock-8            37.60        25.50  -32.18%
BenchmarkChanUncontended            109.00       114.00   +4.59%
BenchmarkChanUncontended-2           54.60        57.20   +4.76%
BenchmarkChanUncontended-4           27.40        28.70   +4.74%
BenchmarkChanUncontended-8           14.60        15.10   +3.42%
BenchmarkChanContended              108.00       114.00   +5.56%
BenchmarkChanContended-2            621.00       617.00   -0.64%
BenchmarkChanContended-4            759.00       677.00  -10.80%
BenchmarkChanContended-8           1635.00      1517.00   -7.22%
BenchmarkChanSync                   299.00       256.00  -14.38%
BenchmarkChanSync-2                5055.00      4624.00   -8.53%
BenchmarkChanSync-4                4998.00      4680.00   -6.36%
BenchmarkChanSync-8                5019.00      4760.00   -5.16%
BenchmarkChanProdCons0              316.00       274.00  -13.29%
BenchmarkChanProdCons0-2           1280.00       617.00  -51.80%
BenchmarkChanProdCons0-4           2433.00      1332.00  -45.25%
BenchmarkChanProdCons0-8           3651.00      1934.00  -47.03%
BenchmarkChanProdCons10             153.00       152.00   -0.65%
BenchmarkChanProdCons10-2           626.00       581.00   -7.19%
BenchmarkChanProdCons10-4          1440.00      1323.00   -8.12%
BenchmarkChanProdCons10-8          2036.00      2017.00   -0.93%

R=rsc, ken
CC=golang-dev
https://golang.org/cl/4790042

src/pkg/runtime/chan.c
src/pkg/runtime/chan_test.go [new file with mode: 0644]

index bbe05e041c78646e2594ea1492e97078bfa5b1a5..926bde723c2f75ad9ceef8bc8baeb4ec06f83828 100644 (file)
@@ -21,7 +21,7 @@ struct        SudoG
        int16   offset;         // offset of case number
        int8    isfree;         // offset of case number
        SudoG*  link;
-       byte    elem[8];        // synch data element (+ more)
+       byte*   elem;           // data element
 };
 
 struct WaitQ
@@ -38,8 +38,8 @@ struct        Hchan
        bool    closed;
        uint8   elemalign;
        Alg*    elemalg;                // interface for element type
-       uint32  sendx;  // send index
-       uint32  recvx;  // receive index
+       uint32  sendx;                  // send index
+       uint32  recvx;                  // receive index
        WaitQ   recvq;                  // list of recv waiters
        WaitQ   sendq;                  // list of send waiters
        SudoG*  free;                   // freelist
@@ -170,6 +170,7 @@ void
 runtime·chansend(Hchan *c, byte *ep, bool *pres)
 {
        SudoG *sg;
+       SudoG mysg;
        G* gp;
 
        if(c == nil)
@@ -185,7 +186,6 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres)
        }
 
        runtime·lock(c);
-loop:
        if(c->closed)
                goto closed;
 
@@ -194,12 +194,12 @@ loop:
 
        sg = dequeue(&c->recvq, c);
        if(sg != nil) {
-               if(ep != nil)
-                       c->elemalg->copy(c->elemsize, sg->elem, ep);
-
+               runtime·unlock(c);
+               
                gp = sg->g;
                gp->param = sg;
-               runtime·unlock(c);
+               if(sg->elem != nil)
+                       c->elemalg->copy(c->elemsize, sg->elem, ep);
                runtime·ready(gp);
 
                if(pres != nil)
@@ -213,21 +213,22 @@ loop:
                return;
        }
 
-       sg = allocsg(c);
-       if(ep != nil)
-               c->elemalg->copy(c->elemsize, sg->elem, ep);
+       mysg.elem = ep;
+       mysg.g = g;
+       mysg.selgen = g->selgen;
        g->param = nil;
        g->status = Gwaiting;
-       enqueue(&c->sendq, sg);
+       enqueue(&c->sendq, &mysg);
        runtime·unlock(c);
        runtime·gosched();
 
-       runtime·lock(c);
-       sg = g->param;
-       if(sg == nil)
-               goto loop;
-       freesg(c, sg);
-       runtime·unlock(c);
+       if(g->param == nil) {
+               runtime·lock(c);
+               if(!c->closed)
+                       runtime·throw("chansend: spurious wakeup");
+               goto closed;
+       }
+
        return;
 
 asynch:
@@ -240,17 +241,18 @@ asynch:
                        *pres = false;
                        return;
                }
-               sg = allocsg(c);
+               mysg.g = g;
+               mysg.elem = nil;
+               mysg.selgen = g->selgen;
                g->status = Gwaiting;
-               enqueue(&c->sendq, sg);
+               enqueue(&c->sendq, &mysg);
                runtime·unlock(c);
                runtime·gosched();
 
                runtime·lock(c);
                goto asynch;
        }
-       if(ep != nil)
-               c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
+       c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
        if(++c->sendx == c->dataqsiz)
                c->sendx = 0;
        c->qcount++;
@@ -258,7 +260,6 @@ asynch:
        sg = dequeue(&c->recvq, c);
        if(sg != nil) {
                gp = sg->g;
-               freesg(c, sg);
                runtime·unlock(c);
                runtime·ready(gp);
        } else
@@ -277,6 +278,7 @@ void
 runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
 {
        SudoG *sg;
+       SudoG mysg;
        G *gp;
 
        if(c == nil)
@@ -289,8 +291,6 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
                runtime·printf("chanrecv: chan=%p\n", c);
 
        runtime·lock(c);
-
-loop:
        if(c->dataqsiz > 0)
                goto asynch;
 
@@ -299,13 +299,12 @@ loop:
 
        sg = dequeue(&c->sendq, c);
        if(sg != nil) {
+               runtime·unlock(c);
+
                if(ep != nil)
                        c->elemalg->copy(c->elemsize, ep, sg->elem);
-               c->elemalg->copy(c->elemsize, sg->elem, nil);
-
                gp = sg->g;
                gp->param = sg;
-               runtime·unlock(c);
                runtime·ready(gp);
 
                if(selected != nil)
@@ -321,25 +320,24 @@ loop:
                return;
        }
 
-       sg = allocsg(c);
+       mysg.elem = ep;
+       mysg.g = g;
+       mysg.selgen = g->selgen;
        g->param = nil;
        g->status = Gwaiting;
-       enqueue(&c->recvq, sg);
+       enqueue(&c->recvq, &mysg);
        runtime·unlock(c);
        runtime·gosched();
 
-       runtime·lock(c);
-       sg = g->param;
-       if(sg == nil)
-               goto loop;
+       if(g->param == nil) {
+               runtime·lock(c);
+               if(!c->closed)
+                       runtime·throw("chanrecv: spurious wakeup");
+               goto closed;
+       }
 
-       if(ep != nil)
-               c->elemalg->copy(c->elemsize, ep, sg->elem);
-       c->elemalg->copy(c->elemsize, sg->elem, nil);
        if(received != nil)
                *received = true;
-       freesg(c, sg);
-       runtime·unlock(c);
        return;
 
 asynch:
@@ -354,9 +352,11 @@ asynch:
                                *received = false;
                        return;
                }
-               sg = allocsg(c);
+               mysg.g = g;
+               mysg.elem = nil;
+               mysg.selgen = g->selgen;
                g->status = Gwaiting;
-               enqueue(&c->recvq, sg);
+               enqueue(&c->recvq, &mysg);
                runtime·unlock(c);
                runtime·gosched();
 
@@ -369,10 +369,10 @@ asynch:
        if(++c->recvx == c->dataqsiz)
                c->recvx = 0;
        c->qcount--;
+
        sg = dequeue(&c->sendq, c);
        if(sg != nil) {
                gp = sg->g;
-               freesg(c, sg);
                runtime·unlock(c);
                runtime·ready(gp);
        } else
@@ -721,7 +721,6 @@ selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so
        cas->so = so;
        cas->kind = CaseRecv;
        cas->u.recv.elemp = elem;
-       cas->u.recv.receivedp = nil;
        cas->u.recv.receivedp = received;
 
        if(debug)
@@ -911,6 +910,7 @@ loop:
        }
 
        if(dfl != nil) {
+               selunlock(sel);
                cas = dfl;
                goto retc;
        }
@@ -926,12 +926,12 @@ loop:
 
                switch(cas->kind) {
                case CaseRecv:
+                       sg->elem = cas->u.recv.elemp;
                        enqueue(&c->recvq, sg);
                        break;
                
                case CaseSend:
-                       if(c->dataqsiz == 0)
-                               c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
+                       sg->elem = cas->u.elem;
                        enqueue(&c->sendq, sg);
                        break;
                }
@@ -965,10 +965,8 @@ loop:
        cas = sel->scase[o];
        c = cas->chan;
 
-       if(c->dataqsiz > 0) {
-//             prints("shouldnt happen\n");
-               goto loop;
-       }
+       if(c->dataqsiz > 0)
+               runtime·throw("selectgo: shouldnt happen");
 
        if(debug)
                runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n",
@@ -977,12 +975,10 @@ loop:
        if(cas->kind == CaseRecv) {
                if(cas->u.recv.receivedp != nil)
                        *cas->u.recv.receivedp = true;
-               if(cas->u.recv.elemp != nil)
-                       c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
-               c->elemalg->copy(c->elemsize, sg->elem, nil);
        }
 
        freesg(c, sg);
+       selunlock(sel);
        goto retc;
 
 asyncrecv:
@@ -998,35 +994,38 @@ asyncrecv:
        sg = dequeue(&c->sendq, c);
        if(sg != nil) {
                gp = sg->g;
-               freesg(c, sg);
+               selunlock(sel);
                runtime·ready(gp);
+       } else {
+               selunlock(sel);
        }
        goto retc;
 
 asyncsend:
        // can send to buffer
-       if(cas->u.elem != nil)
-               c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
+       c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
        if(++c->sendx == c->dataqsiz)
                c->sendx = 0;
        c->qcount++;
        sg = dequeue(&c->recvq, c);
        if(sg != nil) {
                gp = sg->g;
-               freesg(c, sg);
+               selunlock(sel);
                runtime·ready(gp);
+       } else {
+               selunlock(sel);
        }
        goto retc;
 
 syncrecv:
        // can receive from sleeping sender (sg)
+       selunlock(sel);
        if(debug)
                runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
        if(cas->u.recv.receivedp != nil)
                *cas->u.recv.receivedp = true;
        if(cas->u.recv.elemp != nil)
                c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
-       c->elemalg->copy(c->elemsize, sg->elem, nil);
        gp = sg->g;
        gp->param = sg;
        runtime·ready(gp);
@@ -1034,6 +1033,7 @@ syncrecv:
 
 rclose:
        // read at end of closed channel
+       selunlock(sel);
        if(cas->u.recv.receivedp != nil)
                *cas->u.recv.receivedp = false;
        if(cas->u.recv.elemp != nil)
@@ -1042,18 +1042,16 @@ rclose:
 
 syncsend:
        // can send to sleeping receiver (sg)
+       selunlock(sel);
        if(debug)
                runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
-       if(c->closed)
-               goto sclose;
-       c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
+       if(sg->elem != nil)
+               c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
        gp = sg->g;
        gp->param = sg;
        runtime·ready(gp);
 
 retc:
-       selunlock(sel);
-
        // return to pc corresponding to chosen case
        pc = cas->pc;
        as = (byte*)selp + cas->so;
@@ -1093,7 +1091,6 @@ runtime·closechan(Hchan *c)
                        break;
                gp = sg->g;
                gp->param = nil;
-               freesg(c, sg);
                runtime·ready(gp);
        }
 
@@ -1104,7 +1101,6 @@ runtime·closechan(Hchan *c)
                        break;
                gp = sg->g;
                gp->param = nil;
-               freesg(c, sg);
                runtime·ready(gp);
        }
 
@@ -1203,7 +1199,7 @@ allocsg(Hchan *c)
        if(sg != nil) {
                c->free = sg->link;
        } else
-               sg = runtime·mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem));
+               sg = runtime·mal(sizeof(*sg));
        sg->selgen = g->selgen;
        sg->g = g;
        sg->offset = 0;
@@ -1220,6 +1216,7 @@ freesg(Hchan *c, SudoG *sg)
                        runtime·throw("chan.freesg: already free");
                sg->isfree = 1;
                sg->link = c->free;
+               sg->elem = nil;
                c->free = sg;
        }
 }
diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go
new file mode 100644 (file)
index 0000000..31f6856
--- /dev/null
@@ -0,0 +1,251 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime_test
+
+import (
+       "runtime"
+       "sync/atomic"
+       "testing"
+)
+
+func BenchmarkSelectUncontended(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       myc1 := make(chan int, 1)
+                       myc2 := make(chan int, 1)
+                       myc1 <- 0
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       select {
+                                       case <-myc1:
+                                               myc2 <- 0
+                                       case <-myc2:
+                                               myc1 <- 0
+                                       }
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkSelectContended(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       myc1 := make(chan int, procs)
+       myc2 := make(chan int, procs)
+       for p := 0; p < procs; p++ {
+               myc1 <- 0
+               go func() {
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       select {
+                                       case <-myc1:
+                                               myc2 <- 0
+                                       case <-myc2:
+                                               myc1 <- 0
+                                       }
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkSelectNonblock(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       myc1 := make(chan int)
+                       myc2 := make(chan int)
+                       myc3 := make(chan int, 1)
+                       myc4 := make(chan int, 1)
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       select {
+                                       case <-myc1:
+                                       default:
+                                       }
+                                       select {
+                                       case myc2 <- 0:
+                                       default:
+                                       }
+                                       select {
+                                       case <-myc3:
+                                       default:
+                                       }
+                                       select {
+                                       case myc4 <- 0:
+                                       default:
+                                       }
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkChanUncontended(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       myc := make(chan int, CallsPerSched)
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       myc <- 0
+                               }
+                               for g := 0; g < CallsPerSched; g++ {
+                                       <-myc
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkChanContended(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, procs)
+       myc := make(chan int, procs*CallsPerSched)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       myc <- 0
+                               }
+                               for g := 0; g < CallsPerSched; g++ {
+                                       <-myc
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func BenchmarkChanSync(b *testing.B) {
+       const CallsPerSched = 1000
+       procs := 2
+       N := int32(b.N / CallsPerSched / procs * procs)
+       c := make(chan bool, procs)
+       myc := make(chan int)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       for {
+                               i := atomic.AddInt32(&N, -1)
+                               if i < 0 {
+                                       break
+                               }
+                               for g := 0; g < CallsPerSched; g++ {
+                                       if i%2 == 0 {
+                                               <-myc
+                                               myc <- 0
+                                       } else {
+                                               myc <- 0
+                                               <-myc
+                                       }
+                               }
+                       }
+                       c <- true
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+       }
+}
+
+func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
+       const CallsPerSched = 1000
+       procs := runtime.GOMAXPROCS(-1)
+       N := int32(b.N / CallsPerSched)
+       c := make(chan bool, 2*procs)
+       myc := make(chan int, chanSize)
+       for p := 0; p < procs; p++ {
+               go func() {
+                       foo := 0
+                       for atomic.AddInt32(&N, -1) >= 0 {
+                               for g := 0; g < CallsPerSched; g++ {
+                                       for i := 0; i < localWork; i++ {
+                                               foo *= 2
+                                               foo /= 2
+                                       }
+                                       myc <- 1
+                               }
+                       }
+                       myc <- 0
+                       c <- foo == 42
+               }()
+               go func() {
+                       foo := 0
+                       for {
+                               v := <-myc
+                               if v == 0 {
+                                       break
+                               }
+                               for i := 0; i < localWork; i++ {
+                                       foo *= 2
+                                       foo /= 2
+                               }
+                       }
+                       c <- foo == 42
+               }()
+       }
+       for p := 0; p < procs; p++ {
+               <-c
+               <-c
+       }
+}
+
+func BenchmarkChanProdCons0(b *testing.B) {
+       benchmarkChanProdCons(b, 0, 0)
+}
+
+func BenchmarkChanProdCons10(b *testing.B) {
+       benchmarkChanProdCons(b, 10, 0)
+}
+
+func BenchmarkChanProdCons100(b *testing.B) {
+       benchmarkChanProdCons(b, 100, 0)
+}
+
+func BenchmarkChanProdConsWork0(b *testing.B) {
+       benchmarkChanProdCons(b, 0, 100)
+}
+
+func BenchmarkChanProdConsWork10(b *testing.B) {
+       benchmarkChanProdCons(b, 10, 100)
+}
+
+func BenchmarkChanProdConsWork100(b *testing.B) {
+       benchmarkChanProdCons(b, 100, 100)
+}