Commit 13c24a35 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 85d86a32
......@@ -1194,86 +1194,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
return nil
}
// updateWatchers complements readBlk: it updates watchers of the file after a
// block was loaded from ZODB and before block data is returned to kernel.
//
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
// XXX -> pinWatchers? pinOnRead?
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
}
fmt.Printf("S: read #%d -> update watchers (#%d)\n", blk, len(f.watchTab))
// update δFtail index
bfdir := f.head.bfdir
bfdir.δFmu.Lock() // XXX locking correct? XXX -> better push down?
bfdir.δFtail.Track(f, blk, treepath, zblk) // XXX pass in zblk.rev here?
bfdir.δFmu.Unlock()
// makes sure that file[blk] on clients side stays as of @w.at state.
// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
// This is likely to be the case, since most watchers should be usually close to head.
// If using blkrevMax only turns out to be not sufficient, we'll
// consult δFtail, which might involve recomputing it.
blkrev := blkrevMax
blkrevRough := true
// XXX locking (f.watchTab)
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watchTab {
w := w
fmt.Printf("S: read -> update watchers: w @%s\n", w.at)
// XXX locking (w)
// the block is already covered by @w.at database view
if blkrev <= w.at {
continue
}
// if blkrev is rough estimation and that upper bound is > w.at
// we have to recompute ~exact file[blk] revision @head.
if blkrevRough {
blkrev, _ = f.LastBlkRev(ctx, blk, f.head.zconn.At())
blkrevRough = false
// XXX w.at could be only ↑ ? if not - locking is more complex
if blkrev <= w.at {
continue
}
}
// the block is newer - find out its revision as of @w.at and pin to that.
//
// We don't pin to w.at since if we would do so for several clients,
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at) // XXX move into go?
wg.Go(func() error {
// XXX close watcher on any error
return w.pin(ctx, blk, pinrev)
})
}
err := wg.Wait()
if err != nil {
panic(err) // XXX
}
}
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head
......@@ -1403,6 +1323,86 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return nil
}
// updateWatchers complements readBlk: it updates watchers of the file after a
// block was loaded from ZODB and before block data is returned to kernel.
//
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
// XXX -> pinWatchers? pinOnRead?
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
}
fmt.Printf("S: read #%d -> update watchers (#%d)\n", blk, len(f.watchTab))
// update δFtail index
bfdir := f.head.bfdir
bfdir.δFmu.Lock() // XXX locking correct? XXX -> better push down?
bfdir.δFtail.Track(f, blk, treepath, zblk) // XXX pass in zblk.rev here?
bfdir.δFmu.Unlock()
// makes sure that file[blk] on clients side stays as of @w.at state.
// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
// This is likely to be the case, since most watchers should be usually close to head.
// If using blkrevMax only turns out to be not sufficient, we'll
// consult δFtail, which might involve recomputing it.
blkrev := blkrevMax
blkrevRough := true
// XXX locking (f.watchTab)
wg, ctx := errgroup.WithContext(ctx)
for w := range f.watchTab {
w := w
fmt.Printf("S: read -> update watchers: w @%s\n", w.at)
// XXX locking (w)
// the block is already covered by @w.at database view
if blkrev <= w.at {
continue
}
// if blkrev is rough estimation and that upper bound is > w.at
// we have to recompute ~exact file[blk] revision @head.
if blkrevRough {
blkrev, _ = f.LastBlkRev(ctx, blk, f.head.zconn.At())
blkrevRough = false
// XXX w.at could be only ↑ ? if not - locking is more complex
if blkrev <= w.at {
continue
}
}
// the block is newer - find out its revision as of @w.at and pin to that.
//
// We don't pin to w.at since if we would do so for several clients,
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at) // XXX move into go?
wg.Go(func() error {
// XXX close watcher on any error
return w.pin(ctx, blk, pinrev)
})
}
err := wg.Wait()
if err != nil {
panic(err) // XXX
}
}
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment