Commit 01144536 authored by Aaron Jacobs's avatar Aaron Jacobs

Expanded interruptfs to also cover testing interrupts during flush.

parents 94e31a27 be5974e9
......@@ -39,22 +39,29 @@ var fooAttrs = fuseops.InodeAttributes{
Size: 1234,
}
// A file system containing exactly one file, named "foo". Reads to the file
// always hang until interrupted. Exposes a method for synchronizing with the
// arrival of a read.
// A file system containing exactly one file, named "foo". ReadFile and
// FlushFile ops can be made to hang until interrupted. Exposes a method for
// synchronizing with the arrival of a read or a flush.
//
// Must be created with New.
type InterruptFS struct {
fuseutil.NotImplementedFileSystem
mu sync.Mutex
readInFlight bool
readInFlightChanged sync.Cond
mu sync.Mutex
blockForReads bool // GUARDED_BY(mu)
blockForFlushes bool // GUARDED_BY(mu)
// Must hold the mutex when closing these.
readReceived chan struct{}
flushReceived chan struct{}
}
func New() (fs *InterruptFS) {
fs = &InterruptFS{}
fs.readInFlightChanged.L = &fs.mu
fs = &InterruptFS{
readReceived: make(chan struct{}),
flushReceived: make(chan struct{}),
}
return
}
......@@ -64,15 +71,29 @@ func New() (fs *InterruptFS) {
////////////////////////////////////////////////////////////////////////
// Block until the first read is received.
//
// LOCKS_EXCLUDED(fs.mu)
func (fs *InterruptFS) WaitForReadInFlight() {
func (fs *InterruptFS) WaitForFirstRead() {
<-fs.readReceived
}
// Block until the first flush is received.
func (fs *InterruptFS) WaitForFirstFlush() {
<-fs.flushReceived
}
// Enable blocking until interrupted for the next (and subsequent) read ops.
func (fs *InterruptFS) EnableReadBlocking() {
fs.mu.Lock()
defer fs.mu.Unlock()
for !fs.readInFlight {
fs.readInFlightChanged.Wait()
}
fs.blockForReads = true
}
// Enable blocking until interrupted for the next (and subsequent) flush ops.
func (fs *InterruptFS) EnableFlushBlocking() {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.blockForFlushes = true
}
////////////////////////////////////////////////////////////////////////
......@@ -128,22 +149,55 @@ func (fs *InterruptFS) OpenFile(
func (fs *InterruptFS) ReadFile(
ctx context.Context,
op *fuseops.ReadFileOp) (err error) {
// Signal that a read has been received.
fs.mu.Lock()
fs.readInFlight = true
fs.readInFlightChanged.Broadcast()
shouldBlock := fs.blockForReads
// Signal that a read has been received, if this is the first.
select {
case <-fs.readReceived:
default:
close(fs.readReceived)
}
fs.mu.Unlock()
// Wait for cancellation.
done := ctx.Done()
if done == nil {
panic("Expected non-nil channel.")
// Wait for cancellation if enabled.
if shouldBlock {
done := ctx.Done()
if done == nil {
panic("Expected non-nil channel.")
}
<-done
err = ctx.Err()
}
return
}
func (fs *InterruptFS) FlushFile(
ctx context.Context,
op *fuseops.FlushFileOp) (err error) {
fs.mu.Lock()
shouldBlock := fs.blockForFlushes
// Signal that a flush has been received, if this is the first.
select {
case <-fs.flushReceived:
default:
close(fs.flushReceived)
}
fs.mu.Unlock()
<-done
// Wait for cancellation if enabled.
if shouldBlock {
done := ctx.Done()
if done == nil {
panic("Expected non-nil channel.")
}
// Return the context's error.
err = ctx.Err()
<-done
err = ctx.Err()
}
return
}
......@@ -73,6 +73,7 @@ func (t *InterruptFSTest) StatFoo() {
func (t *InterruptFSTest) InterruptedDuringRead() {
var err error
t.fs.EnableReadBlocking()
// Start a sub-process that attempts to read the file.
cmd := exec.Command("cat", path.Join(t.Dir, "foo"))
......@@ -92,7 +93,7 @@ func (t *InterruptFSTest) InterruptedDuringRead() {
}()
// Wait for the read to make it to the file system.
t.fs.WaitForReadInFlight()
t.fs.WaitForFirstRead()
// The command should be hanging on the read, and not yet have returned.
select {
......@@ -111,3 +112,45 @@ func (t *InterruptFSTest) InterruptedDuringRead() {
ExpectThat(err, Error(HasSubstr("signal")))
ExpectThat(err, Error(HasSubstr("interrupt")))
}
func (t *InterruptFSTest) InterruptedDuringFlush() {
var err error
t.fs.EnableFlushBlocking()
// Start a sub-process that attempts to read the file.
cmd := exec.Command("cat", path.Join(t.Dir, "foo"))
var cmdOutput bytes.Buffer
cmd.Stdout = &cmdOutput
cmd.Stderr = &cmdOutput
err = cmd.Start()
AssertEq(nil, err)
// Wait for the command in the background, writing to a channel when it is
// finished.
cmdErr := make(chan error)
go func() {
cmdErr <- cmd.Wait()
}()
// Wait for the flush to make it to the file system.
t.fs.WaitForFirstFlush()
// The command should be hanging on the flush, and not yet have returned.
select {
case err = <-cmdErr:
AddFailure("Command returned early with error: %v", err)
AbortTest()
case <-time.After(10 * time.Millisecond):
}
// Send SIGINT.
cmd.Process.Signal(os.Interrupt)
// Now the command should return, with an appropriate error.
err = <-cmdErr
ExpectThat(err, Error(HasSubstr("signal")))
ExpectThat(err, Error(HasSubstr("interrupt")))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment