Commit ec53627e authored by Russ Cox's avatar Russ Cox

runtime: correct memory leak in select

  * adds pass 3 to dequeue from channels eagerly

various other cleanup/churn:
  * use switch on cas->send in each pass to
    factor out common code.
  * longer goto labels, commented at target
  * be more agressive about can't happen:
    throw instead of print + cope.
  * use "select" instead of "selectgo" in errors
  * use printf for debug prints when possible

R=ken2, ken3
CC=golang-dev, r
https://golang.org/cl/875041
parent 31693e9f
...@@ -79,6 +79,7 @@ struct Select ...@@ -79,6 +79,7 @@ struct Select
Scase* scase[1]; // one per case Scase* scase[1]; // one per case
}; };
static void dequeueg(WaitQ*, Hchan*);
static SudoG* dequeue(WaitQ*, Hchan*); static SudoG* dequeue(WaitQ*, Hchan*);
static void enqueue(WaitQ*, SudoG*); static void enqueue(WaitQ*, SudoG*);
static SudoG* allocsg(Hchan*); static SudoG* allocsg(Hchan*);
...@@ -126,19 +127,9 @@ makechan(Type *elem, uint32 hint) ...@@ -126,19 +127,9 @@ makechan(Type *elem, uint32 hint)
c->dataqsiz = hint; c->dataqsiz = hint;
} }
if(debug) { if(debug)
prints("makechan: chan="); printf("makechan: chan=%p; elemsize=%D; elemalg=%d; elemalign=%d; dataqsiz=%d\n",
·printpointer(c); c, (int64)elem->size, elem->alg, elem->align, c->dataqsiz);
prints("; elemsize=");
·printint(elem->size);
prints("; elemalg=");
·printint(elem->alg);
prints("; elemalign=");
·printint(elem->align);
prints("; dataqsiz=");
·printint(c->dataqsiz);
prints("\n");
}
return c; return c;
} }
...@@ -175,7 +166,12 @@ incerr(Hchan* c) ...@@ -175,7 +166,12 @@ incerr(Hchan* c)
* occur. if pres is not nil, * occur. if pres is not nil,
* then the protocol will not * then the protocol will not
* sleep but return if it could * sleep but return if it could
* not complete * not complete.
*
* sleep can wake up with g->param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/ */
void void
chansend(Hchan *c, byte *ep, bool *pres) chansend(Hchan *c, byte *ep, bool *pres)
...@@ -187,9 +183,7 @@ chansend(Hchan *c, byte *ep, bool *pres) ...@@ -187,9 +183,7 @@ chansend(Hchan *c, byte *ep, bool *pres)
gosched(); gosched();
if(debug) { if(debug) {
prints("chansend: chan="); printf("chansend: chan=%p; elem=", c);
·printpointer(c);
prints("; elem=");
c->elemalg->print(c->elemsize, ep); c->elemalg->print(c->elemsize, ep);
prints("\n"); prints("\n");
} }
...@@ -292,11 +286,8 @@ chanrecv(Hchan* c, byte *ep, bool* pres) ...@@ -292,11 +286,8 @@ chanrecv(Hchan* c, byte *ep, bool* pres)
if(gcwaiting) if(gcwaiting)
gosched(); gosched();
if(debug) { if(debug)
prints("chanrecv: chan="); printf("chanrecv: chan=%p\n", c);
·printpointer(c);
prints("\n");
}
lock(c); lock(c);
loop: loop:
...@@ -471,13 +462,8 @@ void ...@@ -471,13 +462,8 @@ void
sel->tcase = size; sel->tcase = size;
sel->ncase = 0; sel->ncase = 0;
*selp = sel; *selp = sel;
if(debug) { if(debug)
prints("newselect s="); printf("newselect s=%p size=%d\n", sel, size);
·printpointer(sel);
prints(" size=");
·printint(size);
prints("\n");
}
} }
// selectsend(sel *byte, hchan *chan any, elem any) (selected bool); // selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
...@@ -511,19 +497,9 @@ void ...@@ -511,19 +497,9 @@ void
ae = (byte*)&sel + eo; ae = (byte*)&sel + eo;
c->elemalg->copy(c->elemsize, cas->u.elem, ae); c->elemalg->copy(c->elemsize, cas->u.elem, ae);
if(debug) { if(debug)
prints("selectsend s="); printf("selectsend s=%p pc=%p chan=%p so=%d send=%d\n",
·printpointer(sel); sel, cas->pc, cas->chan, cas->so, cas->send);
prints(" pc=");
·printpointer(cas->pc);
prints(" chan=");
·printpointer(cas->chan);
prints(" so=");
·printint(cas->so);
prints(" send=");
·printint(cas->send);
prints("\n");
}
} }
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool); // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
...@@ -553,19 +529,9 @@ void ...@@ -553,19 +529,9 @@ void
cas->send = 0; cas->send = 0;
cas->u.elemp = *(byte**)((byte*)&sel + eo); cas->u.elemp = *(byte**)((byte*)&sel + eo);
if(debug) { if(debug)
prints("selectrecv s="); printf("selectrecv s=%p pc=%p chan=%p so=%d send=%d\n",
·printpointer(sel); sel, cas->pc, cas->chan, cas->so, cas->send);
prints(" pc=");
·printpointer(cas->pc);
prints(" chan=");
·printpointer(cas->chan);
prints(" so=");
·printint(cas->so);
prints(" send=");
·printint(cas->send);
prints("\n");
}
} }
...@@ -590,17 +556,9 @@ void ...@@ -590,17 +556,9 @@ void
cas->send = 2; cas->send = 2;
cas->u.elemp = nil; cas->u.elemp = nil;
if(debug) { if(debug)
prints("selectdefault s="); printf("selectdefault s=%p pc=%p so=%d send=%d\n",
·printpointer(sel); sel, cas->pc, cas->so, cas->send);
prints(" pc=");
·printpointer(cas->pc);
prints(" so=");
·printint(cas->so);
prints(" send=");
·printint(cas->send);
prints("\n");
}
} }
static void static void
...@@ -657,15 +615,12 @@ void ...@@ -657,15 +615,12 @@ void
if(gcwaiting) if(gcwaiting)
gosched(); gosched();
if(debug) { if(debug)
prints("selectgo: sel="); printf("select: sel=%p\n", sel);
·printpointer(sel);
prints("\n");
}
if(sel->ncase < 2) { if(sel->ncase < 2) {
if(sel->ncase < 1) if(sel->ncase < 1)
throw("selectgo: no cases"); throw("select: no cases");
// make special case of one. // make special case of one.
} }
...@@ -674,9 +629,8 @@ void ...@@ -674,9 +629,8 @@ void
p = fastrand1(); p = fastrand1();
if(gcd(p, sel->ncase) == 1) if(gcd(p, sel->ncase) == 1)
break; break;
if(i > 1000) { if(i > 1000)
throw("selectgo: failed to select prime"); throw("select: failed to select prime");
}
} }
// select an initial offset // select an initial offset
...@@ -700,43 +654,40 @@ loop: ...@@ -700,43 +654,40 @@ loop:
dfl = nil; dfl = nil;
for(i=0; i<sel->ncase; i++) { for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o]; cas = sel->scase[o];
if(cas->send == 2) { // default
dfl = cas;
goto next1;
}
c = cas->chan; c = cas->chan;
if(c->dataqsiz > 0) {
if(cas->send) { switch(cas->send) {
if(c->closed & Wclosed) case 0: // recv
goto sclose; if(c->dataqsiz > 0) {
if(c->qcount < c->dataqsiz) if(c->qcount > 0)
goto asyns; goto asyncrecv;
goto next1; } else {
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto syncrecv;
} }
if(c->qcount > 0)
goto asynr;
if(c->closed & Wclosed) if(c->closed & Wclosed)
goto rclose; goto rclose;
goto next1; break;
}
if(cas->send) { case 1: // send
if(c->closed & Wclosed) if(c->closed & Wclosed)
goto sclose; goto sclose;
sg = dequeue(&c->recvq, c); if(c->dataqsiz > 0) {
if(sg != nil) if(c->qcount < c->dataqsiz)
goto gots; goto asyncsend;
goto next1; } else {
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto syncsend;
}
break;
case 2: // default
dfl = cas;
break;
} }
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gotr;
if(c->closed & Wclosed)
goto rclose;
next1:
o += p; o += p;
if(o >= sel->ncase) if(o >= sel->ncase)
o -= sel->ncase; o -= sel->ncase;
...@@ -752,52 +703,34 @@ loop: ...@@ -752,52 +703,34 @@ loop:
for(i=0; i<sel->ncase; i++) { for(i=0; i<sel->ncase; i++) {
cas = sel->scase[o]; cas = sel->scase[o];
c = cas->chan; c = cas->chan;
sg = allocsg(c);
sg->offset = o;
if(c->dataqsiz > 0) { switch(cas->send) {
if(cas->send) { case 0: // recv
if(c->qcount < c->dataqsiz) { if(c->dataqsiz > 0) {
prints("selectgo: pass 2 async send\n"); if(c->qcount > 0)
goto asyns; throw("select: pass 2 async recv");
} } else {
sg = allocsg(c); if(dequeue(&c->sendq, c))
sg->offset = o; throw("select: pass 2 sync recv");
enqueue(&c->sendq, sg);
goto next2;
}
if(c->qcount > 0) {
prints("selectgo: pass 2 async recv\n");
goto asynr;
} }
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg); enqueue(&c->recvq, sg);
goto next2; break;
}
case 1: // send
if(cas->send) { if(c->dataqsiz > 0) {
sg = dequeue(&c->recvq, c); if(c->qcount < c->dataqsiz)
if(sg != nil) { throw("select: pass 2 async send");
prints("selectgo: pass 2 sync send\n"); } else {
g->selgen++; if(dequeue(&c->recvq, c))
goto gots; throw("select: pass 2 sync send");
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
} }
sg = allocsg(c);
sg->offset = o;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg); enqueue(&c->sendq, sg);
goto next2; break;
}
sg = dequeue(&c->sendq, c);
if(sg != nil) {
prints("selectgo: pass 2 sync recv\n");
g->selgen++;
goto gotr;
} }
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg);
next2:
o += p; o += p;
if(o >= sel->ncase) if(o >= sel->ncase)
o -= sel->ncase; o -= sel->ncase;
...@@ -810,6 +743,24 @@ loop: ...@@ -810,6 +743,24 @@ loop:
sellock(sel); sellock(sel);
sg = g->param; sg = g->param;
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
for(i=0; i<sel->ncase; i++) {
if(sg == nil || o != sg->offset) {
cas = sel->scase[o];
c = cas->chan;
if(cas->send)
dequeueg(&c->sendq, c);
else
dequeueg(&c->recvq, c);
}
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
}
if(sg == nil) if(sg == nil)
goto loop; goto loop;
...@@ -822,19 +773,9 @@ loop: ...@@ -822,19 +773,9 @@ loop:
goto loop; goto loop;
} }
if(debug) { if(debug)
prints("wait-return: sel="); printf("wait-return: sel=%p c=%p cas=%p send=%d o=%d\n",
·printpointer(sel); sel, c, cas, cas->send, o);
prints(" c=");
·printpointer(c);
prints(" cas=");
·printpointer(cas);
prints(" send=");
·printint(cas->send);
prints(" o=");
·printint(o);
prints("\n");
}
if(!cas->send) { if(!cas->send) {
if(cas->u.elemp != nil) if(cas->u.elemp != nil)
...@@ -844,7 +785,8 @@ loop: ...@@ -844,7 +785,8 @@ loop:
freesg(c, sg); freesg(c, sg);
goto retc; goto retc;
asynr: asyncrecv:
// can receive from buffer
if(cas->u.elemp != nil) if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem); c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link; c->recvdataq = c->recvdataq->link;
...@@ -857,7 +799,8 @@ asynr: ...@@ -857,7 +799,8 @@ asynr:
} }
goto retc; goto retc;
asyns: asyncsend:
// can send to buffer
if(cas->u.elem != nil) if(cas->u.elem != nil)
c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem); c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
c->senddataq = c->senddataq->link; c->senddataq = c->senddataq->link;
...@@ -870,17 +813,10 @@ asyns: ...@@ -870,17 +813,10 @@ asyns:
} }
goto retc; goto retc;
gotr: syncrecv:
// recv path to wakeup the sender (sg) // can receive from sleeping sender (sg)
if(debug) { if(debug)
prints("gotr: sel="); printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
·printpointer(sel);
prints(" c=");
·printpointer(c);
prints(" o=");
·printint(o);
prints("\n");
}
if(cas->u.elemp != nil) if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g; gp = sg->g;
...@@ -889,23 +825,17 @@ gotr: ...@@ -889,23 +825,17 @@ gotr:
goto retc; goto retc;
rclose: rclose:
// read at end of closed channel
if(cas->u.elemp != nil) if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, nil); c->elemalg->copy(c->elemsize, cas->u.elemp, nil);
c->closed |= Rclosed; c->closed |= Rclosed;
incerr(c); incerr(c);
goto retc; goto retc;
gots: syncsend:
// send path to wakeup the receiver (sg) // can send to sleeping receiver (sg)
if(debug) { if(debug)
prints("gots: sel="); printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
·printpointer(sel);
prints(" c=");
·printpointer(c);
prints(" o=");
·printint(o);
prints("\n");
}
if(c->closed & Wclosed) if(c->closed & Wclosed)
goto sclose; goto sclose;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
...@@ -915,12 +845,14 @@ gots: ...@@ -915,12 +845,14 @@ gots:
goto retc; goto retc;
sclose: sclose:
// send on closed channel
incerr(c); incerr(c);
goto retc; goto retc;
retc: retc:
selunlock(sel); selunlock(sel);
// return to pc corresponding to chosen case
·setcallerpc(&sel, cas->pc); ·setcallerpc(&sel, cas->pc);
as = (byte*)&sel + cas->so; as = (byte*)&sel + cas->so;
freesel(sel); freesel(sel);
...@@ -1020,6 +952,20 @@ loop: ...@@ -1020,6 +952,20 @@ loop:
return sgp; return sgp;
} }
static void
dequeueg(WaitQ *q, Hchan *c)
{
SudoG **l, *sgp;
for(l=&q->first; (sgp=*l) != nil; l=&sgp->link) {
if(sgp->g == g) {
*l = sgp->link;
freesg(c, sgp);
break;
}
}
}
static void static void
enqueue(WaitQ *q, SudoG *sgp) enqueue(WaitQ *q, SudoG *sgp)
{ {
......
// $G $D/$F.go && $L $F.$A && ./$A.out
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import "runtime"
func sender(c chan int, n int) {
for i := 0; i < n; i++ {
c <- 1
}
}
func receiver(c, dummy chan int, n int) {
for i := 0; i < n; i++ {
select {
case <-c:
// nothing
case <-dummy:
panic("dummy")
}
}
}
func main() {
runtime.MemProfileRate = 0
c := make(chan int)
dummy := make(chan int)
// warm up
go sender(c, 100000)
receiver(c, dummy, 100000)
runtime.GC()
runtime.MemStats.Alloc = 0
// second time shouldn't increase footprint by much
go sender(c, 100000)
receiver(c, dummy, 100000)
runtime.GC()
if runtime.MemStats.Alloc > 1e5 {
println("BUG: too much memory for 100,000 selects:", runtime.MemStats.Alloc)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment