enum
{
- Wclosed = 0x0001,
- Rclosed = 0xfffe,
- Rincr = 0x0002,
- Rmax = 0x8000,
+ Wclosed = 0x0001, // writer has closed
+ Rclosed = 0x0002, // reader has seen close
+ Eincr = 0x0004, // increment errors
+ Emax = 0x0800, // error limit before throw
};
typedef struct Hchan Hchan;
struct Hchan
{
uint16 elemsize;
- uint16 closed; // Wclosed closed() hash been called
- // Rclosed read-count after closed()
+ uint16 closed; // Wclosed Rclosed errorcount
uint32 dataqsiz; // size of the circular q
uint32 qcount; // total data in the q
Alg* elemalg; // interface for element type
}
}
+static void
+incerr(Hchan* c)
+{
+ c->closed += Eincr;
+ if(c->closed & Emax) {
+ unlock(&chanlock);
+ throw("too many operations on a closed channel");
+ }
+}
+
/*
* generic single channel send/recv
* if the bool pointer is nil,
}
lock(&chanlock);
+
if(c->dataqsiz > 0)
goto asynch;
+ if(c->closed & Wclosed)
+ goto closed;
+
sg = dequeue(&c->recvq, c);
if(sg != nil) {
if(ep != nil)
return;
asynch:
- while(c->qcount >= c->dataqsiz) {
+ if(c->closed & Wclosed)
+ goto closed;
+
+ if(c->qcount >= c->dataqsiz) {
if(pres != nil) {
unlock(&chanlock);
*pres = false;
sys·Gosched();
lock(&chanlock);
+ goto asynch;
}
if(ep != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, ep);
unlock(&chanlock);
if(pres != nil)
*pres = true;
+ return;
+
+closed:
+ incerr(c);
+ if(pres != nil)
+ *pres = false;
+ unlock(&chanlock);
}
static void
if(c->dataqsiz > 0)
goto asynch;
+ if(c->closed & Wclosed)
+ goto closed;
+
sg = dequeue(&c->sendq, c);
if(sg != nil) {
c->elemalg->copy(c->elemsize, ep, sg->elem);
lock(&chanlock);
sg = g->param;
+
+ if(c->closed & Wclosed) {
+ freesg(c, sg);
+ goto closed;
+ }
+
c->elemalg->copy(c->elemsize, ep, sg->elem);
freesg(c, sg);
unlock(&chanlock);
return;
asynch:
- while(c->qcount <= 0) {
+ if(c->qcount <= 0) {
+ if(c->closed & Wclosed)
+ goto closed;
+
if(pres != nil) {
unlock(&chanlock);
*pres = false;
enqueue(&c->recvq, sg);
unlock(&chanlock);
sys·Gosched();
+
lock(&chanlock);
+ goto asynch;
}
c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
freesg(c, sg);
unlock(&chanlock);
ready(gp);
- } else
- unlock(&chanlock);
+ if(pres != nil)
+ *pres = true;
+ return;
+ }
+
+ unlock(&chanlock);
if(pres != nil)
*pres = true;
+ return;
+
+closed:
+ c->elemalg->copy(c->elemsize, ep, nil);
+ c->closed |= Rclosed;
+ incerr(c);
+ if(pres != nil)
+ *pres = false;
+ unlock(&chanlock);
}
// chansend1(hchan *chan any, elem any);
if(cas->send) {
if(c->qcount < c->dataqsiz)
goto asyns;
+ if(c->closed & Wclosed)
+ goto gots;
goto next1;
}
if(c->qcount > 0)
goto asynr;
+ if(c->closed & Wclosed)
+ goto gotr;
goto next1;
}
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto gots;
+ if(c->closed & Wclosed)
+ goto gots;
goto next1;
}
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gotr;
+ if(c->closed & Wclosed)
+ goto gotr;
next1:
o += p;
sys·printint(o);
prints("\n");
}
+ if(c->closed & Wclosed) {
+ if(cas->u.elemp != nil)
+ c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
+ c->closed |= Rclosed;
+ incerr(c);
+ goto retc;
+ }
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g;
sys·printint(o);
prints("\n");
}
+ if(c->closed & Wclosed) {
+ incerr(c);
+ goto retc;
+ }
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
gp->param = sg;
void
sys·closechan(Hchan *c)
{
- if(c == nil)
- throw("closechan: channel not allocated");
-
- // if wclosed already set
- // work has been done - just return
- if(c->closed & Wclosed)
- return;
+ SudoG *sg;
+ G* gp;
- // set wclosed
+ lock(&chanlock);
+ incerr(c);
c->closed |= Wclosed;
+
+ // release all readers
+ for(;;) {
+ sg = dequeue(&c->recvq, c);
+ if(sg == nil)
+ break;
+ gp = sg->g;
+ gp->param = nil;
+ freesg(c, sg);
+ ready(gp);
+ }
+
+ // release all writers
+ for(;;) {
+ sg = dequeue(&c->sendq, c);
+ if(sg == nil)
+ break;
+ gp = sg->g;
+ gp->param = nil;
+ freesg(c, sg);
+ ready(gp);
+ }
+
+ unlock(&chanlock);
}
// closedchan(sel *byte) bool;
void
sys·closedchan(Hchan *c, bool closed)
{
- if(c == nil)
- throw("closedchan: channel not allocated");
+ // test Rclosed
closed = 0;
-
- // test rclosed
- if(c->closed & Rclosed) {
- // see if rclosed has been set a lot
- if(c->closed & Rmax)
- throw("closedchan: ignored");
- c->closed += Rincr;
+ if(c->closed & Rclosed)
closed = 1;
- }
FLUSH(&closed);
}
static void
freesg(Hchan *c, SudoG *sg)
{
- if(sg->isfree)
- throw("chan.freesg: already free");
- sg->isfree = 1;
- sg->link = c->free;
- c->free = sg;
+ if(sg != nil) {
+ if(sg->isfree)
+ throw("chan.freesg: already free");
+ sg->isfree = 1;
+ sg->link = c->free;
+ c->free = sg;
+ }
}
static uint32