Commit 3725aa97 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e05f89b1
......@@ -493,7 +493,7 @@ type Watcher struct {
head *Head
// established file watchers.
// XXX in-progress - where?
// XXX in-progress - where? -> nowhere; here only established watches are added
// XXX locking?
fileTab map[*FileWatch]struct{}
......@@ -1262,7 +1262,7 @@ func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error
// XXX comment
rev, _ = w.file.δFtail.LastRevOf(blk, w.at)
ack, err := w.link.send(ctx, fmt.Sprintf("pin %s #%s @%s", foid, blk, rev))
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%s @%s", foid, blk, rev))
if err != nil {
return err
}
......@@ -1294,14 +1294,14 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
// XXX del watchTab[w] on w.sk.File.Release
head.watchTab[w] = struct{}{}
go w.serve()
go w.serveRX()
return w.sk.File(), fuse.OK
}
// serve serves client originated watch requests.
// XXX serves rx? (-> and routes client replies ...)
func (w *Watcher) serve() {
err := w._serve()
// serveRX serves client originated watch requests and routes client replies to
// wcfs originated requests.
func (w *Watcher) serveRX() {
err := w._serveRX()
_ = err
// XXX log error if !close
// XXX close if was not closed?
......@@ -1309,15 +1309,15 @@ func (w *Watcher) serve() {
delete(w.head.watchTab, w)
}
func (w *Watcher) _serve() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve", w.id)
func (w *Watcher) _serveRX() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve rx", w.id)
r := bufio.NewReader(w.sk)
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1?
for {
l, err := r.ReadString('\n') // XXX limit accepted line len not to DOS
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
if err != nil {
return err
}
......@@ -1326,7 +1326,7 @@ func (w *Watcher) _serve() (err error) {
stream, msg, err := parseWatchFrame(l)
if err != nil {
return fmt.Errorf("rx: %s", err)
return fmt.Errorf("%s", err)
}
// reply from client to to wcfs
......@@ -1338,7 +1338,7 @@ func (w *Watcher) _serve() (err error) {
w.rxMu.Unlock()
if rxq == nil {
return fmt.Errorf("rx %d: reply on unexpected stream", stream)
return fmt.Errorf("%d: reply on unexpected stream", stream)
}
rxq <- msg
continue
......@@ -1347,7 +1347,7 @@ func (w *Watcher) _serve() (err error) {
// client-initiated request
oid, at, err := parseWatch(msg)
if err != nil {
return fmt.Errorf("rx %d: %s", stream, err)
return fmt.Errorf("%d: %s", stream, err)
}
_ = oid
......@@ -1360,6 +1360,34 @@ func (w *Watcher) _serve() (err error) {
}
}
// sendReq sends wcfs-originated request to client and returns client response.
func (w *Watcher) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx
// XXX assert '\n' not in req
stream = ... // XXX
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
w.rxMu.Lock()
w.rxTab[stream] = rxq // XXX assert .stream is not there?
w.rxMu.Unlock()
// XXX lock tx
// XXX timeout write on ctx cancel
err = w.sk.Write(fmt.Sprintf("%d %s\n", stream, req))
if err != nil {
return "", err
}
select {
case <-ctx.Done():
return "", ctx.Err()
case reply = <-rxq:
return reply, nil
}
}
// ---- Lookup ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment