int16 offset; // offset of case number
int8 isfree; // offset of case number
SudoG* link;
- byte elem[8]; // synch data element (+ more)
+ byte* elem; // data element
};
struct WaitQ
bool closed;
uint8 elemalign;
Alg* elemalg; // interface for element type
- uint32 sendx; // send index
- uint32 recvx; // receive index
+ uint32 sendx; // send index
+ uint32 recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
SudoG* free; // freelist
runtime·chansend(Hchan *c, byte *ep, bool *pres)
{
SudoG *sg;
+ SudoG mysg;
G* gp;
if(c == nil)
}
runtime·lock(c);
-loop:
if(c->closed)
goto closed;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
- if(ep != nil)
- c->elemalg->copy(c->elemsize, sg->elem, ep);
-
+ runtime·unlock(c);
+
gp = sg->g;
gp->param = sg;
- runtime·unlock(c);
+ if(sg->elem != nil)
+ c->elemalg->copy(c->elemsize, sg->elem, ep);
runtime·ready(gp);
if(pres != nil)
return;
}
- sg = allocsg(c);
- if(ep != nil)
- c->elemalg->copy(c->elemsize, sg->elem, ep);
+ mysg.elem = ep;
+ mysg.g = g;
+ mysg.selgen = g->selgen;
g->param = nil;
g->status = Gwaiting;
- enqueue(&c->sendq, sg);
+ enqueue(&c->sendq, &mysg);
runtime·unlock(c);
runtime·gosched();
- runtime·lock(c);
- sg = g->param;
- if(sg == nil)
- goto loop;
- freesg(c, sg);
- runtime·unlock(c);
+ if(g->param == nil) {
+ runtime·lock(c);
+ if(!c->closed)
+ runtime·throw("chansend: spurious wakeup");
+ goto closed;
+ }
+
return;
asynch:
*pres = false;
return;
}
- sg = allocsg(c);
+ mysg.g = g;
+ mysg.elem = nil;
+ mysg.selgen = g->selgen;
g->status = Gwaiting;
- enqueue(&c->sendq, sg);
+ enqueue(&c->sendq, &mysg);
runtime·unlock(c);
runtime·gosched();
runtime·lock(c);
goto asynch;
}
- if(ep != nil)
- c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
runtime·unlock(c);
runtime·ready(gp);
} else
runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
{
SudoG *sg;
+ SudoG mysg;
G *gp;
if(c == nil)
runtime·printf("chanrecv: chan=%p\n", c);
runtime·lock(c);
-
-loop:
if(c->dataqsiz > 0)
goto asynch;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
+ runtime·unlock(c);
+
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
-
gp = sg->g;
gp->param = sg;
- runtime·unlock(c);
runtime·ready(gp);
if(selected != nil)
return;
}
- sg = allocsg(c);
+ mysg.elem = ep;
+ mysg.g = g;
+ mysg.selgen = g->selgen;
g->param = nil;
g->status = Gwaiting;
- enqueue(&c->recvq, sg);
+ enqueue(&c->recvq, &mysg);
runtime·unlock(c);
runtime·gosched();
- runtime·lock(c);
- sg = g->param;
- if(sg == nil)
- goto loop;
+ if(g->param == nil) {
+ runtime·lock(c);
+ if(!c->closed)
+ runtime·throw("chanrecv: spurious wakeup");
+ goto closed;
+ }
- if(ep != nil)
- c->elemalg->copy(c->elemsize, ep, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
if(received != nil)
*received = true;
- freesg(c, sg);
- runtime·unlock(c);
return;
asynch:
*received = false;
return;
}
- sg = allocsg(c);
+ mysg.g = g;
+ mysg.elem = nil;
+ mysg.selgen = g->selgen;
g->status = Gwaiting;
- enqueue(&c->recvq, sg);
+ enqueue(&c->recvq, &mysg);
runtime·unlock(c);
runtime·gosched();
if(++c->recvx == c->dataqsiz)
c->recvx = 0;
c->qcount--;
+
sg = dequeue(&c->sendq, c);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
runtime·unlock(c);
runtime·ready(gp);
} else
cas->so = so;
cas->kind = CaseRecv;
cas->u.recv.elemp = elem;
- cas->u.recv.receivedp = nil;
cas->u.recv.receivedp = received;
if(debug)
}
if(dfl != nil) {
+ selunlock(sel);
cas = dfl;
goto retc;
}
switch(cas->kind) {
case CaseRecv:
+ sg->elem = cas->u.recv.elemp;
enqueue(&c->recvq, sg);
break;
case CaseSend:
- if(c->dataqsiz == 0)
- c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
+ sg->elem = cas->u.elem;
enqueue(&c->sendq, sg);
break;
}
cas = sel->scase[o];
c = cas->chan;
- if(c->dataqsiz > 0) {
-// prints("shouldnt happen\n");
- goto loop;
- }
+ if(c->dataqsiz > 0)
+ runtime·throw("selectgo: shouldnt happen");
if(debug)
runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n",
if(cas->kind == CaseRecv) {
if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = true;
- if(cas->u.recv.elemp != nil)
- c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
}
freesg(c, sg);
+ selunlock(sel);
goto retc;
asyncrecv:
sg = dequeue(&c->sendq, c);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
+ selunlock(sel);
runtime·ready(gp);
+ } else {
+ selunlock(sel);
}
goto retc;
asyncsend:
// can send to buffer
- if(cas->u.elem != nil)
- c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
+ c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
gp = sg->g;
- freesg(c, sg);
+ selunlock(sel);
runtime·ready(gp);
+ } else {
+ selunlock(sel);
}
goto retc;
syncrecv:
// can receive from sleeping sender (sg)
+ selunlock(sel);
if(debug)
runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = true;
if(cas->u.recv.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
- c->elemalg->copy(c->elemsize, sg->elem, nil);
gp = sg->g;
gp->param = sg;
runtime·ready(gp);
rclose:
// read at end of closed channel
+ selunlock(sel);
if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = false;
if(cas->u.recv.elemp != nil)
syncsend:
// can send to sleeping receiver (sg)
+ selunlock(sel);
if(debug)
runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
- if(c->closed)
- goto sclose;
- c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
+ if(sg->elem != nil)
+ c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
gp->param = sg;
runtime·ready(gp);
retc:
- selunlock(sel);
-
// return to pc corresponding to chosen case
pc = cas->pc;
as = (byte*)selp + cas->so;
break;
gp = sg->g;
gp->param = nil;
- freesg(c, sg);
runtime·ready(gp);
}
break;
gp = sg->g;
gp->param = nil;
- freesg(c, sg);
runtime·ready(gp);
}
if(sg != nil) {
c->free = sg->link;
} else
- sg = runtime·mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem));
+ sg = runtime·mal(sizeof(*sg));
sg->selgen = g->selgen;
sg->g = g;
sg->offset = 0;
runtime·throw("chan.freesg: already free");
sg->isfree = 1;
sg->link = c->free;
+ sg->elem = nil;
c->free = sg;
}
}
--- /dev/null
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package runtime_test
+
+import (
+ "runtime"
+ "sync/atomic"
+ "testing"
+)
+
+func BenchmarkSelectUncontended(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ myc1 := make(chan int, 1)
+ myc2 := make(chan int, 1)
+ myc1 <- 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ select {
+ case <-myc1:
+ myc2 <- 0
+ case <-myc2:
+ myc1 <- 0
+ }
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkSelectContended(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ myc1 := make(chan int, procs)
+ myc2 := make(chan int, procs)
+ for p := 0; p < procs; p++ {
+ myc1 <- 0
+ go func() {
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ select {
+ case <-myc1:
+ myc2 <- 0
+ case <-myc2:
+ myc1 <- 0
+ }
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkSelectNonblock(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ myc1 := make(chan int)
+ myc2 := make(chan int)
+ myc3 := make(chan int, 1)
+ myc4 := make(chan int, 1)
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ select {
+ case <-myc1:
+ default:
+ }
+ select {
+ case myc2 <- 0:
+ default:
+ }
+ select {
+ case <-myc3:
+ default:
+ }
+ select {
+ case myc4 <- 0:
+ default:
+ }
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkChanUncontended(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ myc := make(chan int, CallsPerSched)
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ myc <- 0
+ }
+ for g := 0; g < CallsPerSched; g++ {
+ <-myc
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkChanContended(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ myc := make(chan int, procs*CallsPerSched)
+ for p := 0; p < procs; p++ {
+ go func() {
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ myc <- 0
+ }
+ for g := 0; g < CallsPerSched; g++ {
+ <-myc
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkChanSync(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := 2
+ N := int32(b.N / CallsPerSched / procs * procs)
+ c := make(chan bool, procs)
+ myc := make(chan int)
+ for p := 0; p < procs; p++ {
+ go func() {
+ for {
+ i := atomic.AddInt32(&N, -1)
+ if i < 0 {
+ break
+ }
+ for g := 0; g < CallsPerSched; g++ {
+ if i%2 == 0 {
+ <-myc
+ myc <- 0
+ } else {
+ myc <- 0
+ <-myc
+ }
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, 2*procs)
+ myc := make(chan int, chanSize)
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ for g := 0; g < CallsPerSched; g++ {
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ myc <- 1
+ }
+ }
+ myc <- 0
+ c <- foo == 42
+ }()
+ go func() {
+ foo := 0
+ for {
+ v := <-myc
+ if v == 0 {
+ break
+ }
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ <-c
+ }
+}
+
+func BenchmarkChanProdCons0(b *testing.B) {
+ benchmarkChanProdCons(b, 0, 0)
+}
+
+func BenchmarkChanProdCons10(b *testing.B) {
+ benchmarkChanProdCons(b, 10, 0)
+}
+
+func BenchmarkChanProdCons100(b *testing.B) {
+ benchmarkChanProdCons(b, 100, 0)
+}
+
+func BenchmarkChanProdConsWork0(b *testing.B) {
+ benchmarkChanProdCons(b, 0, 100)
+}
+
+func BenchmarkChanProdConsWork10(b *testing.B) {
+ benchmarkChanProdCons(b, 10, 100)
+}
+
+func BenchmarkChanProdConsWork100(b *testing.B) {
+ benchmarkChanProdCons(b, 100, 100)
+}