Commit 193f7ae2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3154ae82
......@@ -1353,26 +1353,40 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// XXX called synchronously - only 1 setupWatch call at a time?
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
head := wlink.head
bfdir := head.bfdir
// XXX locking
// XXX at = zobd.InvalidTid - remove watch
// XXX if watch was already established - we need to update it
w := wlink.byfile[foid]
if w != nil {
panic("TODO") // XXX update the watch
}
if w == nil {
// watch was not previously established - set it up anew
// XXX locking
head := wlink.head
bfdir := head.bfdir
f := bfdir.fileTab[foid]
if f == nil {
// by "invalidation protocol" watch is setup after data file was opened
return fmt.Errorf("file not yet known or is not a ZBigFile")
}
w = &Watch{
link: wlink,
file: f,
at: at,
//pinned: make(SetI64),
pinned: make(map[int64]zodb.Tid),
}
}
// XXX check at >= w.at
if !(at >= w.at) {
panic("TODO")
}
headAt := head.zconn.At()
// XXX wait head.zconn.At() ≥ at
......@@ -1382,14 +1396,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return fmt.Errorf("at is too far away back from head/at (%s)", headAt)
}
w = &Watch{
link: wlink,
file: f,
at: at,
//pinned: make(SetI64),
pinned: make(map[int64]zodb.Tid),
}
// TODO register w to f here early, so that READs going in parallel to
// us preparing and processing initial pins, also send pins for read
// blocks. If we don't, we can miss to send pin for a freshly read
......@@ -1400,6 +1406,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX locking
// pin all tracked file blocks that were changed in (at, head] range
f := w.file
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks {
_, already := toPin[blk]
......
......@@ -671,6 +671,7 @@ def watch(w, zf, at): # XXX -> ?
# send watch request and check that we receive pins for in-cache blocks
# changed > at.
# XXX use timeout to detect wcfs replying less pins.
ctx, cancel = context.with_cancel(context.background())
wg = sync.WorkGroup(ctx)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment