Commit b3c92399 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 83f50497
......@@ -475,9 +475,6 @@ type BigFile struct {
// tail change history of this file.
δFtail *ΔTailI64 // [](rev↑, []#blk)
// TODO -> δFtail
// lastChange zodb.Tid // last change to whole bigfile as of .zconn.At view
// inflight loadings of ZBigFile from ZODB.
// successfull load results are kept here until blkdata is put into OS pagecache.
loadMu sync.Mutex
......@@ -531,7 +528,9 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
//
// we also keep ZBigFile alive because we want to make sure .blksize
// and (p. ref) .blktab do not change.
// XXX do we really need to keep ZBigFile alive for that?
//
// XXX on every resynce we deactivate/activate all bigfiles and restat them
// -> for efficiency better keep ZBigFile in live cache.
//case *ZBigFile:
}
......@@ -547,6 +546,7 @@ func traceWatch(format string, argv ...interface{}) {
}
// zwatcher watches for ZODB changes.
//
// see "4) when we receive an invalidation message from ZODB ..."
func (root *Root) zwatcher(ctx context.Context) (err error) {
defer xerr.Contextf(&err, "zwatch") // XXX more in context?
......@@ -555,8 +555,9 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
traceWatch(">>>")
zwatchq := make(chan zodb.CommitEvent)
root.zstor.AddWatch(zwatchq)
at0 := root.zstor.AddWatch(zwatchq) // XXX -> to main thread to avoid race
defer root.zstor.DelWatch(zwatchq)
_ = at0 // XXX
var zevent zodb.CommitEvent
var ok bool
......@@ -578,12 +579,12 @@ func (root *Root) zwatcher(ctx context.Context) (err error) {
traceWatch("zevent: %s", zevent)
}
root.zhandle1(zevent)
root.zδhandle1(zevent)
}
}
// zhandle1 handles 1 event from ZODB notification.
func (root *Root) zhandle1(zevent zodb.CommitEvent) {
// zδhandle1 handles 1 change event from ZODB notification.
func (root *Root) zδhandle1(zevent zodb.CommitEvent) {
// while we are invalidating OS cache, make sure that nothing, that
// even reads /head/bigfile/*, is running (see 4.6).
root.head.zconnMu.Lock()
......@@ -594,7 +595,7 @@ func (root *Root) zhandle1(zevent zodb.CommitEvent) {
toinvalidate := map[*BigFile]SetI64{} // {} file -> set(#blk)
// zevent = (tid^, []oid)
// zevent = (tid, []oid)
for _, oid := range zevent.Changev {
// XXX zhead.Cache() lock/unlock + comment it is not really needed
obj := zhead.Cache().Get(oid)
......@@ -647,22 +648,52 @@ func (root *Root) zhandle1(zevent zodb.CommitEvent) {
runtime.KeepAlive(obj)
}
//wg = ...
ctx := context.TODO()
wg, ctx := errgroup.WithContext(context.TODO())
for file, blkmap := range toinvalidate {
for blk := range blkmap {
go file.invalidateBlk(ctx, blk) // XXX -> wg.Go
wg.Go(func() error {
return file.invalidateBlk(ctx, blk)
})
}
}
err := wg.Wait()
if err != nil {
panic(err) // XXX
}
// resync .zhead to zevent.tid
// XXX -> Head.Resync()
// XXX zbf deacticate/activate
// XXX -> Head.Resync() ?
// 1. deactivate all ZBigFile (we keep them activated during whole txn)
// XXX dir.mu locking (not needed bcause zconnMu locked)
for _, file := range bfdir.fileTab {
file.zbf.PDeactivate()
file.zbfSize = -1 // just in case
}
// 2. abort old and resync to new txn/at
transaction.Current(zhead.txnCtx).Abort()
txn, ctx := transaction.New(context.Background()) // XXX bg ok?
zhead.Resync(txn, zevent.Tid)
zhead.txnCtx = ctx
// 3. reactivate/restat all ZBigFile
// XXX -> parallel?
// XXX locking
for _, file := range bfdir.fileTab {
err := file.zbf.PActivate(ctx)
if err != nil {
panic(err) // XXX
}
zbfSize, err := file.zbf.Size(ctx)
if err != nil {
panic(err) // XXX
}
file.zbfSize = zbfSize
}
// notify .wcfs/zhead
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", zevent.Tid)
......@@ -676,9 +707,9 @@ func (root *Root) zhandle1(zevent zodb.CommitEvent) {
// invalidateBlk invalidates 1 file block. XXX
//
// called with f.head.zconnMu wlocked.
// see "4.4) for all file/blk to in invalidate we do"
//
// XXX see "4.4) for all file/blk to in invalidate we do"
// called with f.head.zconnMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
fsconn := gfsconn
blksize := f.zbf.blksize
......@@ -825,7 +856,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
return f, nil
}
// XXX do we need to support unlink? (probably no)
// XXX do we need to support unlink? -> no, @<revX>/ are automatically garbage-collected.
// / -> Mkdir receives client request to create @<rev>/.
//
......@@ -979,7 +1010,9 @@ func (f *BigFile) Close() error {
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
// XXX locking
f.head.zconnMu.RLock()
defer f.head.zconnMu.RUnlock()
f.getattr(out)
return fuse.OK
}
......@@ -991,8 +1024,8 @@ func (f *BigFile) getattr(out *fuse.Attr) {
// .Blksize
// FIXME lastChange should cover all bigfile data, not only ZBigFile itself
//mtime := &f.lastChange.Time().Time
lastChange := f.zbf.PSerial()
//mtime := f.δFtail[-1] || zbf.PSerial?
lastChange := f.zbf.PSerial() // XXX activate (if zbf becomes not activated during txn)
mtime := lastChange.Time().Time
out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment