Commit 79426f97 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Stop serializing writes - the kernel is multithreaded too.

parent d0bfebb3
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
const ( const (
// bufSize should be a power of two to minimize lossage in // bufSize should be a power of two to minimize lossage in
// BufferPool. // BufferPool.
bufSize = (1 << 18) bufSize = (1 << 16)
maxRead = bufSize - PAGESIZE maxRead = bufSize - PAGESIZE
) )
...@@ -43,6 +43,8 @@ type fuseRequest struct { ...@@ -43,6 +43,8 @@ type fuseRequest struct {
// Start timestamp for timing info. // Start timestamp for timing info.
startNs int64 startNs int64
dispatchNs int64
preWriteNs int64
} }
// TODO - should gather stats and expose those for performance tuning. // TODO - should gather stats and expose those for performance tuning.
...@@ -65,8 +67,8 @@ type MountState struct { ...@@ -65,8 +67,8 @@ type MountState struct {
// I/O with kernel and daemon. // I/O with kernel and daemon.
mountFile *os.File mountFile *os.File
errorChannel chan os.Error errorChannel chan os.Error
outputChannel chan *fuseRequest
// Run each operation in its own Go-routine. // Run each operation in its own Go-routine.
threaded bool threaded bool
...@@ -150,16 +152,13 @@ func (me *MountState) Mount(mountPoint string) os.Error { ...@@ -150,16 +152,13 @@ func (me *MountState) Mount(mountPoint string) os.Error {
func (me *MountState) Loop(threaded bool) { func (me *MountState) Loop(threaded bool) {
me.threaded = threaded me.threaded = threaded
if me.threaded { if me.threaded {
me.outputChannel = make(chan *fuseRequest, 100)
me.errorChannel = make(chan os.Error, 100) me.errorChannel = make(chan os.Error, 100)
go me.asyncWriterThread()
go me.DefaultErrorHandler() go me.DefaultErrorHandler()
} }
me.loop() me.loop()
if me.threaded { if me.threaded {
close(me.outputChannel)
close(me.errorChannel) close(me.errorChannel)
} }
} }
...@@ -196,11 +195,12 @@ func (me *MountState) Write(req *fuseRequest) { ...@@ -196,11 +195,12 @@ func (me *MountState) Write(req *fuseRequest) {
return return
} }
if me.threaded { _, err := Writev(me.mountFile.Fd(), req.serialized)
me.outputChannel <- req if err != nil {
} else { me.Error(os.NewError(fmt.Sprintf("writer: Writev %v failed, err: %v", req.serialized, err)))
me.syncWrite(req)
} }
me.discardFuseRequest(req)
} }
func NewMountState(fs RawFileSystem) *MountState { func NewMountState(fs RawFileSystem) *MountState {
...@@ -253,48 +253,45 @@ func (me *MountState) Stats() string { ...@@ -253,48 +253,45 @@ func (me *MountState) Stats() string {
return strings.Join(lines, "\n") return strings.Join(lines, "\n")
} }
////////////////
// Private routines.
func (me *MountState) asyncWriterThread() {
for packet := range me.outputChannel {
me.syncWrite(packet)
}
}
func (me *MountState) syncWrite(req *fuseRequest) {
_, err := Writev(me.mountFile.Fd(), req.serialized)
if err != nil {
me.Error(os.NewError(fmt.Sprintf("writer: Writev %v failed, err: %v", req.serialized, err)))
}
me.discardFuseRequest(req)
}
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Logic for the control loop. // Logic for the control loop.
func (me *MountState) newFuseRequest() (*fuseRequest, os.Error) { func (me *MountState) newFuseRequest() (*fuseRequest) {
req := new(fuseRequest) req := new(fuseRequest)
req.status = OK req.status = OK
req.startNs = time.Nanoseconds()
req.inputBuf = me.buffers.AllocBuffer(bufSize) req.inputBuf = me.buffers.AllocBuffer(bufSize)
return req
}
func (me *MountState) readRequest(req *fuseRequest) os.Error {
n, err := me.mountFile.Read(req.inputBuf) n, err := me.mountFile.Read(req.inputBuf)
// If we start timing before the read, we may take into
// account waiting for input into the timing.
req.startNs = time.Nanoseconds()
req.inputBuf = req.inputBuf[0:n] req.inputBuf = req.inputBuf[0:n]
return err
return req, err
} }
func (me *MountState) discardFuseRequest(req *fuseRequest) { func (me *MountState) discardFuseRequest(req *fuseRequest) {
dt := time.Nanoseconds() - req.startNs endNs := time.Nanoseconds()
dt := endNs - req.startNs
me.statisticsMutex.Lock() me.statisticsMutex.Lock()
defer me.statisticsMutex.Unlock() defer me.statisticsMutex.Unlock()
key := operationName(req.inHeader.Opcode) opname := operationName(req.inHeader.Opcode)
key := opname
me.operationCounts[key] += 1 me.operationCounts[key] += 1
me.operationLatencies[key] += dt / 1e6 me.operationLatencies[key] += dt / 1e6
key += "-dispatch"
me.operationLatencies[key] += (req.dispatchNs - req.startNs) / 1e6
me.operationCounts[key] += 1
key = opname + "-write"
me.operationLatencies[key] += (endNs - req.preWriteNs) / 1e6
me.operationCounts[key] += 1
me.buffers.FreeBuffer(req.inputBuf) me.buffers.FreeBuffer(req.inputBuf)
me.buffers.FreeBuffer(req.flatData) me.buffers.FreeBuffer(req.flatData)
} }
...@@ -302,7 +299,9 @@ func (me *MountState) discardFuseRequest(req *fuseRequest) { ...@@ -302,7 +299,9 @@ func (me *MountState) discardFuseRequest(req *fuseRequest) {
func (me *MountState) loop() { func (me *MountState) loop() {
// See fuse_kern_chan_receive() // See fuse_kern_chan_receive()
for { for {
req, err := me.newFuseRequest() req := me.newFuseRequest()
err := me.readRequest(req)
if err != nil { if err != nil {
errNo := OsErrorToFuseError(err) errNo := OsErrorToFuseError(err)
...@@ -337,6 +336,7 @@ func (me *MountState) loop() { ...@@ -337,6 +336,7 @@ func (me *MountState) loop() {
} }
func (me *MountState) handle(req *fuseRequest) { func (me *MountState) handle(req *fuseRequest) {
req.dispatchNs = time.Nanoseconds()
req.arg = bytes.NewBuffer(req.inputBuf) req.arg = bytes.NewBuffer(req.inputBuf)
err := binary.Read(req.arg, binary.LittleEndian, &req.inHeader) err := binary.Read(req.arg, binary.LittleEndian, &req.inHeader)
if err == os.EOF { if err == os.EOF {
...@@ -347,6 +347,7 @@ func (me *MountState) handle(req *fuseRequest) { ...@@ -347,6 +347,7 @@ func (me *MountState) handle(req *fuseRequest) {
return return
} }
me.dispatch(req) me.dispatch(req)
req.preWriteNs = time.Nanoseconds()
me.Write(req) me.Write(req)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment