Scase* scase[1]; // one per case
};
+static void dequeueg(WaitQ*, Hchan*);
static SudoG* dequeue(WaitQ*, Hchan*);
static void enqueue(WaitQ*, SudoG*);
static SudoG* allocsg(Hchan*);
c->dataqsiz = hint;
}
- if(debug) {
- prints("makechan: chan=");
- ·printpointer(c);
- prints("; elemsize=");
- ·printint(elem->size);
- prints("; elemalg=");
- ·printint(elem->alg);
- prints("; elemalign=");
- ·printint(elem->align);
- prints("; dataqsiz=");
- ·printint(c->dataqsiz);
- prints("\n");
- }
+ if(debug)
+ printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n",
+ c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz);
return c;
}
* occur. if pres is not nil,
* then the protocol will not
* sleep but return if it could
- * not complete
+ * not complete.
+ *
+ * sleep can wake up with g->param == nil
+ * when a channel involved in the sleep has
+ * been closed. it is easiest to loop and re-run
+ * the operation; we'll see that it's now closed.
*/
void
chansend(Hchan *c, byte *ep, bool *pres)
gosched();
if(debug) {
- prints("chansend: chan=");
- ·printpointer(c);
- prints("; elem=");
+ printf("chansend: chan=%p; elem=", c);
c->elemalg->print(c->elemsize, ep);
prints("\n");
}
if(gcwaiting)
gosched();
- if(debug) {
- prints("chanrecv: chan=");
- ·printpointer(c);
- prints("\n");
- }
+ if(debug)
+ printf("chanrecv: chan=%p\n", c);
lock(c);
loop:
sel->tcase = size;
sel->ncase = 0;
*selp = sel;
- if(debug) {
- prints("newselect s=");
- ·printpointer(sel);
- prints(" size=");
- ·printint(size);
- prints("\n");
- }
+ if(debug)
+ printf("newselect s=%p size=%d\n", sel, size);
}
// selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
ae = (byte*)&sel + eo;
c->elemalg->copy(c->elemsize, cas->u.elem, ae);
- if(debug) {
- prints("selectsend s=");
- ·printpointer(sel);
- prints(" pc=");
- ·printpointer(cas->pc);
- prints(" chan=");
- ·printpointer(cas->chan);
- prints(" so=");
- ·printint(cas->so);
- prints(" send=");
- ·printint(cas->send);
- prints("\n");
- }
+ if(debug)
+ printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n",
+ sel, cas->pc, cas->chan, cas->so, cas->send);
}
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
cas->send = 0;
cas->u.elemp = *(byte**)((byte*)&sel + eo);
- if(debug) {
- prints("selectrecv s=");
- ·printpointer(sel);
- prints(" pc=");
- ·printpointer(cas->pc);
- prints(" chan=");
- ·printpointer(cas->chan);
- prints(" so=");
- ·printint(cas->so);
- prints(" send=");
- ·printint(cas->send);
- prints("\n");
- }
+ if(debug)
+ printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n",
+ sel, cas->pc, cas->chan, cas->so, cas->send);
}
cas->send = 2;
cas->u.elemp = nil;
- if(debug) {
- prints("selectdefault s=");
- ·printpointer(sel);
- prints(" pc=");
- ·printpointer(cas->pc);
- prints(" so=");
- ·printint(cas->so);
- prints(" send=");
- ·printint(cas->send);
- prints("\n");
- }
+ if(debug)
+ printf("selectdefault s=%p pc=%p so=%d send=%d\n",
+ sel, cas->pc, cas->so, cas->send);
}
static void
if(gcwaiting)
gosched();
- if(debug) {
- prints("selectgo: sel=");
- ·printpointer(sel);
- prints("\n");
- }
+ if(debug)
+ printf("select: sel=%p\n", sel);
if(sel->ncase < 2) {
if(sel->ncase < 1)
- throw("selectgo: no cases");
+ throw("select: no cases");
// make special case of one.
}
p = fastrand1();
if(gcd(p, sel->ncase) == 1)
break;
- if(i > 1000) {
- throw("selectgo: failed to select prime");
- }
+ if(i > 1000)
+ throw("select: failed to select prime");
}
// select an initial offset
dfl = nil;
for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o];
-
- if(cas->send == 2) { // default
- dfl = cas;
- goto next1;
- }
-
c = cas->chan;
- if(c->dataqsiz > 0) {
- if(cas->send) {
- if(c->closed & Wclosed)
- goto sclose;
- if(c->qcount < c->dataqsiz)
- goto asyns;
- goto next1;
+
+ switch(cas->send) {
+ case 0: // recv
+ if(c->dataqsiz > 0) {
+ if(c->qcount > 0)
+ goto asyncrecv;
+ } else {
+ sg = dequeue(&c->sendq, c);
+ if(sg != nil)
+ goto syncrecv;
}
- if(c->qcount > 0)
- goto asynr;
if(c->closed & Wclosed)
goto rclose;
- goto next1;
- }
+ break;
- if(cas->send) {
+ case 1: // send
if(c->closed & Wclosed)
goto sclose;
- sg = dequeue(&c->recvq, c);
- if(sg != nil)
- goto gots;
- goto next1;
+ if(c->dataqsiz > 0) {
+ if(c->qcount < c->dataqsiz)
+ goto asyncsend;
+ } else {
+ sg = dequeue(&c->recvq, c);
+ if(sg != nil)
+ goto syncsend;
+ }
+ break;
+
+ case 2: // default
+ dfl = cas;
+ break;
}
- sg = dequeue(&c->sendq, c);
- if(sg != nil)
- goto gotr;
- if(c->closed & Wclosed)
- goto rclose;
- next1:
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o];
c = cas->chan;
+ sg = allocsg(c);
+ sg->offset = o;
- if(c->dataqsiz > 0) {
- if(cas->send) {
- if(c->qcount < c->dataqsiz) {
- prints("selectgo: pass 2 async send\n");
- goto asyns;
- }
- sg = allocsg(c);
- sg->offset = o;
- enqueue(&c->sendq, sg);
- goto next2;
- }
- if(c->qcount > 0) {
- prints("selectgo: pass 2 async recv\n");
- goto asynr;
+ switch(cas->send) {
+ case 0: // recv
+ if(c->dataqsiz > 0) {
+ if(c->qcount > 0)
+ throw("select: pass 2 async recv");
+ } else {
+ if(dequeue(&c->sendq, c))
+ throw("select: pass 2 sync recv");
}
- sg = allocsg(c);
- sg->offset = o;
enqueue(&c->recvq, sg);
- goto next2;
- }
-
- if(cas->send) {
- sg = dequeue(&c->recvq, c);
- if(sg != nil) {
- prints("selectgo: pass 2 sync send\n");
- g->selgen++;
- goto gots;
+ break;
+
+ case 1: // send
+ if(c->dataqsiz > 0) {
+ if(c->qcount < c->dataqsiz)
+ throw("select: pass 2 async send");
+ } else {
+ if(dequeue(&c->recvq, c))
+ throw("select: pass 2 sync send");
+ c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
}
- sg = allocsg(c);
- sg->offset = o;
- c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg);
- goto next2;
- }
- sg = dequeue(&c->sendq, c);
- if(sg != nil) {
- prints("selectgo: pass 2 sync recv\n");
- g->selgen++;
- goto gotr;
+ break;
}
- sg = allocsg(c);
- sg->offset = o;
- enqueue(&c->recvq, sg);
- next2:
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
sellock(sel);
sg = g->param;
+
+ // pass 3 - dequeue from unsuccessful chans
+ // otherwise they stack up on quiet channels
+ for(i=0; i<sel->ncase; i++) {
+ if(sg == nil || o != sg->offset) {
+ cas = sel->scase[o];
+ c = cas->chan;
+ if(cas->send)
+ dequeueg(&c->sendq, c);
+ else
+ dequeueg(&c->recvq, c);
+ }
+
+ o += p;
+ if(o >= sel->ncase)
+ o -= sel->ncase;
+ }
+
if(sg == nil)
goto loop;
goto loop;
}
- if(debug) {
- prints("wait-return: sel=");
- ·printpointer(sel);
- prints(" c=");
- ·printpointer(c);
- prints(" cas=");
- ·printpointer(cas);
- prints(" send=");
- ·printint(cas->send);
- prints(" o=");
- ·printint(o);
- prints("\n");
- }
+ if(debug)
+ printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n",
+ sel, c, cas, cas->send, o);
if(!cas->send) {
if(cas->u.elemp != nil)
freesg(c, sg);
goto retc;
-asynr:
+asyncrecv:
+ // can receive from buffer
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
}
goto retc;
-asyns:
+asyncsend:
+ // can send to buffer
if(cas->u.elem != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
c->senddataq = c->senddataq->link;
}
goto retc;
-gotr:
- // recv path to wakeup the sender (sg)
- if(debug) {
- prints("gotr: sel=");
- ·printpointer(sel);
- prints(" c=");
- ·printpointer(c);
- prints(" o=");
- ·printint(o);
- prints("\n");
- }
+syncrecv:
+ // can receive from sleeping sender (sg)
+ if(debug)
+ printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g;
goto retc;
rclose:
+ // read at end of closed channel
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
c->closed |= Rclosed;
incerr(c);
goto retc;
-gots:
- // send path to wakeup the receiver (sg)
- if(debug) {
- prints("gots: sel=");
- ·printpointer(sel);
- prints(" c=");
- ·printpointer(c);
- prints(" o=");
- ·printint(o);
- prints("\n");
- }
+syncsend:
+ // can send to sleeping receiver (sg)
+ if(debug)
+ printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
if(c->closed & Wclosed)
goto sclose;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
goto retc;
sclose:
+ // send on closed channel
incerr(c);
goto retc;
retc:
selunlock(sel);
+ // return to pc corresponding to chosen case
·setcallerpc(&sel, cas->pc);
as = (byte*)&sel + cas->so;
freesel(sel);
return sgp;
}
+static void
+dequeueg(WaitQ *q, Hchan *c)
+{
+ SudoG **l, *sgp;
+
+ for(l=&q->first; (sgp=*l) != nil; l=&sgp->link) {
+ if(sgp->g == g) {
+ *l = sgp->link;
+ freesg(c, sgp);
+ break;
+ }
+ }
+}
+
static void
enqueue(WaitQ *q, SudoG *sgp)
{