Commit 6b6f98c5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 08da1d81
......@@ -1101,7 +1101,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
}
// noone was loading - we became responsible to load this block
blkdata, treepath, zblkrev, err := f.zfile.LoadBlk(ctx, blk)
blkdata, treepath, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
......@@ -1116,7 +1116,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see updateWatcher comments)
f.updateWatchers(ctx, blk, treepath, zblkrev)
f.updateWatchers(ctx, blk, treepath, blkrevMax)
// data can be used now
close(loading.ready)
......@@ -1153,8 +1153,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// XXX do we really need to use/propagate caller contex here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
//func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, pathRevMax zodb.Tid) {
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblkrev zodb.Tid) {
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, blkrevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
......@@ -1166,15 +1165,48 @@ func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btre
bfdir.δFtail.Track(f, treepath) // XXX pass in zblk.oid / zblk.rev here?
bfdir.δFmu.Unlock()
blkrev, _ := f.LastBlkRev(blk, f.head.zconn.At())
// XXX ^^^ merge in zblkrev
// makes sure that file[blk] on clients side stays as of @w.at state.
// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
// This is likely to be the case, since most watchers should be usually close to head.
// If using blkrevMax only turns out to be not sufficient, we'll
// consult δFtail, which might involve recomputing it.
blkrev := blkrevMax
blkrevRough := true
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watches {
w := w
// XXX locking
// the block is already covered by @w.at database view
if blkrev <= w.at {
continue
}
// if blkrev is rough estimation and that upper bound is > w.at
// we have to recompute ~exact file[blk] revision @head.
if blkrevRough {
blkrev, _ = f.LastBlkRev(blk, f.head.zconn.At())
blkrevRough = false
if blkrev <= w.at {
continue
}
}
// the block is newer - find out its revision as of @w.at and pin to that.
//
// We don't pin to w.at since if we would do so for several clients,
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := w.file.LastBlkRev(blk, w.at) // XXX move into go?
wg.Go(func() error {
// XXX close watcher on any error
return w.pinIfNewer(ctx, blk, blkrev)
return w.pin(ctx, blk, pinrev)
})
}
err := wg.Wait()
......@@ -1293,28 +1325,6 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return nil
}
// pinIfNewer makes sure that file[blk] on client side stays as of @w.at state.
//
// rev is blk revision as of head. If rev > w.at the block is pinned on client side.
func (w *Watch) pinIfNewer(ctx context.Context, blk int64, rev zodb.Tid) error {
// XXX locking
// the block is already covered by @w.at database view
if rev <= w.at {
return nil
}
// the block is newer - find out its revision as of @w.at and pin to that.
//
// We don't pin to w.at since if we would do so for several clients,
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
rev, _ = w.file.LastBlkRev(blk, w.at)
return w.pin(ctx, blk, rev)
}
// setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
......
......@@ -445,9 +445,10 @@ func (bf *zBigFileState) PySetState(pystate interface{}) (err error) {
//
// - BTree path in .blktab for loaded block,
// - max(_.serial for _ in ZBlk(#blk), all BTree/Bucket that lead to ZBlk)
// which provides a rough upper-bound estimate for file[blk] revision.
//
// XXX load into user-provided buf.
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, pathRevMax zodb.Tid, err error) {
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath []btree.LONode, blkRevMax zodb.Tid, err error) {
defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk)
err = bf.PActivate(ctx)
......@@ -456,16 +457,16 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
}
defer bf.PDeactivate()
pathRevMax = 0
blkRevMax = 0
xzblk, ok, err := bf.blktab.VGet(ctx, blk, func(node btree.LONode) {
treePath = append(treePath, node)
pathRevMax = tidmax(pathRevMax, node.PSerial())
blkRevMax = tidmax(blkRevMax, node.PSerial())
})
if err != nil {
return nil, nil, 0, err
}
if !ok {
return make([]byte, bf.blksize), treePath, pathRevMax, nil
return make([]byte, bf.blksize), treePath, blkRevMax, nil
}
zblk, ok := xzblk.(zBlk)
......@@ -477,7 +478,7 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
if err != nil {
return nil, nil, 0, err
}
pathRevMax = tidmax(pathRevMax, zblkrev)
blkRevMax = tidmax(blkRevMax, zblkrev)
l := int64(len(blkdata))
if l > bf.blksize {
......@@ -494,7 +495,7 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
zblk.bindZFile(bf, blk)
//log.Printf("ZBigFile.loadblk(%d) -> %dB", blk, len(blkdata))
return blkdata, treePath, pathRevMax, nil
return blkdata, treePath, blkRevMax, nil
}
// Size returns whole file size.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment