]> Cypherpunks repositories - gostls13.git/commitdiff
close/closed on chans
authorKen Thompson <ken@golang.org>
Fri, 13 Mar 2009 23:47:54 +0000 (16:47 -0700)
committerKen Thompson <ken@golang.org>
Fri, 13 Mar 2009 23:47:54 +0000 (16:47 -0700)
R=r
OCL=26281
CL=26285

src/runtime/chan.c

index a15e50dc02976505a5e62d86ecbcfab3de8edcc2..c5e53410e85f0638527625aa33f68dc7eb112b29 100644 (file)
@@ -9,10 +9,10 @@ static        Lock            chanlock;
 
 enum
 {
-       Wclosed         = 0x0001,
-       Rclosed         = 0xfffe,
-       Rincr           = 0x0002,
-       Rmax            = 0x8000,
+       Wclosed         = 0x0001,       // writer has closed
+       Rclosed         = 0x0002,       // reader has seen close
+       Eincr           = 0x0004,       // increment errors
+       Emax            = 0x0800,       // error limit before throw
 };
 
 typedef        struct  Hchan   Hchan;
@@ -41,8 +41,7 @@ struct        WaitQ
 struct Hchan
 {
        uint16  elemsize;
-       uint16  closed;                 // Wclosed closed() hash been called
-                                       // Rclosed read-count after closed()
+       uint16  closed;                 // Wclosed Rclosed errorcount
        uint32  dataqsiz;               // size of the circular q
        uint32  qcount;                 // total data in the q
        Alg*    elemalg;                // interface for element type
@@ -143,6 +142,16 @@ sys·newchan(uint32 elemsize, uint32 elemalg, uint32 hint,
        }
 }
 
+static void
+incerr(Hchan* c)
+{
+       c->closed += Eincr;
+       if(c->closed & Emax) {
+               unlock(&chanlock);
+               throw("too many operations on a closed channel");
+       }
+}
+
 /*
  * generic single channel send/recv
  * if the bool pointer is nil,
@@ -167,9 +176,13 @@ sendchan(Hchan *c, byte *ep, bool *pres)
        }
 
        lock(&chanlock);
+
        if(c->dataqsiz > 0)
                goto asynch;
 
+       if(c->closed & Wclosed)
+               goto closed;
+
        sg = dequeue(&c->recvq, c);
        if(sg != nil) {
                if(ep != nil)
@@ -209,7 +222,10 @@ sendchan(Hchan *c, byte *ep, bool *pres)
        return;
 
 asynch:
-       while(c->qcount >= c->dataqsiz) {
+       if(c->closed & Wclosed)
+               goto closed;
+
+       if(c->qcount >= c->dataqsiz) {
                if(pres != nil) {
                        unlock(&chanlock);
                        *pres = false;
@@ -222,6 +238,7 @@ asynch:
                sys·Gosched();
 
                lock(&chanlock);
+               goto asynch;
        }
        if(ep != nil)
                c->elemalg->copy(c->elemsize, c->senddataq->elem, ep);
@@ -238,6 +255,13 @@ asynch:
                unlock(&chanlock);
        if(pres != nil)
                *pres = true;
+       return;
+
+closed:
+       incerr(c);
+       if(pres != nil)
+               *pres = false;
+       unlock(&chanlock);
 }
 
 static void
@@ -256,6 +280,9 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
        if(c->dataqsiz > 0)
                goto asynch;
 
+       if(c->closed & Wclosed)
+               goto closed;
+
        sg = dequeue(&c->sendq, c);
        if(sg != nil) {
                c->elemalg->copy(c->elemsize, ep, sg->elem);
@@ -285,6 +312,12 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
 
        lock(&chanlock);
        sg = g->param;
+
+       if(c->closed & Wclosed) {
+               freesg(c, sg);
+               goto closed;
+       }
+
        c->elemalg->copy(c->elemsize, ep, sg->elem);
        freesg(c, sg);
        unlock(&chanlock);
@@ -293,7 +326,10 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
        return;
 
 asynch:
-       while(c->qcount <= 0) {
+       if(c->qcount <= 0) {
+               if(c->closed & Wclosed)
+                       goto closed;
+
                if(pres != nil) {
                        unlock(&chanlock);
                        *pres = false;
@@ -304,7 +340,9 @@ asynch:
                enqueue(&c->recvq, sg);
                unlock(&chanlock);
                sys·Gosched();
+
                lock(&chanlock);
+               goto asynch;
        }
        c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem);
        c->recvdataq = c->recvdataq->link;
@@ -315,10 +353,23 @@ asynch:
                freesg(c, sg);
                unlock(&chanlock);
                ready(gp);
-       } else
-               unlock(&chanlock);
+               if(pres != nil)
+                       *pres = true;
+               return;
+       }
+
+       unlock(&chanlock);
        if(pres != nil)
                *pres = true;
+       return;
+
+closed:
+       c->elemalg->copy(c->elemsize, ep, nil);
+       c->closed |= Rclosed;
+       incerr(c);
+       if(pres != nil)
+               *pres = false;
+       unlock(&chanlock);
 }
 
 // chansend1(hchan *chan any, elem any);
@@ -602,10 +653,14 @@ loop:
                        if(cas->send) {
                                if(c->qcount < c->dataqsiz)
                                        goto asyns;
+                               if(c->closed & Wclosed)
+                                       goto gots;
                                goto next1;
                        }
                        if(c->qcount > 0)
                                goto asynr;
+                       if(c->closed & Wclosed)
+                               goto gotr;
                        goto next1;
                }
 
@@ -613,11 +668,15 @@ loop:
                        sg = dequeue(&c->recvq, c);
                        if(sg != nil)
                                goto gots;
+                       if(c->closed & Wclosed)
+                               goto gots;
                        goto next1;
                }
                sg = dequeue(&c->sendq, c);
                if(sg != nil)
                        goto gotr;
+               if(c->closed & Wclosed)
+                       goto gotr;
 
        next1:
                o += p;
@@ -764,6 +823,13 @@ gotr:
                sys·printint(o);
                prints("\n");
        }
+       if(c->closed & Wclosed) {
+               if(cas->u.elemp != nil)
+                       c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
+               c->closed |= Rclosed;
+               incerr(c);
+               goto retc;
+       }
        if(cas->u.elemp != nil)
                c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
        gp = sg->g;
@@ -782,6 +848,10 @@ gots:
                sys·printint(o);
                prints("\n");
        }
+       if(c->closed & Wclosed) {
+               incerr(c);
+               goto retc;
+       }
        c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
        gp = sg->g;
        gp->param = sg;
@@ -803,35 +873,47 @@ retc:
 void
 sys·closechan(Hchan *c)
 {
-       if(c == nil)
-               throw("closechan: channel not allocated");
-
-       // if wclosed already set
-       // work has been done - just return
-       if(c->closed & Wclosed)
-               return;
+       SudoG *sg;
+       G* gp;
 
-       // set wclosed
+       lock(&chanlock);
+       incerr(c);
        c->closed |= Wclosed;
+
+       // release all readers
+       for(;;) {
+               sg = dequeue(&c->recvq, c);
+               if(sg == nil)
+                       break;
+               gp = sg->g;
+               gp->param = nil;
+               freesg(c, sg);
+               ready(gp);
+       }
+
+       // release all writers
+       for(;;) {
+               sg = dequeue(&c->sendq, c);
+               if(sg == nil)
+                       break;
+               gp = sg->g;
+               gp->param = nil;
+               freesg(c, sg);
+               ready(gp);
+       }
+
+       unlock(&chanlock);
 }
 
 // closedchan(sel *byte) bool;
 void
 sys·closedchan(Hchan *c, bool closed)
 {
-       if(c == nil)
-               throw("closedchan: channel not allocated");
 
+       // test Rclosed
        closed = 0;
-
-       // test rclosed
-       if(c->closed & Rclosed) {
-               // see if rclosed has been set a lot
-               if(c->closed & Rmax)
-                       throw("closedchan: ignored");
-               c->closed += Rincr;
+       if(c->closed & Rclosed)
                closed = 1;
-       }
        FLUSH(&closed);
 }
 
@@ -892,11 +974,13 @@ allocsg(Hchan *c)
 static void
 freesg(Hchan *c, SudoG *sg)
 {
-       if(sg->isfree)
-               throw("chan.freesg: already free");
-       sg->isfree = 1;
-       sg->link = c->free;
-       c->free = sg;
+       if(sg != nil) {
+               if(sg->isfree)
+                       throw("chan.freesg: already free");
+               sg->isfree = 1;
+               sg->link = c->free;
+               c->free = sg;
+       }
 }
 
 static uint32