Commit b4857f66 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dac44ea0
......@@ -1449,7 +1449,6 @@ func (wlink *WatchLink) serveRX() {
err := wlink._serveRX()
_ = err
// XXX log error if !close
// XXX close if was not closed?
// XXX locking
delete(wlink.head.wlinkTab, wlink)
}
......@@ -1458,13 +1457,44 @@ func (wlink *WatchLink) _serveRX() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk)
// XXX close .sk on error/wcfs stopping
ctx := context.TODO() // XXX ctx = ? -> ctx of wcfs running
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1?
wg, ctx := errgroup.WithContext(ctx)
defer func() {
err2 := wg.Wait()
if err == nil {
err = err2
}
}()
// close .sk on error/wcfs stopping or return. closing .sk wakes up rx on it.
// make sure to stop wg on error return.
retq := make(chan struct{})
defer close(retq)
wg.Go(func() error {
var e error
select {
case <-ctx.Done():
e = ctx.Err()
case <-retq:
e = err // returned error
}
e2 := wlink.sk.Close()
if e == nil {
e = e2
}
return e
})
// allow for only 1 watch request at a time
// TODO allow simultaneous watch requests for different files
var handlingWatch int32
for {
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
if err != nil {
......@@ -1494,10 +1524,14 @@ func (wlink *WatchLink) _serveRX() (err error) {
}
// client-initiated watch request
err = wlink.handleWatch(ctx, stream, msg)
if err != nil {
panic(err) // XXX
if atomic.LoadInt32(&handlingWatch) != 0 {
return fmt.Errorf("%d: another watch request is already in progress", stream)
}
atomic.StoreInt32(&handlingWatch, 1)
wg.Go(func() error {
defer atomic.StoreInt32(&handlingWatch, 0)
return wlink.handleWatch(ctx, stream, msg)
})
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment