Commit f447d0d1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f6e10c9d
......@@ -253,12 +253,12 @@ package main
import (
"context"
"flag"
"fmt"
"log"
stdlog "log"
"os"
"sync"
"syscall"
log "github.com/golang/glog"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xcontext"
......@@ -321,7 +321,7 @@ type BigFileData struct {
bigfile *BigFile
// inflight loadings of ZBigFile from ZODB.
// successfull load results are kept here until blkdata is put to OS pagecache.
// successfull load results are kept here until blkdata is put into OS pagecache.
loadMu sync.Mutex
loading map[int64]*blkLoadState // #blk -> {... blkdata}
}
......@@ -347,7 +347,7 @@ type blkLoadState struct {
func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (_ *nodefs.Inode, status fuse.Status) {
oid, err := zodb.ParseOid(name)
if err != nil {
log.Printf("/bigfile: mkdir %q: not-oid", name)
log.Warningf("/bigfile: mkdir %q: not-oid", name)
return nil, fuse.EINVAL
}
......@@ -382,7 +382,7 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
zdb := zodb.NewDB(bfroot.zstor)
zconn, err := zdb.Open(ctx, &zodb.ConnOptions{}) // XXX .NoSync=true ?
if err != nil {
log.Printf("/bigfile: mkdir %q: %s", name, err)
log.Errorf("/bigfile: mkdir %q: %s", name, err)
return nil, fuse.EIO
}
......@@ -394,21 +394,21 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
case *zodb.NoDataError:
return nil, fuse.EINVAL // XXX what to do if it was existing and got deleted?
default:
log.Printf("/bigfile: mkdir %q: %s", name, err)
log.Errorf("/bigfile: mkdir %q: %s", name, err)
return nil, fuse.EIO
}
}
zbf, ok := xzbf.(*ZBigFile)
if !ok {
log.Printf("/bigfile: mkdir %q: %s is not a ZBigFile", name, typeOf(xzbf))
log.Warningf("/bigfile: mkdir %q: %s is not a ZBigFile", name, typeOf(xzbf))
return nil, fuse.EINVAL
}
// acticate ZBigFile and keep it this way
err = zbf.PActivate(ctx)
if err != nil {
log.Printf("/bigfile: mkdir %q: %s", name, err)
log.Errorf("/bigfile: mkdir %q: %s", name, err)
return nil, fuse.EIO
}
defer func() {
......@@ -495,12 +495,13 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
defer cancel()
// widen read request to be aligned with blksize granularity
// (we can load only whole ZBlk* blocks)
end := off + int64(len(dest)) // XXX overflow?
aoff := off - (off % zbf.blksize)
aend := end + (zbf.blksize - (end % zbf.blksize))
dest = make([]byte, aend - aoff)
// load all block(s) in parallel
// read/load all block(s) in parallel
wg, ctx := errgroup.WithContext(ctx)
for blkoff := aoff; blkoff < aend; blkoff += zbf.blksize {
blkoff := blkoff
......@@ -512,7 +513,7 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
err := wg.Wait()
if err != nil {
log.Printf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
log.Errorf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
}
......@@ -571,14 +572,14 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its cache.
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponging page in pagecache locked, and if
// we would try to update that same page in the cache it would result
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS cache.
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
......@@ -592,14 +593,15 @@ func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte)
delete(bfdata.loading, blk)
bfdata.loadMu.Unlock()
// XXX where to report error (-> log)
// EINVAL | ENOENT -> bug
// ENOMEN - kernel is already under memory pressure - we must not keep here
if st != fuse.OK {
return fmt.Errorf("bigfile %s: blk %d: -> pagecache: %s", zbf.POid(), blk, st)
if st == fuse.OK {
return
}
// pagecache update failed, but it must not (we verified on startup that
// pagecache control is supported by kernel). We can correctly live on
// with the error, but data access will be likely very slow. Tell user
// about the problem.
log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, reading from bigfile will be very slow)", zbf.POid(), blk, st)
}()
return nil
......@@ -621,8 +623,8 @@ func (bf *BigFile) readAt() []byte {
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
// becuse it is essentially the index where to find ZBigFile data.
//
// For the data itself - we put it to kernel cache and always deactivate from
// ZODB right after that.
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// TODO set it to Connection.CacheControl
type zodbCacheControl struct {}
......@@ -641,8 +643,6 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
return false
}
// XXX option to prevent starting if wcfs was already started ?
// FIXME gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
// TODO:
// - Inode += .Mount() -> nodefs.Mount
......@@ -652,10 +652,14 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
var gfsconn *nodefs.FileSystemConnector
func main() {
log.SetPrefix("wcfs: ")
stdlog.SetPrefix("wcfs: ")
log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
defer log.Flush()
debug := flag.Bool("d", false, "debug")
autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
// XXX option to prevent starting if wcfs was already started ?
flag.Parse()
if len(flag.Args()) != 2 {
log.Fatalf("Usage: %s [OPTIONS] zurl mntpt", os.Args[0])
......@@ -671,7 +675,6 @@ func main() {
}
defer zstor.Close()
// mount root
opts := &fuse.MountOptions{
FsName: zurl,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment