From ec53627ed52e099685d4b60c19630ddcf22904d6 Mon Sep 17 00:00:00 2001 From: Russ Cox Date: Thu, 1 Apr 2010 11:56:18 -0700 Subject: [PATCH] runtime: correct memory leak in select * adds pass 3 to dequeue from channels eagerly various other cleanup/churn: * use switch on cas->send in each pass to factor out common code. * longer goto labels, commented at target * be more agressive about can't happen: throw instead of print + cope. * use "select" instead of "selectgo" in errors * use printf for debug prints when possible R=ken2, ken3 CC=golang-dev, r https://golang.org/cl/875041 --- src/pkg/runtime/chan.c | 304 +++++++++++++++++------------------------ test/chan/select2.go | 48 +++++++ 2 files changed, 173 insertions(+), 179 deletions(-) create mode 100644 test/chan/select2.go diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index ea3b493b62..6e3d81a96f 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -79,6 +79,7 @@ struct Select Scase* scase[1]; // one per case }; +static void dequeueg(WaitQ*, Hchan*); static SudoG* dequeue(WaitQ*, Hchan*); static void enqueue(WaitQ*, SudoG*); static SudoG* allocsg(Hchan*); @@ -126,19 +127,9 @@ makechan(Type *elem, uint32 hint) c->dataqsiz = hint; } - if(debug) { - prints("makechan: chan="); - ·printpointer(c); - prints("; elemsize="); - ·printint(elem->size); - prints("; elemalg="); - ·printint(elem->alg); - prints("; elemalign="); - ·printint(elem->align); - prints("; dataqsiz="); - ·printint(c->dataqsiz); - prints("\n"); - } + if(debug) + printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n", + c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz); return c; } @@ -175,7 +166,12 @@ incerr(Hchan* c) * occur. if pres is not nil, * then the protocol will not * sleep but return if it could - * not complete + * not complete. + * + * sleep can wake up with g->param == nil + * when a channel involved in the sleep has + * been closed. it is easiest to loop and re-run + * the operation; we'll see that it's now closed. */ void chansend(Hchan *c, byte *ep, bool *pres) @@ -187,9 +183,7 @@ chansend(Hchan *c, byte *ep, bool *pres) gosched(); if(debug) { - prints("chansend: chan="); - ·printpointer(c); - prints("; elem="); + printf("chansend: chan=%p; elem=", c); c->elemalg->print(c->elemsize, ep); prints("\n"); } @@ -292,11 +286,8 @@ chanrecv(Hchan* c, byte *ep, bool* pres) if(gcwaiting) gosched(); - if(debug) { - prints("chanrecv: chan="); - ·printpointer(c); - prints("\n"); - } + if(debug) + printf("chanrecv: chan=%p\n", c); lock(c); loop: @@ -471,13 +462,8 @@ void sel->tcase = size; sel->ncase = 0; *selp = sel; - if(debug) { - prints("newselect s="); - ·printpointer(sel); - prints(" size="); - ·printint(size); - prints("\n"); - } + if(debug) + printf("newselect s=%p size=%d\n", sel, size); } // selectsend(sel *byte, hchan *chan any, elem any) (selected bool); @@ -511,19 +497,9 @@ void ae = (byte*)&sel + eo; c->elemalg->copy(c->elemsize, cas->u.elem, ae); - if(debug) { - prints("selectsend s="); - ·printpointer(sel); - prints(" pc="); - ·printpointer(cas->pc); - prints(" chan="); - ·printpointer(cas->chan); - prints(" so="); - ·printint(cas->so); - prints(" send="); - ·printint(cas->send); - prints("\n"); - } + if(debug) + printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n", + sel, cas->pc, cas->chan, cas->so, cas->send); } // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool); @@ -553,19 +529,9 @@ void cas->send = 0; cas->u.elemp = *(byte**)((byte*)&sel + eo); - if(debug) { - prints("selectrecv s="); - ·printpointer(sel); - prints(" pc="); - ·printpointer(cas->pc); - prints(" chan="); - ·printpointer(cas->chan); - prints(" so="); - ·printint(cas->so); - prints(" send="); - ·printint(cas->send); - prints("\n"); - } + if(debug) + printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n", + sel, cas->pc, cas->chan, cas->so, cas->send); } @@ -590,17 +556,9 @@ void cas->send = 2; cas->u.elemp = nil; - if(debug) { - prints("selectdefault s="); - ·printpointer(sel); - prints(" pc="); - ·printpointer(cas->pc); - prints(" so="); - ·printint(cas->so); - prints(" send="); - ·printint(cas->send); - prints("\n"); - } + if(debug) + printf("selectdefault s=%p pc=%p so=%d send=%d\n", + sel, cas->pc, cas->so, cas->send); } static void @@ -657,15 +615,12 @@ void if(gcwaiting) gosched(); - if(debug) { - prints("selectgo: sel="); - ·printpointer(sel); - prints("\n"); - } + if(debug) + printf("select: sel=%p\n", sel); if(sel->ncase < 2) { if(sel->ncase < 1) - throw("selectgo: no cases"); + throw("select: no cases"); // make special case of one. } @@ -674,9 +629,8 @@ void p = fastrand1(); if(gcd(p, sel->ncase) == 1) break; - if(i > 1000) { - throw("selectgo: failed to select prime"); - } + if(i > 1000) + throw("select: failed to select prime"); } // select an initial offset @@ -700,43 +654,40 @@ loop: dfl = nil; for(i=0; incase; i++) { cas = sel->scase[o]; - - if(cas->send == 2) { // default - dfl = cas; - goto next1; - } - c = cas->chan; - if(c->dataqsiz > 0) { - if(cas->send) { - if(c->closed & Wclosed) - goto sclose; - if(c->qcount < c->dataqsiz) - goto asyns; - goto next1; + + switch(cas->send) { + case 0: // recv + if(c->dataqsiz > 0) { + if(c->qcount > 0) + goto asyncrecv; + } else { + sg = dequeue(&c->sendq, c); + if(sg != nil) + goto syncrecv; } - if(c->qcount > 0) - goto asynr; if(c->closed & Wclosed) goto rclose; - goto next1; - } + break; - if(cas->send) { + case 1: // send if(c->closed & Wclosed) goto sclose; - sg = dequeue(&c->recvq, c); - if(sg != nil) - goto gots; - goto next1; + if(c->dataqsiz > 0) { + if(c->qcount < c->dataqsiz) + goto asyncsend; + } else { + sg = dequeue(&c->recvq, c); + if(sg != nil) + goto syncsend; + } + break; + + case 2: // default + dfl = cas; + break; } - sg = dequeue(&c->sendq, c); - if(sg != nil) - goto gotr; - if(c->closed & Wclosed) - goto rclose; - next1: o += p; if(o >= sel->ncase) o -= sel->ncase; @@ -752,52 +703,34 @@ loop: for(i=0; incase; i++) { cas = sel->scase[o]; c = cas->chan; + sg = allocsg(c); + sg->offset = o; - if(c->dataqsiz > 0) { - if(cas->send) { - if(c->qcount < c->dataqsiz) { - prints("selectgo: pass 2 async send\n"); - goto asyns; - } - sg = allocsg(c); - sg->offset = o; - enqueue(&c->sendq, sg); - goto next2; - } - if(c->qcount > 0) { - prints("selectgo: pass 2 async recv\n"); - goto asynr; + switch(cas->send) { + case 0: // recv + if(c->dataqsiz > 0) { + if(c->qcount > 0) + throw("select: pass 2 async recv"); + } else { + if(dequeue(&c->sendq, c)) + throw("select: pass 2 sync recv"); } - sg = allocsg(c); - sg->offset = o; enqueue(&c->recvq, sg); - goto next2; - } - - if(cas->send) { - sg = dequeue(&c->recvq, c); - if(sg != nil) { - prints("selectgo: pass 2 sync send\n"); - g->selgen++; - goto gots; + break; + + case 1: // send + if(c->dataqsiz > 0) { + if(c->qcount < c->dataqsiz) + throw("select: pass 2 async send"); + } else { + if(dequeue(&c->recvq, c)) + throw("select: pass 2 sync send"); + c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); } - sg = allocsg(c); - sg->offset = o; - c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); enqueue(&c->sendq, sg); - goto next2; - } - sg = dequeue(&c->sendq, c); - if(sg != nil) { - prints("selectgo: pass 2 sync recv\n"); - g->selgen++; - goto gotr; + break; } - sg = allocsg(c); - sg->offset = o; - enqueue(&c->recvq, sg); - next2: o += p; if(o >= sel->ncase) o -= sel->ncase; @@ -810,6 +743,24 @@ loop: sellock(sel); sg = g->param; + + // pass 3 - dequeue from unsuccessful chans + // otherwise they stack up on quiet channels + for(i=0; incase; i++) { + if(sg == nil || o != sg->offset) { + cas = sel->scase[o]; + c = cas->chan; + if(cas->send) + dequeueg(&c->sendq, c); + else + dequeueg(&c->recvq, c); + } + + o += p; + if(o >= sel->ncase) + o -= sel->ncase; + } + if(sg == nil) goto loop; @@ -822,19 +773,9 @@ loop: goto loop; } - if(debug) { - prints("wait-return: sel="); - ·printpointer(sel); - prints(" c="); - ·printpointer(c); - prints(" cas="); - ·printpointer(cas); - prints(" send="); - ·printint(cas->send); - prints(" o="); - ·printint(o); - prints("\n"); - } + if(debug) + printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n", + sel, c, cas, cas->send, o); if(!cas->send) { if(cas->u.elemp != nil) @@ -844,7 +785,8 @@ loop: freesg(c, sg); goto retc; -asynr: +asyncrecv: + // can receive from buffer if(cas->u.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem); c->recvdataq = c->recvdataq->link; @@ -857,7 +799,8 @@ asynr: } goto retc; -asyns: +asyncsend: + // can send to buffer if(cas->u.elem != nil) c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem); c->senddataq = c->senddataq->link; @@ -870,17 +813,10 @@ asyns: } goto retc; -gotr: - // recv path to wakeup the sender (sg) - if(debug) { - prints("gotr: sel="); - ·printpointer(sel); - prints(" c="); - ·printpointer(c); - prints(" o="); - ·printint(o); - prints("\n"); - } +syncrecv: + // can receive from sleeping sender (sg) + if(debug) + printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); if(cas->u.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); gp = sg->g; @@ -889,23 +825,17 @@ gotr: goto retc; rclose: + // read at end of closed channel if(cas->u.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.elemp, nil); c->closed |= Rclosed; incerr(c); goto retc; -gots: - // send path to wakeup the receiver (sg) - if(debug) { - prints("gots: sel="); - ·printpointer(sel); - prints(" c="); - ·printpointer(c); - prints(" o="); - ·printint(o); - prints("\n"); - } +syncsend: + // can send to sleeping receiver (sg) + if(debug) + printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); if(c->closed & Wclosed) goto sclose; c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); @@ -915,12 +845,14 @@ gots: goto retc; sclose: + // send on closed channel incerr(c); goto retc; retc: selunlock(sel); + // return to pc corresponding to chosen case ·setcallerpc(&sel, cas->pc); as = (byte*)&sel + cas->so; freesel(sel); @@ -1020,6 +952,20 @@ loop: return sgp; } +static void +dequeueg(WaitQ *q, Hchan *c) +{ + SudoG **l, *sgp; + + for(l=&q->first; (sgp=*l) != nil; l=&sgp->link) { + if(sgp->g == g) { + *l = sgp->link; + freesg(c, sgp); + break; + } + } +} + static void enqueue(WaitQ *q, SudoG *sgp) { diff --git a/test/chan/select2.go b/test/chan/select2.go new file mode 100644 index 0000000000..e24c51ed16 --- /dev/null +++ b/test/chan/select2.go @@ -0,0 +1,48 @@ +// $G $D/$F.go && $L $F.$A && ./$A.out + +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import "runtime" + +func sender(c chan int, n int) { + for i := 0; i < n; i++ { + c <- 1 + } +} + +func receiver(c, dummy chan int, n int) { + for i := 0; i < n; i++ { + select { + case <-c: + // nothing + case <-dummy: + panic("dummy") + } + } +} + +func main() { + runtime.MemProfileRate = 0 + + c := make(chan int) + dummy := make(chan int) + + // warm up + go sender(c, 100000) + receiver(c, dummy, 100000) + runtime.GC() + runtime.MemStats.Alloc = 0 + + // second time shouldn't increase footprint by much + go sender(c, 100000) + receiver(c, dummy, 100000) + runtime.GC() + + if runtime.MemStats.Alloc > 1e5 { + println("BUG: too much memory for 100,000 selects:", runtime.MemStats.Alloc) + } +} -- 2.50.0