Commit 0f398ab8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f2b8e63b
...@@ -641,7 +641,7 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) { ...@@ -641,7 +641,7 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) {
}() }()
if !strings.HasPrefix(msg, "watch ") { if !strings.HasPrefix(msg, "watch ") {
return 0, 0, fmt.Errorf("not a watch request") return 0, 0, fmt.Errorf("not a watch request: %q", msg)
} }
argv := strings.Split(msg[len("watch "):], " ") argv := strings.Split(msg[len("watch "):], " ")
if len(argv) != 2 { if len(argv) != 2 {
......
...@@ -423,6 +423,7 @@ import ( ...@@ -423,6 +423,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io"
stdlog "log" stdlog "log"
"os" "os"
// "runtime" // "runtime"
...@@ -1336,7 +1337,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1336,7 +1337,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
if rev == zodb.TidMax { if rev == zodb.TidMax {
panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk) panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
} }
return // already pinned XXX for simultaneous calls? return nil // already pinned XXX for simultaneous calls?
} }
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr)) ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
...@@ -1379,7 +1380,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1379,7 +1380,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
f := bfdir.fileTab[foid] f := bfdir.fileTab[foid]
if f == nil { if f == nil {
// by "invalidation protocol" watch is setup after data file was opened // by "invalidation protocol" watch is setup after data file was opened
return fmt.Errorf("file not yet known or is not a ZBigFile") return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile")
} }
w = &Watch{ w = &Watch{
...@@ -1404,7 +1405,8 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1404,7 +1405,8 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX <~> f.δtail.Head() ≥ at (?) // XXX <~> f.δtail.Head() ≥ at (?)
if at < bfdir.δFtail.Tail() { if at < bfdir.δFtail.Tail() {
return fmt.Errorf("at is too far away back from head/at (%s)", headAt) return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time))
} }
// TODO register w to f here early, so that READs going in parallel to // TODO register w to f here early, so that READs going in parallel to
...@@ -1482,14 +1484,14 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus ...@@ -1482,14 +1484,14 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
// XXX del wlinkTab[w] on w.sk.File.Release // XXX del wlinkTab[w] on w.sk.File.Release
head.wlinkTab[wlink] = struct{}{} head.wlinkTab[wlink] = struct{}{}
go wlink.serveRX() go wlink.serve()
return wlink.sk.File(), fuse.OK return wlink.sk.File(), fuse.OK
} }
// serveRX serves client initiated watch requests and routes client replies to // serve serves client initiated watch requests and routes client replies to
// wcfs initiated requests. // wcfs initiated pin requests.
func (wlink *WatchLink) serveRX() { func (wlink *WatchLink) serve() {
err := wlink._serveRX() err := wlink._serve()
// XXX log error if !close // XXX log error if !close
if err != nil { if err != nil {
log.Error(err) log.Error(err)
...@@ -1498,14 +1500,11 @@ func (wlink *WatchLink) serveRX() { ...@@ -1498,14 +1500,11 @@ func (wlink *WatchLink) serveRX() {
delete(wlink.head.wlinkTab, wlink) delete(wlink.head.wlinkTab, wlink)
} }
func (wlink *WatchLink) _serveRX() (err error) { func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id) defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk) r := bufio.NewReader(wlink.sk)
ctx := context.TODO() // XXX ctx = ? -> ctx of wcfs running ctx := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1?
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
...@@ -1514,10 +1513,14 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1514,10 +1513,14 @@ func (wlink *WatchLink) _serveRX() (err error) {
if err == nil { if err == nil {
err = err2 err = err2
} }
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1? -> better 0 as in HTTP/2?
// XXX then -> CloseWrite
}() }()
// close .sk on error/wcfs stopping or return. closing .sk wakes up rx // close .sk.rx on error/wcfs stopping or return. closing .sk.rx wakes up read(sk)
// on it and rx on client side. make sure to stop wg on error return. // (XXX and rx on client side). make sure to stop wg on error return.
retq := make(chan struct{}) retq := make(chan struct{})
defer close(retq) defer close(retq)
wg.Go(func() error { wg.Go(func() error {
...@@ -1529,7 +1532,8 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1529,7 +1532,8 @@ func (wlink *WatchLink) _serveRX() (err error) {
e = err // returned error e = err // returned error
} }
e2 := wlink.sk.Close() //e2 := wlink.sk.Close() // XXX -> CloseRead
e2 := wlink.sk.CloseRead()
if e == nil { if e == nil {
e = e2 e = e2
} }
...@@ -1543,6 +1547,10 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1543,6 +1547,10 @@ func (wlink *WatchLink) _serveRX() (err error) {
for { for {
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
if err != nil { if err != nil {
// r.Read is woken up by sk.CloseRead when serve decides to exit
if err == io.ErrClosedPipe {
err = nil
}
return err return err
} }
...@@ -1593,6 +1601,9 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1593,6 +1601,9 @@ func (wlink *WatchLink) _serveRX() (err error) {
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) { func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
defer xerr.Contextf(&err, "%d", stream) defer xerr.Contextf(&err, "%d", stream)
// FIXME error "at is too far away" from setupWatch shuold be not fatal
// (XXX bad parse watch -> fatal (not following protocol))
// XXX kill fatal replies? (i.e. make almost everything non-fatal ?)
err = wlink._handleWatch(ctx, msg) err = wlink._handleWatch(ctx, msg)
reply := "ok" reply := "ok"
if err != nil { if err != nil {
......
...@@ -878,8 +878,12 @@ def test_wcfs(): ...@@ -878,8 +878,12 @@ def test_wcfs():
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
# invalid requests -> wcfs replies error # invalid requests -> wcfs replies error
# XXX invalid request not following frame structure `<stream> ...`
wl = t.openwatch() wl = t.openwatch()
assert wl.sendReq(context.background(), b'bla bla') == b"error bad watch: not a watch request" assert wl.sendReq(context.background(), b'bla bla') == \
b'error bad watch: not a watch request: "bla bla"'
# wcfs must close watch link after invalid request # wcfs must close watch link after invalid request
_, _rx = select( _, _rx = select(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment