Commit ad766f75 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Rework WatchLink.serve to rely on context cancellation to stop reading

Previously we were using .sk.CloseRead() to interrupt sk.Read(), but
that is not necessary since .sk, relying on xio.Pipe, implements
xio.Reader natively with full support for cancellation.

The original code to cancel via CloseRead comes from mid 2019 and predates

go123@7ad867a3
go123@0e368363
go123@0bdac628
go123@9db4dfac
go123@d2dc6c09

And in b17aeb8c and
6f0cdaff (wcfs: Provide isolation to clients), it seems, I missed to
update WatchLink.serve code to that.

Do that now because it simplifies code flow organization a bit.
parent 2fe57e8d
...@@ -1884,8 +1884,6 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1884,8 +1884,6 @@ func (wlink *WatchLink) _serve() (err error) {
ctx, cancel := context.WithCancel(ctx0) ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
defer func() { defer func() {
// cancel all handlers on both error and ok return. // cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client // ( ok return is e.g. when we received "bye", so if client
...@@ -1913,40 +1911,34 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1913,40 +1911,34 @@ func (wlink *WatchLink) _serve() (err error) {
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err)) _ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
} }
// close .sk.tx : this wakes up rx on client side. // close .sk
err2 = wlink.sk.CloseWrite() // closing .sk.tx wakes up rx on client side.
err2 = wlink.sk.Close()
if err == nil { if err == nil {
err = err2 err = err2
} }
}() }()
// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk). // cancel main thread on any watch handler error
retq := make(chan struct{}) ctx, mainCancel := context.WithCancel(ctx)
defer close(retq) defer mainCancel()
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
// monitor is always canceled - either at parent ctx cancel, or // monitor is always canceled - either due to parent ctx cancel, error in workgroup,
// upon return from serve (see "cancel all handlers ..." ^^^). // or return from serve and running "cancel all handlers ..." above
// If it was return - report returned error to wg.Wait, not "canceled".
<-ctx.Done() <-ctx.Done()
e := ctx.Err() mainCancel()
select { return nil
default:
case <-retq:
e = err // returned error
}
e2 := wlink.sk.CloseRead()
if e == nil {
e = e2
}
return e
}) })
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
for { for {
// NOTE r.Read is woken up by ctx cancel because wlink.sk implements xio.Reader natively
l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS
if err != nil { if err != nil {
// r.Read is woken up by sk.CloseRead when serve decides to exit if err == io.EOF {
if err == io.ErrClosedPipe || err == io.EOF { err = io.ErrUnexpectedEOF
}
if errors.Is(err, ctx.Err()) {
err = nil err = nil
} }
return err return err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment