Commit 05bfe267 authored by Aaron Jacobs's avatar Aaron Jacobs

Added support for kernel interrupt requests.

An interrupt for the kernel will cause the original op's context to be
cancelled.

For GoogleCloudPlatform/gcsfuse#41.
parents 2cbbc756 fe90f319
......@@ -38,6 +38,14 @@ type Connection struct {
// For logging purposes only.
nextOpID uint32
mu sync.Mutex
// A map from bazilfuse request ID (*not* the op ID for logging used above)
// to a function that cancel's its associated context.
//
// GUARDED_BY(mu)
cancelFuncs map[bazilfuse.RequestID]func()
}
// Responsibility for closing the wrapped connection is transferred to the
......@@ -47,9 +55,10 @@ func newConnection(
logger *log.Logger,
wrapped *bazilfuse.Conn) (c *Connection, err error) {
c = &Connection{
logger: logger,
wrapped: wrapped,
parentCtx: parentCtx,
logger: logger,
wrapped: wrapped,
parentCtx: parentCtx,
cancelFuncs: make(map[bazilfuse.RequestID]func()),
}
return
......@@ -85,28 +94,121 @@ func (c *Connection) log(
c.logger.Println(msg)
}
// Set up state for an op that is about to be returned to the user.
func (c *Connection) beginOp() {
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) recordCancelFunc(
reqID bazilfuse.RequestID,
f func()) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.cancelFuncs[reqID]; ok {
panic(fmt.Sprintf("Already have cancel func for request %v", reqID))
}
c.cancelFuncs[reqID] = f
}
// Set up state for an op that is about to be returned to the user, given its
// underlying bazilfuse request.
//
// Return a context that should be used for the op.
//
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) beginOp(
bfReq bazilfuse.Request) (ctx context.Context) {
reqID := bfReq.Hdr().ID
// Note that the op is in flight.
c.opsInFlight.Add(1)
// Set up a cancellation function.
//
// Special case: On Darwin, osxfuse appears to aggressively reuse "unique"
// request IDs. This matters for Forget requests, which have no reply
// associated and therefore appear to have IDs that are immediately eligible
// for reuse. For these, we should not record any state keyed on their ID.
//
// Cf. https://github.com/osxfuse/osxfuse/issues/208
ctx = c.parentCtx
if _, ok := bfReq.(*bazilfuse.ForgetRequest); !ok {
var cancel func()
ctx, cancel = context.WithCancel(ctx)
c.recordCancelFunc(reqID, cancel)
}
return
}
// Clean up all state associated with an op to which the user has responded.
func (c *Connection) finishOp() {
// Clean up all state associated with an op to which the user has responded,
// given its underlying bazilfuse request. This must be called before a
// response is sent to the kernel, to avoid a race where the request's ID might
// be reused by osxfuse.
//
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) finishOp(bfReq bazilfuse.Request) {
c.mu.Lock()
defer c.mu.Unlock()
reqID := bfReq.Hdr().ID
// Even though the op is finished, context.WithCancel requires us to arrange
// for the cancellation function to be invoked. We also must remove it from
// our map.
//
// Special case: we don't do this for Forget requests. See the note in
// beginOp above.
if _, ok := bfReq.(*bazilfuse.ForgetRequest); !ok {
cancel, ok := c.cancelFuncs[reqID]
if !ok {
panic(fmt.Sprintf("Unknown request ID in finishOp: %v", reqID))
}
cancel()
delete(c.cancelFuncs, reqID)
}
// Decrement the in-flight counter.
c.opsInFlight.Done()
}
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) handleInterrupt(req *bazilfuse.InterruptRequest) {
c.mu.Lock()
defer c.mu.Unlock()
// NOTE(jacobsa): fuse.txt in the Linux kernel documentation
// (https://goo.gl/H55Dnr) defines the kernel <-> userspace protocol for
// interrupts.
//
// In particular, my reading of it is that an interrupt request cannot be
// delivered to userspace before the original request. The part about the
// race and EAGAIN appears to be aimed at userspace programs that
// concurrently process requests.
//
// So in this method we assume that if we can't find the ID to be
// interrupted, it means that the request has already been replied to.
cancel, ok := c.cancelFuncs[req.IntrID]
if !ok {
return
}
cancel()
}
// Read the next op from the kernel process. Return io.EOF if the kernel has
// closed the connection.
//
// This function delivers ops in exactly the order they are received from
// /dev/fuse. It must not be called multiple times concurrently.
//
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
var bfReq bazilfuse.Request
// Keep going until we find a request we know how to convert.
for {
// Read a bazilfuse request.
var bfReq bazilfuse.Request
bfReq, err = c.wrapped.ReadRequest()
if err != nil {
return
}
......@@ -118,8 +220,8 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
// Log the receipt of the operation.
c.log(opID, 1, "<- %v", bfReq)
// Special case: responding to this is required to make mounting work on OS
// X. We don't currently expose the capability for the file system to
// Special case: responding to statfs is required to make mounting work on
// OS X. We don't currently expose the capability for the file system to
// intercept this.
if statfsReq, ok := bfReq.(*bazilfuse.StatfsRequest); ok {
c.log(opID, 1, "-> (Statfs) OK")
......@@ -127,15 +229,22 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
continue
}
// Convert it.
// Special case: handle interrupt requests.
if interruptReq, ok := bfReq.(*bazilfuse.InterruptRequest); ok {
c.handleInterrupt(interruptReq)
continue
}
// Set up op dependencies.
opCtx := c.beginOp(bfReq)
logForOp := func(calldepth int, format string, v ...interface{}) {
c.log(opID, calldepth+1, format, v...)
}
finished := func(err error) { c.finishOp() }
finished := func(err error) { c.finishOp(bfReq) }
op = fuseops.Convert(c.parentCtx, bfReq, logForOp, finished)
c.beginOp()
op = fuseops.Convert(opCtx, bfReq, logForOp, finished)
return
}
}
......
......@@ -208,8 +208,8 @@ func (o *commonOp) Logf(format string, v ...interface{}) {
}
func (o *commonOp) Respond(err error) {
// Don't forget to report back to the connection that we are finished.
defer o.finished(err)
// Report that the user is responding.
o.finished(err)
// If successful, we should respond to bazilfuse with the appropriate struct.
if err == nil {
......
......@@ -27,7 +27,7 @@ import (
//
// Convert the supplied bazilfuse request struct to an Op. finished will be
// called with the error supplied to o.Respond when the user invokes that
// method.
// method, before a response is sent to the kernel.
//
// It is guaranteed that o != nil. If the op is unknown, a special unexported
// type will be used.
......@@ -197,15 +197,17 @@ func Convert(
case *bazilfuse.FsyncRequest:
// We don't currently support this for directories.
if typed.Dir {
return
}
to := &SyncFileOp{
Inode: InodeID(typed.Header.Node),
Handle: HandleID(typed.Handle),
to := &unknownOp{}
io = to
co = &to.commonOp
} else {
to := &SyncFileOp{
Inode: InodeID(typed.Header.Node),
Handle: HandleID(typed.Handle),
}
io = to
co = &to.commonOp
}
io = to
co = &to.commonOp
case *bazilfuse.FlushRequest:
to := &FlushFileOp{
......
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package interruptfs
import (
"fmt"
"os"
"sync"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
)
var rootAttrs = fuseops.InodeAttributes{
Nlink: 1,
Mode: os.ModeDir | 0777,
}
const fooID = fuseops.RootInodeID + 1
var fooAttrs = fuseops.InodeAttributes{
Nlink: 1,
Mode: 0777,
Size: 1234,
}
// A file system containing exactly one file, named "foo". Reads to the file
// always hang until interrupted. Exposes a method for synchronizing with the
// arrival of a read.
//
// Must be created with New.
type InterruptFS struct {
fuseutil.NotImplementedFileSystem
mu sync.Mutex
readInFlight bool
readInFlightChanged sync.Cond
}
func New() (fs *InterruptFS) {
fs = &InterruptFS{}
fs.readInFlightChanged.L = &fs.mu
return
}
////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
// Block until the first read is received.
//
// LOCKS_EXCLUDED(fs.mu)
func (fs *InterruptFS) WaitForReadInFlight() {
fs.mu.Lock()
defer fs.mu.Unlock()
for !fs.readInFlight {
fs.readInFlightChanged.Wait()
}
}
////////////////////////////////////////////////////////////////////////
// FileSystem methods
////////////////////////////////////////////////////////////////////////
func (fs *InterruptFS) Init(
op *fuseops.InitOp) {
var err error
defer fuseutil.RespondToOp(op, &err)
return
}
func (fs *InterruptFS) LookUpInode(
op *fuseops.LookUpInodeOp) {
var err error
defer fuseutil.RespondToOp(op, &err)
// We support only one parent.
if op.Parent != fuseops.RootInodeID {
err = fmt.Errorf("Unexpected parent: %v", op.Parent)
return
}
// We support only one name.
if op.Name != "foo" {
err = fuse.ENOENT
return
}
// Fill in the response.
op.Entry.Child = fooID
op.Entry.Attributes = fooAttrs
return
}
func (fs *InterruptFS) GetInodeAttributes(
op *fuseops.GetInodeAttributesOp) {
var err error
defer fuseutil.RespondToOp(op, &err)
switch op.Inode {
case fuseops.RootInodeID:
op.Attributes = rootAttrs
case fooID:
op.Attributes = fooAttrs
default:
err = fmt.Errorf("Unexpected inode ID: %v", op.Inode)
return
}
return
}
func (fs *InterruptFS) OpenFile(
op *fuseops.OpenFileOp) {
var err error
defer fuseutil.RespondToOp(op, &err)
return
}
func (fs *InterruptFS) ReadFile(
op *fuseops.ReadFileOp) {
var err error
defer fuseutil.RespondToOp(op, &err)
// Signal that a read has been received.
fs.mu.Lock()
fs.readInFlight = true
fs.readInFlightChanged.Broadcast()
fs.mu.Unlock()
// Wait for cancellation.
done := op.Context().Done()
if done == nil {
panic("Expected non-nil channel.")
}
<-done
// Return the context's error.
err = op.Context().Err()
return
}
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package interruptfs_test
import (
"bytes"
"os"
"os/exec"
"path"
"testing"
"time"
"github.com/jacobsa/fuse/fuseutil"
"github.com/jacobsa/fuse/samples"
"github.com/jacobsa/fuse/samples/interruptfs"
. "github.com/jacobsa/oglematchers"
. "github.com/jacobsa/ogletest"
)
func TestInterruptFS(t *testing.T) { RunTests(t) }
////////////////////////////////////////////////////////////////////////
// Boilerplate
////////////////////////////////////////////////////////////////////////
type InterruptFSTest struct {
samples.SampleTest
fs *interruptfs.InterruptFS
}
func init() { RegisterTestSuite(&InterruptFSTest{}) }
var _ SetUpInterface = &InterruptFSTest{}
var _ TearDownInterface = &InterruptFSTest{}
func (t *InterruptFSTest) SetUp(ti *TestInfo) {
var err error
// Create the file system.
t.fs = interruptfs.New()
AssertEq(nil, err)
t.Server = fuseutil.NewFileSystemServer(t.fs)
// Mount it.
t.SampleTest.SetUp(ti)
}
////////////////////////////////////////////////////////////////////////
// Test functions
////////////////////////////////////////////////////////////////////////
func (t *InterruptFSTest) StatFoo() {
fi, err := os.Stat(path.Join(t.Dir, "foo"))
AssertEq(nil, err)
ExpectEq("foo", fi.Name())
ExpectEq(0777, fi.Mode())
ExpectFalse(fi.IsDir())
}
func (t *InterruptFSTest) InterruptedDuringRead() {
var err error
// Start a sub-process that attempts to read the file.
cmd := exec.Command("cat", path.Join(t.Dir, "foo"))
var cmdOutput bytes.Buffer
cmd.Stdout = &cmdOutput
cmd.Stderr = &cmdOutput
err = cmd.Start()
AssertEq(nil, err)
// Wait for the command in the background, writing to a channel when it is
// finished.
cmdErr := make(chan error)
go func() {
cmdErr <- cmd.Wait()
}()
// Wait for the read to make it to the file system.
t.fs.WaitForReadInFlight()
// The command should be hanging on the read, and not yet have returned.
select {
case err = <-cmdErr:
AddFailure("Command returned early with error: %v", err)
AbortTest()
case <-time.After(10 * time.Millisecond):
}
// Send SIGINT.
cmd.Process.Signal(os.Interrupt)
// Now the command should return, with an appropriate error.
err = <-cmdErr
ExpectThat(err, Error(HasSubstr("signal")))
ExpectThat(err, Error(HasSubstr("interrupt")))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment