Commit 891b2d7a authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Implement a buffer pool, and pass to Read().

This reduces the overhead of allocating, zeroing and GC-ing large
amounts of memory.

For large reads into /dev/null, this provides a 22% speed-up.
parent 51f7b76c
......@@ -15,7 +15,7 @@ func main() {
threaded := flag.Bool("threaded", true, "switch off threading; print debugging messages.")
flag.Parse()
if flag.NArg() < 2 {
// TODO - where to get program name?
// TODO - where to get program name?
fmt.Println("usage: main ORIGINAL MOUNTPOINT")
os.Exit(2)
}
......@@ -35,4 +35,5 @@ func main() {
fmt.Printf("Mounted %s on %s (threaded=%v, debug=%v, cpus=%v)\n", orig, mountPoint, *threaded, *debug, cpus)
state.Loop(*threaded)
fmt.Println("Finished", state.Stats())
}
......@@ -102,7 +102,7 @@ func (self *DummyFuse) OpenDir(header *fuse.InHeader, input *fuse.OpenIn) (flags
type DummyFuseFile struct{}
func (self *DummyFuseFile) Read(*fuse.ReadIn) ([]byte, fuse.Status) {
func (self *DummyFuseFile) Read(*fuse.ReadIn, *fuse.BufferPool) ([]byte, fuse.Status) {
return []byte(""), fuse.ENOSYS
}
......
......@@ -140,9 +140,8 @@ type PassThroughFile struct {
file *os.File
}
func (self *PassThroughFile) Read(input *fuse.ReadIn) ([]byte, fuse.Status) {
buf := make([]byte, input.Size)
slice := buf[:]
func (self *PassThroughFile) Read(input *fuse.ReadIn, buffers *fuse.BufferPool) ([]byte, fuse.Status) {
slice := buffers.GetBuffer(input.Size)
n, err := self.file.ReadAt(slice, int64(input.Offset))
if err == os.EOF {
......
......@@ -10,6 +10,7 @@ GOFILES=misc.go\
mount.go\
types.go\
pathfilesystem.go \
bufferpool.go
include $(GOROOT)/src/Make.pkg
package fuse
import (
"sync"
"fmt"
)
const PAGESIZE int = 4096
// This implements a pool of buffers that returns slices with capacity
// (2^e * PAGESIZE) for e=0,1,... which have possibly been used, and
// may contain random contents.
type BufferPool struct {
lock sync.Mutex
// For each exponent a list of slice pointers.
buffersByExponent [][][]byte
}
// TODO: use table ?
func IntToExponent(z int) uint {
x := z
var exp uint = 0
for x > 1 {
exp++
x >>= 1
}
if z > (1 << exp) {
exp ++
}
return exp
}
func NewBufferPool() *BufferPool {
bp := new(BufferPool)
bp.buffersByExponent = make([][][]byte, 0, 8)
return bp
}
func (self *BufferPool) String() string {
s := ""
for exp, bufs := range(self.buffersByExponent) {
s = s + fmt.Sprintf("%d = %d\n", exp, len(bufs))
}
return s
}
func (self *BufferPool) getBuffer(sz int) []byte {
exponent := int(IntToExponent(sz) - IntToExponent(PAGESIZE))
self.lock.Lock()
defer self.lock.Unlock()
if (len(self.buffersByExponent) <= int(exponent)) {
return nil
}
bufferList := self.buffersByExponent[exponent]
if (len(bufferList) == 0) {
return nil
}
result := bufferList[len(bufferList)-1]
self.buffersByExponent[exponent] = self.buffersByExponent[exponent][:len(bufferList)-1]
if cap(result) < sz {
panic("returning incorrect buffer.")
}
return result
}
func (self *BufferPool) addBuffer(slice []byte) {
if cap(slice) & (PAGESIZE -1) != 0 {
return
}
pages := cap(slice) / PAGESIZE
if pages == 0 {
return
}
exp := IntToExponent(pages)
if (1 << exp) != pages {
return
}
self.lock.Lock()
defer self.lock.Unlock()
for len(self.buffersByExponent) <= int(exp) {
self.buffersByExponent = append(self.buffersByExponent, make([][]byte, 0))
}
self.buffersByExponent[exp] = append(self.buffersByExponent[exp], slice)
}
func (self *BufferPool) GetBuffer(size uint32) []byte {
sz := int(size)
if sz < PAGESIZE {
sz = PAGESIZE
}
rounded := 1 << IntToExponent(sz)
b := self.getBuffer(rounded)
if b != nil {
b = b[:size]
return b
}
return make([]byte, size, rounded)
}
package fuse
import (
"testing"
"fmt"
)
func TestIntToExponent(t *testing.T) {
e := IntToExponent(1)
if e != 0 {
t.Error("1", e)
}
e = IntToExponent(2)
if e != 1 {
t.Error("2", e)
}
e = IntToExponent(3)
if e != 2 {
t.Error("3", e)
}
e = IntToExponent(4)
if e != 2 {
t.Error("4", e)
}
}
func TestBufferPool(t *testing.T) {
bp := NewBufferPool()
b := bp.getBuffer(PAGESIZE-1)
if b != nil {
t.Error("bp 0")
}
b = bp.getBuffer(PAGESIZE)
if b != nil {
t.Error("bp 1")
}
s := make([]byte, PAGESIZE - 1)
bp.addBuffer(s)
b = bp.getBuffer(PAGESIZE -1)
if b != nil {
t.Error("bp 3")
}
s = make([]byte, PAGESIZE)
bp.addBuffer(s)
b = bp.getBuffer(PAGESIZE)
if b == nil {
t.Error("not found.")
}
b = bp.getBuffer(PAGESIZE)
if b != nil {
t.Error("should fail.")
}
bp.addBuffer(make([]byte, 3*PAGESIZE))
b = bp.getBuffer(2*PAGESIZE)
if b != nil {
t.Error("should fail.")
}
b = bp.getBuffer(4*PAGESIZE)
if b != nil {
t.Error("should fail.")
}
bp.addBuffer(make([]byte, 4*PAGESIZE))
fmt.Println(bp)
b = bp.getBuffer(2*PAGESIZE)
if b != nil {
t.Error("should fail.")
}
b = bp.getBuffer(4*PAGESIZE)
if b == nil {
t.Error("4*ps should succeed.")
}
}
......@@ -51,6 +51,9 @@ type MountState struct {
// Dump debug info onto stdout.
Debug bool
// For efficient reads.
buffers *BufferPool
}
func (self *MountState) RegisterFile(file RawFuseFile) uint64 {
......@@ -114,7 +117,7 @@ func (self *MountState) Mount(mountPoint string) os.Error {
}
// Normally, callers should run loop() and wait for FUSE to exit, but
// tests will want to run this in a goroutine.
// tests will want to run this in a goroutine.
func (self *MountState) Loop(threaded bool) {
self.threaded = threaded
if self.threaded {
......@@ -177,9 +180,15 @@ func NewMountState(fs RawFileSystem) *MountState {
self.openedFiles = make(map[uint64]RawFuseFile)
self.mountPoint = ""
self.fileSystem = fs
self.buffers = NewBufferPool()
return self
}
// TODO - more of them.
func (self *MountState) Stats() string {
return "buffers: " + self.buffers.String()
}
////////////////
// Private routines.
......@@ -194,8 +203,12 @@ func (self *MountState) syncWrite(packet [][]byte) {
if err != nil {
self.Error(os.NewError(fmt.Sprintf("writer: Writev %v failed, err: %v", packet, err)))
}
for _, v := range(packet) {
self.buffers.addBuffer(v)
}
}
////////////////////////////////////////////////////////////////
// Logic for the control loop.
......@@ -331,7 +344,7 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
case FUSE_OPEN:
out, status = doOpen(state, h, input.(*OpenIn))
case FUSE_READ:
flatData, status = doRead(state, h, input.(*ReadIn))
flatData, status = doRead(state, h, input.(*ReadIn), state.buffers)
case FUSE_WRITE:
out, status = doWrite(state, h, input.(*WriteIn), arg.Bytes())
case FUSE_FLUSH:
......@@ -482,8 +495,8 @@ func doRelease(state *MountState, header *InHeader, input *ReleaseIn) (out Empty
return nil, OK
}
func doRead(state *MountState, header *InHeader, input *ReadIn) (out []byte, code Status) {
output, code := state.FindFile(input.Fh).Read(input)
func doRead(state *MountState, header *InHeader, input *ReadIn, buffers *BufferPool) (out []byte, code Status) {
output, code := state.FindFile(input.Fh).Read(input, buffers)
return output, code
}
......
......@@ -528,7 +528,7 @@ type RawFileSystem interface {
}
type RawFuseFile interface {
Read(*ReadIn) ([]byte, Status)
Read(*ReadIn, *BufferPool) ([]byte, Status)
// u32 <-> u64 ?
Write(*WriteIn, []byte) (uint32, Status)
Flush() Status
......@@ -558,13 +558,13 @@ type PathFilesystem interface {
Open(name string, flags uint32) (file RawFuseFile, code Status)
// Where to hook up statfs?
//
//
// Unimplemented:
// RemoveXAttr, SetXAttr, GetXAttr, ListXAttr.
OpenDir(name string) (dir RawFuseDir, code Status)
// TODO - what is a good interface?
// TODO - what is a good interface?
Init() (*InitOut, Status)
Destroy()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment