#include "../../cmd/ld/textflag.h"
#define MAXALIGN 8
-#define NOSELGEN 1
typedef struct WaitQ WaitQ;
typedef struct SudoG SudoG;
struct SudoG
{
- G* g; // g and selgen constitute
- uint32 selgen; // a weak pointer to g
+ G* g;
+ uint32* selectdone;
SudoG* link;
int64 releasetime;
byte* elem; // data element
static void destroychan(Hchan*);
static void racesync(Hchan*, SudoG*);
-Hchan*
-runtime·makechan_c(ChanType *t, int64 hint)
+static Hchan*
+makechan(ChanType *t, int64 hint)
{
Hchan *c;
Type *elem;
void
reflect·makechan(ChanType *t, uint64 size, Hchan *c)
{
- c = runtime·makechan_c(t, size);
+ c = makechan(t, size);
FLUSH(&c);
}
void
runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
{
- ret = runtime·makechan_c(t, hint);
+ ret = makechan(t, hint);
FLUSH(&ret);
}
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
-void
-runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
+static bool
+chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
{
SudoG *sg;
SudoG mysg;
int64 t0;
if(raceenabled)
- runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), runtime·chansend);
+ runtime·racereadobjectpc(ep, t->elem, runtime·getcallerpc(&t), chansend);
if(c == nil) {
USED(t);
- if(pres != nil) {
- *pres = false;
- return;
- }
+ if(!block)
+ return false;
runtime·park(nil, nil, "chan send (nil chan)");
- return; // not reached
+ return false; // not reached
}
if(debug) {
runtime·lock(c);
if(raceenabled)
- runtime·racereadpc(c, pc, runtime·chansend);
+ runtime·racereadpc(c, pc, chansend);
if(c->closed)
goto closed;
if(sg->releasetime)
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
-
- if(pres != nil)
- *pres = true;
- return;
+ return true;
}
- if(pres != nil) {
+ if(!block) {
runtime·unlock(c);
- *pres = false;
- return;
+ return false;
}
mysg.elem = ep;
mysg.g = g;
- mysg.selgen = NOSELGEN;
+ mysg.selectdone = nil;
g->param = nil;
enqueue(&c->sendq, &mysg);
runtime·parkunlock(c, "chan send");
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
- return;
+ return true;
asynch:
if(c->closed)
goto closed;
if(c->qcount >= c->dataqsiz) {
- if(pres != nil) {
+ if(!block) {
runtime·unlock(c);
- *pres = false;
- return;
+ return false;
}
mysg.g = g;
mysg.elem = nil;
- mysg.selgen = NOSELGEN;
+ mysg.selectdone = nil;
enqueue(&c->sendq, &mysg);
runtime·parkunlock(c, "chan send");
runtime·ready(gp);
} else
runtime·unlock(c);
- if(pres != nil)
- *pres = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
- return;
+ return true;
closed:
runtime·unlock(c);
runtime·panicstring("send on closed channel");
+ return false; // not reached
}
-void
-runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
+static bool
+chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
{
SudoG *sg;
SudoG mysg;
if(c == nil) {
USED(t);
- if(selected != nil) {
- *selected = false;
- return;
- }
+ if(!block)
+ return false;
runtime·park(nil, nil, "chan receive (nil chan)");
- return; // not reached
+ return false; // not reached
}
t0 = 0;
sg->releasetime = runtime·cputicks();
runtime·ready(gp);
- if(selected != nil)
- *selected = true;
if(received != nil)
*received = true;
- return;
+ return true;
}
- if(selected != nil) {
+ if(!block) {
runtime·unlock(c);
- *selected = false;
- return;
+ return false;
}
mysg.elem = ep;
mysg.g = g;
- mysg.selgen = NOSELGEN;
+ mysg.selectdone = nil;
g->param = nil;
enqueue(&c->recvq, &mysg);
runtime·parkunlock(c, "chan receive");
*received = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
- return;
+ return true;
asynch:
if(c->qcount <= 0) {
if(c->closed)
goto closed;
- if(selected != nil) {
+ if(!block) {
runtime·unlock(c);
- *selected = false;
if(received != nil)
*received = false;
- return;
+ return false;
}
mysg.g = g;
mysg.elem = nil;
- mysg.selgen = NOSELGEN;
+ mysg.selectdone = nil;
enqueue(&c->recvq, &mysg);
runtime·parkunlock(c, "chan receive");
} else
runtime·unlock(c);
- if(selected != nil)
- *selected = true;
if(received != nil)
*received = true;
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
- return;
+ return true;
closed:
if(ep != nil)
c->elemtype->alg->copy(c->elemsize, ep, nil);
- if(selected != nil)
- *selected = true;
if(received != nil)
*received = false;
if(raceenabled)
runtime·unlock(c);
if(mysg.releasetime > 0)
runtime·blockevent(mysg.releasetime - t0, 2);
+ return true;
}
// chansend1(hchan *chan any, elem *any);
void
runtime·chansend1(ChanType *t, Hchan* c, byte *v)
{
- runtime·chansend(t, c, v, nil, runtime·getcallerpc(&t));
+ chansend(t, c, v, true, runtime·getcallerpc(&t));
}
// chanrecv1(hchan *chan any, elem *any);
void
runtime·chanrecv1(ChanType *t, Hchan* c, byte *v)
{
- runtime·chanrecv(t, c, v, nil, nil);
+ chanrecv(t, c, v, true, nil);
}
// chanrecv2(hchan *chan any, elem *any) (received bool);
void
runtime·chanrecv2(ChanType *t, Hchan* c, byte *v, bool received)
{
- runtime·chanrecv(t, c, v, nil, &received);
+ chanrecv(t, c, v, true, &received);
}
// func selectnbsend(c chan any, elem *any) bool
//
#pragma textflag NOSPLIT
void
-runtime·selectnbsend(ChanType *t, Hchan *c, byte *val, bool pres)
+runtime·selectnbsend(ChanType *t, Hchan *c, byte *val, bool res)
{
- runtime·chansend(t, c, val, &pres, runtime·getcallerpc(&t));
+ res = chansend(t, c, val, false, runtime·getcallerpc(&t));
+ FLUSH(&res);
}
// func selectnbrecv(elem *any, c chan any) bool
void
runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
{
- runtime·chanrecv(t, c, v, &selected, nil);
+ selected = chanrecv(t, c, v, false, nil);
+ FLUSH(&selected);
}
// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
void
runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
{
- runtime·chanrecv(t, c, v, &selected, received);
+ selected = chanrecv(t, c, v, false, received);
+ FLUSH(&selected);
}
// For reflect:
void
reflect·chansend(ChanType *t, Hchan *c, byte *val, bool nb, uintptr selected)
{
- bool *sp;
-
- if(nb) {
- selected = false;
- sp = (bool*)&selected;
- } else {
- *(bool*)&selected = true;
- FLUSH(&selected);
- sp = nil;
- }
- runtime·chansend(t, c, val, sp, runtime·getcallerpc(&t));
+ selected = chansend(t, c, val, !nb, runtime·getcallerpc(&t));
+ FLUSH(&selected);
}
// For reflect:
void
reflect·chanrecv(ChanType *t, Hchan *c, bool nb, byte *val, bool selected, bool received)
{
- bool *sp;
-
- if(nb) {
- selected = false;
- sp = &selected;
- } else {
- selected = true;
- FLUSH(&selected);
- sp = nil;
- }
received = false;
FLUSH(&received);
- runtime·chanrecv(t, c, val, sp, &received);
+ selected = chanrecv(t, c, val, !nb, &received);
+ FLUSH(&selected);
}
static Select* newselect(int32);
selectgo(Select **selp)
{
Select *sel;
- uint32 o, i, j, k;
+ uint32 o, i, j, k, done;
int64 t0;
Scase *cas, *dfl;
Hchan *c;
case CaseSend:
if(raceenabled)
- runtime·racereadpc(c, cas->pc, runtime·chansend);
+ runtime·racereadpc(c, cas->pc, chansend);
if(c->closed)
goto sclose;
if(c->dataqsiz > 0) {
// pass 2 - enqueue on all chans
+ done = 0;
for(i=0; i<sel->ncase; i++) {
o = sel->pollorder[i];
cas = &sel->scase[o];
c = cas->chan;
sg = &cas->sg;
sg->g = g;
- sg->selgen = g->selgen;
+ sg->selectdone = &done;
switch(cas->kind) {
case CaseRecv:
if(raceenabled) {
if(cas->kind == CaseRecv && cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
else if(cas->kind == CaseSend)
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
}
selunlock(sel);
// can receive from buffer
if(raceenabled) {
if(cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
runtime·raceacquire(chanbuf(c, c->recvx));
}
if(cas->receivedp != nil)
// can send to buffer
if(raceenabled) {
runtime·racerelease(chanbuf(c, c->sendx));
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
}
c->elemtype->alg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
if(++c->sendx == c->dataqsiz)
// can receive from sleeping sender (sg)
if(raceenabled) {
if(cas->sg.elem != nil)
- runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chanrecv);
+ runtime·racewriteobjectpc(cas->sg.elem, c->elemtype, cas->pc, chanrecv);
racesync(c, sg);
}
selunlock(sel);
syncsend:
// can send to sleeping receiver (sg)
if(raceenabled) {
- runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, runtime·chansend);
+ runtime·racereadobjectpc(cas->sg.elem, c->elemtype, cas->pc, chansend);
racesync(c, sg);
}
selunlock(sel);
return nil;
q->first = sgp->link;
- // if sgp is stale, ignore it
- if(sgp->selgen != NOSELGEN &&
- (sgp->selgen != sgp->g->selgen ||
- !runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
- //prints("INVALID PSEUDOG POINTER\n");
- goto loop;
+ // if sgp participates in a select and is already signaled, ignore it
+ if(sgp->selectdone != nil) {
+ // claim the right to signal
+ if(*sgp->selectdone != 0 || !runtime·cas(sgp->selectdone, 0, 1))
+ goto loop;
}
return sgp;