Commit 3154ae82 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a5304b31
...@@ -607,7 +607,10 @@ type Watch struct { ...@@ -607,7 +607,10 @@ type Watch struct {
// XXX locking // XXX locking
at zodb.Tid // requested to be watched @at at zodb.Tid // requested to be watched @at
pinned SetI64 // blocks that are already pinned to be ≤ at // pinned SetI64 // blocks that are already pinned to be ≤ at
// {} blk -> rev
pinned map[int64]zodb.Tid // blocks that are already pinned to be ≤ at
} }
// -------- 3) Cache invariant -------- // -------- 3) Cache invariant --------
...@@ -1318,13 +1321,14 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1318,13 +1321,14 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// XXX locking? // XXX locking?
// XXX simultaneous calls? // XXX simultaneous calls?
// XXX rev=inf -> unpin to head
if rev > w.at { if rev > w.at {
panicf("f<%s>: watch%d: pin #%d @%s: watch.at (%s) < rev", panicf("f<%s>: watch%d: pin #%d @%s: watch.at (%s) < rev",
foid, w.link.id, blk, rev, w.at) foid, w.link.id, blk, rev, w.at)
} }
if w.pinned.Has(blk) { if w.pinned[blk] == rev {
// XXX pinned has to be invalidated when w.at↑ // XXX pinned has to be invalidated when w.at↑
return // already pinned return // already pinned
} }
...@@ -1338,7 +1342,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1338,7 +1342,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return fmt.Errorf("expect %q; got %q", "ack", ack) return fmt.Errorf("expect %q; got %q", "ack", ack)
} }
w.pinned.Add(blk) w.pinned[blk] = rev
return nil return nil
} }
...@@ -1382,7 +1386,8 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1382,7 +1386,8 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
link: wlink, link: wlink,
file: f, file: f,
at: at, at: at,
pinned: make(SetI64), //pinned: make(SetI64),
pinned: make(map[int64]zodb.Tid),
} }
// TODO register w to f here early, so that READs going in parallel to // TODO register w to f here early, so that READs going in parallel to
...@@ -1402,7 +1407,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1402,7 +1407,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
continue continue
} }
toPin[blk], _ = f.LastBlkRev(ctx, blk, at) //toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
pinrev, _ := f.LastBlkRev(ctx, blk, at)
if w.pinned[blk] != pinrev { // XXX locking
toPin[blk] = pinrev
}
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment