Commit c69cd3b5 authored by Aaron Jacobs's avatar Aaron Jacobs

Don't depend on fuseshim.Message.

parents 6dd4bf3b 4de334e0
......@@ -16,14 +16,18 @@ package fuse
import (
"fmt"
"io"
"log"
"os"
"path"
"runtime"
"sync"
"syscall"
"golang.org/x/net/context"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/internal/buffer"
"github.com/jacobsa/fuse/internal/fusekernel"
"github.com/jacobsa/fuse/internal/fuseshim"
)
......@@ -202,6 +206,60 @@ func (c *Connection) handleInterrupt(fuseID uint64) {
cancel()
}
func (c *Connection) allocateInMessage() (m *buffer.InMessage) {
// TODO(jacobsa): Use a freelist.
m = new(buffer.InMessage)
return
}
func (c *Connection) destroyInMessage(m *buffer.InMessage) {
// TODO(jacobsa): Use a freelist.
}
// Read the next message from the kernel. The message must later be destroyed
// using destroyInMessage.
func (c *Connection) readMessage() (m *buffer.InMessage, err error) {
// Allocate a message.
m = c.allocateInMessage()
// Loop past transient errors.
for {
// Lock and read.
//
// TODO(jacobsa): Ensure that we document concurrency constraints that make
// it safe, then kill the lock here.
c.wrapped.Rio.RLock()
err = m.Init(c.wrapped.Dev)
c.wrapped.Rio.RUnlock()
// Special cases:
//
// * ENODEV means fuse has hung up.
//
// * EINTR means we should try again. (This seems to happen often on
// OS X, cf. http://golang.org/issue/11180)
//
if pe, ok := err.(*os.PathError); ok {
switch pe.Err {
case syscall.ENODEV:
err = io.EOF
case syscall.EINTR:
err = nil
continue
}
}
if err != nil {
c.destroyInMessage(m)
m = nil
return
}
return
}
}
// Read the next op from the kernel process. Return io.EOF if the kernel has
// closed the connection.
//
......@@ -212,9 +270,9 @@ func (c *Connection) handleInterrupt(fuseID uint64) {
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
// Keep going until we find a request we know how to convert.
for {
// Read the next message from the fuseshim connection.
var m *fuseshim.Message
m, err = c.wrapped.ReadMessage()
// Read the next message from the kernel.
var m *buffer.InMessage
m, err = c.readMessage()
if err != nil {
return
}
......@@ -224,7 +282,7 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
c.nextOpID++
// Set up op dependencies.
opCtx := c.beginOp(m.Hdr.Opcode, m.Hdr.Unique)
opCtx := c.beginOp(m.Header().Opcode, m.Header().Unique)
var debugLogForOp func(int, string, ...interface{})
if c.debugLogger != nil {
......@@ -238,12 +296,11 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
fuseID uint64,
replyMsg []byte,
opErr error) (err error) {
// Make sure we destroy the message, as required by
// fuseshim.Connection.ReadMessage.
defer m.Destroy()
// Make sure we destroy the message, as required by readMessage.
defer c.destroyInMessage(m)
// Clean up state for this op.
c.finishOp(m.Hdr.Opcode, m.Hdr.Unique)
c.finishOp(m.Header().Opcode, m.Header().Unique)
// Debug logging
if c.debugLogger != nil {
......
......@@ -36,7 +36,7 @@ type internalOp interface {
//
// Special case: a zero return value means that the kernel is not expecting a
// response.
kernelResponse() (b buffer.Buffer)
kernelResponse() (b buffer.OutMessage)
}
// A function that sends a reply message back to the kernel for the request
......@@ -142,11 +142,11 @@ func (o *commonOp) Respond(err error) {
// If successful, we ask the op for an appopriate response to the kernel, and
// it is responsible for leaving room for the fusekernel.OutHeader struct.
// Otherwise, create our own.
var b buffer.Buffer
var b buffer.OutMessage
if err == nil {
b = o.op.kernelResponse()
} else {
b = buffer.New(0)
b = buffer.NewOutMessage(0)
}
// Fill in the header if a reply is needed.
......
This diff is collapsed.
......@@ -88,9 +88,9 @@ func (o *LookUpInodeOp) ShortDesc() (desc string) {
return
}
func (o *LookUpInodeOp) kernelResponse() (b buffer.Buffer) {
func (o *LookUpInodeOp) kernelResponse() (b buffer.OutMessage) {
size := fusekernel.EntryOutSize(o.protocol)
b = buffer.New(size)
b = buffer.NewOutMessage(size)
out := (*fusekernel.EntryOut)(b.Grow(size))
convertChildInodeEntry(&o.Entry, out)
......@@ -123,9 +123,9 @@ func (o *GetInodeAttributesOp) DebugString() string {
o.Attributes.DebugString())
}
func (o *GetInodeAttributesOp) kernelResponse() (b buffer.Buffer) {
func (o *GetInodeAttributesOp) kernelResponse() (b buffer.OutMessage) {
size := fusekernel.AttrOutSize(o.protocol)
b = buffer.New(size)
b = buffer.NewOutMessage(size)
out := (*fusekernel.AttrOut)(b.Grow(size))
out.AttrValid, out.AttrValidNsec = convertExpirationTime(o.AttributesExpiration)
convertAttributes(o.Inode, &o.Attributes, &out.Attr)
......@@ -157,9 +157,9 @@ type SetInodeAttributesOp struct {
AttributesExpiration time.Time
}
func (o *SetInodeAttributesOp) kernelResponse() (b buffer.Buffer) {
func (o *SetInodeAttributesOp) kernelResponse() (b buffer.OutMessage) {
size := fusekernel.AttrOutSize(o.protocol)
b = buffer.New(size)
b = buffer.NewOutMessage(size)
out := (*fusekernel.AttrOut)(b.Grow(size))
out.AttrValid, out.AttrValidNsec = convertExpirationTime(o.AttributesExpiration)
convertAttributes(o.Inode, &o.Attributes, &out.Attr)
......@@ -216,7 +216,7 @@ type ForgetInodeOp struct {
N uint64
}
func (o *ForgetInodeOp) kernelResponse() (b buffer.Buffer) {
func (o *ForgetInodeOp) kernelResponse() (b buffer.OutMessage) {
// No response.
return
}
......@@ -259,9 +259,9 @@ func (o *MkDirOp) ShortDesc() (desc string) {
return
}
func (o *MkDirOp) kernelResponse() (b buffer.Buffer) {
func (o *MkDirOp) kernelResponse() (b buffer.OutMessage) {
size := fusekernel.EntryOutSize(o.protocol)
b = buffer.New(size)
b = buffer.NewOutMessage(size)
out := (*fusekernel.EntryOut)(b.Grow(size))
convertChildInodeEntry(&o.Entry, out)
......@@ -311,9 +311,9 @@ func (o *CreateFileOp) ShortDesc() (desc string) {
return
}
func (o *CreateFileOp) kernelResponse() (b buffer.Buffer) {
func (o *CreateFileOp) kernelResponse() (b buffer.OutMessage) {
eSize := fusekernel.EntryOutSize(o.protocol)
b = buffer.New(eSize + unsafe.Sizeof(fusekernel.OpenOut{}))
b = buffer.NewOutMessage(eSize + unsafe.Sizeof(fusekernel.OpenOut{}))
e := (*fusekernel.EntryOut)(b.Grow(eSize))
convertChildInodeEntry(&o.Entry, e)
......@@ -357,9 +357,9 @@ func (o *CreateSymlinkOp) ShortDesc() (desc string) {
return
}
func (o *CreateSymlinkOp) kernelResponse() (b buffer.Buffer) {
func (o *CreateSymlinkOp) kernelResponse() (b buffer.OutMessage) {
size := fusekernel.EntryOutSize(o.protocol)
b = buffer.New(size)
b = buffer.NewOutMessage(size)
out := (*fusekernel.EntryOut)(b.Grow(size))
convertChildInodeEntry(&o.Entry, out)
......@@ -418,8 +418,8 @@ type RenameOp struct {
NewName string
}
func (o *RenameOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *RenameOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -439,8 +439,8 @@ type RmDirOp struct {
Name string
}
func (o *RmDirOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *RmDirOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -459,8 +459,8 @@ type UnlinkOp struct {
Name string
}
func (o *UnlinkOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *UnlinkOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -491,8 +491,8 @@ type OpenDirOp struct {
Handle HandleID
}
func (o *OpenDirOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(unsafe.Sizeof(fusekernel.OpenOut{}))
func (o *OpenDirOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.OpenOut{}))
out := (*fusekernel.OpenOut)(b.Grow(unsafe.Sizeof(fusekernel.OpenOut{})))
out.Fh = uint64(o.Handle)
......@@ -589,8 +589,8 @@ type ReadDirOp struct {
Data []byte
}
func (o *ReadDirOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(uintptr(len(o.Data)))
func (o *ReadDirOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(uintptr(len(o.Data)))
b.Append(o.Data)
return
}
......@@ -612,8 +612,8 @@ type ReleaseDirHandleOp struct {
Handle HandleID
}
func (o *ReleaseDirHandleOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *ReleaseDirHandleOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -643,8 +643,8 @@ type OpenFileOp struct {
Handle HandleID
}
func (o *OpenFileOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(unsafe.Sizeof(fusekernel.OpenOut{}))
func (o *OpenFileOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.OpenOut{}))
out := (*fusekernel.OpenOut)(b.Grow(unsafe.Sizeof(fusekernel.OpenOut{})))
out.Fh = uint64(o.Handle)
......@@ -680,8 +680,8 @@ type ReadFileOp struct {
Data []byte
}
func (o *ReadFileOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(uintptr(len(o.Data)))
func (o *ReadFileOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(uintptr(len(o.Data)))
b.Append(o.Data)
return
}
......@@ -756,8 +756,8 @@ type WriteFileOp struct {
Data []byte
}
func (o *WriteFileOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(unsafe.Sizeof(fusekernel.WriteOut{}))
func (o *WriteFileOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.WriteOut{}))
out := (*fusekernel.WriteOut)(b.Grow(unsafe.Sizeof(fusekernel.WriteOut{})))
out.Size = uint32(len(o.Data))
......@@ -788,8 +788,8 @@ type SyncFileOp struct {
Handle HandleID
}
func (o *SyncFileOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *SyncFileOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -848,8 +848,8 @@ type FlushFileOp struct {
Handle HandleID
}
func (o *FlushFileOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *FlushFileOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -870,8 +870,8 @@ type ReleaseFileHandleOp struct {
Handle HandleID
}
func (o *ReleaseFileHandleOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(0)
func (o *ReleaseFileHandleOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(0)
return
}
......@@ -888,7 +888,7 @@ func (o *unknownOp) ShortDesc() (desc string) {
return
}
func (o *unknownOp) kernelResponse() (b buffer.Buffer) {
func (o *unknownOp) kernelResponse() (b buffer.OutMessage) {
panic(fmt.Sprintf("Should never get here for unknown op: %s", o.ShortDesc()))
}
......@@ -907,8 +907,8 @@ type ReadSymlinkOp struct {
Target string
}
func (o *ReadSymlinkOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(uintptr(len(o.Target)))
func (o *ReadSymlinkOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(uintptr(len(o.Target)))
b.AppendString(o.Target)
return
}
......@@ -929,8 +929,8 @@ type InternalStatFSOp struct {
commonOp
}
func (o *InternalStatFSOp) kernelResponse() (b buffer.Buffer) {
b = buffer.New(unsafe.Sizeof(fusekernel.StatfsOut{}))
func (o *InternalStatFSOp) kernelResponse() (b buffer.OutMessage) {
b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.StatfsOut{}))
b.Grow(unsafe.Sizeof(fusekernel.StatfsOut{}))
return
......@@ -942,6 +942,6 @@ type InternalInterruptOp struct {
FuseID uint64
}
func (o *InternalInterruptOp) kernelResponse() (b buffer.Buffer) {
func (o *InternalInterruptOp) kernelResponse() (b buffer.OutMessage) {
panic("Shouldn't get here.")
}
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"fmt"
"io"
"syscall"
"unsafe"
"github.com/jacobsa/fuse/internal/fusekernel"
)
// All requests read from the kernel, without data, are shorter than
// this.
const pageSize = 4096
func init() {
// Confirm the page size.
if syscall.Getpagesize() != pageSize {
panic(fmt.Sprintf("Page size is unexpectedly %d", syscall.Getpagesize()))
}
}
// We size the buffer to have enough room for a fuse request plus data
// associated with a write request.
const bufSize = pageSize + MaxWriteSize
// An incoming message from the kernel, including leading fusekernel.InHeader
// struct. Provides storage for messages and convenient access to their
// contents.
type InMessage struct {
remaining []byte
storage [bufSize]byte
}
// Initialize with the data read by a single call to r.Read. The first call to
// Consume will consume the bytes directly after the fusekernel.InHeader
// struct.
func (m *InMessage) Init(r io.Reader) (err error) {
n, err := r.Read(m.storage[:])
if err != nil {
return
}
// Make sure the message is long enough.
const headerSize = unsafe.Sizeof(fusekernel.InHeader{})
if uintptr(n) < headerSize {
err = fmt.Errorf("Unexpectedly read only %d bytes.", n)
return
}
m.remaining = m.storage[headerSize:n]
// Check the header's length.
if int(m.Header().Len) != n {
err = fmt.Errorf(
"Header says %d bytes, but we read %d",
m.Header().Len,
n)
return
}
return
}
// Return a reference to the header read in the most recent call to Init.
func (m *InMessage) Header() (h *fusekernel.InHeader) {
h = (*fusekernel.InHeader)(unsafe.Pointer(&m.storage[0]))
return
}
// Return the number of bytes left to consume.
func (m *InMessage) Len() uintptr {
return uintptr(len(m.remaining))
}
// Consume the next n bytes from the message, returning a nil pointer if there
// are fewer than n bytes available.
func (m *InMessage) Consume(n uintptr) (p unsafe.Pointer) {
if m.Len() == 0 || n > m.Len() {
return
}
p = unsafe.Pointer(&m.remaining[0])
m.remaining = m.remaining[n:]
return
}
// Equivalent to Consume, except returns a slice of bytes. The result will be
// nil if Consume would fail.
func (m *InMessage) ConsumeBytes(n uintptr) (b []byte) {
if n > m.Len() {
return
}
b = m.remaining[:n]
m.remaining = m.remaining[n:]
return
}
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
// The maximum fuse write request size that InMessage can acommodate.
//
// Experimentally, OS X appears to cap the size of writes to 1 MiB, regardless
// of whether a larger size is specified in the mount options.
const MaxWriteSize = 1 << 20
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
// The maximum fuse write request size that InMessage can acommodate.
//
// Experimentally, Linux appears to refuse to honor a MaxWrite setting in an
// INIT response of more than 128 KiB.
const MaxWriteSize = 1 << 17
......@@ -21,25 +21,26 @@ import (
"github.com/jacobsa/fuse/internal/fusekernel"
)
// Buffer provides a mechanism for constructing a single contiguous fuse
// OutMessage provides a mechanism for constructing a single contiguous fuse
// message from multiple segments, where the first segment is always a
// fusekernel.OutHeader message.
//
// Must be created with New. Exception: the zero value has Bytes() == nil.
type Buffer struct {
// Must be created with NewOutMessage. Exception: the zero value has
// Bytes() == nil.
type OutMessage struct {
slice []byte
}
// Create a new buffer whose initial contents are a zeroed fusekernel.OutHeader
// message, and with room enough to grow by extra bytes.
func New(extra uintptr) (b Buffer) {
func NewOutMessage(extra uintptr) (b OutMessage) {
const headerSize = unsafe.Sizeof(fusekernel.OutHeader{})
b.slice = make([]byte, headerSize, headerSize+extra)
return
}
// Return a pointer to the header at the start of the buffer.
func (b *Buffer) OutHeader() (h *fusekernel.OutHeader) {
func (b *OutMessage) OutHeader() (h *fusekernel.OutHeader) {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b.slice))
h = (*fusekernel.OutHeader)(unsafe.Pointer(sh.Data))
return
......@@ -48,7 +49,7 @@ func (b *Buffer) OutHeader() (h *fusekernel.OutHeader) {
// Grow the buffer by the supplied number of bytes, returning a pointer to the
// start of the new segment. The sum of the arguments given to Grow must not
// exceed the argument given to New when creating the buffer.
func (b *Buffer) Grow(size uintptr) (p unsafe.Pointer) {
func (b *OutMessage) Grow(size uintptr) (p unsafe.Pointer) {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b.slice))
p = unsafe.Pointer(sh.Data + uintptr(sh.Len))
b.slice = b.slice[:len(b.slice)+int(size)]
......@@ -56,7 +57,7 @@ func (b *Buffer) Grow(size uintptr) (p unsafe.Pointer) {
}
// Equivalent to growing by the length of p, then copying p into the new segment.
func (b *Buffer) Append(p []byte) {
func (b *OutMessage) Append(p []byte) {
sh := reflect.SliceHeader{
Data: uintptr(b.Grow(uintptr(len(p)))),
Len: len(p),
......@@ -67,7 +68,7 @@ func (b *Buffer) Append(p []byte) {
}
// Equivalent to growing by the length of s, then copying s into the new segment.
func (b *Buffer) AppendString(s string) {
func (b *OutMessage) AppendString(s string) {
sh := reflect.SliceHeader{
Data: uintptr(b.Grow(uintptr(len(s)))),
Len: len(s),
......@@ -78,6 +79,6 @@ func (b *Buffer) AppendString(s string) {
}
// Return a reference to the current contents of the buffer.
func (b *Buffer) Bytes() []byte {
func (b *OutMessage) Bytes() []byte {
return b.slice
}
......@@ -125,10 +125,10 @@ type Conn struct {
MountError error
// File handle for kernel communication. Only safe to access if
// rio or wio is held.
dev *os.File
wio sync.RWMutex
rio sync.RWMutex
// Rio or Wio is held.
Dev *os.File
Wio sync.RWMutex
Rio sync.RWMutex
// Protocol version negotiated with InitRequest/InitResponse.
proto fusekernel.Protocol
......@@ -162,7 +162,7 @@ func Mount(dir string, options ...MountOption) (*Conn, error) {
if err != nil {
return nil, err
}
c.dev = f
c.Dev = f
if err := initMount(c, &conf); err != nil {
c.Close()
......@@ -513,16 +513,16 @@ func (malformedMessage) String() string {
// Close closes the FUSE connection.
func (c *Conn) Close() error {
c.wio.Lock()
defer c.wio.Unlock()
c.rio.Lock()
defer c.rio.Unlock()
return c.dev.Close()
c.Wio.Lock()
defer c.Wio.Unlock()
c.Rio.Lock()
defer c.Rio.Unlock()
return c.Dev.Close()
}
// caller must hold wio or rio
// caller must hold Wio or Rio
func (c *Conn) fd() int {
return int(c.dev.Fd())
return int(c.Dev.Fd())
}
func (c *Conn) Protocol() fusekernel.Protocol {
......@@ -536,9 +536,9 @@ func (c *Conn) Protocol() fusekernel.Protocol {
func (c *Conn) ReadMessage() (m *Message, err error) {
m = getMessage(c)
loop:
c.rio.RLock()
c.Rio.RLock()
n, err := syscall.Read(c.fd(), m.buf)
c.rio.RUnlock()
c.Rio.RUnlock()
if err == syscall.EINTR {
// OSXFUSE sends EINTR to userspace when a request interrupt
// completed before it got sent to userspace?
......@@ -1068,8 +1068,8 @@ func (c *Conn) writeToKernel(msg []byte) error {
}
func (c *Conn) WriteToKernel(msg []byte) error {
c.wio.RLock()
defer c.wio.RUnlock()
c.Wio.RLock()
defer c.Wio.RUnlock()
_, err := syscall.Write(c.fd(), msg)
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment