Commit ea3987b5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3e68ba25
......@@ -23,6 +23,8 @@ package main
import (
"context"
"fmt"
"io"
"math"
"syscall"
log "github.com/golang/glog"
......@@ -37,6 +39,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
)
// ---- FUSE ----
// eInvalError is the error wrapper signifying that underlying error is about "invalid argument".
// err2LogStatus converts such errors into EINVAL return code + logs as warning.
......@@ -138,7 +141,7 @@ func (f *SmallFile) Read(_ nodefs.File, dest []byte, off int64, _ *fuse.Context)
// mkdir adds child to parent as directory.
//
// Note: parent must be already in the filesystem tree - i.e. associated
// with inode. if not - nodefs will panic in Inode.NewChild on nil dereference.
// with Inode. if not - nodefs will panic in Inode.NewChild on nil dereference.
func mkdir(parent nodefs.Node, name string, child nodefs.Node) {
parent.Inode().NewChild(name, true, child)
}
......@@ -166,6 +169,155 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
return fssrv, fsconn, nil
}
// FileSock is bidirectional channel associated with opened file.
//
// FileSock provides streaming write/read operations for filesystem server that
// are correspondingly matched with read/write operations on filesystem user side.
type FileSock struct {
file *skFile // filesock's file peer
rx *io.PipeReader // socket reads from file here
tx *io.PipeWriter // socket writes to file here
}
// skFile is File peer of FileSock.
//
// skFile.Read is connected with sk.Write.
// skFile.Write is connected with sk.Read.
//
// skFile is always created with nonseekable & directio flags, to support
// streaming semantics.
type skFile struct {
nodefs.File
rx *io.PipeReader // file reads from socket here
tx *io.PipeWriter // file writes to socket here
}
// NewFileSock creates new file socket.
//
// After file socket is created, File return should be given to kernel for the
// socket to be connected to an opened file.
func NewFileSock() *FileSock {
sk := &FileSock{}
f := &skFile{
File: nodefs.NewDefaultFile(),
}
sk.file = f
rx, tx := io.Pipe()
sk.rx = rx
f .tx = tx
rx, tx = io.Pipe()
sk.rx = rx
f .tx = tx
return sk
}
// File returns nodefs.File handle that is connected to the socket.
//
// The handle should be given to kernel as result of a file open, for that file
// to be connected to the socket.
func (sk *FileSock) File() nodefs.File {
// nonseekable & directio for opened file to have streaming semantic as
// if it was a socket.
return &nodefs.WithFlags{
File: sk.file,
FuseFlags: fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO,
}
}
// Write writes data to filesock.
//
// The data will be read by client reading from filesock's file.
// Write semantic is that of io.Writer.
func (sk *FileSock) Write(data []byte) (n int, err error) {
return sk.tx.Write(data)
}
// Read implements nodefs.File and is paired with filesock.Write().
func (f *skFile) Read(dest []byte, /*ignored*/off int64) (fuse.ReadResult, fuse.Status) {
n, err := f.rx.Read(dest)
if n != 0 {
err = nil
}
if err == io.EOF {
// XXX what here?
}
if err != nil {
return nil, err2LogStatus(err)
}
return fuse.ReadResultData(dest), fuse.OK
}
// Read reads data from filesock.
//
// The data read will be that the client writes into filesock's file.
// Read semantic is that of io.Reader.
func (sk *FileSock) Read(dest []byte) (n int, err error) {
return sk.rx.Read(dest)
}
// Write implements nodefs.File and is paired with filesock.Read()
func (f *skFile) Write(data []byte, /*ignored*/off int64) (uint32, fuse.Status) {
// cap data to 2GB (not 4GB not to overflow int on 32-bit platforms)
l := len(data)
if l > math.MaxInt32 {
l = math.MaxInt32
data = data[:l]
}
n, err := f.tx.Write(data)
if n != 0 {
err = nil
}
if err == io.ErrClosedPipe {
// XXX what here?
}
if err != nil {
return 0, err2LogStatus(err)
}
return uint32(n), fuse.OK
}
// CloseRead closes reading side of the socket.
func (sk *FileSock) CloseRead() error {
return sk.rx.Close()
}
// CloseWrite closes writing side of the socket.
func (sk *FileSock) CloseWrite() error {
return sk.tx.Close()
}
// Close closes the socket.
//
// it is semantically equivalent to CloseRead + CloseWrite.
func (sk *FileSock) Close() error {
err := sk.CloseRead()
err2 := sk.CloseWrite()
if err == nil {
err = err2
}
return err
}
// Flush implements nodefs.File to handle close(file) call.
func (f *skFile) Flush() fuse.Status {
err := f.rx.Close()
err2 := f.tx.Close()
if err == nil {
err = err2
}
return err2LogStatus(err)
}
// ---- ZODB ---
// typeOf returns ZODB type of an object.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment