Commit 197f6c42 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bb8d466b
...@@ -23,7 +23,6 @@ package main ...@@ -23,7 +23,6 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
"syscall" "syscall"
log "github.com/golang/glog" log "github.com/golang/glog"
...@@ -185,10 +184,13 @@ type ZConn struct { ...@@ -185,10 +184,13 @@ type ZConn struct {
// read-only transaction under which we access zodb.Connection data. // read-only transaction under which we access zodb.Connection data.
txnCtx context.Context // XXX -> better directly store txn txnCtx context.Context // XXX -> better directly store txn
// for historic @<rev> acess the connection can be shared between several bigfiles. /* XXX redecided to purge files and @<rev>/ on atime timeout
// for historic @<rev> access the connection can be shared between several bigfiles.
// since we want to free such connections when no longer needed we // since we want to free such connections when no longer needed we
// return zodb.Connection back to zodb.DB when refcnt drops to 0. // return zodb.Connection back to zodb.DB when refcnt drops to 0.
refcnt int32 refcnt int32
*/
} }
// zopen opens new connection to ZODB database + associated read-only transaction. // zopen opens new connection to ZODB database + associated read-only transaction.
...@@ -213,10 +215,11 @@ func zopen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn, ...@@ -213,10 +215,11 @@ func zopen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn,
return &ZConn{ return &ZConn{
Connection: zconn, Connection: zconn,
txnCtx: txnCtx, txnCtx: txnCtx,
refcnt: 1, // refcnt: 1,
}, nil }, nil
} }
/*
// Release decrements reference count and releases connection back to zodb.DB // Release decrements reference count and releases connection back to zodb.DB
// if it is no longer used. // if it is no longer used.
func (zc *ZConn) Release() { func (zc *ZConn) Release() {
...@@ -249,8 +252,8 @@ func (zc *ZConn) Incref() { ...@@ -249,8 +252,8 @@ func (zc *ZConn) Incref() {
// if the connection for this @<rev> was already opened - it is shared. // if the connection for this @<rev> was already opened - it is shared.
func (r *Root) zopenAt(ctx context.Context, rev zodb.Tid) (_ *ZConn, err error) { func (r *Root) zopenAt(ctx context.Context, rev zodb.Tid) (_ *ZConn, err error) {
// check if zconn for @<rev> is already there // check if zconn for @<rev> is already there
r.zrevMu.Lock() r.revMu.Lock()
zconn := r.zrevTab[rev] zconn := r.revTab[rev]
if zconn != nil { if zconn != nil {
if atomic.LoadInt32(&zconn.refcnt) > 0 { if atomic.LoadInt32(&zconn.refcnt) > 0 {
zconn.Incref() zconn.Incref()
...@@ -258,43 +261,43 @@ func (r *Root) zopenAt(ctx context.Context, rev zodb.Tid) (_ *ZConn, err error) ...@@ -258,43 +261,43 @@ func (r *Root) zopenAt(ctx context.Context, rev zodb.Tid) (_ *ZConn, err error)
zconn = nil // in-progress destruction zconn = nil // in-progress destruction
} }
} }
r.zrevMu.Unlock() r.revMu.Unlock()
if zconn != nil { if zconn != nil {
return zconn, nil return zconn, nil
} }
// not there - without zrevMu lock proceed to open it // not there - without revMu lock proceed to open it
zconn, err = zopen(ctx, r.zdb, &zodb.ConnOptions{At: rev}) zconn, err = zopen(ctx, r.zdb, &zodb.ConnOptions{At: rev})
if err != nil { if err != nil {
return nil, err return nil, err
} }
// relock zrevTab and either register zconn, or returun another zconn2, // relock revTab and either register zconn, or returun another zconn2,
// that might have been opened while we were not holding zrevMu. // that might have been opened while we were not holding revMu.
r.zrevMu.Lock() r.revMu.Lock()
defer r.zrevMu.Unlock() defer r.revMu.Unlock()
zconn2 := r.zrevTab[rev] zconn2 := r.revTab[rev]
if zconn2 != nil { if zconn2 != nil {
if atomic.LoadInt32(&zconn2.refcnt) > 0 { if atomic.LoadInt32(&zconn2.refcnt) > 0 {
zconn.Release() zconn.Release() // FIXME aborts txn -> just drop zconn
zconn2.Incref() zconn2.Incref()
return zconn2, nil return zconn2, nil
} }
// else it was another in-progress destruction // else it was another in-progress destruction
} }
r.zrevTab[rev] = zconn r.revTab[rev] = zconn
// schedule del zrevTab[rev] on zconn destroy // schedule del revTab[rev] on zconn destroy
txn := transaction.Current(zconn.txnCtx) txn := transaction.Current(zconn.txnCtx)
txn.RegisterSync(&zrevTabUnregister{r, zconn}) txn.RegisterSync(&zrevTabUnregister{r, zconn})
return zconn, nil return zconn, nil
} }
// zrevTabUnregister unregisters zconn from root.zrevTab on zconn's transaction abort. // zrevTabUnregister unregisters zconn from root.revTab on zconn's transaction abort.
type zrevTabUnregister struct { type zrevTabUnregister struct {
root *Root root *Root
zconn *ZConn zconn *ZConn
...@@ -304,12 +307,13 @@ func (u *zrevTabUnregister) BeforeCompletion(txn transaction.Transaction) {} ...@@ -304,12 +307,13 @@ func (u *zrevTabUnregister) BeforeCompletion(txn transaction.Transaction) {}
func (u *zrevTabUnregister) AfterCompletion(txn transaction.Transaction) { func (u *zrevTabUnregister) AfterCompletion(txn transaction.Transaction) {
rev := u.zconn.At() rev := u.zconn.At()
u.root.zrevMu.Lock() u.root.revMu.Lock()
defer u.root.zrevMu.Unlock() defer u.root.revMu.Unlock()
// delete only if zconn is still registered - as another zconn2 might have // delete only if zconn is still registered - as another zconn2 might have
// been already registered instead while zconn was in zrevTab with refcnt=0. // been already registered instead while zconn was in revTab with refcnt=0.
if u.root.zrevTab[rev] == u.zconn { if u.root.revTab[rev] == u.zconn {
delete(u.root.zrevTab, rev) delete(u.root.revTab, rev)
} }
} }
*/
...@@ -390,6 +390,7 @@ import ( ...@@ -390,6 +390,7 @@ import (
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree" "lab.nexedi.com/kirr/neo/go/zodb/btree"
_ "lab.nexedi.com/kirr/neo/go/zodb/wks" _ "lab.nexedi.com/kirr/neo/go/zodb/wks"
...@@ -412,24 +413,23 @@ type Root struct { ...@@ -412,24 +413,23 @@ type Root struct {
// only one connection is used for head/ and only one for each @<rev>. // only one connection is used for head/ and only one for each @<rev>.
zdb *zodb.DB zdb *zodb.DB
// ZODB connection for head/ // directory + ZODB connection for head/
zheadMu sync.RWMutex // protects access to zhead & live _objects_ associated with it head *Head
zhead *ZConn // zwatcher resyncs zhead; others only read zhead objects.
// ZODB connections for @<rev>/ // directories + ZODB connections for @<rev>/
zrevMu sync.Mutex revMu sync.Mutex
zrevTab map[zodb.Tid]*ZConn revTab map[zodb.Tid]*Head
// XXX include?
// // {} rev -> @<rev>/
// mu sync.Mutex
// revTab map[zodb.Tid]*Rev
} }
// /(head|<rev>)/ - served by Head. XXX separate Rev? // /(head|<rev>)/ - served by Head.
type Head struct { type Head struct {
nodefs.Node nodefs.Node
// bigfile, at, watch, etc - all implicitly linked to by fs rev zodb.Tid // 0 for head/, !0 for @<rev>/
// bigfile/, at, watch, etc - all implicitly linked to by fs
// ZODB connection for everything under this head
zconnMu sync.RWMutex // protects access to zconn & live _objects_ associated with it
zconn *ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
} }
// /head/watch - served by Watch. // /head/watch - served by Watch.
...@@ -442,6 +442,7 @@ type Watch struct { ...@@ -442,6 +442,7 @@ type Watch struct {
// /(head|<rev>)/bigfile/ - served by BigFileDir. // /(head|<rev>)/bigfile/ - served by BigFileDir.
type BigFileDir struct { type BigFileDir struct {
nodefs.Node nodefs.Node
head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX> // {} oid -> <bigfileX>
mu sync.Mutex mu sync.Mutex
...@@ -452,8 +453,9 @@ type BigFileDir struct { ...@@ -452,8 +453,9 @@ type BigFileDir struct {
type BigFile struct { type BigFile struct {
nodefs.Node nodefs.Node
// this BigFile views ZODB via zconn // this BigFile is under head; it views ZODB via head.zconn
zconn *ZConn // parent's BigFileDir.head is the same.
head *Head
// ZBigFile top-level object. Kept activated during lifetime of current transaction. // ZBigFile top-level object. Kept activated during lifetime of current transaction.
zbf *ZBigFile zbf *ZBigFile
...@@ -629,9 +631,9 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error { ...@@ -629,9 +631,9 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) error {
// ---------------------------------------- // ----------------------------------------
// /(head/<rev>)/bigfile/ -> Lookup receives client request to create (head|<rev>)/bigfile/<bigfileX>. // /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) { func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
f, err := bfdir.lookup(out, name, fctx) // XXX reorder out? f, err := bfdir.lookup(out, name, fctx)
var inode *nodefs.Inode var inode *nodefs.Inode
if f != nil { if f != nil {
inode = f.Inode() inode = f.Inode()
...@@ -642,17 +644,21 @@ func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) ...@@ -642,17 +644,21 @@ func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context)
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) { func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
defer xerr.Contextf(&err, "/bigfile: lookup %q", name) defer xerr.Contextf(&err, "/bigfile: lookup %q", name)
defer func() {
if f != nil {
f.getattr(out)
}
}()
oid, err := zodb.ParseOid(name) oid, err := zodb.ParseOid(name)
if err != nil { if err != nil {
return nil, eINVALf("not oid") return nil, eINVALf("not oid")
} }
bfdir.head.zconnMu.Lock()
defer bfdir.head.zconnMu.Unlock()
defer func() {
if f != nil {
f.getattr(out)
}
}()
// check to see if dir(oid) is already there // check to see if dir(oid) is already there
bfdir.mu.Lock() bfdir.mu.Lock()
f, already := bfdir.tab[oid] f, already := bfdir.tab[oid]
...@@ -663,7 +669,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) ...@@ -663,7 +669,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
} }
// not there - without bfdir lock proceed to open BigFile from ZODB // not there - without bfdir lock proceed to open BigFile from ZODB
f, err = bigopen(asctx(fctx), groot.zhead, oid) // XXX zhead -> head|rev.zconn f, err = bfdir.head.bigopen(asctx(fctx), oid)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -690,6 +696,9 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) ...@@ -690,6 +696,9 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
// XXX do we need to support unlink? (probably no) // XXX do we need to support unlink? (probably no)
// / -> Mkdir receives client request to create @<rev>/. // / -> Mkdir receives client request to create @<rev>/.
//
// it is not an error if @<rev>/ already exists - mkdir succeeds and EEXIST is not returned.
// in other words mkdir behaves here similarly to `mkdir -p`.
func (root *Root) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) { func (root *Root) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
inode, err := root.mkdir(name, fctx) // XXX ok to ignore mode? inode, err := root.mkdir(name, fctx) // XXX ok to ignore mode?
return inode, err2LogStatus(err) return inode, err2LogStatus(err)
...@@ -710,51 +719,56 @@ func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err e ...@@ -710,51 +719,56 @@ func (root *Root) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err e
} }
// check to see if dir(rev) is already there // check to see if dir(rev) is already there
root.zrevMu.Lock() root.revMu.Lock()
_, already := root.zrevTab[rev] _, already := root.revTab[rev]
root.zrevMu.Unlock() root.revMu.Unlock()
if already { if already {
return nil, syscall.EEXIST return nil, syscall.EEXIST
} }
// not there - without zrevMu lock proceed to open @rev view of ZODB // not there - without revMu lock proceed to open @rev view of ZODB
ctx := asctx(fctx) ctx := asctx(fctx)
zconnRev, err := groot.zopenAt(ctx, rev) // zconnRev, err := root.zopenAt(ctx, rev)
zconnRev, err := zopen(ctx, root.zdb, &zodb.ConnOptions{At: rev})
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer zconnRev.Release() // XXX ok?
// relock root and either mkdir or EEXIST if the directory was maybe // relock root and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding zrevMu. // simultanously created while we were not holding revMu.
root.zrevMu.Lock() root.revMu.Lock()
_, already = root.zrevTab[rev] _, already = root.revTab[rev]
if already { if already {
root.zrevMu.Unlock() root.revMu.Unlock()
// zconnRev.Release()
transaction.Current(zconnRev.txnCtx).Abort()
return nil, syscall.EEXIST return nil, syscall.EEXIST
} }
revDir := &Head{ // XXX -> Rev ? revDir := &Head{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
rev: rev,
zconn: zconnRev,
} }
root.zrevTab[rev] = zconnRev root.revTab[rev] = revDir
root.zrevMu.Unlock() root.revMu.Unlock()
// mkdir takes filesystem treeLock - do it outside zrevMu. // mkdir takes filesystem treeLock - do it outside revMu.
mkdir(root, name, revDir) mkdir(root, name, revDir)
return revDir.Inode(), nil return revDir.Inode(), nil
} }
// bigopen opens BigFile corresponding to oid on zconn. // bigopen opens BigFile corresponding to oid on head.zconn.
// //
// A ZBigFile corresponding to oid is activated and statted. // A ZBigFile corresponding to oid is activated and statted.
// //
// The whole result is returned as BigFile. // head.zconn must be locked.
func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err error) { func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
zconn := head.zconn
defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At()) defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())
// XXX better ctx = transaction.PutIntoContext(ctx, txn) // XXX better ctx = transaction.PutIntoContext(ctx, txn)
...@@ -794,10 +808,10 @@ func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err e ...@@ -794,10 +808,10 @@ func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err e
return nil, err return nil, err
} }
zconn.Incref() // zconn.Incref()
return &BigFile{ return &BigFile{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
zconn: zconn, head: head,
zbf: zbf, zbf: zbf,
zbfSize: zbfSize, zbfSize: zbfSize,
...@@ -809,11 +823,12 @@ func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err e ...@@ -809,11 +823,12 @@ func bigopen(ctx context.Context, zconn *ZConn, oid zodb.Oid) (_ *BigFile, err e
// Close release all resources of BigFile. // Close release all resources of BigFile.
func (f *BigFile) Close() error { func (f *BigFile) Close() error {
f.zbf.PDeactivate() f.zbf.PDeactivate() // XXX f.head.zconn must locked
f.zbf = nil f.zbf = nil
f.zconn.Release() // f.zconn.Release()
f.zconn = nil // f.zconn = nil
f.head = nil
return nil return nil
} }
...@@ -842,7 +857,8 @@ func (f *BigFile) getattr(out *fuse.Attr) { ...@@ -842,7 +857,8 @@ func (f *BigFile) getattr(out *fuse.Attr) {
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data. // /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) { func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
// XXX locking f.head.zconnMu.RLock()
defer f.head.zconnMu.RUnlock()
zbf := f.zbf zbf := f.zbf
...@@ -866,7 +882,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -866,7 +882,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn) // XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(asctx(fctx), f.zconn.txnCtx) ctx, cancel := xcontext.Merge(asctx(fctx), f.head.zconn.txnCtx)
defer cancel() defer cancel()
// read/load all block(s) in parallel // read/load all block(s) in parallel
...@@ -1014,11 +1030,9 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error { ...@@ -1014,11 +1030,9 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) error {
// /(head|<rev>)/at -> readAt serves read. // /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte { func (h *Head) readAt() []byte {
// XXX implemented only for Head, not Rev h.zconnMu.Lock()
root := groot defer h.zconnMu.Unlock()
root.zheadMu.Lock() return []byte(h.zconn.At().String())
defer root.zheadMu.Unlock()
return []byte(root.zhead.At().String())
} }
...@@ -1031,7 +1045,7 @@ func (h *Head) readAt() []byte { ...@@ -1031,7 +1045,7 @@ func (h *Head) readAt() []byte {
// - Mount: // - Mount:
// .Root() -> root Inode of the fs // .Root() -> root Inode of the fs
// .Connector() -> FileSystemConnector through which fs is mounted // .Connector() -> FileSystemConnector through which fs is mounted
var groot *Root //var groot *Root
var gfsconn *nodefs.FileSystemConnector var gfsconn *nodefs.FileSystemConnector
func main() { func main() {
...@@ -1070,13 +1084,19 @@ func main() { ...@@ -1070,13 +1084,19 @@ func main() {
} }
zhead.Cache().SetControl(&zodbCacheControl{}) // XXX +locking? zhead.Cache().SetControl(&zodbCacheControl{}) // XXX +locking?
// mount root // mount root + head/
head := &Head{
Node: nodefs.NewDefaultNode(),
rev: 0,
zconn: zhead,
}
root := &Root{ root := &Root{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
zstor: zstor, zstor: zstor,
zdb: zdb, zdb: zdb,
zhead: zhead, head: head,
zrevTab: make(map[zodb.Tid]*ZConn), revTab: make(map[zodb.Tid]*Head),
} }
opts := &fuse.MountOptions{ opts := &fuse.MountOptions{
...@@ -1091,7 +1111,7 @@ func main() { ...@@ -1091,7 +1111,7 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
groot = root // FIXME temp workaround (see ^^^) //groot = root // FIXME temp workaround (see ^^^)
gfsconn = fsconn // FIXME ----//---- gfsconn = fsconn // FIXME ----//----
// we require proper pagecache control (added to Linux 2.6.36 in 2010) // we require proper pagecache control (added to Linux 2.6.36 in 2010)
...@@ -1102,12 +1122,10 @@ func main() { ...@@ -1102,12 +1122,10 @@ func main() {
// add entries to / // add entries to /
mkfile(root, ".wcfs", NewStaticFile([]byte(zurl))) mkfile(root, ".wcfs", NewStaticFile([]byte(zurl)))
head := &Head{
Node: nodefs.NewDefaultNode(),
}
mkdir(root, "head", head) mkdir(root, "head", head)
mkdir(head, "bigfile", &BigFileDir{ mkdir(head, "bigfile", &BigFileDir{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
head: head,
tab: make(map[zodb.Oid]*BigFile), tab: make(map[zodb.Oid]*BigFile),
}) })
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at) mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment