Commit b4f4e42b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ebf38926
...@@ -684,3 +684,9 @@ func (root *Root) StatFs() *fuse.StatfsOut { ...@@ -684,3 +684,9 @@ func (root *Root) StatFs() *fuse.StatfsOut {
NameLen: 255, // XXX ok? /proc uses the same NameLen: 255, // XXX ok? /proc uses the same
} }
} }
// ---- misc ----
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
...@@ -1095,7 +1095,8 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1095,7 +1095,8 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
} }
// noone was loading - we became responsible to load this block // noone was loading - we became responsible to load this block
blkdata, treepath, pathRevMax, err := f.zfile.LoadBlk(ctx, blk) // blkdata, treepath, pathRevMax, err := f.zfile.LoadBlk(ctx, blk)
blkdata, treepath, zblkrev, err := f.zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata loading.blkdata = blkdata
loading.err = err loading.err = err
...@@ -1110,7 +1111,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1110,7 +1111,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// we have the data - it can be used after watchers are updated // we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see updateWatcher comments) // XXX should we use ctx here? (see updateWatcher comments)
f.updateWatchers(ctx, blk, treepath, pathRevMax) f.updateWatchers(ctx, blk, treepath, zblkrev)
// data can be used now // data can be used now
close(loading.ready) close(loading.ready)
...@@ -1147,7 +1148,8 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1147,7 +1148,8 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// XXX do we really need to use/propagate caller contex here? ideally update // XXX do we really need to use/propagate caller contex here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout. // watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? // Should a READ interrupt cause watch update failure?
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, pathRevMax zodb.Tid) { //func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, pathRevMax zodb.Tid) {
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblkrev zodb.Tid) {
// only head/ is being watched for // only head/ is being watched for
if f.head.rev != 0 { if f.head.rev != 0 {
return return
...@@ -1159,15 +1161,15 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre ...@@ -1159,15 +1161,15 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
bfdir.δFtail.Track(f, treepath) bfdir.δFtail.Track(f, treepath)
bfdir.δFmu.Unlock() bfdir.δFmu.Unlock()
blkrevmax, _ := f.LastBlkRev(blk, f.head.zconn.At()) blkrev, _ := f.LastBlkRev(blk, f.head.zconn.At())
blkrevmax = tidmin(blkrevmax, pathRevMax) // XXX ^^^ merge in zblkrev
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
for w := range f.watches { for w := range f.watches {
w := w w := w
wg.Go(func() error { wg.Go(func() error {
// XXX close watcher on any error // XXX close watcher on any error
return w.pin(ctx, blk, blkrevmax) return w.pinIfNewer(ctx, blk, blkrev)
}) })
} }
err := wg.Wait() err := wg.Wait()
...@@ -1247,7 +1249,10 @@ retry: ...@@ -1247,7 +1249,10 @@ retry:
// -------- invalidation protocol notification/serving -------- // -------- invalidation protocol notification/serving --------
// pin makes sure that file[blk] on client side is the same as of @rev state. XXX no // pin makes sure that file[blk] on client side is the same as of @rev state.
//
// rev must be ≤ w.at
//
// XXX what is passed here is rev(blk, @head) - we need to consider rev(blk, @w.at) // XXX what is passed here is rev(blk, @head) - we need to consider rev(blk, @w.at)
// //
// XXX describe more. // XXX describe more.
...@@ -1260,8 +1265,9 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1260,8 +1265,9 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// XXX locking? // XXX locking?
// XXX simultaneous calls? // XXX simultaneous calls?
if rev <= w.at { if rev > w.at {
return // client's view already coveris rev panicf("f<%s>: watch%d: pin #%d @%s: watch.at (%s) < rev",
foid, w.link.id, blk, rev, w.at)
} }
if w.pinned.Has(blk) { if w.pinned.Has(blk) {
...@@ -1269,11 +1275,6 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1269,11 +1275,6 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return // already pinned return // already pinned
} }
// FIXME LastBlkRev call not needed here - we get blk rev as argument. - XXX no
// XXX comment
// XXX file.δtail has not full info
rev, _ = w.file.LastBlkRev(blk, w.at)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev)) ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev))
if err != nil { if err != nil {
return err return err
...@@ -1287,6 +1288,28 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1287,6 +1288,28 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return nil return nil
} }
// pinIfNewer make sure that file[blk] on client side stays as of @w.at state.
//
// rev is blk revision as of head. If rev > w.at the block is pinned on client side.
func (w *Watch) pinIfNewer(ctx context.Context, blk int64, rev zodb.Tid) error {
// XXX locking
// the block is already covered by @w.at database view
if rev <= w.at {
return nil
}
// the block is newer - find out its revision as of @w.at and pin to that.
//
// We don't pin to w.at since if we would do so for several clients,
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
rev, _ = w.file.LastBlkRev(blk, w.at)
return w.pin(ctx, blk, rev)
}
// setupWatch sets up a Watch when client sends `watch <file> @<at>` request. // setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
// //
// XXX sends "pin" notifications; final "ok" must be sent by caller. // XXX sends "pin" notifications; final "ok" must be sent by caller.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment