Commit 12628943 authored by Kirill Smelkov's avatar Kirill Smelkov

X make sure "bye" is always processed immediately - even if a handleWatch is currently blocked

parent 508a3d71
...@@ -1516,11 +1516,18 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1516,11 +1516,18 @@ func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id) defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk) r := bufio.NewReader(wlink.sk)
ctx := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout) ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
defer func() { defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
err2 := wg.Wait() err2 := wg.Wait()
if err == nil { if err == nil {
err = err2 err = err2
...@@ -1529,28 +1536,31 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1529,28 +1536,31 @@ func (wlink *WatchLink) _serve() (err error) {
// write to peer if it was logical error on client side // write to peer if it was logical error on client side
// then .sk.tx to wakeup rx on client side // then .sk.tx to wakeup rx on client side
if err != nil { if err != nil {
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", err)) _ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
} }
// close .sk.tx : this wakes up rx on client side.
err2 = wlink.sk.CloseWrite() err2 = wlink.sk.CloseWrite()
if err == nil { if err == nil {
err = err2 err = err2
} }
}() }()
// close .sk.rx on error/wcfs stopping or return. closing .sk.rx wakes up read(sk) // close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
// (XXX and rx on client side). make sure to stop wg on error return.
retq := make(chan struct{}) retq := make(chan struct{})
defer close(retq) defer close(retq)
wg.Go(func() error { wg.Go(func() error {
var e error // monitor is always canceled - either at parent ctx cancel, or
// upon return from serve (see "cancel all handlers ..." ^^^).
// If it was return - report returned error to wg.Wait, not "canceled".
<-ctx.Done()
e := ctx.Err()
select { select {
case <-ctx.Done(): default:
e = ctx.Err()
case <-retq: case <-retq:
e = err // returned error e = err // returned error
} }
//e2 := wlink.sk.Close() // XXX -> CloseRead
e2 := wlink.sk.CloseRead() e2 := wlink.sk.CloseRead()
if e == nil { if e == nil {
e = e2 e = e2
......
...@@ -1219,6 +1219,8 @@ def test_wcfs(): ...@@ -1219,6 +1219,8 @@ def test_wcfs():
# XXX commit after current file size -> watch # XXX commit after current file size -> watch
# XXX no reply to pin - killed
# XXX access to block not previously accessed but invalidated in ZODB # XXX access to block not previously accessed but invalidated in ZODB
...@@ -1258,8 +1260,6 @@ def test_wcfs(): ...@@ -1258,8 +1260,6 @@ def test_wcfs():
# XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and # XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
# setupWatch must send pins. # setupWatch must send pins.
# XXX no reply to pin - killed
# ---- misc --- # ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment