Commit be3e090c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 41a21d20
......@@ -563,7 +563,7 @@ type WatchLink struct {
fileTab map[zodb.Oid]*Watch // {} foid -> Watch
// IO
// acceptq chan string // (stream, msg) // client-initiated messages go here
txMu sync.Mutex
rxMu sync.Mutex
rxTab map[uint64]chan string // client replies go via here
}
......@@ -1260,6 +1260,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
}
// XXX comment
// XXX file.δtail has not full info
rev, _ = w.file.δtail.LastRevOf(blk, w.at)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev))
......@@ -1277,6 +1278,8 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
// XXX called synchronously - only 1 setupWatch call at a time?
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
......@@ -1287,7 +1290,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX if watch was already established - we need to update it
w := wlink.fileTab[foid]
if w != nil {
// XXX update the watch
panic("TODO") // XXX update the watch
}
// watch was not previously established - set it up anew
......@@ -1307,7 +1310,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return fmt.Errorf("at is too far away back from head/at")
}
toPin := map[int64]zodb.Tid // blk -> @rev
toPin := map[int64]zodb.Tid{} // blk -> @rev
// XXX f.δtail.Head() not neccessarily = head.At()
// (if f was not changed by a txn, f.δtail stays not updated) XXX correct?
......@@ -1331,7 +1334,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
blk := blk
rev := rev
wg.Go(func() error {
return wlink.pin(ctx, blk, rev)
return w.pin(ctx, blk, rev)
})
}
err = wg.Wait()
......@@ -1339,7 +1342,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
panic(err) // XXX
}
// XXX something else?
// XXX register w to f (here ?)
return nil
}
// Open serves /head/watch opens.
......@@ -1362,8 +1367,8 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
return wlink.sk.File(), fuse.OK
}
// serveRX serves client originated watch requests and routes client replies to
// wcfs originated requests.
// serveRX serves client initiated watch requests and routes client replies to
// wcfs initiated requests.
func (wlink *WatchLink) serveRX() {
err := wlink._serveRX()
_ = err
......@@ -1414,17 +1419,23 @@ func (wlink *WatchLink) _serveRX() (err error) {
return fmt.Errorf("%d: %s", stream, err)
}
err = wlink.setupWatch(context.TODO(), foid, at) // XXX ctx = ?
ctx := context.TODO() // XXX ctx = ?
err = wlink.setupWatch(ctx, foid, at)
if err != nil {
return fmt.Errorf("%d: %s", stream, err)
}
err = wlink.send(ctx, stream, "ok")
if err != nil {
panic(err) // XXX
}
}
}
// sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx
// XXX assert '\n' not in req
stream := uint64(2) // FIXME allocate stream anew as several in-flight sendReq are possible
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
......@@ -1432,10 +1443,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
wlink.rxTab[stream] = rxq // XXX assert .stream is not there?
wlink.rxMu.Unlock()
// XXX lock tx
// XXX timeout write on ctx cancel
_, err = wlink.sk.Write([]byte(fmt.Sprintf("%d %s\n", stream, req)))
err = wlink.send(ctx, stream, req)
if err != nil {
return "", err
}
......@@ -1449,6 +1457,26 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
}
}
// send sends a message message to client over specified stream ID.
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
// XXX err ctx
// XXX assert '\n' not in msg
wlink.txMu.Lock()
defer wlink.txMu.Unlock()
// XXX timeout write on ctx cancel
pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
_, err := wlink.sk.Write(pkt)
if err != nil {
return err
}
return nil
}
// ---- Lookup ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment