Commit 9b97e435 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3febc81c
...@@ -540,6 +540,9 @@ type BigFile struct { ...@@ -540,6 +540,9 @@ type BigFile struct {
// XXX mappings where client(s) requested isolation guarantee // XXX mappings where client(s) requested isolation guarantee
//mappings ... XXX -> watchers? //mappings ... XXX -> watchers?
// watchers attached to this file
watchers map[*Watcher]struct{}
} }
// blkLoadState represents a ZBlk load state/result. // blkLoadState represents a ZBlk load state/result.
...@@ -997,6 +1000,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -997,6 +1000,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
if re := end % f.blksize; re != 0 { if re := end % f.blksize; re != 0 {
aend += f.blksize - re aend += f.blksize - re
} }
// XXX use original dest if it can fit the data
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn) // XXX better ctx = transaction.PutIntoContext(ctx, txn)
...@@ -1017,9 +1021,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -1017,9 +1021,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
err := wg.Wait() err := wg.Wait()
if err != nil { if err != nil {
// XXX -> err2LogStatus return nil, err2LogStatus(err)
log.Errorf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
} }
return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
...@@ -1030,9 +1032,9 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -1030,9 +1032,9 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
// see "7) when we receive a FUSE read(#blk) request ..." in overview. // see "7) when we receive a FUSE read(#blk) request ..." in overview.
// //
// len(dest) == blksize. // len(dest) == blksize.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error { // called with head.zconnMu rlocked.
// XXX errctx? func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
// XXX locking defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
// check if someone else is already loading this block // check if someone else is already loading this block
f.loadMu.Lock() f.loadMu.Lock()
...@@ -1053,30 +1055,71 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error { ...@@ -1053,30 +1055,71 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
case <-loading.ready: case <-loading.ready:
if loading.err == nil { if loading.err == nil {
copy(dest, loading.blkdata) copy(dest, loading.blkdata) // XXX copy
} }
return loading.err return loading.err
} }
} }
// noone was loading - we became responsible to load this block // noone was loading - we became responsible to load this block
blkdata, treepath, pathRevMax, err := f.zfile.LoadBlk(ctx, blk)
zfile := f.zfile
blkdata, treepath, pathRevMax, err := zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata loading.blkdata = blkdata
loading.err = err loading.err = err
// data loaded with error - cleanup .loading
if loading.err != nil {
close(loading.ready)
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
// we have the data - it can be used after watchers are updated
f.updateWatchers(blk, treepath, pathRevMax)
// data can be used now
close(loading.ready) close(loading.ready)
copy(dest, blkdata) // XXX copy
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponding page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
//
// XXX if direct-io: don't touch pagecache
go f.uploadBlk(blk, loading)
return nil
}
// only head/ has δbtree index. // updateWatchers complements readBlk and update watchers of the file after a
if f.head.rev == 0 { // block was loaded from ZODB and before block data is returned to kernel.
//
// see "7.2) for all registered client@at watchers ..."
func (f *BigFile) updateWatchers(blk int64, treepath []zodb.IPersistent, pathRevMax zodb.Tid) {
// only head/ is being watched for
if f.head.rev != 0 {
return
}
// update δbtree index
bfdir := f.head.bfdir bfdir := f.head.bfdir
bfdir.indexMu.Lock() // XXX locking correct? bfdir.indexMu.Lock() // XXX locking correct?
bfdir.indexLooked.Add(f, treepath) bfdir.indexLooked.Add(f, treepath)
bfdir.indexMu.Unlock() bfdir.indexMu.Unlock()
}
// XXX before loading.ready? blkrevmax, _ := f.δFtail.LastRevOf(blk, f.zfile.PJar().At()) // XXX = f.head.zconn.At()
blkrevmax, _ := f.δFtail.LastRevOf(blk, zfile.PJar().At())
blkrevmax = tidmin(blkrevmax, pathRevMax) blkrevmax = tidmin(blkrevmax, pathRevMax)
/* /*
...@@ -1101,38 +1144,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error { ...@@ -1101,38 +1144,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
} }
*/ */
// data loaded with error - cleanup .loading
if loading.err != nil {
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
// data loaded ok
copy(dest, blkdata)
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponding page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
//
// XXX if direct-io: don't touch pagecache
go f.uploadBlk(blk, loading)
return nil
} }
// uploadBlk complements readBlk and uploads loaded blkdata into OS cache. // uploadBlk complements readBlk and uploads loaded blkdata into OS cache.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment