From 4523ee9ac8873552d6c64472c19d68760955d4a8 Mon Sep 17 00:00:00 2001 From: Ken Thompson Date: Fri, 13 Mar 2009 16:47:54 -0700 Subject: [PATCH] close/closed on chans R=r OCL=26281 CL=26285 --- src/runtime/chan.c | 150 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 117 insertions(+), 33 deletions(-) diff --git a/src/runtime/chan.c b/src/runtime/chan.c index a15e50dc02..c5e53410e8 100644 --- a/src/runtime/chan.c +++ b/src/runtime/chan.c @@ -9,10 +9,10 @@ static Lock chanlock; enum { - Wclosed = 0x0001, - Rclosed = 0xfffe, - Rincr = 0x0002, - Rmax = 0x8000, + Wclosed = 0x0001, // writer has closed + Rclosed = 0x0002, // reader has seen close + Eincr = 0x0004, // increment errors + Emax = 0x0800, // error limit before throw }; typedef struct Hchan Hchan; @@ -41,8 +41,7 @@ struct WaitQ struct Hchan { uint16 elemsize; - uint16 closed; // Wclosed closed() hash been called - // Rclosed read-count after closed() + uint16 closed; // Wclosed Rclosed errorcount uint32 dataqsiz; // size of the circular q uint32 qcount; // total data in the q Alg* elemalg; // interface for element type @@ -143,6 +142,16 @@ sys·newchan(uint32 elemsize, uint32 elemalg, uint32 hint, } } +static void +incerr(Hchan* c) +{ + c->closed += Eincr; + if(c->closed & Emax) { + unlock(&chanlock); + throw("too many operations on a closed channel"); + } +} + /* * generic single channel send/recv * if the bool pointer is nil, @@ -167,9 +176,13 @@ sendchan(Hchan *c, byte *ep, bool *pres) } lock(&chanlock); + if(c->dataqsiz > 0) goto asynch; + if(c->closed & Wclosed) + goto closed; + sg = dequeue(&c->recvq, c); if(sg != nil) { if(ep != nil) @@ -209,7 +222,10 @@ sendchan(Hchan *c, byte *ep, bool *pres) return; asynch: - while(c->qcount >= c->dataqsiz) { + if(c->closed & Wclosed) + goto closed; + + if(c->qcount >= c->dataqsiz) { if(pres != nil) { unlock(&chanlock); *pres = false; @@ -222,6 +238,7 @@ asynch: sys·Gosched(); lock(&chanlock); + goto asynch; } if(ep != nil) c->elemalg->copy(c->elemsize, c->senddataq->elem, ep); @@ -238,6 +255,13 @@ asynch: unlock(&chanlock); if(pres != nil) *pres = true; + return; + +closed: + incerr(c); + if(pres != nil) + *pres = false; + unlock(&chanlock); } static void @@ -256,6 +280,9 @@ chanrecv(Hchan* c, byte *ep, bool* pres) if(c->dataqsiz > 0) goto asynch; + if(c->closed & Wclosed) + goto closed; + sg = dequeue(&c->sendq, c); if(sg != nil) { c->elemalg->copy(c->elemsize, ep, sg->elem); @@ -285,6 +312,12 @@ chanrecv(Hchan* c, byte *ep, bool* pres) lock(&chanlock); sg = g->param; + + if(c->closed & Wclosed) { + freesg(c, sg); + goto closed; + } + c->elemalg->copy(c->elemsize, ep, sg->elem); freesg(c, sg); unlock(&chanlock); @@ -293,7 +326,10 @@ chanrecv(Hchan* c, byte *ep, bool* pres) return; asynch: - while(c->qcount <= 0) { + if(c->qcount <= 0) { + if(c->closed & Wclosed) + goto closed; + if(pres != nil) { unlock(&chanlock); *pres = false; @@ -304,7 +340,9 @@ asynch: enqueue(&c->recvq, sg); unlock(&chanlock); sys·Gosched(); + lock(&chanlock); + goto asynch; } c->elemalg->copy(c->elemsize, ep, c->recvdataq->elem); c->recvdataq = c->recvdataq->link; @@ -315,10 +353,23 @@ asynch: freesg(c, sg); unlock(&chanlock); ready(gp); - } else - unlock(&chanlock); + if(pres != nil) + *pres = true; + return; + } + + unlock(&chanlock); if(pres != nil) *pres = true; + return; + +closed: + c->elemalg->copy(c->elemsize, ep, nil); + c->closed |= Rclosed; + incerr(c); + if(pres != nil) + *pres = false; + unlock(&chanlock); } // chansend1(hchan *chan any, elem any); @@ -602,10 +653,14 @@ loop: if(cas->send) { if(c->qcount < c->dataqsiz) goto asyns; + if(c->closed & Wclosed) + goto gots; goto next1; } if(c->qcount > 0) goto asynr; + if(c->closed & Wclosed) + goto gotr; goto next1; } @@ -613,11 +668,15 @@ loop: sg = dequeue(&c->recvq, c); if(sg != nil) goto gots; + if(c->closed & Wclosed) + goto gots; goto next1; } sg = dequeue(&c->sendq, c); if(sg != nil) goto gotr; + if(c->closed & Wclosed) + goto gotr; next1: o += p; @@ -764,6 +823,13 @@ gotr: sys·printint(o); prints("\n"); } + if(c->closed & Wclosed) { + if(cas->u.elemp != nil) + c->elemalg->copy(c->elemsize, cas->u.elemp, nil); + c->closed |= Rclosed; + incerr(c); + goto retc; + } if(cas->u.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); gp = sg->g; @@ -782,6 +848,10 @@ gots: sys·printint(o); prints("\n"); } + if(c->closed & Wclosed) { + incerr(c); + goto retc; + } c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); gp = sg->g; gp->param = sg; @@ -803,35 +873,47 @@ retc: void sys·closechan(Hchan *c) { - if(c == nil) - throw("closechan: channel not allocated"); - - // if wclosed already set - // work has been done - just return - if(c->closed & Wclosed) - return; + SudoG *sg; + G* gp; - // set wclosed + lock(&chanlock); + incerr(c); c->closed |= Wclosed; + + // release all readers + for(;;) { + sg = dequeue(&c->recvq, c); + if(sg == nil) + break; + gp = sg->g; + gp->param = nil; + freesg(c, sg); + ready(gp); + } + + // release all writers + for(;;) { + sg = dequeue(&c->sendq, c); + if(sg == nil) + break; + gp = sg->g; + gp->param = nil; + freesg(c, sg); + ready(gp); + } + + unlock(&chanlock); } // closedchan(sel *byte) bool; void sys·closedchan(Hchan *c, bool closed) { - if(c == nil) - throw("closedchan: channel not allocated"); + // test Rclosed closed = 0; - - // test rclosed - if(c->closed & Rclosed) { - // see if rclosed has been set a lot - if(c->closed & Rmax) - throw("closedchan: ignored"); - c->closed += Rincr; + if(c->closed & Rclosed) closed = 1; - } FLUSH(&closed); } @@ -892,11 +974,13 @@ allocsg(Hchan *c) static void freesg(Hchan *c, SudoG *sg) { - if(sg->isfree) - throw("chan.freesg: already free"); - sg->isfree = 1; - sg->link = c->free; - c->free = sg; + if(sg != nil) { + if(sg->isfree) + throw("chan.freesg: already free"); + sg->isfree = 1; + sg->link = c->free; + c->free = sg; + } } static uint32 -- 2.48.1