Commit d245e5ef authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 56ee365c
......@@ -554,8 +554,12 @@ type BigFile struct {
loadMu sync.Mutex
loading map[int64]*blkLoadState // #blk -> {... blkdata}
// watches attached to this file
// XXX already in "established" state (i.e. initial watch request was answered with "ok")
// watches attached to this file.
//
// both watches in already "established" state (i.e. initial watch
// request was completed and answered with "ok"), and watches in
// progress of being established. XXX text
//
// XXX locking -> watchMu?
watches map[*Watch]struct{}
}
......@@ -608,7 +612,6 @@ type Watch struct {
// XXX locking
at zodb.Tid // requested to be watched @at
// pinned SetI64 // blocks that are already pinned to be ≤ at
// {} blk -> rev
pinned map[int64]zodb.Tid // blocks that are already pinned to be ≤ at
......@@ -1362,7 +1365,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return nil
}
// setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
......@@ -1391,13 +1394,12 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
link: wlink,
file: f,
at: at,
//pinned: make(SetI64),
pinned: make(map[int64]zodb.Tid),
}
}
// XXX check at >= w.at
// XXX check at >= w.at -> reject?
if !(at >= w.at) {
panic("TODO")
}
......@@ -1413,10 +1415,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
headAt, headAt.Time().Sub(at.Time().Time))
}
// TODO register w to f here early, so that READs going in parallel to
// us preparing and processing initial pins, also send pins for read
f := w.file
// register w to f here early, so that READs going in parallel to us
// preparing and processing initial pins, also send pins for read
// blocks. If we don't, we can miss to send pin for a freshly read
// block which could have revision > w.at .
// block which could have revision > w.at . XXX test
// XXX locking
// XXX register only if watch was created anew, not updated.
f.watches[w] = struct{}{}
// pin all tracked file blocks that were changed in (at, head] range.
......@@ -1424,7 +1432,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
toPin := map[int64]zodb.Tid{} // blk -> @rev
f := w.file
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks {
_, already := toPin[blk]
......@@ -1615,14 +1622,12 @@ func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg stri
err = wlink._handleWatch(ctx, msg)
reply := "ok"
if err != nil {
// logical error is reported back to client, but watch link remains live
reply = fmt.Sprintf("error %s", err)
err = nil
}
err2 := wlink.send(ctx, stream, reply)
if err == nil {
err = err2
}
err = wlink.send(ctx, stream, reply)
return err
}
......@@ -1889,10 +1894,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
loading: make(map[int64]*blkLoadState),
}
// only head/ needs f.δtail
// only head/ needs δFtail & f.δtail.
// only head/ needs δFtail, f.δtail and watches.
if head.rev == 0 {
head.bfdir.δFmu.Lock() // XXX locking ok?
head.bfdir.δFtail.Track(f, -1, sizePath, nil)
......@@ -1900,6 +1902,8 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
// FIXME: scan zfile.blktab - so that we can detect all btree changes
// see "XXX building δFtail lazily ..." in notes.txt
f.watches = make(map[*Watch]struct{})
}
return f, nil
......
......@@ -750,7 +750,7 @@ def watch(twlink, zf, at): # XXX -> ?
# blk ∈ pin_prev, blk ∉ pin -> unpin to head
elif blk in pin_prev and blk not in pin:
pin[blk] = None # XXX = head
pin[blk] = None # @head
# blk ∈ pin_prev, blk ∈ pin -> if rev different: use pin
elif blk in pin_prev and blk in pin:
......@@ -986,6 +986,8 @@ def test_wcfs():
assert ev == ['read pre', 'pin rx', 'pin ack', 'read post']
assert f.cached()[2] == 1
wl.close()
# XXX commit after current file size -> watch
......@@ -1031,6 +1033,8 @@ def test_wcfs():
# XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
# setupWatch must send pins.
# XXX no reply to pin - killed
# ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment