Commit 95df0951 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 554500a0
...@@ -584,15 +584,19 @@ type WatchLink struct { ...@@ -584,15 +584,19 @@ type WatchLink struct {
id int32 // ID of this /head/watch handle (for debug log) id int32 // ID of this /head/watch handle (for debug log)
head *Head head *Head
// established watchs. // established watches.
// XXX in-progress - where? -> nowhere; here only established watches are added // XXX in-progress - where? -> (XXX no - see vvv) nowhere; here only established watches are added
// XXX -> in-progress here - so that access to new blocks after δFtail
// was queried also send pins.
//
// XXX locking? // XXX locking?
fileTab map[zodb.Oid]*Watch // {} foid -> Watch fileTab map[zodb.Oid]*Watch // {} foid -> Watch
// IO // IO
txMu sync.Mutex reqNext uint64 // stream ID for next wcfs-originated request
rxMu sync.Mutex txMu sync.Mutex
rxTab map[uint64]chan string // client replies go via here rxMu sync.Mutex
rxTab map[/*stream*/uint64]chan string // client replies go via here
} }
// Watch represents watching for changes to 1 BigFile over particular watch link. // Watch represents watching for changes to 1 BigFile over particular watch link.
...@@ -1299,15 +1303,13 @@ retry: ...@@ -1299,15 +1303,13 @@ retry:
} }
// -------- invalidation protocol notification/serving -------- // -------- invalidation protocol notification/serving --------
//
// (see "7.2) for all registered client@at watchers ...")
// pin makes sure that file[blk] on client side is the same as of @rev state. // pin makes sure that file[blk] on client side is the same as of @rev state.
// //
// rev must be ≤ w.at // rev must be ≤ w.at, and there must be no rev_next: rev < rev_next ≤ w.at.
// //
// XXX what is passed here is rev(blk, @head) - we need to consider rev(blk, @w.at)
//
// XXX describe more.
// XXX explain that if rev ≤ .at there is no rev_next: rev < rev_next ≤ at.
// XXX error - when? or close watch on any error? // XXX error - when? or close watch on any error?
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid() foid := w.file.zfile.POid()
...@@ -1322,7 +1324,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1322,7 +1324,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
} }
if w.pinned.Has(blk) { if w.pinned.Has(blk) {
// XXX pinned has to be invalidated when w.at^ // XXX pinned has to be invalidated when w.at
return // already pinned return // already pinned
} }
...@@ -1375,10 +1377,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1375,10 +1377,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return fmt.Errorf("at is too far away back from head/at (%s)", headAt) return fmt.Errorf("at is too far away back from head/at (%s)", headAt)
} }
// TODO register w to f here early, so that READs going in parallel to us
// preparing and processing initial pins, also sends pin for read
// blocks. If we don't we can miss to send pin for a freshly read which
// could have revision > w.at .
w = &Watch{ w = &Watch{
link: wlink, link: wlink,
file: f, file: f,
...@@ -1386,13 +1384,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1386,13 +1384,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
pinned: make(SetI64), pinned: make(SetI64),
} }
// TODO register w to f here early, so that READs going in parallel to
// us preparing and processing initial pins, also send pins for read
// blocks. If we don't, we can miss to send pin for a freshly read
// block which could have revision > w.at .
toPin := map[int64]zodb.Tid{} // blk -> @rev toPin := map[int64]zodb.Tid{} // blk -> @rev
// XXX locking // XXX locking
// pin all tracked file blocks that were changed (at, head] range // pin all tracked file blocks that were changed in (at, head] range
// fhead := tidmin(f.δtail.Head(), headAt)
// for _, δfile := range f.δtail.SliceByRev(at, fhead) {
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) { for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks { for blk := range δfile.Blocks {
_, already := toPin[blk] _, already := toPin[blk]
...@@ -1473,7 +1474,7 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1473,7 +1474,7 @@ func (wlink *WatchLink) _serveRX() (err error) {
return fmt.Errorf("%s", err) return fmt.Errorf("%s", err)
} }
// reply from client to to wcfs // reply from client to wcfs
reply := (stream % 2 == 0) reply := (stream % 2 == 0)
if reply { if reply {
wlink.rxMu.Lock() wlink.rxMu.Lock()
...@@ -1518,7 +1519,7 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1518,7 +1519,7 @@ func (wlink *WatchLink) _serveRX() (err error) {
// sendReq sends wcfs-originated request to client and returns client response. // sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) { func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx // XXX err ctx
stream := uint64(2) // FIXME allocate stream anew as several in-flight sendReq are possible stream := atomic.AddUint64(&wlink.reqNext, +2)
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client) rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
wlink.rxMu.Lock() wlink.rxMu.Lock()
...@@ -1539,7 +1540,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, ...@@ -1539,7 +1540,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
} }
} }
// send sends a message message to client over specified stream ID. // send sends a message to client over specified stream ID.
// //
// Multiple send can be called simultaneously; send serializes writes. // Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error { func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment