diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index bbe05e041c78646e2594ea1492e97078bfa5b1a5..926bde723c2f75ad9ceef8bc8baeb4ec06f83828 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -21,7 +21,7 @@ struct SudoG int16 offset; // offset of case number int8 isfree; // offset of case number SudoG* link; - byte elem[8]; // synch data element (+ more) + byte* elem; // data element }; struct WaitQ @@ -38,8 +38,8 @@ struct Hchan bool closed; uint8 elemalign; Alg* elemalg; // interface for element type - uint32 sendx; // send index - uint32 recvx; // receive index + uint32 sendx; // send index + uint32 recvx; // receive index WaitQ recvq; // list of recv waiters WaitQ sendq; // list of send waiters SudoG* free; // freelist @@ -170,6 +170,7 @@ void runtime·chansend(Hchan *c, byte *ep, bool *pres) { SudoG *sg; + SudoG mysg; G* gp; if(c == nil) @@ -185,7 +186,6 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres) } runtime·lock(c); -loop: if(c->closed) goto closed; @@ -194,12 +194,12 @@ loop: sg = dequeue(&c->recvq, c); if(sg != nil) { - if(ep != nil) - c->elemalg->copy(c->elemsize, sg->elem, ep); - + runtime·unlock(c); + gp = sg->g; gp->param = sg; - runtime·unlock(c); + if(sg->elem != nil) + c->elemalg->copy(c->elemsize, sg->elem, ep); runtime·ready(gp); if(pres != nil) @@ -213,21 +213,22 @@ loop: return; } - sg = allocsg(c); - if(ep != nil) - c->elemalg->copy(c->elemsize, sg->elem, ep); + mysg.elem = ep; + mysg.g = g; + mysg.selgen = g->selgen; g->param = nil; g->status = Gwaiting; - enqueue(&c->sendq, sg); + enqueue(&c->sendq, &mysg); runtime·unlock(c); runtime·gosched(); - runtime·lock(c); - sg = g->param; - if(sg == nil) - goto loop; - freesg(c, sg); - runtime·unlock(c); + if(g->param == nil) { + runtime·lock(c); + if(!c->closed) + runtime·throw("chansend: spurious wakeup"); + goto closed; + } + return; asynch: @@ -240,17 +241,18 @@ asynch: *pres = false; return; } - sg = allocsg(c); + mysg.g = g; + mysg.elem = nil; + mysg.selgen = g->selgen; g->status = Gwaiting; - enqueue(&c->sendq, sg); + enqueue(&c->sendq, &mysg); runtime·unlock(c); runtime·gosched(); runtime·lock(c); goto asynch; } - if(ep != nil) - c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); + c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); if(++c->sendx == c->dataqsiz) c->sendx = 0; c->qcount++; @@ -258,7 +260,6 @@ asynch: sg = dequeue(&c->recvq, c); if(sg != nil) { gp = sg->g; - freesg(c, sg); runtime·unlock(c); runtime·ready(gp); } else @@ -277,6 +278,7 @@ void runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received) { SudoG *sg; + SudoG mysg; G *gp; if(c == nil) @@ -289,8 +291,6 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received) runtime·printf("chanrecv: chan=%p\n", c); runtime·lock(c); - -loop: if(c->dataqsiz > 0) goto asynch; @@ -299,13 +299,12 @@ loop: sg = dequeue(&c->sendq, c); if(sg != nil) { + runtime·unlock(c); + if(ep != nil) c->elemalg->copy(c->elemsize, ep, sg->elem); - c->elemalg->copy(c->elemsize, sg->elem, nil); - gp = sg->g; gp->param = sg; - runtime·unlock(c); runtime·ready(gp); if(selected != nil) @@ -321,25 +320,24 @@ loop: return; } - sg = allocsg(c); + mysg.elem = ep; + mysg.g = g; + mysg.selgen = g->selgen; g->param = nil; g->status = Gwaiting; - enqueue(&c->recvq, sg); + enqueue(&c->recvq, &mysg); runtime·unlock(c); runtime·gosched(); - runtime·lock(c); - sg = g->param; - if(sg == nil) - goto loop; + if(g->param == nil) { + runtime·lock(c); + if(!c->closed) + runtime·throw("chanrecv: spurious wakeup"); + goto closed; + } - if(ep != nil) - c->elemalg->copy(c->elemsize, ep, sg->elem); - c->elemalg->copy(c->elemsize, sg->elem, nil); if(received != nil) *received = true; - freesg(c, sg); - runtime·unlock(c); return; asynch: @@ -354,9 +352,11 @@ asynch: *received = false; return; } - sg = allocsg(c); + mysg.g = g; + mysg.elem = nil; + mysg.selgen = g->selgen; g->status = Gwaiting; - enqueue(&c->recvq, sg); + enqueue(&c->recvq, &mysg); runtime·unlock(c); runtime·gosched(); @@ -369,10 +369,10 @@ asynch: if(++c->recvx == c->dataqsiz) c->recvx = 0; c->qcount--; + sg = dequeue(&c->sendq, c); if(sg != nil) { gp = sg->g; - freesg(c, sg); runtime·unlock(c); runtime·ready(gp); } else @@ -721,7 +721,6 @@ selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so cas->so = so; cas->kind = CaseRecv; cas->u.recv.elemp = elem; - cas->u.recv.receivedp = nil; cas->u.recv.receivedp = received; if(debug) @@ -911,6 +910,7 @@ loop: } if(dfl != nil) { + selunlock(sel); cas = dfl; goto retc; } @@ -926,12 +926,12 @@ loop: switch(cas->kind) { case CaseRecv: + sg->elem = cas->u.recv.elemp; enqueue(&c->recvq, sg); break; case CaseSend: - if(c->dataqsiz == 0) - c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); + sg->elem = cas->u.elem; enqueue(&c->sendq, sg); break; } @@ -965,10 +965,8 @@ loop: cas = sel->scase[o]; c = cas->chan; - if(c->dataqsiz > 0) { -// prints("shouldnt happen\n"); - goto loop; - } + if(c->dataqsiz > 0) + runtime·throw("selectgo: shouldnt happen"); if(debug) runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n", @@ -977,12 +975,10 @@ loop: if(cas->kind == CaseRecv) { if(cas->u.recv.receivedp != nil) *cas->u.recv.receivedp = true; - if(cas->u.recv.elemp != nil) - c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem); - c->elemalg->copy(c->elemsize, sg->elem, nil); } freesg(c, sg); + selunlock(sel); goto retc; asyncrecv: @@ -998,35 +994,38 @@ asyncrecv: sg = dequeue(&c->sendq, c); if(sg != nil) { gp = sg->g; - freesg(c, sg); + selunlock(sel); runtime·ready(gp); + } else { + selunlock(sel); } goto retc; asyncsend: // can send to buffer - if(cas->u.elem != nil) - c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem); + c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem); if(++c->sendx == c->dataqsiz) c->sendx = 0; c->qcount++; sg = dequeue(&c->recvq, c); if(sg != nil) { gp = sg->g; - freesg(c, sg); + selunlock(sel); runtime·ready(gp); + } else { + selunlock(sel); } goto retc; syncrecv: // can receive from sleeping sender (sg) + selunlock(sel); if(debug) runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); if(cas->u.recv.receivedp != nil) *cas->u.recv.receivedp = true; if(cas->u.recv.elemp != nil) c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem); - c->elemalg->copy(c->elemsize, sg->elem, nil); gp = sg->g; gp->param = sg; runtime·ready(gp); @@ -1034,6 +1033,7 @@ syncrecv: rclose: // read at end of closed channel + selunlock(sel); if(cas->u.recv.receivedp != nil) *cas->u.recv.receivedp = false; if(cas->u.recv.elemp != nil) @@ -1042,18 +1042,16 @@ rclose: syncsend: // can send to sleeping receiver (sg) + selunlock(sel); if(debug) runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); - if(c->closed) - goto sclose; - c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); + if(sg->elem != nil) + c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); gp = sg->g; gp->param = sg; runtime·ready(gp); retc: - selunlock(sel); - // return to pc corresponding to chosen case pc = cas->pc; as = (byte*)selp + cas->so; @@ -1093,7 +1091,6 @@ runtime·closechan(Hchan *c) break; gp = sg->g; gp->param = nil; - freesg(c, sg); runtime·ready(gp); } @@ -1104,7 +1101,6 @@ runtime·closechan(Hchan *c) break; gp = sg->g; gp->param = nil; - freesg(c, sg); runtime·ready(gp); } @@ -1203,7 +1199,7 @@ allocsg(Hchan *c) if(sg != nil) { c->free = sg->link; } else - sg = runtime·mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem)); + sg = runtime·mal(sizeof(*sg)); sg->selgen = g->selgen; sg->g = g; sg->offset = 0; @@ -1220,6 +1216,7 @@ freesg(Hchan *c, SudoG *sg) runtime·throw("chan.freesg: already free"); sg->isfree = 1; sg->link = c->free; + sg->elem = nil; c->free = sg; } } diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go new file mode 100644 index 0000000000000000000000000000000000000000..31f6856e77fd90c4ab6e4f866e6adbf27ba9e662 --- /dev/null +++ b/src/pkg/runtime/chan_test.go @@ -0,0 +1,251 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime_test + +import ( + "runtime" + "sync/atomic" + "testing" +) + +func BenchmarkSelectUncontended(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + myc1 := make(chan int, 1) + myc2 := make(chan int, 1) + myc1 <- 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 + } + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkSelectContended(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + myc1 := make(chan int, procs) + myc2 := make(chan int, procs) + for p := 0; p < procs; p++ { + myc1 <- 0 + go func() { + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + select { + case <-myc1: + myc2 <- 0 + case <-myc2: + myc1 <- 0 + } + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkSelectNonblock(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + myc1 := make(chan int) + myc2 := make(chan int) + myc3 := make(chan int, 1) + myc4 := make(chan int, 1) + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + select { + case <-myc1: + default: + } + select { + case myc2 <- 0: + default: + } + select { + case <-myc3: + default: + } + select { + case myc4 <- 0: + default: + } + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkChanUncontended(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + myc := make(chan int, CallsPerSched) + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + myc <- 0 + } + for g := 0; g < CallsPerSched; g++ { + <-myc + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkChanContended(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + myc := make(chan int, procs*CallsPerSched) + for p := 0; p < procs; p++ { + go func() { + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + myc <- 0 + } + for g := 0; g < CallsPerSched; g++ { + <-myc + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkChanSync(b *testing.B) { + const CallsPerSched = 1000 + procs := 2 + N := int32(b.N / CallsPerSched / procs * procs) + c := make(chan bool, procs) + myc := make(chan int) + for p := 0; p < procs; p++ { + go func() { + for { + i := atomic.AddInt32(&N, -1) + if i < 0 { + break + } + for g := 0; g < CallsPerSched; g++ { + if i%2 == 0 { + <-myc + myc <- 0 + } else { + myc <- 0 + <-myc + } + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, 2*procs) + myc := make(chan int, chanSize) + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < CallsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + myc <- 1 + } + } + myc <- 0 + c <- foo == 42 + }() + go func() { + foo := 0 + for { + v := <-myc + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + <-c + } +} + +func BenchmarkChanProdCons0(b *testing.B) { + benchmarkChanProdCons(b, 0, 0) +} + +func BenchmarkChanProdCons10(b *testing.B) { + benchmarkChanProdCons(b, 10, 0) +} + +func BenchmarkChanProdCons100(b *testing.B) { + benchmarkChanProdCons(b, 100, 0) +} + +func BenchmarkChanProdConsWork0(b *testing.B) { + benchmarkChanProdCons(b, 0, 100) +} + +func BenchmarkChanProdConsWork10(b *testing.B) { + benchmarkChanProdCons(b, 10, 100) +} + +func BenchmarkChanProdConsWork100(b *testing.B) { + benchmarkChanProdCons(b, 100, 100) +}