Commit 29d35b93 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

New class LatencyMap for doing latency measurements.

parent 3dac50b0
...@@ -22,6 +22,7 @@ GOFILES=misc.go\ ...@@ -22,6 +22,7 @@ GOFILES=misc.go\
pathdebug.go \ pathdebug.go \
opcode.go \ opcode.go \
pathops.go \ pathops.go \
latencymap.go
include $(GOROOT)/src/Make.pkg include $(GOROOT)/src/Make.pkg
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"os" "os"
"reflect" "reflect"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
"unsafe" "unsafe"
...@@ -72,11 +71,7 @@ type MountState struct { ...@@ -72,11 +71,7 @@ type MountState struct {
buffers *BufferPool buffers *BufferPool
RecordStatistics bool RecordStatistics bool
statisticsMutex sync.Mutex *LatencyMap
operationCounts map[string]int64
// In nanoseconds.
operationLatencies map[string]int64
} }
// Mount filesystem on mountPoint. // Mount filesystem on mountPoint.
...@@ -87,9 +82,7 @@ func (me *MountState) Mount(mountPoint string) os.Error { ...@@ -87,9 +82,7 @@ func (me *MountState) Mount(mountPoint string) os.Error {
} }
me.mountPoint = mp me.mountPoint = mp
me.mountFile = file me.mountFile = file
me.LatencyMap = NewLatencyMap()
me.operationCounts = make(map[string]int64)
me.operationLatencies = make(map[string]int64)
return nil return nil
} }
...@@ -135,26 +128,11 @@ func NewMountState(fs RawFileSystem) *MountState { ...@@ -135,26 +128,11 @@ func NewMountState(fs RawFileSystem) *MountState {
} }
func (me *MountState) Latencies() map[string]float64 { func (me *MountState) Latencies() map[string]float64 {
me.statisticsMutex.Lock() return me.LatencyMap.Latencies(1e-3)
defer me.statisticsMutex.Unlock()
r := make(map[string]float64)
for k, v := range me.operationCounts {
r[k] = 1e-6 * float64(me.operationLatencies[k]) / float64(v)
}
return r
} }
func (me *MountState) OperationCounts() map[string]int64 { func (me *MountState) OperationCounts() map[string]int {
me.statisticsMutex.Lock() return me.LatencyMap.Counts()
defer me.statisticsMutex.Unlock()
r := make(map[string]int64)
for k, v := range me.operationCounts {
r[k] = v
}
return r
} }
func (me *MountState) BufferPoolStats() string { func (me *MountState) BufferPoolStats() string {
...@@ -188,26 +166,12 @@ func (me *MountState) discardRequest(req *request) { ...@@ -188,26 +166,12 @@ func (me *MountState) discardRequest(req *request) {
endNs := time.Nanoseconds() endNs := time.Nanoseconds()
dt := endNs - req.startNs dt := endNs - req.startNs
me.statisticsMutex.Lock()
defer me.statisticsMutex.Unlock()
opname := operationName(req.inHeader.Opcode) opname := operationName(req.inHeader.Opcode)
key := opname me.LatencyMap.AddMany(
me.operationCounts[key] += 1 []LatencyArg{
me.operationLatencies[key] += dt {opname, "", dt},
{opname + "-dispatch", "", req.dispatchNs - req.startNs},
key += "-dispatch" {opname + "-write", "", endNs - req.preWriteNs}})
me.operationLatencies[key] += (req.dispatchNs - req.startNs)
me.operationCounts[key] += 1
key = opname + "-write"
me.operationLatencies[key] += (endNs - req.preWriteNs)
me.operationCounts[key] += 1
recDt := time.Nanoseconds() - endNs
key = "measurement"
me.operationCounts[key] += 1
me.operationLatencies[key] += recDt
} }
me.buffers.FreeBuffer(req.inputBuf) me.buffers.FreeBuffer(req.inputBuf)
......
package fuse
import (
"fmt"
"sort"
"sync"
)
type latencyMapEntry struct {
count int
ns int64
}
type LatencyArg struct {
Name string
Arg string
DtNs int64
}
type LatencyMap struct {
sync.Mutex
stats map[string]*latencyMapEntry
secondaryStats map[string]map[string]int64
}
func NewLatencyMap() *LatencyMap {
m := &LatencyMap{}
m.stats = make(map[string]*latencyMapEntry)
m.secondaryStats = make(map[string]map[string]int64)
return m
}
func (me *LatencyMap) AddMany(args []LatencyArg) {
me.Mutex.Lock()
defer me.Mutex.Unlock()
for _, v := range args {
me.add(v.Name, v.Arg, v.DtNs)
}
}
func (me *LatencyMap) Add(name string, arg string, dtNs int64) {
me.Mutex.Lock()
defer me.Mutex.Unlock()
me.add(name, arg, dtNs)
}
func (me *LatencyMap) add(name string, arg string, dtNs int64) {
e := me.stats[name]
if e == nil {
e = new(latencyMapEntry)
me.stats[name] = e
}
e.count ++
e.ns += dtNs
if arg != "" {
m, ok := me.secondaryStats[name]
if !ok {
m = make(map[string]int64)
me.secondaryStats[name] = m
}
}
}
func (me *LatencyMap) Counts() map[string]int {
me.Mutex.Lock()
defer me.Mutex.Unlock()
r := make(map[string]int)
for k, v := range me.stats {
r[k] = v.count
}
return r
}
// Latencies returns a map. Use 1e-3 for unit to get ms
// results.
func (me *LatencyMap) Latencies(unit float64) map[string]float64 {
me.Mutex.Lock()
defer me.Mutex.Unlock()
r := make(map[string]float64)
mult := 1 / (1e9 * unit)
for key, ent := range me.stats {
lat := mult * float64(ent.ns) / float64(ent.count)
r[key] = lat
}
return r
}
func (me *LatencyMap) TopArgs(name string) []string {
me.Mutex.Lock()
defer me.Mutex.Unlock()
counts := me.secondaryStats[name]
results := make([]string, 0, len(counts))
for k, v := range counts {
results = append(results, fmt.Sprintf("% 9d %s", v, k))
}
sort.SortStrings(results)
return results
}
package fuse
import (
"fmt"
"testing"
)
var _ = fmt.Println
func TestLatencyMap(t *testing.T) {
fmt.Println("TestLatencyMap")
m := NewLatencyMap()
m.Add("foo", "", 0.1e9)
m.Add("foo", "", 0.2e9)
l := m.Latencies(1e-3)
if l["foo"] != 150 {
t.Error("unexpected latency", l)
}
}
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"log" "log"
"math" "math"
"regexp" "regexp"
"sort"
"syscall" "syscall"
"unsafe" "unsafe"
"io/ioutil" "io/ioutil"
...@@ -144,21 +143,6 @@ func CheckSuccess(e os.Error) { ...@@ -144,21 +143,6 @@ func CheckSuccess(e os.Error) {
} }
} }
// For printing latency data.
func PrintMap(m map[string]float64) {
keys := make([]string, len(m))
for k, _ := range m {
keys = append(keys, k)
}
sort.SortStrings(keys)
for _, k := range keys {
if m[k] > 0 {
fmt.Println(k, m[k])
}
}
}
func MyPID() string { func MyPID() string {
v, _ := os.Readlink("/proc/self") v, _ := os.Readlink("/proc/self")
return v return v
......
...@@ -104,7 +104,7 @@ func FloatMapToBytes(m map[string]float64) []byte { ...@@ -104,7 +104,7 @@ func FloatMapToBytes(m map[string]float64) []byte {
} }
// Ugh - generics. // Ugh - generics.
func IntMapToBytes(m map[string]int64) []byte { func IntMapToBytes(m map[string]int) []byte {
keys := make([]string, 0, len(m)) keys := make([]string, 0, len(m))
for k, _ := range m { for k, _ := range m {
keys = append(keys, k) keys = append(keys, k)
...@@ -153,7 +153,8 @@ func (me *FileSystemDebug) AddFileSystemConnector(conn *FileSystemConnector) { ...@@ -153,7 +153,8 @@ func (me *FileSystemDebug) AddFileSystemConnector(conn *FileSystemConnector) {
} }
func hotPaths(timing *TimingFileSystem) []byte { func hotPaths(timing *TimingFileSystem) []byte {
hot, unique := timing.HotPaths("GetAttr") hot := timing.HotPaths("GetAttr")
unique := len(hot)
top := 20 top := 20
start := len(hot) - top start := len(hot) - top
if start < 0 { if start < 0 {
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
) )
func TestPathDebug(t *testing.T) { func TestPathDebug(t *testing.T) {
fs := &DefaultFileSystem{} debugFs := NewFileSystemDebug()
debugFs := NewFileSystemDebug(fs) debugFs.Original = &DefaultFileSystem{}
debugFs.Add("test-entry", func()[]byte { return []byte("test-content"); }) debugFs.Add("test-entry", func()[]byte { return []byte("test-content"); })
connector := NewFileSystemConnector(debugFs) connector := NewFileSystemConnector(debugFs)
......
package fuse package fuse
import ( import (
"sync"
"time" "time"
"log" "log"
"fmt" "fmt"
"sort"
) )
var _ = log.Print var _ = log.Print
...@@ -15,18 +13,13 @@ var _ = fmt.Print ...@@ -15,18 +13,13 @@ var _ = fmt.Print
type TimingFileSystem struct { type TimingFileSystem struct {
WrappingFileSystem WrappingFileSystem
statisticsLock sync.Mutex *LatencyMap
latencies map[string]int64
counts map[string]int64
pathCounts map[string]map[string]int64
} }
func NewTimingFileSystem(fs FileSystem) *TimingFileSystem { func NewTimingFileSystem(fs FileSystem) *TimingFileSystem {
t := new(TimingFileSystem) t := new(TimingFileSystem)
t.LatencyMap = NewLatencyMap()
t.Original = fs t.Original = fs
t.latencies = make(map[string]int64)
t.counts = make(map[string]int64)
t.pathCounts = make(map[string]map[string]int64)
return t return t
} }
...@@ -35,56 +28,20 @@ func (me *TimingFileSystem) startTimer(name string, arg string) (closure func()) ...@@ -35,56 +28,20 @@ func (me *TimingFileSystem) startTimer(name string, arg string) (closure func())
return func() { return func() {
dt := (time.Nanoseconds() - start) / 1e6 dt := (time.Nanoseconds() - start) / 1e6
me.statisticsLock.Lock() me.LatencyMap.Add(name, arg, dt)
defer me.statisticsLock.Unlock()
me.counts[name] += 1
me.latencies[name] += dt
m, ok := me.pathCounts[name]
if !ok {
m = make(map[string]int64)
me.pathCounts[name] = m
}
m[arg] += 1
} }
} }
func (me *TimingFileSystem) OperationCounts() map[string]int64 { func (me *TimingFileSystem) OperationCounts() map[string]int {
me.statisticsLock.Lock() return me.LatencyMap.Counts()
defer me.statisticsLock.Unlock()
r := make(map[string]int64)
for k, v := range me.counts {
r[k] = v
}
return r
} }
func (me *TimingFileSystem) Latencies() map[string]float64 { func (me *TimingFileSystem) Latencies() map[string]float64 {
me.statisticsLock.Lock() return me.LatencyMap.Latencies(1e-3)
defer me.statisticsLock.Unlock()
r := make(map[string]float64)
for k, v := range me.counts {
lat := float64(me.latencies[k]) / float64(v)
r[k] = lat
}
return r
} }
func (me *TimingFileSystem) HotPaths(operation string) (paths []string, uniquePaths int) { func (me *TimingFileSystem) HotPaths(operation string) (paths []string) {
me.statisticsLock.Lock() return me.LatencyMap.TopArgs(operation)
defer me.statisticsLock.Unlock()
counts := me.pathCounts[operation]
results := make([]string, 0, len(counts))
for k, v := range counts {
results = append(results, fmt.Sprintf("% 9d %s", v, k))
}
sort.SortStrings(results)
return results, len(counts)
} }
func (me *TimingFileSystem) GetAttr(name string) (*Attr, Status) { func (me *TimingFileSystem) GetAttr(name string) (*Attr, Status) {
......
package fuse package fuse
import ( import (
"sync"
"time" "time"
) )
...@@ -9,16 +8,13 @@ import ( ...@@ -9,16 +8,13 @@ import (
type TimingRawFileSystem struct { type TimingRawFileSystem struct {
WrappingRawFileSystem WrappingRawFileSystem
statisticsLock sync.Mutex *LatencyMap
latencies map[string]int64
counts map[string]int64
} }
func NewTimingRawFileSystem(fs RawFileSystem) *TimingRawFileSystem { func NewTimingRawFileSystem(fs RawFileSystem) *TimingRawFileSystem {
t := new(TimingRawFileSystem) t := new(TimingRawFileSystem)
t.Original = fs t.Original = fs
t.latencies = make(map[string]int64) t.LatencyMap = NewLatencyMap()
t.counts = make(map[string]int64)
return t return t
} }
...@@ -27,23 +23,12 @@ func (me *TimingRawFileSystem) startTimer(name string) (closure func()) { ...@@ -27,23 +23,12 @@ func (me *TimingRawFileSystem) startTimer(name string) (closure func()) {
return func() { return func() {
dt := (time.Nanoseconds() - start) / 1e6 dt := (time.Nanoseconds() - start) / 1e6
me.statisticsLock.Lock() me.LatencyMap.Add(name, "", dt)
defer me.statisticsLock.Unlock()
me.counts[name] += 1
me.latencies[name] += dt
} }
} }
func (me *TimingRawFileSystem) Latencies() map[string]float64 { func (me *TimingRawFileSystem) Latencies() map[string]float64 {
me.statisticsLock.Lock() return me.LatencyMap.Latencies(1e-3)
defer me.statisticsLock.Unlock()
r := make(map[string]float64)
for k, v := range me.counts {
r[k] = float64(me.latencies[k]) / float64(v)
}
return r
} }
func (me *TimingRawFileSystem) Init(h *InHeader, input *InitIn) (*InitOut, Status) { func (me *TimingRawFileSystem) Init(h *InHeader, input *InitIn) (*InitOut, Status) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment