Commit 600167f7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 314dbb0d
......@@ -553,7 +553,7 @@ type blkLoadState struct {
err error
}
// ----------------------------------------
// -------- 3) ZODB invariant --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
......@@ -594,6 +594,8 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic
return 0
}
// -------- 4) ZODB invalidation -> OS cache --------
func traceZWatch(format string, argv ...interface{}) {
if !log.V(1) { // XXX -> 2?
return
......@@ -971,9 +973,239 @@ func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release fun
return xfrev.Node().(*BigFile), func() { f.Close() }, nil
}
// ----------------------------------------
// -------- 7) FUSE read(#blk) --------
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
f.head.zconnMu.RLock()
defer f.head.zconnMu.RUnlock()
// cap read request to file size
end := off + int64(len(dest)) // XXX overflow?
if end > f.size {
end = f.size
}
if end <= off {
// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
return fuse.ReadResultData(nil), fuse.OK
}
// widen read request to be aligned with blksize granularity
// (we can load only whole ZBlk* blocks)
aoff := off - (off % f.blksize)
aend := end
if re := end % f.blksize; re != 0 {
aend += f.blksize - re
}
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(asctx(fctx), f.head.zconn.txnCtx)
defer cancel()
// read/load all block(s) in parallel
wg, ctx := errgroup.WithContext(ctx)
for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
blkoff := blkoff
blk := blkoff / f.blksize
wg.Go(func() error {
δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
})
}
err := wg.Wait()
if err != nil {
// XXX -> err2LogStatus
log.Errorf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
}
return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}
// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// XXX errctx?
// XXX locking
// check if someone else is already loading this block
f.loadMu.Lock()
loading, already := f.loading[blk]
if !already {
loading = &blkLoadState{
ready: make(chan struct{}),
}
f.loading[blk] = loading
}
f.loadMu.Unlock()
// if it is already loading - just wait for it
if already {
select {
case <-ctx.Done():
return ctx.Err()
case <-loading.ready:
if loading.err == nil {
copy(dest, loading.blkdata)
}
return loading.err
}
}
// noone was loading - we became responsible to load this block
zfile := f.zfile
blkdata, treepath, pathRevMax, err := zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
close(loading.ready)
// only head/ has δbtree index.
if f.head.rev == 0 {
bfdir := f.head.bfdir
bfdir.indexMu.Lock() // XXX locking correct?
bfdir.indexLooked.Add(f, treepath)
bfdir.indexMu.Unlock()
}
// XXX before loading.ready?
blkrevmax, _ := f.δFtail.LastRevOf(blk, zfile.PJar().At())
blkrevmax = tidmin(blkrevmax, pathRevMax)
/*
// XXX remmapping - only if head.rev == 0
// XXX -> own func?
// XXX locking
for _, mapping := range f.mappings {
if revmax <= mapping.at || !mapping.blkrange.in(blk) {
continue // do nothing
}
if mapping.pinned.Contains(blk) {
continue // do nothing
}
rev = max(δFtail.by(blk) : _ <= mapping.at)
// XXX vvv -> go
client.remmap(mapping.addr[blk], file/@<rev>/data)
mapping.pinned.Add(blk)
}
*/
// data loaded with error - cleanup .loading
if loading.err != nil {
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
// data loaded ok
copy(dest, blkdata)
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponding page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
//
// XXX if direct-io: don't touch pagecache
go f.uploadBlk(blk, loading)
return nil
}
// uploadBlk complements readBlk and uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head
// rlock zconnMu and make sure zwatcher is not asking us to pause.
// if it does - wait for a safer time not to deadlock.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
for {
head.zconnMu.RLock()
// help zwatcher if it asks us to pause uploadings, so it can
// take zconnMu wlocked without deadlocks.
if head.pauseOSCacheUpload {
ready := head.continueOSCacheUpload
head.zconnMu.RUnlock()
<-ready
continue retry
}
break
}
// zwatcher is not currently trying to pause OS cache uploads.
// check if this block was already invalidated by zwatcher.
// if so don't upload the block into OS cache.
f.loadMu.Lock()
loading_ := f.loading[blk]
f.loadMu.Unlock()
if loading != loading_ {
head.zconnMu.RUnlock()
return
}
oid := f.zfile.POid()
// signal to zwatcher not to run while we are performing the upload.
// upload with released zconnMu so that zwatcher can lock it even if to
// check inflightOSCacheUploads status.
atomic.AddInt32(&head.inflightOSCacheUploads, +1)
head.zconnMu.RUnlock()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
f.loadMu.Lock()
bug := (loading != f.loading[blk])
if !bug {
delete(f.loading, blk)
}
f.loadMu.Unlock()
// signal to zwatcher that we are done and it can continue.
atomic.AddInt32(&head.inflightOSCacheUploads, -1)
if bug {
panic(fmt.Sprintf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk))
}
if st == fuse.OK {
return
}
// pagecache update failed, but it must not (we verified on startup that
// pagecache control is supported by kernel). We can correctly live on
// with the error, but data access will be likely very slow. Tell user
// about the problem.
log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}
// XXX Move Read here
// ----------------------------------------
......@@ -1248,237 +1480,6 @@ func (f *BigFile) getattr(out *fuse.Attr) {
}
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
f.head.zconnMu.RLock()
defer f.head.zconnMu.RUnlock()
// cap read request to file size
end := off + int64(len(dest)) // XXX overflow?
if end > f.size {
end = f.size
}
if end <= off {
// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
return fuse.ReadResultData(nil), fuse.OK
}
// widen read request to be aligned with blksize granularity
// (we can load only whole ZBlk* blocks)
aoff := off - (off % f.blksize)
aend := end
if re := end % f.blksize; re != 0 {
aend += f.blksize - re
}
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(asctx(fctx), f.head.zconn.txnCtx)
defer cancel()
// read/load all block(s) in parallel
wg, ctx := errgroup.WithContext(ctx)
for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
blkoff := blkoff
blk := blkoff / f.blksize
wg.Go(func() error {
δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
})
}
err := wg.Wait()
if err != nil {
// XXX -> err2LogStatus
log.Errorf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
}
return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}
// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// XXX errctx?
// XXX locking
// check if someone else is already loading this block
f.loadMu.Lock()
loading, already := f.loading[blk]
if !already {
loading = &blkLoadState{
ready: make(chan struct{}),
}
f.loading[blk] = loading
}
f.loadMu.Unlock()
// if it is already loading - just wait for it
if already {
select {
case <-ctx.Done():
return ctx.Err()
case <-loading.ready:
if loading.err == nil {
copy(dest, loading.blkdata)
}
return loading.err
}
}
// noone was loading - we became responsible to load this block
zfile := f.zfile
blkdata, treepath, pathRevMax, err := zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
close(loading.ready)
// only head/ has δbtree index.
if f.head.rev == 0 {
bfdir := f.head.bfdir
bfdir.indexMu.Lock() // XXX locking correct?
bfdir.indexLooked.Add(f, treepath)
bfdir.indexMu.Unlock()
}
// XXX before loading.ready?
blkrevmax, _ := f.δFtail.LastRevOf(blk, zfile.PJar().At())
blkrevmax = tidmin(blkrevmax, pathRevMax)
/*
// XXX remmapping - only if head.rev == 0
// XXX -> own func?
// XXX locking
for _, mapping := range f.mappings {
if revmax <= mapping.at || !mapping.blkrange.in(blk) {
continue // do nothing
}
if mapping.pinned.Contains(blk) {
continue // do nothing
}
rev = max(δFtail.by(blk) : _ <= mapping.at)
// XXX vvv -> go
client.remmap(mapping.addr[blk], file/@<rev>/data)
mapping.pinned.Add(blk)
}
*/
// data loaded with error - cleanup .loading
if loading.err != nil {
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
// data loaded ok
copy(dest, blkdata)
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponding page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
//
// XXX if direct-io: don't touch pagecache
go f.uploadBlk(blk, loading)
return nil
}
// uploadBlk complements readBlk and uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head
// rlock zconnMu and make sure zwatcher is not asking us to pause.
// if it does - wait for a safer time not to deadlock.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
for {
head.zconnMu.RLock()
// help zwatcher if it asks us to pause uploadings, so it can
// take zconnMu wlocked without deadlocks.
if head.pauseOSCacheUpload {
ready := head.continueOSCacheUpload
head.zconnMu.RUnlock()
<-ready
continue retry
}
break
}
// zwatcher is not currently trying to pause OS cache uploads.
// check if this block was already invalidated by zwatcher.
// if so don't upload the block into OS cache.
f.loadMu.Lock()
loading_ := f.loading[blk]
f.loadMu.Unlock()
if loading != loading_ {
head.zconnMu.RUnlock()
return
}
oid := f.zfile.POid()
// signal to zwatcher not to run while we are performing the upload.
// upload with released zconnMu so that zwatcher can lock it even if to
// check inflightOSCacheUploads status.
atomic.AddInt32(&head.inflightOSCacheUploads, +1)
head.zconnMu.RUnlock()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
f.loadMu.Lock()
bug := (loading != f.loading[blk])
if !bug {
delete(f.loading, blk)
}
f.loadMu.Unlock()
// signal to zwatcher that we are done and it can continue.
atomic.AddInt32(&head.inflightOSCacheUploads, -1)
if bug {
panic(fmt.Sprintf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk))
}
if st == fuse.OK {
return
}
// pagecache update failed, but it must not (we verified on startup that
// pagecache control is supported by kernel). We can correctly live on
// with the error, but data access will be likely very slow. Tell user
// about the problem.
log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}
// FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment