Commit e410a527 authored by Keith Randall's avatar Keith Randall

runtime: simplify chan ops, take 2

This change is the same as CL #9345 which was reverted,
except for a small bug fix.

The only change is to the body of sendDirect and its callsite.
Also added a test.

The problem was during a channel send operation.  The target
of the send was a sleeping goroutine waiting to receive.  We
basically do:
1) Read the destination pointer out of the sudog structure
2) Copy the value we're sending to that destination pointer
Unfortunately, the previous change had a goroutine suspend
point between 1 & 2 (the call to sendDirect).  At that point
the destination goroutine's stack could be copied (shrunk).
The pointer we read in step 1 is no longer valid for step 2.

Fixed by not allowing any suspension points between 1 & 2.
I suspect the old code worked correctly basically by accident.

Fixes #13169

The original 9345:

This change removes the retry mechanism we use for buffered channels.
Instead, any sender waking up a receiver or vice versa completes the
full protocol with its counterpart.  This means the counterpart does
not need to relock the channel when it wakes up.  (Currently
buffered channels need to relock on wakeup.)

For sends on a channel with waiting receivers, this change replaces
two copies (sender->queue, queue->receiver) with one (sender->receiver).
For receives on channels with a waiting sender, two copies are still required.

This change unifies to a large degree the algorithm for buffered
and unbuffered channels, simplifying the overall implementation.

Fixes #11506

Change-Id: I57dfa3fc219cffa4d48301ee15fe5479299efa09
Reviewed-on: https://go-review.googlesource.com/16740Reviewed-by: default avatarIan Lance Taylor <iant@golang.org>
parent 1b4d28f8
This diff is collapsed.
......@@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
k *scase
sglist *sudog
sgnext *sudog
futile byte
qp unsafe.Pointer
)
loop:
......@@ -317,15 +317,12 @@ loop:
switch cas.kind {
case caseRecv:
if c.dataqsiz > 0 {
if c.qcount > 0 {
goto asyncrecv
}
} else {
sg = c.sendq.dequeue()
if sg != nil {
goto syncrecv
}
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
......@@ -338,15 +335,12 @@ loop:
if c.closed != 0 {
goto sclose
}
if c.dataqsiz > 0 {
if c.qcount < c.dataqsiz {
goto asyncsend
}
} else {
sg = c.recvq.dequeue()
if sg != nil {
goto syncsend
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
......@@ -363,6 +357,9 @@ loop:
// pass 2 - enqueue on all chans
gp = getg()
done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas.c
......@@ -389,7 +386,7 @@ loop:
// wait for someone to wake us up
gp.param = nil
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
// someone woke us up
sellock(sel)
......@@ -432,16 +429,13 @@ loop:
}
if cas == nil {
futile = traceFutileWakeup
// This can happen if we were woken up by a close().
// TODO: figure that out explicitly so we don't need this loop.
goto loop
}
c = cas.c
if c.dataqsiz > 0 {
throw("selectgo: shouldn't happen")
}
if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
......@@ -470,7 +464,7 @@ loop:
selunlock(sel)
goto retc
asyncrecv:
bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
......@@ -485,29 +479,20 @@ asyncrecv:
if cas.receivedp != nil {
*cas.receivedp = true
}
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx))
typedmemmove(c.elemtype, cas.elem, qp)
}
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
memclr(qp, uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
sg = c.sendq.dequeue()
if sg != nil {
gp = sg.g
selunlock(sel)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
} else {
selunlock(sel)
}
selunlock(sel)
goto retc
asyncsend:
bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
......@@ -523,47 +508,18 @@ asyncsend:
c.sendx = 0
}
c.qcount++
sg = c.recvq.dequeue()
if sg != nil {
gp = sg.g
selunlock(sel)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
} else {
selunlock(sel)
}
selunlock(sel)
goto retc
syncrecv:
recv:
// can receive from sleeping sender (sg)
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
racesync(c, sg)
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
selunlock(sel)
recv(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "\n")
}
if cas.receivedp != nil {
*cas.receivedp = true
}
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, sg.elem)
}
sg.elem = nil
gp = sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
goto retc
rclose:
......@@ -580,29 +536,19 @@ rclose:
}
goto retc
syncsend:
// can send to sleeping receiver (sg)
send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
racesync(c, sg)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
selunlock(sel)
send(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncsend: sel=", sel, " c=", c, "\n")
}
if sg.elem != nil {
syncsend(c, sg, cas.elem)
}
sg.elem = nil
gp = sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
goto retc
retc:
if cas.releasetime > 0 {
......
// run
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
type T struct {
a, b, c int
}
func usestack() {
usestack1(32)
}
func usestack1(d int) byte {
if d == 0 {
return 0
}
var b [1024]byte
usestack1(d - 1)
return b[3]
}
const n = 100000
func main() {
c := make(chan interface{})
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < n; j++ {
c <- new(T)
}
done <- true
}()
go func() {
for j := 0; j < n; j++ {
_ = (<-c).(*T)
usestack()
}
done <- true
}()
}
for i := 0; i < 20; i++ {
<-done
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment