Commit a41635da authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7ce5b4c6
......@@ -235,6 +235,7 @@ func (sk *FileSock) File() nodefs.File {
// The data will be read by client reading from filesock's file.
// Write semantic is that of io.Writer.
func (sk *FileSock) Write(data []byte) (n int, err error) {
// XXX err ctx?
return sk.tx.Write(data)
}
......@@ -259,6 +260,7 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64) (fuse.ReadResult, fuse.
// The data read will be that the client writes into filesock's file.
// Read semantic is that of io.Reader.
func (sk *FileSock) Read(dest []byte) (n int, err error) {
// XXX err ctx?
return sk.rx.Read(dest)
}
......@@ -285,17 +287,17 @@ func (f *skFile) Write(data []byte, /*ignored*/off int64) (uint32, fuse.Status)
}
// CloseRead closes reading side of the socket.
// CloseRead closes reading side of filesock.
func (sk *FileSock) CloseRead() error {
return sk.rx.Close()
}
// CloseWrite closes writing side of the socket.
// CloseWrite closes writing side of filesock.
func (sk *FileSock) CloseWrite() error {
return sk.tx.Close()
}
// Close closes the socket.
// Close closes filesock.
//
// it is semantically equivalent to CloseRead + CloseWrite.
func (sk *FileSock) Close() error {
......
......@@ -529,13 +529,13 @@ func (cc *zodbCacheControl) WantEvict(obj zodb.IPersistent) bool {
// zwatcher watches for ZODB changes.
// see "4) when we receive an invalidation message from ZODB ..."
func (r *Root) zwatcher(ctx context.Context) (err error) {
func (root *Root) zwatcher(ctx context.Context) (err error) {
defer xerr.Contextf(&err, "zwatch") // XXX more in context?
// XXX unmount on error? -> always EIO?
zwatchq := make(chan zodb.CommitEvent)
r.zstor.AddWatch(zwatchq)
defer r.zstor.DelWatch(zwatchq)
root.zstor.AddWatch(zwatchq)
defer root.zstor.DelWatch(zwatchq)
var zevent zodb.CommitEvent
var ok bool
......@@ -551,19 +551,19 @@ func (r *Root) zwatcher(ctx context.Context) (err error) {
}
}
r.zhandle1(zevent)
root.zhandle1(zevent)
}
}
// zhandle1 handles 1 event from ZODB notification.
func (r *Root) zhandle1(zevent zodb.CommitEvent) {
func (root *Root) zhandle1(zevent zodb.CommitEvent) {
// while we are invalidating OS cache, make sure that nothing, that
// even reads /head/bigfile/*, is running (see 4.6).
r.head.zconnMu.Lock()
defer r.head.zconnMu.Unlock()
root.head.zconnMu.Lock()
defer root.head.zconnMu.Unlock()
zhead := r.head.zconn
bfdir := r.head.bfdir
zhead := root.head.zconn
bfdir := root.head.bfdir
toinvalidate := map[*BigFile]SetI64{} // {} file -> set(#blk)
......@@ -626,12 +626,15 @@ func (r *Root) zhandle1(zevent zodb.CommitEvent) {
// XXX resync .zhead to zevent.tid
/* XXX reenable
// notify .wcfs/zhead
for fh := range gdebug.zhead.fhTab {
fh.Write(fmt.Sprintf("%s\n", zevent.Tid)) // XXX -> fh = filesock
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", zevent.Tid)
if err != nil {
log.Error(err) // XXX errctx, -> warning?
sk.Close()
delete(gdebug.zheadSockTab, sk)
}
}
*/
}
// invalidateBlk invalidates 1 file block. XXX
......@@ -1151,6 +1154,30 @@ var gfsconn *nodefs.FileSystemConnector
// so we still have to reference the root via path.
var gmntpt string
// debugging
var gdebug = struct {
// .wcfs/zhead opens
// protected by groot.head.zconnMu
zheadSockTab map[*FileSock]struct{}
}{}
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
nodefs.Node
}
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags?
sk := NewFileSock()
sk.CloseRead()
groot.head.zconnMu.Lock()
defer groot.head.zconnMu.Unlock()
gdebug.zheadSockTab[sk] = struct{}{}
return sk.File(), fuse.OK
}
func main() {
stdlog.SetPrefix("wcfs: ")
//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
......@@ -1242,9 +1269,9 @@ func main() {
// XXX ^^^ invalidate cache or direct io
// for debugging/testing
dotwcfs := nodefs.NewDefaultNode()
mkdir(root, ".wcfs", dotwcfs)
mkfile(dotwcfs, "zurl", NewStaticFile([]byte(zurl)))
_wcfs := nodefs.NewDefaultNode()
mkdir(root, ".wcfs", _wcfs)
mkfile(_wcfs, "zurl", NewStaticFile([]byte(zurl)))
// .wcfs/zhead - special file channel that sends zhead.at.
//
......@@ -1252,7 +1279,9 @@ func main() {
// zhead.at was, starting from the time when .wcfs/zhead was opened.
// There can be multiple openers. Once opened, the file must be read,
// as wcfs blocks waiting for data to be read.
//mkfile(dotwcfs, "zhead", ...) XXX reenable
mkfile(_wcfs, "zhead", &_wcfs_Zhead{
Node: nodefs.NewDefaultNode(),
})
// TODO handle autoexit
_ = autoexit
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment