Commit 1a587945 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent efc51ca2
...@@ -86,6 +86,8 @@ func err2LogStatus(err error) fuse.Status { ...@@ -86,6 +86,8 @@ func err2LogStatus(err error) fuse.Status {
return fuse.EINTR return fuse.EINTR
} }
// XXX io.ErrClosedPipe -> EPIPE?
// otherwise log as warnings EINVAL and as errors everything else // otherwise log as warnings EINVAL and as errors everything else
switch e.(type) { switch e.(type) {
case *eInvalError: case *eInvalError:
...@@ -297,9 +299,6 @@ type skFile struct { ...@@ -297,9 +299,6 @@ type skFile struct {
// //
// After file socket is created, File return should be given to kernel for the // After file socket is created, File return should be given to kernel for the
// socket to be connected to an opened file. // socket to be connected to an opened file.
//
// Note: the node opening which gives FileSock, should not be "regular" - else
// read/write will be serialized by kernel (git.kernel.org/linus/9c225f2655)
func NewFileSock() *FileSock { func NewFileSock() *FileSock {
sk := &FileSock{} sk := &FileSock{}
f := &skFile{ f := &skFile{
...@@ -324,7 +323,10 @@ func NewFileSock() *FileSock { ...@@ -324,7 +323,10 @@ func NewFileSock() *FileSock {
// to be connected to the socket. // to be connected to the socket.
func (sk *FileSock) File() nodefs.File { func (sk *FileSock) File() nodefs.File {
// nonseekable & directio for opened file to have streaming semantic as // nonseekable & directio for opened file to have streaming semantic as
// if it was a socket. // if it was a socket. FOPEN_STREAM is used so that both read and write
// could be run simultaneously:
//
// https://marc.info/?l=linux-fsdevel&m=155364064907309&w=2
return &nodefs.WithFlags{ return &nodefs.WithFlags{
File: sk.file, File: sk.file,
FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO, FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO,
......
...@@ -1248,19 +1248,6 @@ retry: ...@@ -1248,19 +1248,6 @@ retry:
// ---- Watch server ---- // ---- Watch server ----
func (watch *Watch) GetAttr(out *fuse.Attr, f nodefs.File, fctx *fuse.Context) fuse.Status {
st := watch.fsNode.GetAttr(out, f, fctx)
// represent ourself as XXX (FileSock requirement)
// XXX S_IFSOCK does not work (LOOKUP returns inode, open gives: "No such device or address")
// XXX S_IFIFO does not work (the kernel shows the file, but it being
// FIFO makes the data go through kernel pipe, not via FUSE filesystem)
// XXX S_IFLNK - the kernel wants to follow the link
// XXX S_IFDIR - os.open complains "is a directory" (maybe could workaround)
// XXX S_IFCHR - fusermount always adds nodev mount option -> the device cannot be accessed
//out.Mode = syscall.S_IFSOCK | 0644
return st
}
// Open serves /head/watch opens. // Open serves /head/watch opens.
func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) { func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags? // XXX check flags?
...@@ -1280,7 +1267,7 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St ...@@ -1280,7 +1267,7 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
} }
// serve serves client originated watch requests. // serve serves client originated watch requests.
// XXX serves rx? // XXX serves rx? (-> and routes client replies ...)
func (w *Watcher) serve() { func (w *Watcher) serve() {
err := w._serve() err := w._serve()
_ = err _ = err
...@@ -1294,17 +1281,19 @@ func (w *Watcher) _serve() (err error) { ...@@ -1294,17 +1281,19 @@ func (w *Watcher) _serve() (err error) {
defer xerr.Contextf(&err, "watcher %d: serve", w.id) defer xerr.Contextf(&err, "watcher %d: serve", w.id)
r := bufio.NewReader(w.sk) r := bufio.NewReader(w.sk)
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1?
for { for {
l, err := r.ReadString('\n') // XXX limit accepted line len not to DOS l, err := r.ReadString('\n') // XXX limit accepted line len not to DOS
if err != nil { if err != nil {
return err // XXX err ctx? return err
} }
fmt.Printf("S: watch: rx: %q\n", l) fmt.Printf("S: watch: rx: %q\n", l)
stream, msg, err := parseWatchFrame(l) stream, msg, err := parseWatchFrame(l)
if err != nil { if err != nil {
// XXX write to peer too? (on which stream? -1?)
return fmt.Errorf("rx: %s", err) return fmt.Errorf("rx: %s", err)
} }
...@@ -1317,7 +1306,7 @@ func (w *Watcher) _serve() (err error) { ...@@ -1317,7 +1306,7 @@ func (w *Watcher) _serve() (err error) {
w.rxMu.Unlock() w.rxMu.Unlock()
if rxq == nil { if rxq == nil {
return fmt.Errorf("rx %d: reply on unexpected streamd", stream) return fmt.Errorf("rx %d: reply on unexpected stream", stream)
} }
rxq <- msg rxq <- msg
continue continue
...@@ -1326,7 +1315,6 @@ func (w *Watcher) _serve() (err error) { ...@@ -1326,7 +1315,6 @@ func (w *Watcher) _serve() (err error) {
// client-initiated request // client-initiated request
oid, at, err := parseWatch(msg) oid, at, err := parseWatch(msg)
if err != nil { if err != nil {
// XXX write to peer too
return fmt.Errorf("rx %d: %s", stream, err) return fmt.Errorf("rx %d: %s", stream, err)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment