Commit 89ad3a79 authored by Kirill Smelkov's avatar Kirill Smelkov

X Don't keep ZBigFile activated during whole current transaction

If we keep it activated, there could be a problem at Resync time - if
Resync sees that old zhead.At is outside of DB.δtail coverage it will
invalidate all zhead.cache objects, not only changed objects. That means
that even non-changed and being kept active ZBigFile will be invalidated
and oops - PInvalidate will panic.

We could avoid it via deactivating all ZBigFiles on each transaction
update, but that can be too expensive if there are many ZBigFiles.

We could avoid the problem another way - add to ZODB/go API to request
that DB.δtail covers particular connection. That in turn would mean we
have to also extend ZODB/go API to release connection from affecting DB
via such constraint. Even if first step could be done via e.g. another
flag, the second step - release - is not very clear - we already have
connection "release" on transaction completion and adding e.g.
conn.Close() in addition to that would be ambiguous for users.
Also, if wcfs is slow to process invalidations for some reason,
such constraint would mean DB.δtail would ↑ indefinitely.

-> we can solve the problem in another way: don't keep ZBigFile always
activated and just do activation/deactivation as if working with ZODB
objects regularly. This does not add any complications to code flow and
from the performance point of view we can practically avoid the slowdown
by teaching zodbCacheControl to also pin ZBigFile in live cache.
parent 5f7c757e
...@@ -494,13 +494,13 @@ type BigFile struct { ...@@ -494,13 +494,13 @@ type BigFile struct {
// parent's BigFileDir.head is the same. // parent's BigFileDir.head is the same.
head *Head head *Head
// ZBigFile top-level object. Kept activated during lifetime of current transaction. // ZBigFile top-level object
// XXX kill "Kept activated ..."
zbf *ZBigFile zbf *ZBigFile
// zbf.Size(). It is constant during lifetime of current transaction. // things read/computed from .zbf; constant during lifetime of current transaction.
blksize int64 // zbf.blksize blksize int64 // zbf.blksize
zbfSize int64 // zbf.Size() XXX -> .size zbfSize int64 // zbf.Size() XXX -> .size
rev zodb.Tid // last revision that modified file data
// tail change history of this file. // tail change history of this file.
δFtail *ΔTailI64 // [](rev↑, []#blk) δFtail *ΔTailI64 // [](rev↑, []#blk)
...@@ -550,6 +550,12 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool { ...@@ -550,6 +550,12 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
case *btree.LOBTree: case *btree.LOBTree:
case *btree.LOBucket: case *btree.LOBucket:
// for performance reason we also keep ZBigFile in cache.
//
// ZBigFile is top-level object that is used on every block load, and
// it would be a waste to evict ZBigFile state from cache.
case *ZBigFile:
// FIXME we need to keep ZBigFile in cache: even if we keep a pointer // FIXME we need to keep ZBigFile in cache: even if we keep a pointer
// to ZBigFile, but it is allowed to drop its state, it will release // to ZBigFile, but it is allowed to drop its state, it will release
// pointer to LOBTree object and, consequently, that LOBTree object, // pointer to LOBTree object and, consequently, that LOBTree object,
...@@ -557,14 +563,8 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool { ...@@ -557,14 +563,8 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
// go runtime, and the cache will loose its weak reference to it. // go runtime, and the cache will loose its weak reference to it.
// XXX however we cannot protect ZBigFile from releasing state - as // XXX however we cannot protect ZBigFile from releasing state - as
// any object can be explicitly invalidated. // any object can be explicitly invalidated.
// FIXME -> teach zodb.LiveCache to keep object by itself?
// //
// we also keep ZBigFile alive because we want to make sure .blksize // FIXME -> teach zodb.LiveCache to keep object by itself
// and (p. ref) .blktab do not change.
//
// XXX on every resync we deactivate/activate all bigfiles and restat them
// -> for efficiency better keep ZBigFile in live cache.
//case *ZBigFile:
} }
return false return false
...@@ -725,6 +725,7 @@ retry: ...@@ -725,6 +725,7 @@ retry:
// persistent reference) do not change. // persistent reference) do not change.
// XXX shutdown fs with ^^^ message. // XXX shutdown fs with ^^^ message.
panic("ZBigFile changed")
} }
// make sure obj won't be garbage-collected until we finish handling it. // make sure obj won't be garbage-collected until we finish handling it.
...@@ -788,14 +789,7 @@ retry: ...@@ -788,14 +789,7 @@ retry:
// resync .zhead to zδ.tid // resync .zhead to zδ.tid
// XXX -> Head.Resync() ? // XXX -> Head.Resync() ?
// 1. deactivate changed ZBigFile (we keep all ZBigFiles activated during whole txn) // 1. abort old and resync to new txn/at
// XXX dir.fileMu locking (not needed because zconnMu locked)
for file := range toinvalidate {
file.zbf.PDeactivate()
file.zbfSize = -1
}
// 2. abort old and resync to new txn/at
transaction.Current(zhead.txnCtx).Abort() transaction.Current(zhead.txnCtx).Abort()
_, ctx = transaction.New(context.Background()) // XXX bg ok? _, ctx = transaction.New(context.Background()) // XXX bg ok?
err = zhead.Resync(ctx, .Tid) err = zhead.Resync(ctx, .Tid)
...@@ -804,15 +798,10 @@ retry: ...@@ -804,15 +798,10 @@ retry:
} }
zhead.txnCtx = ctx zhead.txnCtx = ctx
// 3. reactivate/restat invalidated ZBigFile // 2. restat invalidated ZBigFile
// XXX -> parallel? // XXX -> parallel?
// XXX locking // XXX locking
for file := range toinvalidate { for file := range toinvalidate {
err := file.zbf.PActivate(ctx)
if err != nil {
panic(err) // XXX
}
zbfSize, treePath, err := file.zbf.Size(ctx) zbfSize, treePath, err := file.zbf.Size(ctx)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
...@@ -820,6 +809,8 @@ retry: ...@@ -820,6 +809,8 @@ retry:
file.zbfSize = zbfSize file.zbfSize = zbfSize
bfdir.indexLooked.Add(file, treePath) bfdir.indexLooked.Add(file, treePath)
file.rev = zhead.At()
} }
// notify .wcfs/zhead // notify .wcfs/zhead
...@@ -842,7 +833,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { ...@@ -842,7 +833,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk) defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)
fsconn := gfsconn fsconn := gfsconn
blksize := f.zbf.blksize blksize := f.blksize
off := blk*blksize off := blk*blksize
var blkdata []byte = nil var blkdata []byte = nil
...@@ -1136,16 +1127,16 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er ...@@ -1136,16 +1127,16 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
return nil, eINVALf("%s is not a ZBigFile", typeOf(xzbf)) return nil, eINVALf("%s is not a ZBigFile", typeOf(xzbf))
} }
// activate ZBigFile and keep it this way // extract blksize, size and initial approximation for file revision
err = zbf.PActivate(ctx) err = zbf.PActivate(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() { blksize := zbf.blksize
if err != nil { // XXX it should be revision of both ZBigFile and its data. But we
// cannot get data revision without expensive scan of all objects under ZBigFile.
rev := zbf.PSerial()
zbf.PDeactivate() zbf.PDeactivate()
}
}()
zbfSize, treePath, err := zbf.Size(ctx) zbfSize, treePath, err := zbf.Size(ctx)
if err != nil { if err != nil {
...@@ -1157,7 +1148,9 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er ...@@ -1157,7 +1148,9 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
fsNode: newFSNode(&fsOptions{Sticky: false}), // XXX + BigFile.OnForget -> del .head.bfdir.fileTab[] fsNode: newFSNode(&fsOptions{Sticky: false}), // XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
head: head, head: head,
zbf: zbf, zbf: zbf,
blksize: blksize,
zbfSize: zbfSize, zbfSize: zbfSize,
rev: rev,
// XXX this is needed only for head/ // XXX this is needed only for head/
δFtail: NewΔTailI64(zconn.At()), δFtail: NewΔTailI64(zconn.At()),
...@@ -1176,7 +1169,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er ...@@ -1176,7 +1169,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
// Close release all resources of BigFile. // Close release all resources of BigFile.
func (f *BigFile) Close() error { func (f *BigFile) Close() error {
f.zbf.PDeactivate() // XXX f.head.zconn must locked // XXX locking?
f.zbf = nil f.zbf = nil
// f.zconn.Release() // f.zconn.Release()
...@@ -1216,11 +1209,7 @@ func (f *BigFile) getattr(out *fuse.Attr) { ...@@ -1216,11 +1209,7 @@ func (f *BigFile) getattr(out *fuse.Attr) {
// .Blocks // .Blocks
// .Blksize // .Blksize
// FIXME lastChange should cover all bigfile data, not only ZBigFile itself mtime := f.rev.Time().Time
// XXX -> f.lastRev (initially serial, later ~ f.δFtail[-1])
lastChange := f.zbf.PSerial() // XXX activate (if zbf becomes not activated during txn)
mtime := lastChange.Time().Time
out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime) out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
} }
...@@ -1240,14 +1229,12 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -1240,14 +1229,12 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
return fuse.ReadResultData(nil), fuse.OK return fuse.ReadResultData(nil), fuse.OK
} }
zbf := f.zbf
// widen read request to be aligned with blksize granularity // widen read request to be aligned with blksize granularity
// (we can load only whole ZBlk* blocks) // (we can load only whole ZBlk* blocks)
aoff := off - (off % zbf.blksize) aoff := off - (off % f.blksize)
aend := end aend := end
if re := end % zbf.blksize; re != 0 { if re := end % f.blksize; re != 0 {
aend += zbf.blksize - re aend += f.blksize - re
} }
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
...@@ -1257,13 +1244,13 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -1257,13 +1244,13 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
// read/load all block(s) in parallel // read/load all block(s) in parallel
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
for blkoff := aoff; blkoff < aend; blkoff += zbf.blksize { for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
blkoff := blkoff blkoff := blkoff
blk := blkoff / zbf.blksize blk := blkoff / f.blksize
wg.Go(func() error { wg.Go(func() error {
δ := blkoff-aoff // blk position in dest δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, zbf.blksize) //log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
return f.readBlk(ctx, blk, dest[δ:δ+zbf.blksize]) return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
}) })
} }
...@@ -1422,7 +1409,6 @@ retry: ...@@ -1422,7 +1409,6 @@ retry:
} }
oid := f.zbf.POid() oid := f.zbf.POid()
blksize := f.zbf.blksize
// signal to zwatcher not to run while we are performing the upload. // signal to zwatcher not to run while we are performing the upload.
// upload with released zconnMu so that zwatcher can lock it even if to // upload with released zconnMu so that zwatcher can lock it even if to
...@@ -1430,7 +1416,7 @@ retry: ...@@ -1430,7 +1416,7 @@ retry:
atomic.AddInt32(&head.inflightOSCacheUploads, +1) atomic.AddInt32(&head.inflightOSCacheUploads, +1)
head.zconnMu.RUnlock() head.zconnMu.RUnlock()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*blksize, loading.blkdata) st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
f.loadMu.Lock() f.loadMu.Lock()
bug := (loading != f.loading[blk]) bug := (loading != f.loading[blk])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment