Commit 5d775923 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bf88c4f5
......@@ -364,14 +364,15 @@ package main
// and a client that wants @rev data will get @rev data, even if it was this
// "old" client that triggered the pagefault(~).
//
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
//
//
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
//
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
//
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
import (
......@@ -418,37 +419,39 @@ type Root struct {
// ZODB connections for @<rev>/
zrevMu sync.Mutex
zrevTab map[zodb.Tid]*ZConn
// XXX include?
// // {} rev -> @<rev>/
// mu sync.Mutex
// revTab map[zodb.Tid]*Rev
}
// /bigfile/ - served by BigFileRoot.
type BigFileRoot struct {
// /(head|<rev>)/ - served by Head. XXX separate Rev?
type Head struct {
nodefs.Node
// {} oid -> <bigfileX>/
mu sync.Mutex
tab map[zodb.Oid]*BigFileDir
// bigfile, at, watch, etc - all implicitly linked to by fs
}
// /bigfile/<bigfileX>/ - served by BigFileDir.
type BigFileDir struct {
// /head/watch - served by Watch.
type Watch struct {
nodefs.Node
oid zodb.Oid // oid of ZBigFile
// head/ is implicitly linked to by fs
// {} rev -> @<rev>/ bigfile snapshot
mu sync.Mutex
revTab map[zodb.Tid]*BigFileRev
// TODO
}
// /bigfile/<bigfileX>/(head|<rev>)/ - served by BigFileRev.
type BigFileRev struct {
// /(head|<rev>)/bigfile/ - served by BigFileDir.
type BigFileDir struct {
nodefs.Node
// data, at, invalidations, etc - all implicitly linked to by fs
// {} oid -> <bigfileX>
mu sync.Mutex
tab map[zodb.Oid]*BigFile
}
// /bigfile/<bigfileX>/(head|<rev>)/* - internally served by BigFile.
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
type BigFile struct {
nodefs.Node
// this BigFile views ZODB via zconn
zconn *ZConn
......@@ -463,18 +466,9 @@ type BigFile struct {
// TODO -> δFtail
// lastChange zodb.Tid // last change to whole bigfile as of .zconn.At view
}
// /bigfile/<bigfileX>/(head|<rev>)/data - served by BigFileData.
type BigFileData struct {
nodefs.Node
bigfile *BigFile
// inflight loadings of ZBigFile from ZODB.
// successfull load results are kept here until blkdata is put into OS pagecache.
//
// XXX -> BigFile ?
loadMu sync.Mutex
loading map[int64]*blkLoadState // #blk -> {... blkdata}
......@@ -524,6 +518,7 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
return false
}
/*
// zwatcher watches for ZODB changes.
// see "4) when we receive an invalidation message from ZODB ..."
func (r *Root) zwatcher(ctx context.Context) (err error) {
......@@ -550,7 +545,7 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) {
defer r.zheadMu.Unlock()
//toinvalidate := map[*ZBigFile]SetI64{} // {} zfile -> set(#blk)
toinvalidate := map[*BigFileData]SetI64{} // {} zfile -> set(#blk)
toinvalidate := map[*BigFile]SetI64{} // {} zfile -> set(#blk)
// zevent = (tid^, []oid)
for _, oid := range zevent.Changev {
......@@ -601,7 +596,7 @@ func (r *Root) zhandle1(zevent zodb.WatchEvent) {
// invalidateBlk invalidates 1 file block. XXX
// XXX see "4.4) for all file/blk to in invalidate we do"
func (f *BigFileData) invalidateBlk(ctx context.Context, blk int64) error {
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
fsconn := f.root().fsconn
off := blk*blksize
......@@ -630,20 +625,19 @@ func (f *BigFileData) invalidateBlk(ctx context.Context, blk int64) error {
panic("TODO")
}
*/
// ----------------------------------------
// /bigfile -> Mkdir receives client request to create /bigfile/<bigfileX>.
//
// It creates <bigfileX>/head/* along the way.
func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
inode, err := bfroot.mkdir(name, fctx) // XXX ok to ignore mode?
// /(head/<rev>)/bigfile/ -> Lookup receives client request to create (head|<rev>)/bigfile/<bigfileX>.
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
inode, err := bfdir.lookup(out, name, fctx) // XXX reorder out?
return inode, err2LogStatus(err)
}
func (bfroot *BigFileRoot) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/bigfile: mkdir %q", name)
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/bigfile: lookup %q", name)
oid, err := zodb.ParseOid(name)
if err != nil {
......@@ -651,139 +645,100 @@ func (bfroot *BigFileRoot) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Ino
}
// check to see if dir(oid) is already there
bfroot.mu.Lock()
_, already := bfroot.tab[oid]
bfroot.mu.Unlock()
bfdir.mu.Lock()
f, already := bfdir.tab[oid]
bfdir.mu.Unlock()
if already {
return nil, syscall.EEXIST
// XXX fill out
return f.Inode(), nil
}
// not there - without bfroot lock proceed to open BigFile from ZODB
bf, err := bigopen(asctx(fctx), groot.zhead, oid)
// not there - without bfdir lock proceed to open BigFile from ZODB
f, err = bigopen(asctx(fctx), groot.zhead, oid) // XXX zhead -> head|rev.zconn
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bf.Close()
}
}()
// relock bfroot and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding bfroot.mu
bfroot.mu.Lock()
_, already = bfroot.tab[oid]
// relock bfdir and either register f or, if the file was maybe
// simultanously created while we were not holding bfdir.mu, return that.
bfdir.mu.Lock()
f2, already := bfdir.tab[oid]
if already {
bfroot.mu.Unlock()
return nil, syscall.EEXIST
}
bfdir := &BigFileDir{
Node: nodefs.NewDefaultNode(),
oid: oid,
revTab: make(map[zodb.Tid]*BigFileRev),
}
bfhead := &BigFileRev{
Node: nodefs.NewDefaultNode(),
}
bfdata := &BigFileData{
Node: nodefs.NewDefaultNode(),
bigfile: bf,
loading: make(map[int64]*blkLoadState),
bfdir.mu.Unlock()
f.Close()
return f2.Inode(), nil // XXX fill out
}
bfroot.tab[oid] = bfdir
bfroot.mu.Unlock()
bfdir.tab[oid] = f
bfdir.mu.Unlock()
// mkdir takes filesystem treeLock - do it outside bfroot.mu
mkdir(bfroot, name, bfdir)
mkdir(bfdir, "head", bfhead)
mkfile(bfhead, "data", bfdata)
mkfile(bfhead, "at", NewSmallFile(bf.readAt)) // TODO mtime(at) = tidtime(at)
// XXX mkfile(bh, "invalidations", bh.inv)
// mkfile takes filesystem treeLock - do it outside bfdir.mu
mkfile(bfdir, name, f)
return bfdir.Inode(), nil
// XXX fill out
return f.Inode(), nil
}
// XXX do we need to support rmdir? (probably no)
// XXX do we need to support unlink? (probably no)
// /bigfile/<bigfileX> -> Mkdir receives client request to create @<tid>/.
func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
inode, err := bfdir.mkdir(name, fctx) // XXX ok to ignore mode?
// / -> Mkdir receives client request to create @<rev>/.
func (root *Root) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
inode, err := root.mkdir(name, fctx) // XXX ok to ignore mode?
return inode, err2LogStatus(err)
}
func (bfdir *BigFileDir) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/bigfile/%s: mkdir %q", bfdir.oid, name)
func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/: mkdir %q", name)
var tid zodb.Tid
var rev zodb.Tid
ok := false
if strings.HasPrefix(name, "@") {
tid, err = zodb.ParseTid(name[1:])
rev, err = zodb.ParseTid(name[1:])
ok = (err == nil)
}
if !ok {
return nil, eINVALf("not @tid")
return nil, eINVALf("not @rev")
}
// check to see if dir(tid) is already there
bfdir.mu.Lock()
_, already := bfdir.revTab[tid]
bfdir.mu.Unlock()
// check to see if dir(rev) is already there
root.zrevMu.Lock()
_, already := root.zrevTab[rev]
root.zrevMu.Unlock()
if already {
return nil, syscall.EEXIST
}
// not there - without bfdir lock proceed to open BigFile @tid view of ZODB
// not there - without zrevMu lock proceed to open @rev view of ZODB
ctx := asctx(fctx)
zconnRev, err := groot.zopenAt(ctx, tid)
zconnRev, err := groot.zopenAt(ctx, rev)
if err != nil {
return nil, err
}
defer zconnRev.Release()
defer zconnRev.Release() // XXX ok?
bf, err := bigopen(ctx, zconnRev, bfdir.oid)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bf.Close()
}
}()
// relock bfdir and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding bfroot.mu
bfdir.mu.Lock()
_, already = bfdir.revTab[tid]
// relock root and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding zrevMu.
root.zrevMu.Lock()
_, already = root.zrevTab[rev]
if already {
bfdir.mu.Unlock()
root.zrevMu.Unlock()
return nil, syscall.EEXIST
}
bfrev := &BigFileRev{
revDir := &Head{ // XXX -> Rev ?
Node: nodefs.NewDefaultNode(),
}
revdata := &BigFileData{
Node: nodefs.NewDefaultNode(),
bigfile: bf,
loading: make(map[int64]*blkLoadState),
}
root.zrevTab[rev] = zconnRev
root.zrevMu.Unlock()
bfdir.revTab[tid] = bfrev
bfdir.mu.Unlock()
// mkdir takes filesystem treeLock - do it outside bfroot.mu
mkdir(bfdir, name, bfrev)
mkfile(bfrev, "data", revdata)
// mkdir takes filesystem treeLock - do it outside zrevMu.
mkdir(root, name, revDir)
return bfrev.Inode(), nil
return revDir.Inode(), nil
}
......@@ -844,30 +799,28 @@ func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err e
}
// Close release all resources of BigFile.
func (bf *BigFile) Close() error {
bf.zbf.PDeactivate()
bf.zbf = nil
func (f *BigFile) Close() error {
f.zbf.PDeactivate()
f.zbf = nil
bf.zconn.Release()
bf.zconn = nil
f.zconn.Release()
f.zconn = nil
return nil
}
// /bigfile/<bigfileX>/head/data -> Getattr serves stat.
func (bfdata *BigFileData) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
// XXX locking
bf := bfdata.bigfile
out.Mode = fuse.S_IFREG | 0444
out.Size = uint64(bf.zbfSize)
out.Size = uint64(f.zbfSize)
// .Blocks
// .Blksize
// FIXME lastChange should cover all bigfile data, not only ZBigFile itself
//mtime := &bfdata.lastChange.Time().Time
lastChange := bf.zbf.PSerial()
//mtime := &f.lastChange.Time().Time
lastChange := f.zbf.PSerial()
mtime := lastChange.Time().Time
out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
......@@ -876,17 +829,16 @@ func (bfdata *BigFileData) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Con
}
// /bigfile/<bigfileX>/head/data -> Read serves reading bigfile data.
func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
// XXX locking
bf := bfdata.bigfile
zbf := bf.zbf
zbf := f.zbf
// cap read request to file size
end := off + int64(len(dest)) // XXX overflow?
if end > bf.zbfSize {
end = bf.zbfSize
if end > f.zbfSize {
end = f.zbfSize
}
if end <= off {
// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
......@@ -903,7 +855,7 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(asctx(fctx), bf.zconn.txnCtx)
ctx, cancel := xcontext.Merge(asctx(fctx), f.zconn.txnCtx)
defer cancel()
// read/load all block(s) in parallel
......@@ -914,7 +866,7 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
wg.Go(func() error {
δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, zbf.blksize)
return bfdata.readBlk(ctx, blk, dest[δ:δ+zbf.blksize])
return f.readBlk(ctx, blk, dest[δ:δ+zbf.blksize])
})
}
......@@ -929,23 +881,23 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "6) when we receive a FUSE read(#blk) request ..." in overview.
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte) error {
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// XXX errctx?
// XXX locking
// check if someone else is already loading this block
bfdata.loadMu.Lock()
loading, already := bfdata.loading[blk]
f.loadMu.Lock()
loading, already := f.loading[blk]
if !already {
loading = &blkLoadState{
ready: make(chan struct{}),
}
bfdata.loading[blk] = loading
f.loading[blk] = loading
}
bfdata.loadMu.Unlock()
f.loadMu.Unlock()
// if it is already loading - just wait for it
if already {
......@@ -963,14 +915,14 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
// noone was loading - we became reponsible to load this block
zbf := bfdata.bigfile.zbf
zbf := f.zbf
blkdata, err := zbf.LoadBlk(ctx, blk) // XXX -> +blkrevmax1
loading.blkdata = blkdata
loading.err = err
close(loading.ready)
// XXX before loading.ready?
blkrevmax2, _ := bfdata.bigfile.δFtail.LastRevOf(blk, zbf.PJar().At())
blkrevmax2, _ := f.δFtail.LastRevOf(blk, zbf.PJar().At())
//revmax := min(blkrevmax1, blkrevmax2)
revmax := blkrevmax2
_ = revmax
......@@ -979,7 +931,7 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
// XXX remmapping
// XXX -> own func?
// XXX locking
for _, mapping := range bfdata.mappings {
for _, mapping := range f.mappings {
if revmax <= mapping.at || !mapping.blkrange.in(blk) {
continue // do nothing
}
......@@ -1001,9 +953,9 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
// data loaded with error - cleanup .loading
if loading.err != nil {
bfdata.loadMu.Lock()
delete(bfdata.loading, blk)
bfdata.loadMu.Unlock()
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
......@@ -1028,11 +980,11 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
// XXX locking - invalidation must make sure this workers are finished.
// XXX if direct-io: don't touch pagecache
st := gfsconn.FileNotifyStoreCache(bfdata.Inode(), blk*zbf.blksize, blkdata)
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*zbf.blksize, blkdata)
bfdata.loadMu.Lock()
delete(bfdata.loading, blk)
bfdata.loadMu.Unlock()
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
if st == fuse.OK {
return
......@@ -1137,9 +1089,8 @@ func main() {
// add entries to /
mkfile(root, ".wcfs", NewStaticFile([]byte(zurl)))
mkdir(root, "bigfile", &BigFileRoot{
mkdir(root, "head", &Head{
Node: nodefs.NewDefaultNode(),
tab: make(map[zodb.Oid]*BigFileDir),
})
// TODO handle autoexit
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment