Commit 77e8f7f7 authored by Aaron Jacobs's avatar Aaron Jacobs

Add option for direct I/O.

See #25.
parents bf339162 df7caed6
......@@ -561,6 +561,10 @@ func (c *Connection) kernelResponseForOp(
out.OpenFlags |= uint32(fusekernel.OpenKeepCache)
}
if o.UseDirectIO {
out.OpenFlags |= uint32(fusekernel.OpenDirectIO)
}
case *fuseops.ReadFileOp:
// convertInMessage already set up the destination buffer to be at the end
// of the out message. We need only shrink to the right size based on how
......
......@@ -564,6 +564,15 @@ type OpenFileOp struct {
// is set to true, regardless of its value, at least for files opened in the
// same mode. (Cf. https://github.com/osxfuse/osxfuse/issues/223)
KeepPageCache bool
// Whether to use direct IO for this file handle. By default, the kernel
// suppresses what it sees as redundant operations (including reads beyond
// the precomputed EOF).
//
// Enabling direct IO ensures that all client operations reach the fuse
// layer. This allows for filesystems whose file sizes are not known in
// advance, for example, because contents are generated on the fly.
UseDirectIO bool
}
// Read data from a file previously opened with CreateFile or OpenFile.
......@@ -591,6 +600,8 @@ type ReadFileOp struct {
// (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand
// where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned
// by a previous call to LookUpInode, GetInodeAttributes, etc.
//
// If direct IO is enabled, semantics should match those of read(2).
BytesRead int
}
......
package dynamicfs
import (
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"github.com/jacobsa/timeutil"
)
// Create a file system that contains 2 files (`age` and `weekday`) and no
// directories. Every time the `age` file is opened, its contents are refreshed
// to show the number of seconds elapsed since the file system was created (as
// opposed to mounted). Every time the `weekday` file is opened, its contents
// are refreshed to reflect the current weekday.
//
// The contents of both of these files is updated within the filesystem itself,
// i.e., these changes do not go through the kernel. Additionally, file access
// times are not updated and file size is not known in advance and is set to 0.
// This simulates a filesystem that is backed by a dynamic data source where
// file metadata is not necessarily known before the file is read. For example,
// a filesystem backed by an expensive RPC or by a stream that's generated on
// the fly might not know data size ahead of time.
//
// This implementation depends on direct IO in fuse. Without it, all read
// operations are suppressed because the kernel detects that they read beyond
// the end of the files.
func NewDynamicFS(clock timeutil.Clock) (server fuse.Server, err error) {
createTime := clock.Now()
fs := &dynamicFS{
clock: clock,
createTime: createTime,
fileHandles: make(map[fuseops.HandleID]string),
}
server = fuseutil.NewFileSystemServer(fs)
return
}
type dynamicFS struct {
fuseutil.NotImplementedFileSystem
mu sync.Mutex
clock timeutil.Clock
createTime time.Time
nextHandle fuseops.HandleID
fileHandles map[fuseops.HandleID]string
}
const (
rootInode fuseops.InodeID = fuseops.RootInodeID + iota
ageInode
weekdayInode
)
type inodeInfo struct {
attributes fuseops.InodeAttributes
// File or directory?
dir bool
// For directories, children.
children []fuseutil.Dirent
}
// We have a fixed directory structure.
var gInodeInfo = map[fuseops.InodeID]inodeInfo{
// root
rootInode: {
attributes: fuseops.InodeAttributes{
Nlink: 1,
Mode: 0555 | os.ModeDir,
},
dir: true,
children: []fuseutil.Dirent{
{
Offset: 1,
Inode: ageInode,
Name: "age",
Type: fuseutil.DT_File,
},
{
Offset: 2,
Inode: weekdayInode,
Name: "weekday",
Type: fuseutil.DT_File,
},
},
},
// age
ageInode: {
attributes: fuseops.InodeAttributes{
Nlink: 1,
Mode: 0444,
},
},
// weekday
weekdayInode: {
attributes: fuseops.InodeAttributes{
Nlink: 1,
Mode: 0444,
// Size left at 0.
},
},
}
func findChildInode(
name string,
children []fuseutil.Dirent) (inode fuseops.InodeID, err error) {
for _, e := range children {
if e.Name == name {
inode = e.Inode
return
}
}
err = fuse.ENOENT
return
}
func (fs *dynamicFS) findUnusedHandle() fuseops.HandleID {
// TODO: Mutex annotation?
handle := fs.nextHandle
for _, exists := fs.fileHandles[handle]; exists; _, exists = fs.fileHandles[handle] {
handle++
}
fs.nextHandle = handle + 1
return handle
}
func (fs *dynamicFS) GetInodeAttributes(
ctx context.Context,
op *fuseops.GetInodeAttributesOp) (err error) {
// Find the info for this inode.
info, ok := gInodeInfo[op.Inode]
if !ok {
err = fuse.ENOENT
return
}
// Copy over its attributes.
op.Attributes = info.attributes
return
}
func (fs *dynamicFS) LookUpInode(
ctx context.Context,
op *fuseops.LookUpInodeOp) (err error) {
// Find the info for the parent.
parentInfo, ok := gInodeInfo[op.Parent]
if !ok {
err = fuse.ENOENT
return
}
// Find the child within the parent.
childInode, err := findChildInode(op.Name, parentInfo.children)
if err != nil {
return
}
// Copy over information.
op.Entry.Child = childInode
op.Entry.Attributes = gInodeInfo[childInode].attributes
return
}
func (fs *dynamicFS) OpenDir(
ctx context.Context,
op *fuseops.OpenDirOp) (err error) {
// Allow opening directory.
return
}
func (fs *dynamicFS) ReadDir(
ctx context.Context,
op *fuseops.ReadDirOp) (err error) {
// Find the info for this inode.
info, ok := gInodeInfo[op.Inode]
if !ok {
err = fuse.ENOENT
return
}
if !info.dir {
err = fuse.EIO
return
}
entries := info.children
// Grab the range of interest.
if op.Offset > fuseops.DirOffset(len(entries)) {
err = fuse.EIO
return
}
entries = entries[op.Offset:]
// Resume at the specified offset into the array.
for _, e := range entries {
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], e)
if n == 0 {
break
}
op.BytesRead += n
}
return
}
func (fs *dynamicFS) OpenFile(
ctx context.Context,
op *fuseops.OpenFileOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
var contents string
// Update file contents on (and only on) open.
switch op.Inode {
case ageInode:
now := fs.clock.Now()
ageInSeconds := int(now.Sub(fs.createTime).Seconds())
contents = fmt.Sprintf("This filesystem is %d seconds old.", ageInSeconds)
case weekdayInode:
contents = fmt.Sprintf("Today is %s.", fs.clock.Now().Weekday())
default:
err = fuse.EINVAL
return
}
handle := fs.findUnusedHandle()
fs.fileHandles[handle] = contents
op.UseDirectIO = true
op.Handle = handle
return
}
func (fs *dynamicFS) ReadFile(
ctx context.Context,
op *fuseops.ReadFileOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
contents, ok := fs.fileHandles[op.Handle]
if !ok {
log.Printf("ReadFile: no open file handle: %d", op.Handle)
err = fuse.EIO
return
}
reader := strings.NewReader(contents)
op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset)
if err == io.EOF {
err = nil
}
return
}
func (fs *dynamicFS) ReleaseFileHandle(
ctx context.Context,
op *fuseops.ReleaseFileHandleOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
_, ok := fs.fileHandles[op.Handle]
if !ok {
log.Printf("ReleaseFileHandle: bad handle: %d", op.Handle)
err = fuse.EIO
return
}
delete(fs.fileHandles, op.Handle)
return
}
func (fs *dynamicFS) StatFS(ctx context.Context,
op *fuseops.StatFSOp) (err error) {
return
}
package dynamicfs_test
import (
"testing"
"github.com/jacobsa/fuse/fusetesting"
"github.com/jacobsa/fuse/samples"
"github.com/jacobsa/fuse/samples/dynamicfs"
"bytes"
"fmt"
"io/ioutil"
"os"
"path"
"syscall"
"time"
. "github.com/jacobsa/oglematchers"
. "github.com/jacobsa/ogletest"
)
func TestDynamicFS(t *testing.T) { RunTests(t) }
type DynamicFSTest struct {
samples.SampleTest
}
func init() {
RegisterTestSuite(&DynamicFSTest{})
}
var gCreateTime = time.Date(2017, 5, 4, 14, 53, 10, 0, time.UTC)
func (t *DynamicFSTest) SetUp(ti *TestInfo) {
var err error
t.Clock.SetTime(gCreateTime)
t.Server, err = dynamicfs.NewDynamicFS(&t.Clock)
AssertEq(nil, err)
t.SampleTest.SetUp(ti)
}
func (t *DynamicFSTest) ReadDir_Root() {
entries, err := fusetesting.ReadDirPicky(t.Dir)
AssertEq(nil, err)
AssertEq(2, len(entries))
var fi os.FileInfo
fi = entries[0]
ExpectEq("age", fi.Name())
ExpectEq(0, fi.Size())
ExpectEq(0444, fi.Mode())
ExpectFalse(fi.IsDir())
fi = entries[1]
ExpectEq("weekday", fi.Name())
ExpectEq(0, fi.Size())
ExpectEq(0444, fi.Mode())
ExpectFalse(fi.IsDir())
}
func (t *DynamicFSTest) ReadDir_NonExistent() {
_, err := fusetesting.ReadDirPicky(path.Join(t.Dir, "nosuchfile"))
AssertNe(nil, err)
ExpectThat(err, Error(HasSubstr("no such file")))
}
func (t *DynamicFSTest) Stat_Age() {
fi, err := os.Stat(path.Join(t.Dir, "age"))
AssertEq(nil, err)
ExpectEq("age", fi.Name())
ExpectEq(0, fi.Size())
ExpectEq(0444, fi.Mode())
ExpectFalse(fi.IsDir())
ExpectEq(1, fi.Sys().(*syscall.Stat_t).Nlink)
}
func (t *DynamicFSTest) Stat_Weekday() {
fi, err := os.Stat(path.Join(t.Dir, "weekday"))
AssertEq(nil, err)
ExpectEq("weekday", fi.Name())
ExpectEq(0, fi.Size())
ExpectEq(0444, fi.Mode())
ExpectFalse(fi.IsDir())
ExpectEq(1, fi.Sys().(*syscall.Stat_t).Nlink)
}
func (t *DynamicFSTest) Stat_NonExistent() {
_, err := os.Stat(path.Join(t.Dir, "nosuchfile"))
AssertNe(nil, err)
ExpectThat(err, Error(HasSubstr("no such file")))
}
func (t *DynamicFSTest) ReadFile_AgeZero() {
t.Clock.SetTime(gCreateTime)
slice, err := ioutil.ReadFile(path.Join(t.Dir, "age"))
AssertEq(nil, err)
ExpectEq("This filesystem is 0 seconds old.", string(slice))
}
func (t *DynamicFSTest) ReadFile_Age1000() {
t.Clock.SetTime(gCreateTime.Add(1000 * time.Second))
slice, err := ioutil.ReadFile(path.Join(t.Dir, "age"))
AssertEq(nil, err)
ExpectEq("This filesystem is 1000 seconds old.", string(slice))
}
func (t *DynamicFSTest) ReadFile_WeekdayNow() {
now := t.Clock.Now()
// Does simulated clock advance itself by default?
// Manually set time to ensure it's frozen.
t.Clock.SetTime(now)
slice, err := ioutil.ReadFile(path.Join(t.Dir, "weekday"))
AssertEq(nil, err)
ExpectEq(fmt.Sprintf("Today is %s.", now.Weekday().String()), string(slice))
}
func (t *DynamicFSTest) ReadFile_WeekdayCreateTime() {
t.Clock.SetTime(gCreateTime)
slice, err := ioutil.ReadFile(path.Join(t.Dir, "weekday"))
AssertEq(nil, err)
ExpectEq(fmt.Sprintf("Today is %s.", gCreateTime.Weekday().String()), string(slice))
}
func (t *DynamicFSTest) ReadFile_AgeUnchangedForHandle() {
t.Clock.SetTime(gCreateTime.Add(100 * time.Second))
var err error
var file *os.File
file, err = os.Open(path.Join(t.Dir, "age"))
AssertEq(nil, err)
// Ensure that all reads from the same handle return the contents created at
// file open time.
func(file *os.File) {
defer file.Close()
var expectedContents string
var buffer bytes.Buffer
var bytesRead int64
expectedContents = "This filesystem is 100 seconds old."
bytesRead, err = buffer.ReadFrom(file)
AssertEq(nil, err)
ExpectEq(len(expectedContents), bytesRead)
ExpectEq(expectedContents, buffer.String())
t.Clock.SetTime(gCreateTime.Add(1000 * time.Second))
// Seek back to the beginning of the file. The contents should be unchanged
// for the life of the file handle.
_, err = file.Seek(0, 0)
AssertEq(nil, err)
buffer.Reset()
bytesRead, err = buffer.ReadFrom(file)
AssertEq(nil, err)
ExpectEq(len(expectedContents), bytesRead)
ExpectEq(expectedContents, buffer.String())
}(file)
// The clock was advanced while the old handle was open. The content change
// should be reflected by the new handle.
file, err = os.Open(path.Join(t.Dir, "age"))
AssertEq(nil, err)
func(file *os.File) {
defer file.Close()
expectedContents := "This filesystem is 1000 seconds old."
buffer := bytes.Buffer{}
bytesRead, err := buffer.ReadFrom(file)
AssertEq(nil, err)
ExpectEq(len(expectedContents), bytesRead)
ExpectEq(expectedContents, buffer.String())
}(file)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment