Commit 0e6afd21 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9c3a2154
......@@ -491,7 +491,7 @@ type Head struct {
// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
// with additional locking protocol to avoid deadlocks (see below for
// pauseOSCacheUpload + ...).
zconnMu sync.RWMutex
zconnMu sync.RWMutex // XXX -> zheadMu ?
zconn *ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
......@@ -505,7 +505,6 @@ type Head struct {
// head/watch opens
// XXX protected by ... head.zconnMu ?
// XXX -> watchTab?
wlinkTab map[*WatchLink]struct{}
}
......@@ -1313,28 +1312,34 @@ retry:
// pin makes sure that file[blk] on client side is the same as of @rev state.
//
// rev must be ≤ w.at, and there must be no rev_next: rev < rev_next ≤ w.at.
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// XXX error - when? or close watch on any error?
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid()
defer xerr.Contextf(&err, "f<%s>: watch%d: pin #%d @%s", foid, w.link.id, blk, rev)
revstr := rev.String()
if rev == zodb.TidMax {
revstr = "head"
}
defer xerr.Contextf(&err, "f<%s>: wlink%d: pin #%d @%s", foid, w.link.id, blk, revstr)
// XXX locking?
// XXX simultaneous calls?
// XXX rev=inf -> unpin to head
if rev > w.at {
panicf("f<%s>: watch%d: pin #%d @%s: watch.at (%s) < rev",
if !(rev == zodb.TidMax || rev <= w.at) {
panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
foid, w.link.id, blk, rev, w.at)
}
if w.pinned[blk] == rev {
// XXX pinned has to be invalidated when w.at↑
return // already pinned
if rev == zodb.TidMax {
panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
}
return // already pinned XXX for simultaneous calls?
}
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev))
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
if err != nil {
return err
}
......@@ -1343,7 +1348,12 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return fmt.Errorf("expect %q; got %q", "ack", ack)
}
if rev == zodb.TidMax {
delete(w.pinned, blk)
} else {
w.pinned[blk] = rev
}
return nil
}
......@@ -1402,12 +1412,12 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// blocks. If we don't, we can miss to send pin for a freshly read
// block which could have revision > w.at .
toPin := map[int64]zodb.Tid{} // blk -> @rev
// pin all tracked file blocks that were changed in (at, head] range.
// XXX locking
// pin all tracked file blocks that were changed in (at, head] range
// XXX blk in w.pinned; blk not in δ -> unpin to head
toPin := map[int64]zodb.Tid{} // blk -> @rev
f := w.file
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks {
......@@ -1416,11 +1426,20 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
continue
}
//toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
pinrev, _ := f.LastBlkRev(ctx, blk, at)
if w.pinned[blk] != pinrev { // XXX locking
toPin[blk] = pinrev
toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
}
}
// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
for blk, pinPrev := range w.pinned { // XXX locking
pinNew, pinning := toPin[blk]
if !pinning {
toPin[blk] = zodb.TidMax // @head
}
// don't bother to spawn .pin goroutines if pin revision is the same
if pinPrev == pinNew {
delete(toPin, blk)
}
}
......
......@@ -596,7 +596,7 @@ class tSrvReq:
# _pinAt returns which blocks needs to be pinned for zf@at.
#
# it does not take into account whether blocks are in cache or not and computes
# pin from all changes.
# pin from all changes. XXX desired behaviour?
@func(tWatch)
def _pinAt(w, zf, at): # -> pin = {} blk -> rev
t = w.tdb
......@@ -850,7 +850,7 @@ def test_wcfs():
# XXX both from scratch and going e.g. at1 -> at2 -> at3
# XXX going not only up, but also down at1 <- at2 <- at3 ?
# XXX going not only up, but also down at1 <- at2 <- at3 ? -> forbid
# XXX 2 (or more) opened watch for 1 file at the same time
# XXX watch for 2 files via single watch open
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment