Commit 65a027f8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 096d7eb0
......@@ -557,7 +557,8 @@ type WatchLink struct {
// established watchs.
// XXX in-progress - where? -> nowhere; here only established watches are added
// XXX locking?
watchTab map[zodb.Oid]*Watch // {} foid -> Watch
// XXX -> watchTab?
fileTab map[zodb.Oid]*Watch // {} foid -> Watch
// IO
// acceptq chan string // (stream, msg) // client-initiated messages go here
......@@ -567,8 +568,8 @@ type WatchLink struct {
// Watch represents watching for changes to 1 BigFile.
type Watch struct {
wlink *WatchLink // link to client
file *BigFile // XXX needed?
link *WatchLink // link to client
file *BigFile // XXX needed?
// XXX locking
......@@ -1150,7 +1151,7 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []zodb
blkrevmax = tidmin(blkrevmax, pathRevMax)
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watchers {
for w := range f.watches {
w := w
wg.Go(func() error {
// XXX close watcher on any error
......@@ -1242,7 +1243,7 @@ retry:
// XXX error - when? or close watch on any error?
//
// XXX place=ok?
func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid()
defer xerr.Contextf(&err, "f<%s>: watch%d: pin #%d @%s", foid, w.link.id, blk, rev)
......@@ -1274,72 +1275,71 @@ func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error
return nil
}
// setupWatch sets up new FileWatch when client sends `watch <file> @<at>`.
// setupWatch sets up new Watch when client sends `watch <file> @<at>`.
//
// XXX place=ok?
func (watch *Watcher) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
// XXX locking
// XXX at = zobd.InvalidTid - remove watch
// XXX if watch was already established - we need to update it
w = watch.fileTab[foid]
w := wlink.fileTab[foid]
if w != nil {
// XXX update the watch
}
// watch was not previously established - set it up anew
// XXX locking
f := watch.head.bfdir.fileTab[foid]
f := wlink.head.bfdir.fileTab[foid]
if f == nil {
// by "invalidation protocol" watch is setup after data file was opened
return fmt.Errorf("file not yet known or is not a ZBigFile")
}
// XXX wait watch.head.zconn.At() ≥ at
// XXX wait wlink.head.zconn.At() ≥ at
panic("TODO")
}
// ---- Watch server ----
// Open serves /head/watch opens.
func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags?
head := watch.head
head := wnode.head
w := &Watcher{
wlink := &WatchLink{
sk: NewFileSock(),
id: atomic.AddInt32(&watch.idNext, +1),
id: atomic.AddInt32(&wnode.idNext, +1),
head: head,
fileTab: make(map[*FileWatch]struct{}),
fileTab: make(map[zodb.Oid]*Watch),
}
// XXX locking
// XXX del watchTab[w] on w.sk.File.Release
head.watchTab[w] = struct{}{}
// XXX del wlinkTab[w] on w.sk.File.Release
head.wlinkTab[wlink] = struct{}{}
go w.serveRX()
return w.sk.File(), fuse.OK
go wlink.serveRX()
return wlink.sk.File(), fuse.OK
}
// serveRX serves client originated watch requests and routes client replies to
// wcfs originated requests.
func (w *Watcher) serveRX() {
err := w._serveRX()
func (wlink *WatchLink) serveRX() {
err := wlink._serveRX()
_ = err
// XXX log error if !close
// XXX close if was not closed?
// XXX locking
delete(w.head.watchTab, w)
delete(wlink.head.wlinkTab, wlink)
}
func (w *Watcher) _serveRX() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve rx", w.id)
r := bufio.NewReader(w.sk)
func (wlink *WatchLink) _serveRX() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk)
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1?
......@@ -1360,10 +1360,10 @@ func (w *Watcher) _serveRX() (err error) {
// reply from client to to wcfs
reply := (stream % 2 == 0)
if reply {
w.rxMu.Lock()
rxq := w.rxTab[stream]
delete(w.rxTab, stream)
w.rxMu.Unlock()
wlink.rxMu.Lock()
rxq := wlink.rxTab[stream]
delete(wlink.rxTab, stream)
wlink.rxMu.Unlock()
if rxq == nil {
return fmt.Errorf("%d: reply on unexpected stream", stream)
......@@ -1378,7 +1378,8 @@ func (w *Watcher) _serveRX() (err error) {
return fmt.Errorf("%d: %s", stream, err)
}
err = w.setupWatch(foid, at)
// XXX ctx = ?
err = wlink.setupWatch(context.TODO(), foid, at) // XXX -> head.setupWatch?
if err != nil {
return fmt.Errorf("%d: %s", stream, err)
}
......@@ -1386,20 +1387,20 @@ func (w *Watcher) _serveRX() (err error) {
}
// sendReq sends wcfs-originated request to client and returns client response.
func (w *Watcher) sendReq(ctx context.Context, req string) (reply string, err error) {
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx
// XXX assert '\n' not in req
stream := uint64(2) // FIXME allocate stream anew as several in-flight sendReq are possible
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
w.rxMu.Lock()
w.rxTab[stream] = rxq // XXX assert .stream is not there?
w.rxMu.Unlock()
wlink.rxMu.Lock()
wlink.rxTab[stream] = rxq // XXX assert .stream is not there?
wlink.rxMu.Unlock()
// XXX lock tx
// XXX timeout write on ctx cancel
_, err = w.sk.Write([]byte(fmt.Sprintf("%d %s\n", stream, req)))
_, err = wlink.sk.Write([]byte(fmt.Sprintf("%d %s\n", stream, req)))
if err != nil {
return "", err
}
......@@ -1788,10 +1789,10 @@ func main() {
fsNode: newFSNode(fSticky),
rev: 0,
zconn: zhead,
watchTab: make(map[*Watcher]struct{}),
wlinkTab: make(map[*WatchLink]struct{}),
}
watch := &Watch{
wnode := &WatchNode{
fsNode: newFSNode(fSticky),
head: head,
}
......@@ -1859,7 +1860,7 @@ func main() {
mkdir(root, "head", head)
mkdir(head, "bigfile", bfdir)
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
mkfile(head, "watch", watch)
mkfile(head, "watch", wnode)
// for debugging/testing
_wcfs := newFSNode(fSticky)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment