if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
- // buf points into the same allocation, elemtype is persistent
- // and SudoG's are referenced from G so can't be collected.
+ // buf points into the same allocation, elemtype is persistent.
+ // SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(gomallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan))
if size > 0 && elem.size != 0 {
c.dataqsiz = uint(size)
if debugChan {
- println("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size)
+ print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
return false
}
gopark(nil, nil, "chan send (nil chan)")
- return false // not reached
+ gothrow("unreachable")
}
if debugChan {
- println("chansend: chan=", c)
+ print("chansend: chan=", c, "\n")
}
if raceenabled {
// no receiver available: block on this channel.
gp := getg()
mysg := acquireSudog()
+ mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
}
gp := getg()
mysg := acquireSudog()
+ mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
goparkunlock(&c.lock, "chan send")
// someone woke us up - try again
- if mysg.releasetime != 0 {
- t1 = int64(mysg.releasetime)
+ if mysg.releasetime > 0 {
+ t1 = mysg.releasetime
}
releaseSudog(mysg)
lock(&c.lock)
}
goready(gp)
}
+ unlock(&c.lock)
+}
+
+// entry points for <- c from compiled code
+//go:nosplit
+func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
+ chanrecv(t, c, elem, true)
+}
+
+//go:nosplit
+func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
+ _, received = chanrecv(t, c, elem, true)
+ return
+}
+
+// chanrecv receives on channel c and writes the received data to ep.
+// ep may be nil, in which case received data is ignored.
+// If block == false and no elements are available, returns (false, false).
+// Otherwise, if c is closed, zeros *ep and returns (true, false).
+// Otherwise, fills in *ep with an element and returns (true, true).
+func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
+ // raceenabled: don't need to check ep, as it is always on the stack.
+
+ if debugChan {
+ print("chanrecv: chan=", c, "\n")
+ }
+
+ if c == nil {
+ if !block {
+ return
+ }
+ gopark(nil, nil, "chan receive (nil chan)")
+ gothrow("unreachable")
+ }
+
+ // Fast path: check for failed non-blocking operation without acquiring the lock.
+ //
+ // After observing that the channel is not ready for receiving, we observe that the
+ // channel is not closed. Each of these observations is a single word-sized read
+ // (first c.sendq.first or c.qcount, and second c.closed).
+ // Because a channel cannot be reopened, the later observation of the channel
+ // being not closed implies that it was also not closed at the moment of the
+ // first observation. We behave as if we observed the channel at that moment
+ // and report that the receive cannot proceed.
+ //
+ // The order of operations is important here: reversing the operations can lead to
+ // incorrect behavior when racing with a close.
+ if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
+ c.dataqsiz > 0 && atomicloaduint(&c.qcount) == 0) &&
+ atomicload(&c.closed) == 0 {
+ return
+ }
+
+ var t0 int64
+ if blockprofilerate > 0 {
+ t0 = cputicks()
+ }
+
+ lock(&c.lock)
+ if c.dataqsiz == 0 { // synchronous channel
+ if c.closed != 0 {
+ return recvclosed(c, ep)
+ }
+ sg := c.sendq.dequeue()
+ if sg != nil {
+ if raceenabled {
+ racesync(c, sg)
+ }
+ unlock(&c.lock)
+
+ if ep != nil {
+ memmove(ep, sg.elem, uintptr(c.elemsize))
+ }
+ gp := sg.g
+ gp.param = unsafe.Pointer(sg)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
+ }
+ goready(gp)
+ selected = true
+ received = true
+ return
+ }
+
+ if !block {
+ unlock(&c.lock)
+ return
+ }
+
+ // no sender available: block on this channel.
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = ep
+ mysg.waitlink = nil
+ gp.waiting = mysg
+ mysg.g = gp
+ mysg.selectdone = nil
+ gp.param = nil
+ c.recvq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan receive")
+
+ // someone woke us up
+ gp.waiting = nil
+ if mysg.releasetime > 0 {
+ blockevent(mysg.releasetime-t0, 2)
+ }
+ releaseSudog(mysg)
+
+ if gp.param != nil {
+ // a sender sent us some data. It already wrote to ep.
+ selected = true
+ received = true
+ return
+ }
+
+ lock(&c.lock)
+ if c.closed == 0 {
+ gothrow("chanrecv: spurious wakeup")
+ }
+ return recvclosed(c, ep)
+ }
+
+ // asynchronous channel
+ // wait for some data to appear
+ var t1 int64
+ for c.qcount <= 0 {
+ if c.closed != 0 {
+ selected, received = recvclosed(c, ep)
+ if t1 > 0 {
+ blockevent(t1-t0, 2)
+ }
+ return
+ }
+
+ if !block {
+ unlock(&c.lock)
+ return
+ }
+
+ // wait for someone to send an element
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = nil
+ mysg.g = gp
+ mysg.selectdone = nil
+
+ c.recvq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan receive")
+
+ // someone woke us up - try again
+ if mysg.releasetime > 0 {
+ t1 = mysg.releasetime
+ }
+ releaseSudog(mysg)
+ lock(&c.lock)
+ }
+
+ if raceenabled {
+ raceacquire(chanbuf(c, c.recvx))
+ racerelease(chanbuf(c, c.recvx))
+ }
+ if ep != nil {
+ memmove(ep, chanbuf(c, c.recvx), uintptr(c.elemsize))
+ }
+ memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
+
+ c.recvx++
+ if c.recvx == c.dataqsiz {
+ c.recvx = 0
+ }
+ c.qcount--
+
+ // ping a sender now that there is space
+ sg := c.sendq.dequeue()
+ if sg != nil {
+ gp := sg.g
+ unlock(&c.lock)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
+ }
+ goready(gp)
+ } else {
+ unlock(&c.lock)
+ }
+
+ if t1 > 0 {
+ blockevent(t1-t0, 2)
+ }
+ selected = true
+ received = true
+ return
+}
+
+// recvclosed is a helper function for chanrecv. Handles cleanup
+// when the receiver encounters a closed channel.
+// Caller must hold c.lock, recvclosed will release the lock.
+func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
+ if raceenabled {
+ raceacquire(unsafe.Pointer(c))
+ }
unlock(&c.lock)
+ if ep != nil {
+ memclr(ep, uintptr(c.elemsize))
+ }
+ return true, false
+}
+
+// compiler implements
+//
+// select {
+// case c <- v:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if selectnbsend(c, v) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
+func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
+ return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
+}
+
+// compiler implements
+//
+// select {
+// case v = <-c:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if selectnbrecv(&v, c) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
+func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
+ selected, _ = chanrecv(t, c, elem, false)
+ return
+}
+
+// compiler implements
+//
+// select {
+// case v, ok = <-c:
+// ... foo
+// default:
+// ... bar
+// }
+//
+// as
+//
+// if c != nil && selectnbrecv2(&v, &ok, c) {
+// ... foo
+// } else {
+// ... bar
+// }
+//
+func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
+ // TODO(khr): just return 2 values from this function, now that it is in Go.
+ selected, *received = chanrecv(t, c, elem, false)
+ return
+}
+
+func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
+ return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t)))
+}
+
+func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
+ return chanrecv(t, c, elem, !nb)
}
func reflect_chanlen(c *hchan) int {
static void enqueue(WaitQ*, SudoG*);
static void racesync(Hchan*, SudoG*);
-/*
- * generic single channel send/recv
- * if the bool pointer is nil,
- * then the full exchange will
- * occur. if pres is not nil,
- * then the protocol will not
- * sleep but return if it could
- * not complete.
- *
- * sleep can wake up with g->param == nil
- * when a channel involved in the sleep has
- * been closed. it is easiest to loop and re-run
- * the operation; we'll see that it's now closed.
- */
-static bool
-chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
-{
- SudoG *sg;
- SudoG mysg;
- G* gp;
- int64 t0;
-
- if(raceenabled)
- runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), chansend);
-
- if(c == nil) {
- USED(t);
- if(!block)
- return false;
- runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan send (nil chan)"));
- return false; // not reached
- }
-
- if(raceenabled)
- runtime·racereadpc(c, pc, chansend);
-
- // Fast path: check for failed non-blocking operation without acquiring the lock.
- //
- // After observing that the channel is not closed, we observe that the channel is
- // not ready for sending. Each of these observations is a single word-sized read
- // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
- // Because a closed channel cannot transition from 'ready for sending' to
- // 'not ready for sending', even if the channel is closed between the two observations,
- // they imply a moment between the two when the channel was both not yet closed
- // and not ready for sending. We behave as if we observed the channel at that moment,
- // and report that the send cannot proceed.
- //
- // It is okay if the reads are reordered here: if we observe that the channel is not
- // ready for sending and then observe that it is not closed, that implies that the
- // channel wasn't closed during the first observation.
- if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil) ||
- (c->dataqsiz > 0 && c->qcount == c->dataqsiz)))
- return false;
-
- t0 = 0;
- mysg.releasetime = 0;
- if(runtime·blockprofilerate > 0) {
- t0 = runtime·cputicks();
- mysg.releasetime = -1;
- }
-
- runtime·lock(&c->lock);
- if(c->closed)
- goto closed;
-
- if(c->dataqsiz > 0)
- goto asynch;
-
- sg = dequeue(&c->recvq);
- if(sg != nil) {
- if(raceenabled)
- racesync(c, sg);
- runtime·unlock(&c->lock);
-
- gp = sg->g;
- gp->param = sg;
- if(sg->elem != nil)
- runtime·memmove(sg->elem, ep, c->elemsize);
- if(sg->releasetime)
- sg->releasetime = runtime·cputicks();
- runtime·ready(gp);
- return true;
- }
-
- if(!block) {
- runtime·unlock(&c->lock);
- return false;
- }
-
- mysg.elem = ep;
- mysg.g = g;
- mysg.selectdone = nil;
- g->param = nil;
- enqueue(&c->sendq, &mysg);
- runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan send"));
-
- if(g->param == nil) {
- runtime·lock(&c->lock);
- if(!c->closed)
- runtime·throw("chansend: spurious wakeup");
- goto closed;
- }
-
- if(mysg.releasetime > 0)
- runtime·blockevent(mysg.releasetime - t0, 2);
-
- return true;
-
-asynch:
- if(c->closed)
- goto closed;
-
- if(c->qcount >= c->dataqsiz) {
- if(!block) {
- runtime·unlock(&c->lock);
- return false;
- }
- mysg.g = g;
- mysg.elem = nil;
- mysg.selectdone = nil;
- enqueue(&c->sendq, &mysg);
- runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan send"));
-
- runtime·lock(&c->lock);
- goto asynch;
- }
-
- if(raceenabled) {
- runtime·raceacquire(chanbuf(c, c->sendx));
- runtime·racerelease(chanbuf(c, c->sendx));
- }
-
- runtime·memmove(chanbuf(c, c->sendx), ep, c->elemsize);
- if(++c->sendx == c->dataqsiz)
- c->sendx = 0;
- c->qcount++;
-
- sg = dequeue(&c->recvq);
- if(sg != nil) {
- gp = sg->g;
- runtime·unlock(&c->lock);
- if(sg->releasetime)
- sg->releasetime = runtime·cputicks();
- runtime·ready(gp);
- } else
- runtime·unlock(&c->lock);
- if(mysg.releasetime > 0)
- runtime·blockevent(mysg.releasetime - t0, 2);
- return true;
-
-closed:
- runtime·unlock(&c->lock);
- runtime·panicstring("send on closed channel");
- return false; // not reached
-}
-
-
-static bool
-chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
-{
- SudoG *sg;
- SudoG mysg;
- G *gp;
- int64 t0;
-
- // raceenabled: don't need to check ep, as it is always on the stack.
-
- if(debug)
- runtime·printf("chanrecv: chan=%p\n", c);
-
- if(c == nil) {
- USED(t);
- if(!block)
- return false;
- runtime·park(nil, nil, runtime·gostringnocopy((byte*)"chan receive (nil chan)"));
- return false; // not reached
- }
-
- // Fast path: check for failed non-blocking operation without acquiring the lock.
- //
- // After observing that the channel is not ready for receiving, we observe that the
- // channel is not closed. Each of these observations is a single word-sized read
- // (first c.sendq.first or c.qcount, and second c.closed).
- // Because a channel cannot be reopened, the later observation of the channel
- // being not closed implies that it was also not closed at the moment of the
- // first observation. We behave as if we observed the channel at that moment
- // and report that the receive cannot proceed.
- //
- // The order of operations is important here: reversing the operations can lead to
- // incorrect behavior when racing with a close.
- if(!block && ((c->dataqsiz == 0 && c->sendq.first == nil) ||
- (c->dataqsiz > 0 && runtime·atomicloadp((void**)&c->qcount) == 0)) &&
- !runtime·atomicload(&c->closed))
- return false;
-
- t0 = 0;
- mysg.releasetime = 0;
- if(runtime·blockprofilerate > 0) {
- t0 = runtime·cputicks();
- mysg.releasetime = -1;
- }
-
- runtime·lock(&c->lock);
- if(c->dataqsiz > 0)
- goto asynch;
-
- if(c->closed)
- goto closed;
-
- sg = dequeue(&c->sendq);
- if(sg != nil) {
- if(raceenabled)
- racesync(c, sg);
- runtime·unlock(&c->lock);
-
- if(ep != nil)
- runtime·memmove(ep, sg->elem, c->elemsize);
- gp = sg->g;
- gp->param = sg;
- if(sg->releasetime)
- sg->releasetime = runtime·cputicks();
- runtime·ready(gp);
-
- if(received != nil)
- *received = true;
- return true;
- }
-
- if(!block) {
- runtime·unlock(&c->lock);
- return false;
- }
-
- mysg.elem = ep;
- mysg.g = g;
- mysg.selectdone = nil;
- g->param = nil;
- enqueue(&c->recvq, &mysg);
- runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan receive"));
-
- if(g->param == nil) {
- runtime·lock(&c->lock);
- if(!c->closed)
- runtime·throw("chanrecv: spurious wakeup");
- goto closed;
- }
-
- if(received != nil)
- *received = true;
- if(mysg.releasetime > 0)
- runtime·blockevent(mysg.releasetime - t0, 2);
- return true;
-
-asynch:
- if(c->qcount <= 0) {
- if(c->closed)
- goto closed;
-
- if(!block) {
- runtime·unlock(&c->lock);
- if(received != nil)
- *received = false;
- return false;
- }
- mysg.g = g;
- mysg.elem = nil;
- mysg.selectdone = nil;
- enqueue(&c->recvq, &mysg);
- runtime·parkunlock(&c->lock, runtime·gostringnocopy((byte*)"chan receive"));
-
- runtime·lock(&c->lock);
- goto asynch;
- }
-
- if(raceenabled) {
- runtime·raceacquire(chanbuf(c, c->recvx));
- runtime·racerelease(chanbuf(c, c->recvx));
- }
-
- if(ep != nil)
- runtime·memmove(ep, chanbuf(c, c->recvx), c->elemsize);
- runtime·memclr(chanbuf(c, c->recvx), c->elemsize);
- if(++c->recvx == c->dataqsiz)
- c->recvx = 0;
- c->qcount--;
-
- sg = dequeue(&c->sendq);
- if(sg != nil) {
- gp = sg->g;
- runtime·unlock(&c->lock);
- if(sg->releasetime)
- sg->releasetime = runtime·cputicks();
- runtime·ready(gp);
- } else
- runtime·unlock(&c->lock);
-
- if(received != nil)
- *received = true;
- if(mysg.releasetime > 0)
- runtime·blockevent(mysg.releasetime - t0, 2);
- return true;
-
-closed:
- if(ep != nil)
- runtime·memclr(ep, c->elemsize);
- if(received != nil)
- *received = false;
- if(raceenabled)
- runtime·raceacquire(c);
- runtime·unlock(&c->lock);
- if(mysg.releasetime > 0)
- runtime·blockevent(mysg.releasetime - t0, 2);
- return true;
-}
-
-#pragma textflag NOSPLIT
-func chanrecv1(t *ChanType, c *Hchan, elem *byte) {
- chanrecv(t, c, elem, true, nil);
-}
-
-// chanrecv2(hchan *chan any, elem *any) (received bool);
-#pragma textflag NOSPLIT
-func chanrecv2(t *ChanType, c *Hchan, elem *byte) (received bool) {
- chanrecv(t, c, elem, true, &received);
-}
-
-// compiler implements
-//
-// select {
-// case c <- v:
-// ... foo
-// default:
-// ... bar
-// }
-//
-// as
-//
-// if selectnbsend(c, v) {
-// ... foo
-// } else {
-// ... bar
-// }
-//
-#pragma textflag NOSPLIT
-func selectnbsend(t *ChanType, c *Hchan, elem *byte) (selected bool) {
- selected = chansend(t, c, elem, false, runtime·getcallerpc(&t));
-}
-
-// compiler implements
-//
-// select {
-// case v = <-c:
-// ... foo
-// default:
-// ... bar
-// }
-//
-// as
-//
-// if selectnbrecv(&v, c) {
-// ... foo
-// } else {
-// ... bar
-// }
-//
-#pragma textflag NOSPLIT
-func selectnbrecv(t *ChanType, elem *byte, c *Hchan) (selected bool) {
- selected = chanrecv(t, c, elem, false, nil);
-}
-
-// compiler implements
-//
-// select {
-// case v, ok = <-c:
-// ... foo
-// default:
-// ... bar
-// }
-//
-// as
-//
-// if c != nil && selectnbrecv2(&v, &ok, c) {
-// ... foo
-// } else {
-// ... bar
-// }
-//
-#pragma textflag NOSPLIT
-func selectnbrecv2(t *ChanType, elem *byte, received *bool, c *Hchan) (selected bool) {
- selected = chanrecv(t, c, elem, false, received);
-}
-
-#pragma textflag NOSPLIT
-func reflect·chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool) {
- selected = chansend(t, c, elem, !nb, runtime·getcallerpc(&t));
-}
-
-func reflect·chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool, received bool) {
- received = false;
- selected = chanrecv(t, c, elem, !nb, &received);
-}
+// TODO(khr): temporary placeholders until the rest of this code is moved to Go.
+extern byte runtime·chansend;
+extern byte runtime·chanrecv;
static int64
selectsize(int32 size)
case CaseSend:
if(raceenabled)
- runtime·racereadpc(c, cas->pc, chansend);
+ runtime·racereadpc(c, cas->pc, &runtime·chansend);
if(c->closed)
goto sclose;
if(c->dataqsiz > 0) {
if(raceenabled) {
if(cas->kind == CaseRecv && cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chanrecv);
else if(cas->kind == CaseSend)
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chansend);
}
selunlock(sel);
// can receive from buffer
if(raceenabled) {
if(cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chanrecv);
runtime·raceacquire(chanbuf(c, c->recvx));
runtime·racerelease(chanbuf(c, c->recvx));
}
if(raceenabled) {
runtime·raceacquire(chanbuf(c, c->sendx));
runtime·racerelease(chanbuf(c, c->sendx));
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chansend);
}
runtime·memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
if(++c->sendx == c->dataqsiz)
// can receive from sleeping sender (sg)
if(raceenabled) {
if(cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chanrecv);
racesync(c, sg);
}
selunlock(sel);
syncsend:
// can send to sleeping receiver (sg)
if(raceenabled) {
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, &runtime·chansend);
racesync(c, sg);
}
selunlock(sel);