Commit b17aeb8c authored by Kirill Smelkov's avatar Kirill Smelkov

X Change FileSock to use xio.Pipe which is io.Pipe + support for IO cancellation

We need to do this because when e.g. a thread in client process dies and
wants to abort the whole process, kernel sends FUSE INTERRUPT request to
interrup READ syscall on-client pinner is waiting on for /head/watch.
And if this cancellation is not handled, the client process is left hang
forever and even kill -9 does not stop it.

xio.Pipe to be committed/pushed yet to go123.

test_wcfs_watch_robust becomes broken for now.
parent 00110399
......@@ -39,6 +39,7 @@ import (
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -271,9 +272,9 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
// FileSock provides streaming write/read operations for filesystem server that
// are correspondingly matched with read/write operations on filesystem user side.
type FileSock struct {
file *skFile // filesock's file peer
rx *io.PipeReader // socket reads from file here
tx *io.PipeWriter // socket writes to file here
file *skFile // filesock's file peer
rx *xio.PipeReader // socket reads from file here
tx *xio.PipeWriter // socket writes to file here
}
// skFile is File peer of FileSock.
......@@ -286,8 +287,8 @@ type FileSock struct {
type skFile struct {
nodefs.File
rx *io.PipeReader // file reads from socket here
tx *io.PipeWriter // file writes to socket here
rx *xio.PipeReader // file reads from socket here
tx *xio.PipeWriter // file writes to socket here
}
// NewFileSock creates new file socket.
......@@ -301,11 +302,11 @@ func NewFileSock() *FileSock {
}
sk.file = f
rx, tx := io.Pipe()
rx, tx := xio.Pipe()
sk.rx = rx
f .tx = tx
rx, tx = io.Pipe()
rx, tx = xio.Pipe()
f .rx = rx
sk.tx = tx
......@@ -332,15 +333,15 @@ func (sk *FileSock) File() nodefs.File {
// Write writes data to filesock.
//
// The data will be read by client reading from filesock's file.
// Write semantic is that of io.Writer.
func (sk *FileSock) Write(data []byte) (n int, err error) {
// Write semantic is that of xio.Writer.
func (sk *FileSock) Write(ctx context.Context, data []byte) (n int, err error) {
// XXX err ctx?
return sk.tx.Write(data)
return sk.tx.Write(ctx, data)
}
// Read implements nodefs.File and is paired with filesock.Write().
func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
n, err := f.rx.Read(dest) // XXX fctx.cancel
n, err := f.rx.Read(fctx, dest)
if n != 0 {
err = nil
}
......@@ -358,10 +359,10 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fu
// Read reads data from filesock.
//
// The data read will be that the client writes into filesock's file.
// Read semantic is that of io.Reader.
func (sk *FileSock) Read(dest []byte) (n int, err error) {
// Read semantic is that of xio.Reader.
func (sk *FileSock) Read(ctx context.Context, dest []byte) (n int, err error) {
// XXX err ctx?
return sk.rx.Read(dest)
return sk.rx.Read(ctx, dest)
}
// Write implements nodefs.File and is paired with filesock.Read()
......@@ -373,7 +374,7 @@ func (f *skFile) Write(data []byte, /*ignored*/off int64, fctx *fuse.Context) (u
data = data[:l]
}
n, err := f.tx.Write(data) // XXX fctx.cancel
n, err := f.tx.Write(fctx, data)
if n != 0 {
err = nil
}
......
......@@ -491,6 +491,7 @@ import (
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -924,7 +925,7 @@ retry:
// notify .wcfs/zhead
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
_, err := fmt.Fprintf(xio.BindWriter(sk, ctx), "%s\n", δZ.Tid)
if err != nil {
log.Errorf("%s", err) // XXX errctx + file, handle, reader pid
sk.Close()
......@@ -1731,13 +1732,14 @@ func (wlink *WatchLink) serve() {
func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk)
ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg, ctx := errgroup.WithContext(ctx)
r := bufio.NewReader(xio.BindReader(wlink.sk, ctx))
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
......@@ -1908,10 +1910,9 @@ func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) err
wlink.txMu.Lock()
defer wlink.txMu.Unlock()
// XXX timeout write on ctx cancel
pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt)
_, err := wlink.sk.Write(pkt)
_, err := wlink.sk.Write(ctx, pkt)
if err != nil {
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment