]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: correct memory leak in select
authorRuss Cox <rsc@golang.org>
Thu, 1 Apr 2010 18:56:18 +0000 (11:56 -0700)
committerRuss Cox <rsc@golang.org>
Thu, 1 Apr 2010 18:56:18 +0000 (11:56 -0700)
  * adds pass 3 to dequeue from channels eagerly

various other cleanup/churn:
  * use switch on cas->send in each pass to
    factor out common code.
  * longer goto labels, commented at target
  * be more agressive about can't happen:
    throw instead of print + cope.
  * use "select" instead of "selectgo" in errors
  * use printf for debug prints when possible

R=ken2, ken3
CC=golang-dev, r
https://golang.org/cl/875041

src/pkg/runtime/chan.c
test/chan/select2.go [new file with mode: 0644]

index ea3b493b62395c88ff9e26713a328c9843471e7c..6e3d81a96f7986b24dcea15eb585cfe6d81b37ea 100644 (file)
@@ -79,6 +79,7 @@ struct        Select
        Scase*  scase[1];               // one per case
 };
 
+static void    dequeueg(WaitQ*, Hchan*);
 static SudoG*  dequeue(WaitQ*, Hchan*);
 static void    enqueue(WaitQ*, SudoG*);
 static SudoG*  allocsg(Hchan*);
@@ -126,19 +127,9 @@ makechan(Type *elem, uint32 hint)
                c->dataqsiz = hint;
        }
 
-       if(debug) {
-               prints("makechan: chan=");
-               ·printpointer(c);
-               prints("; elemsize=");
-               ·printint(elem->size);
-               prints("; elemalg=");
-               ·printint(elem->alg);
-               prints("; elemalign=");
-               ·printint(elem->align);
-               prints("; dataqsiz=");
-               ·printint(c->dataqsiz);
-               prints("\n");
-       }
+       if(debug)
+               printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n",
+                       c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz);
 
        return c;
 }
@@ -175,7 +166,12 @@ incerr(Hchan* c)
  * occur. if pres is not nil,
  * then the protocol will not
  * sleep but return if it could
- * not complete
+ * not complete.
+ *
+ * sleep can wake up with g->param == nil
+ * when a channel involved in the sleep has
+ * been closed.  it is easiest to loop and re-run
+ * the operation; we'll see that it's now closed.
  */
 void
 chansend(Hchan *c, byte *ep, bool *pres)
@@ -187,9 +183,7 @@ chansend(Hchan *c, byte *ep, bool *pres)
                gosched();
 
        if(debug) {
-               prints("chansend: chan=");
-               ·printpointer(c);
-               prints("; elem=");
+               printf("chansend: chan=%p; elem=", c);
                c->elemalg->print(c->elemsize, ep);
                prints("\n");
        }
@@ -292,11 +286,8 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
        if(gcwaiting)
                gosched();
 
-       if(debug) {
-               prints("chanrecv: chan=");
-               ·printpointer(c);
-               prints("\n");
-       }
+       if(debug)
+               printf("chanrecv: chan=%p\n", c);
 
        lock(c);
 loop:
@@ -471,13 +462,8 @@ void
        sel->tcase = size;
        sel->ncase = 0;
        *selp = sel;
-       if(debug) {
-               prints("newselect s=");
-               ·printpointer(sel);
-               prints(" size=");
-               ·printint(size);
-               prints("\n");
-       }
+       if(debug)
+               printf("newselect s=%p size=%d\n", sel, size);
 }
 
 // selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
@@ -511,19 +497,9 @@ void
        ae = (byte*)&sel + eo;
        c->elemalg->copy(c->elemsize, cas->u.elem, ae);
 
-       if(debug) {
-               prints("selectsend s=");
-               ·printpointer(sel);
-               prints(" pc=");
-               ·printpointer(cas->pc);
-               prints(" chan=");
-               ·printpointer(cas->chan);
-               prints(" so=");
-               ·printint(cas->so);
-               prints(" send=");
-               ·printint(cas->send);
-               prints("\n");
-       }
+       if(debug)
+               printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n",
+                       sel, cas->pc, cas->chan, cas->so, cas->send);
 }
 
 // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
@@ -553,19 +529,9 @@ void
        cas->send = 0;
        cas->u.elemp = *(byte**)((byte*)&sel + eo);
 
-       if(debug) {
-               prints("selectrecv s=");
-               ·printpointer(sel);
-               prints(" pc=");
-               ·printpointer(cas->pc);
-               prints(" chan=");
-               ·printpointer(cas->chan);
-               prints(" so=");
-               ·printint(cas->so);
-               prints(" send=");
-               ·printint(cas->send);
-               prints("\n");
-       }
+       if(debug)
+               printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n",
+                       sel, cas->pc, cas->chan, cas->so, cas->send);
 }
 
 
@@ -590,17 +556,9 @@ void
        cas->send = 2;
        cas->u.elemp = nil;
 
-       if(debug) {
-               prints("selectdefault s=");
-               ·printpointer(sel);
-               prints(" pc=");
-               ·printpointer(cas->pc);
-               prints(" so=");
-               ·printint(cas->so);
-               prints(" send=");
-               ·printint(cas->send);
-               prints("\n");
-       }
+       if(debug)
+               printf("selectdefault s=%p pc=%p so=%d send=%d\n",
+                       sel, cas->pc, cas->so, cas->send);
 }
 
 static void
@@ -657,15 +615,12 @@ void
        if(gcwaiting)
                gosched();
 
-       if(debug) {
-               prints("selectgo: sel=");
-               ·printpointer(sel);
-               prints("\n");
-       }
+       if(debug)
+               printf("select: sel=%p\n", sel);
 
        if(sel->ncase < 2) {
                if(sel->ncase < 1)
-                       throw("selectgo: no cases");
+                       throw("select: no cases");
                // make special case of one.
        }
 
@@ -674,9 +629,8 @@ void
                p = fastrand1();
                if(gcd(p, sel->ncase) == 1)
                        break;
-               if(i > 1000) {
-                       throw("selectgo: failed to select prime");
-               }
+               if(i > 1000)
+                       throw("select: failed to select prime");
        }
 
        // select an initial offset
@@ -700,43 +654,40 @@ loop:
        dfl = nil;
        for(i=0; i<sel->ncase; i++) {
                cas = sel->scase[o];
-
-               if(cas->send == 2) {    // default
-                       dfl = cas;
-                       goto next1;
-               }
-
                c = cas->chan;
-               if(c->dataqsiz > 0) {
-                       if(cas->send) {
-                               if(c->closed & Wclosed)
-                                       goto sclose;
-                               if(c->qcount < c->dataqsiz)
-                                       goto asyns;
-                               goto next1;
+
+               switch(cas->send) {
+               case 0: // recv
+                       if(c->dataqsiz > 0) {
+                               if(c->qcount > 0)
+                                       goto asyncrecv;
+                       } else {
+                               sg = dequeue(&c->sendq, c);
+                               if(sg != nil)
+                                       goto syncrecv;
                        }
-                       if(c->qcount > 0)
-                               goto asynr;
                        if(c->closed & Wclosed)
                                goto rclose;
-                       goto next1;
-               }
+                       break;
 
-               if(cas->send) {
+               case 1: // send
                        if(c->closed & Wclosed)
                                goto sclose;
-                       sg = dequeue(&c->recvq, c);
-                       if(sg != nil)
-                               goto gots;
-                       goto next1;
+                       if(c->dataqsiz > 0) {
+                               if(c->qcount < c->dataqsiz)
+                                       goto asyncsend;
+                       } else {
+                               sg = dequeue(&c->recvq, c);
+                               if(sg != nil)
+                                       goto syncsend;
+                       }
+                       break;
+
+               case 2: // default
+                       dfl = cas;
+                       break;
                }
-               sg = dequeue(&c->sendq, c);
-               if(sg != nil)
-                       goto gotr;
-               if(c->closed & Wclosed)
-                       goto rclose;
 
-       next1:
                o += p;
                if(o >= sel->ncase)
                        o -= sel->ncase;
@@ -752,52 +703,34 @@ loop:
        for(i=0; i<sel->ncase; i++) {
                cas = sel->scase[o];
                c = cas->chan;
+               sg = allocsg(c);
+               sg->offset = o;
 
-               if(c->dataqsiz > 0) {
-                       if(cas->send) {
-                               if(c->qcount < c->dataqsiz) {
-                                       prints("selectgo: pass 2 async send\n");
-                                       goto asyns;
-                               }
-                               sg = allocsg(c);
-                               sg->offset = o;
-                               enqueue(&c->sendq, sg);
-                               goto next2;
-                       }
-                       if(c->qcount > 0) {
-                               prints("selectgo: pass 2 async recv\n");
-                               goto asynr;
+               switch(cas->send) {
+               case 0: // recv
+                       if(c->dataqsiz > 0) {
+                               if(c->qcount > 0)
+                                       throw("select: pass 2 async recv");
+                       } else {
+                               if(dequeue(&c->sendq, c))
+                                       throw("select: pass 2 sync recv");
                        }
-                       sg = allocsg(c);
-                       sg->offset = o;
                        enqueue(&c->recvq, sg);
-                       goto next2;
-               }
-
-               if(cas->send) {
-                       sg = dequeue(&c->recvq, c);
-                       if(sg != nil) {
-                               prints("selectgo: pass 2 sync send\n");
-                               g->selgen++;
-                               goto gots;
+                       break;
+               
+               case 1: // send
+                       if(c->dataqsiz > 0) {
+                               if(c->qcount < c->dataqsiz)
+                                       throw("select: pass 2 async send");
+                       } else {
+                               if(dequeue(&c->recvq, c))
+                                       throw("select: pass 2 sync send");
+                               c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
                        }
-                       sg = allocsg(c);
-                       sg->offset = o;
-                       c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
                        enqueue(&c->sendq, sg);
-                       goto next2;
-               }
-               sg = dequeue(&c->sendq, c);
-               if(sg != nil) {
-                       prints("selectgo: pass 2 sync recv\n");
-                       g->selgen++;
-                       goto gotr;
+                       break;
                }
-               sg = allocsg(c);
-               sg->offset = o;
-               enqueue(&c->recvq, sg);
 
-       next2:
                o += p;
                if(o >= sel->ncase)
                        o -= sel->ncase;
@@ -810,6 +743,24 @@ loop:
 
        sellock(sel);
        sg = g->param;
+
+       // pass 3 - dequeue from unsuccessful chans
+       // otherwise they stack up on quiet channels
+       for(i=0; i<sel->ncase; i++) {
+               if(sg == nil || o != sg->offset) {
+                       cas = sel->scase[o];
+                       c = cas->chan;
+                       if(cas->send)
+                               dequeueg(&c->sendq, c);
+                       else
+                               dequeueg(&c->recvq, c);
+               }
+               
+               o += p;
+               if(o >= sel->ncase)
+                       o -= sel->ncase;
+       }
+
        if(sg == nil)
                goto loop;
 
@@ -822,19 +773,9 @@ loop:
                goto loop;
        }
 
-       if(debug) {
-               prints("wait-return: sel=");
-               ·printpointer(sel);
-               prints(" c=");
-               ·printpointer(c);
-               prints(" cas=");
-               ·printpointer(cas);
-               prints(" send=");
-               ·printint(cas->send);
-               prints(" o=");
-               ·printint(o);
-               prints("\n");
-       }
+       if(debug)
+               printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n",
+                       sel, c, cas, cas->send, o);
 
        if(!cas->send) {
                if(cas->u.elemp != nil)
@@ -844,7 +785,8 @@ loop:
        freesg(c, sg);
        goto retc;
 
-asynr:
+asyncrecv:
+       // can receive from buffer
        if(cas->u.elemp != nil)
                c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
        c->recvdataq = c->recvdataq->link;
@@ -857,7 +799,8 @@ asynr:
        }
        goto retc;
 
-asyns:
+asyncsend:
+       // can send to buffer
        if(cas->u.elem != nil)
                c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
        c->senddataq = c->senddataq->link;
@@ -870,17 +813,10 @@ asyns:
        }
        goto retc;
 
-gotr:
-       // recv path to wakeup the sender (sg)
-       if(debug) {
-               prints("gotr: sel=");
-               ·printpointer(sel);
-               prints(" c=");
-               ·printpointer(c);
-               prints(" o=");
-               ·printint(o);
-               prints("\n");
-       }
+syncrecv:
+       // can receive from sleeping sender (sg)
+       if(debug)
+               printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
        if(cas->u.elemp != nil)
                c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
        gp = sg->g;
@@ -889,23 +825,17 @@ gotr:
        goto retc;
 
 rclose:
+       // read at end of closed channel
        if(cas->u.elemp != nil)
                c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
        c->closed |= Rclosed;
        incerr(c);
        goto retc;
 
-gots:
-       // send path to wakeup the receiver (sg)
-       if(debug) {
-               prints("gots: sel=");
-               ·printpointer(sel);
-               prints(" c=");
-               ·printpointer(c);
-               prints(" o=");
-               ·printint(o);
-               prints("\n");
-       }
+syncsend:
+       // can send to sleeping receiver (sg)
+       if(debug)
+               printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
        if(c->closed & Wclosed)
                goto sclose;
        c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
@@ -915,12 +845,14 @@ gots:
        goto retc;
 
 sclose:
+       // send on closed channel
        incerr(c);
        goto retc;
 
 retc:
        selunlock(sel);
 
+       // return to pc corresponding to chosen case
        ·setcallerpc(&sel, cas->pc);
        as = (byte*)&sel + cas->so;
        freesel(sel);
@@ -1020,6 +952,20 @@ loop:
        return sgp;
 }
 
+static void
+dequeueg(WaitQ *q, Hchan *c)
+{
+       SudoG **l, *sgp;
+       
+       for(l=&q->first; (sgp=*l) != nil; l=&sgp->link) {
+               if(sgp->g == g) {
+                       *l = sgp->link;
+                       freesg(c, sgp);
+                       break;
+               }
+       }
+}
+
 static void
 enqueue(WaitQ *q, SudoG *sgp)
 {
diff --git a/test/chan/select2.go b/test/chan/select2.go
new file mode 100644 (file)
index 0000000..e24c51e
--- /dev/null
@@ -0,0 +1,48 @@
+// $G $D/$F.go && $L $F.$A && ./$A.out
+
+// Copyright 2010 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import "runtime"
+
+func sender(c chan int, n int) {
+       for i := 0; i < n; i++ {
+               c <- 1
+       }
+}
+
+func receiver(c, dummy chan int, n int) {
+       for i := 0; i < n; i++ {
+               select {
+               case <-c:
+                       // nothing
+               case <-dummy:
+                       panic("dummy")
+               }
+       }
+}
+
+func main() {
+       runtime.MemProfileRate = 0
+
+       c := make(chan int)
+       dummy := make(chan int)
+
+       // warm up
+       go sender(c, 100000)
+       receiver(c, dummy, 100000)
+       runtime.GC()
+       runtime.MemStats.Alloc = 0
+
+       // second time shouldn't increase footprint by much
+       go sender(c, 100000)
+       receiver(c, dummy, 100000)
+       runtime.GC()
+
+       if runtime.MemStats.Alloc > 1e5 {
+               println("BUG: too much memory for 100,000 selects:", runtime.MemStats.Alloc)
+       }
+}