Commit 12628943 authored by Kirill Smelkov's avatar Kirill Smelkov

X make sure "bye" is always processed immediately - even if a handleWatch is currently blocked

parent 508a3d71
......@@ -1516,11 +1516,18 @@ func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk)
ctx := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg, ctx := errgroup.WithContext(ctx)
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
err2 := wg.Wait()
if err == nil {
err = err2
......@@ -1529,28 +1536,31 @@ func (wlink *WatchLink) _serve() (err error) {
// write to peer if it was logical error on client side
// then .sk.tx to wakeup rx on client side
if err != nil {
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", err))
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
}
// close .sk.tx : this wakes up rx on client side.
err2 = wlink.sk.CloseWrite()
if err == nil {
err = err2
}
}()
// close .sk.rx on error/wcfs stopping or return. closing .sk.rx wakes up read(sk)
// (XXX and rx on client side). make sure to stop wg on error return.
// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
retq := make(chan struct{})
defer close(retq)
wg.Go(func() error {
var e error
// monitor is always canceled - either at parent ctx cancel, or
// upon return from serve (see "cancel all handlers ..." ^^^).
// If it was return - report returned error to wg.Wait, not "canceled".
<-ctx.Done()
e := ctx.Err()
select {
case <-ctx.Done():
e = ctx.Err()
default:
case <-retq:
e = err // returned error
}
//e2 := wlink.sk.Close() // XXX -> CloseRead
e2 := wlink.sk.CloseRead()
if e == nil {
e = e2
......
......@@ -1219,6 +1219,8 @@ def test_wcfs():
# XXX commit after current file size -> watch
# XXX no reply to pin - killed
# XXX access to block not previously accessed but invalidated in ZODB
......@@ -1258,8 +1260,6 @@ def test_wcfs():
# XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
# setupWatch must send pins.
# XXX no reply to pin - killed
# ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment