Commit a4b30e86 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ca5e34cd
...@@ -495,6 +495,7 @@ type Watcher struct { ...@@ -495,6 +495,7 @@ type Watcher struct {
// established file watchers. // established file watchers.
// XXX in-progress - where? -> nowhere; here only established watches are added // XXX in-progress - where? -> nowhere; here only established watches are added
// XXX locking? // XXX locking?
// XXX -> {} foid -> FileWatch
fileTab map[*FileWatch]struct{} fileTab map[*FileWatch]struct{}
// IO // IO
...@@ -1239,9 +1240,8 @@ retry: ...@@ -1239,9 +1240,8 @@ retry:
// //
// XXX describe more. // XXX describe more.
// XXX explain that if rev ≤ .at there is no rev_next: rev < rev_next ≤ at. // XXX explain that if rev ≤ .at there is no rev_next: rev < rev_next ≤ at.
// XXX error - when? // XXX error - when? or close watch on any error?
// //
// XXX -> WatchFile.Pin(blk, at)
// XXX place=ok? // XXX place=ok?
func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid() foid := w.file.zfile.POid()
...@@ -1275,6 +1275,35 @@ func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error ...@@ -1275,6 +1275,35 @@ func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error
return nil return nil
} }
// setupWatch sets up new FileWatch when client sends `watch <file> @<at>`.
//
// XXX place=ok?
func (watch *Watcher) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
// XXX locking
// XXX at = zobd.InvalidTid - remove watch
// XXX if watch was already established - we need to update it
w = watch.fileTab[foid]
if w != nil {
// XXX update the watch
}
// watch was not previously established - set it up anew
// XXX locking
f := watch.head.bfdir.fileTab[foid]
if f == nil {
// by "invalidation protocol" watch is setup after data file was opened
return fmt.Errorf("file not yet known or is not a ZBigFile")
}
// XXX wait watch.head.zconn.At() ≥ at
}
// ---- Watch server ---- // ---- Watch server ----
...@@ -1344,18 +1373,15 @@ func (w *Watcher) _serveRX() (err error) { ...@@ -1344,18 +1373,15 @@ func (w *Watcher) _serveRX() (err error) {
continue continue
} }
// client-initiated request // client-initiated watch request
oid, at, err := parseWatch(msg) foid, at, err := parseWatch(msg)
if err != nil { if err != nil {
return fmt.Errorf("%d: %s", stream, err) return fmt.Errorf("%d: %s", stream, err)
} }
_ = oid err = w.setupWatch(foid, at)
_ = at
_, err = fmt.Fprintf(w.sk, "%d error TODO\n", stream)
if err != nil { if err != nil {
return err return fmt.Errorf("%d: %s", stream, err)
} }
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment