Commit 0e0b766d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 394de65d
......@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"math"
"sync/atomic"
"syscall"
log "github.com/golang/glog"
......@@ -100,7 +101,7 @@ func asctx(fctx *fuse.Context) context.Context {
}
// defaultNode should be used instead of nodefs.DefaultNode in wcfs.
// fsNode should be used instead of nodefs.DefaultNode in wcfs.
//
// nodefs.DefaultNode.Open returns ENOSYS. This is convenient for filesystems
// that have no dynamic files at all. But for filesystems, where there are some
......@@ -111,25 +112,53 @@ func asctx(fctx *fuse.Context) context.Context {
// In wcfs we have dynamic files (e.g. /head/watch) and this way we have to
// avoid returning ENOSYS on nodes, that do not need file handles.
//
// defaultNode is like nodefs.defaultNode, but Open returns to kernel fh=0 and
// fsNode is like nodefs.defaultNode, but Open returns to kernel fh=0 and
// FOPEN_KEEP_CACHE - similarly how openless case is handled there.
type defaultNode struct{
type fsNode struct{
nodefs.Node
// cache for path
// we don't use hardlinks / don't want to pay locks + traversal price every time.
xpath atomic.Value
}
func (n *defaultNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
func (n *fsNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
return &nodefs.WithFlags{
File: nil,
FuseFlags: fuse.FOPEN_KEEP_CACHE,
}, fuse.OK
}
func newDefaultNode() nodefs.Node {
return &defaultNode{
func newFSNode() fsNode { // NOTE not pointer
return fsNode{
Node: nodefs.NewDefaultNode(),
}
}
// path returns node path in its filesystem.
func (n *fsNode) path() string {
xpath := n.xpath.Load()
if xpath != nil {
return xpath.(string)
}
// slow part - let's construct the path and remember it
path := ""
inode := n.Inode()
for {
var name string
inode, name = inode.Parent()
if inode == nil {
break
}
path = "/" + name + path
}
n.xpath.Store(path)
return path
}
// NewStaticFile creates nodefs.Node for file with static data.
func NewStaticFile(data []byte) *SmallFile {
......@@ -140,7 +169,7 @@ func NewStaticFile(data []byte) *SmallFile {
// SmallFile is a nodefs.Node for file with potentially dynamic, but always small, data.
type SmallFile struct {
nodefs.Node
fsNode
fuseFlags uint32 // fuse.FOPEN_*
// readData gives whole file data
......@@ -149,7 +178,7 @@ type SmallFile struct {
func newSmallFile(readData func() []byte, fuseFlags uint32) *SmallFile {
return &SmallFile{
Node: newDefaultNode(),
fsNode: newFSNode(),
fuseFlags: fuseFlags,
readData: readData,
}
......
......@@ -410,7 +410,7 @@ import (
// Root represents root of wcfs filesystem.
type Root struct {
nodefs.Node
fsNode
// ZODB storage we work with
zstor zodb.IStorage
......@@ -431,7 +431,7 @@ type Root struct {
// /(head|<rev>)/ - served by Head.
type Head struct {
nodefs.Node
fsNode
rev zodb.Tid // 0 for head/, !0 for @<rev>/
bfdir *BigFileDir // bigfile/
......@@ -466,14 +466,14 @@ type Head struct {
// /head/watch - served by Watch.
type Watch struct {
nodefs.Node
fsNode
// TODO
}
// /(head|<rev>)/bigfile/ - served by BigFileDir.
type BigFileDir struct {
nodefs.Node
fsNode
head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX>
......@@ -490,7 +490,7 @@ type BigFileDir struct {
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
type BigFile struct {
nodefs.Node
fsNode
// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
// parent's BigFileDir.head is the same.
......@@ -822,7 +822,9 @@ retry:
// see "4.4) for all file/blk to in invalidate we do"
//
// called with f.head.zconnMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)
fsconn := gfsconn
blksize := f.zbf.blksize
off := blk*blksize
......@@ -846,7 +848,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
blkdata = make([]byte, blksize)
n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
if st != fuse.OK {
// XXX warn?
// XXX warn
}
blkdata = blkdata[:n]
}
......@@ -855,27 +857,29 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
// some data from its cache already. In such case we don't try to
// preserve the rest and drop what was read, to avoid keeping the
// system overloaded.
//
// if we have the data - preserve it under @revX/bigfile/file[blk].
if int64(len(blkdata)) == blksize {
// XXX -> go?
// store retrieved data back to OS cache for file @<rev>/file[blk]
blkrev, _ := f.δFtail.LastRevOf(blk, f.head.zconn.At())
frev, err := groot.mkrevfile(blkrev, f.zbf.POid())
if err != nil {
// XXX
panic(err)
}
st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK {
// XXX log - dup wrt readBlk -> common func. XXX -> uploadBlk
panic(st)
}
func() {
// store retrieved data back to OS cache for file @<rev>/file[blk]
blkrev, _ := f.δFtail.LastRevOf(blk, f.head.zconn.At())
frev, frelease, err := groot.mkrevfile(blkrev, f.zbf.POid())
if err != nil {
log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
}
defer frelease()
st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK {
log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path, st)
}
}()
}
// invalidate file/head/data[blk] in OS file cache.
st := fsconn.FileNotify(f.Inode(), off, blksize)
if st != fuse.OK {
// XXX (fatal error here) -> return just error
panic(st)
return syscall.Errno(st)
}
return nil
......@@ -883,7 +887,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
// invalidateAttr invalidates file attributes in kernel cache.
func (f *BigFile) invalidateAttr() (err error) {
defer xerr.Contextf(&err, "%s: invalidate attr", "XXX") // XXX -> f.path
defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
if st != fuse.OK {
......@@ -902,7 +906,9 @@ func (f *BigFile) invalidateAttr() (err error) {
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use mkrevfile from under FUSE request handler as doing so might deadlock.
func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, err error) {
//
// Caller must call release when inode ID is no longer required to be present.
func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release func(), err error) {
fsconn := gfsconn
frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
......@@ -913,7 +919,7 @@ func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, err error)
if xfrev != nil {
// FIXME checking for "node{0}" is fragile, but currently no other way
if xfrev.String() != "node{0}" {
return xfrev.Node().(*BigFile), nil
return xfrev.Node().(*BigFile), func(){}, nil
}
}
......@@ -921,13 +927,12 @@ func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, err error)
frevospath := gmntpt + "/" + frevpath // now starting from OS /
f, err := os.Open(frevospath)
if err != nil {
return nil, err
return nil, nil, err
}
defer f.Close() // XXX -> return to caller to release when inode no longer needed
xfrev = fsconn.LookupNode(root.Inode(), frevpath)
// must be !nil as open succeeded XXX better recheck
return xfrev.Node().(*BigFile), nil
return xfrev.Node().(*BigFile), func() { f.Close() }, nil
}
// ----------------------------------------
......@@ -952,7 +957,7 @@ func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context)
}
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
defer xerr.Contextf(&err, "/XXXbigfile: lookup %q", name) // XXX name -> path
defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)
oid, err := zodb.ParseOid(name)
if err != nil {
......@@ -1016,7 +1021,7 @@ func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*node
}
func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
defer xerr.Contextf(&err, "/: lookup %q", name) // XXX name -> path?
defer xerr.Contextf(&err, "/: lookup %q", name)
var rev zodb.Tid
ok := false
......@@ -1060,13 +1065,13 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
// XXX -> newHead()
revDir = &Head{
Node: newDefaultNode(),
rev: rev,
zconn: zconnRev,
fsNode: newFSNode(),
rev: rev,
zconn: zconnRev,
}
bfdir := &BigFileDir{
Node: newDefaultNode(),
fsNode: newFSNode(),
head: revDir,
fileTab: make(map[zodb.Oid]*BigFile),
indexLooked: nil, // δbtree index not needed/used for @revX/
......@@ -1133,7 +1138,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
// zconn.Incref()
f := &BigFile{
Node: newDefaultNode(),
fsNode: newFSNode(),
head: head,
zbf: zbf,
zbfSize: zbfSize,
......@@ -1470,7 +1475,7 @@ func init() {
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
nodefs.Node
fsNode
}
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
......@@ -1530,12 +1535,12 @@ func main() {
// mount root + head/
// XXX -> newHead()
head := &Head{
Node: newDefaultNode(),
rev: 0,
zconn: zhead,
fsNode: newFSNode(),
rev: 0,
zconn: zhead,
}
bfdir := &BigFileDir{
Node: newDefaultNode(),
fsNode: newFSNode(),
head: head,
fileTab: make(map[zodb.Oid]*BigFile),
indexLooked: δbtree.NewPathSet(),
......@@ -1543,7 +1548,7 @@ func main() {
head.bfdir = bfdir
root := &Root{
Node: newDefaultNode(),
fsNode: newFSNode(),
zstor: zstor,
zdb: zdb,
head: head,
......@@ -1583,9 +1588,9 @@ func main() {
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
// for debugging/testing
_wcfs := newDefaultNode()
mkdir(root, ".wcfs", _wcfs)
mkfile(_wcfs, "zurl", NewStaticFile([]byte(zurl)))
_wcfs := newFSNode()
mkdir(root, ".wcfs", &_wcfs)
mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
// .wcfs/zhead - special file channel that sends zhead.at.
//
......@@ -1593,8 +1598,8 @@ func main() {
// zhead.at was, starting from the time when .wcfs/zhead was opened.
// There can be multiple openers. Once opened, the file must be read,
// as wcfs blocks waiting for data to be read before XXX.
mkfile(_wcfs, "zhead", &_wcfs_Zhead{
Node: newDefaultNode(),
mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
fsNode: newFSNode(),
})
// XXX place = ok?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment