Commit 8abfd27d authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Levin Zimmermann

wcfs: Add .wcfs/stats file with basic usage statistics

Report there number of inside-WCFS instances, e.g. number of tracked
BigFiles, WatchLinks etc, and also number of counted events, for example
how many times a pin event happened.

Soon we will need this statistics to implement tests e.g. for pinkilling
and other functionalities, and it might be also useful to have in general.

/reviewed-by @levin.zimmermann
/reviewed-on nexedi/wendelin.core!18
parent 96b216f6
...@@ -542,6 +542,9 @@ type Root struct { ...@@ -542,6 +542,9 @@ type Root struct {
// directories + ZODB connections for @<rev>/ // directories + ZODB connections for @<rev>/
revMu sync.Mutex revMu sync.Mutex
revTab map[zodb.Tid]*Head revTab map[zodb.Tid]*Head
// collected statistics
stats *Stats
} }
// /(head|<rev>)/ - served by Head. // /(head|<rev>)/ - served by Head.
...@@ -704,6 +707,14 @@ type blkPinState struct { ...@@ -704,6 +707,14 @@ type blkPinState struct {
err error err error
} }
// Stats keeps collected statistics.
//
// The statistics is accessible via .wcfs/stats file served by _wcfs_Stats.
type Stats struct {
pin atomic.Int64 // # of times wcfs issued pin request
}
// -------- ZODB cache control -------- // -------- ZODB cache control --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict // zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
...@@ -1472,6 +1483,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1472,6 +1483,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// perform IO without w.pinnedMu // perform IO without w.pinnedMu
w.pinnedMu.Unlock() w.pinnedMu.Unlock()
groot.stats.pin.Add(1)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr)) ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
w.pinnedMu.Lock() w.pinnedMu.Lock()
...@@ -2362,6 +2374,74 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse ...@@ -2362,6 +2374,74 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse
return sk.File(), fuse.OK return sk.File(), fuse.OK
} }
// _wcfs_Stats serves .wcfs/stats reads.
//
// In the output:
//
// - entries that start with capital letter, e.g. "Watch", indicate current
// number of named instances. This numbers can go up and down.
//
// - entries that start with lowercase letter, e.g. "pin", indicate number of
// named event occurrences. This numbers are cumulative counters and should
// never go down.
func _wcfs_Stats(fctx *fuse.Context) ([]byte, error) {
stats := ""
num := func(name string, value any) {
stats += fmt.Sprintf("%s\t: %d\n", name, value)
}
root := groot
head := root.head
bfdir := head.bfdir
// dump information detected at runtime
root.revMu.Lock()
lenRevTab := len(root.revTab)
root.revMu.Unlock()
head.wlinkMu.Lock()
ΣWatch := 0
ΣPinnedBlk := 0
lenWLinkTab := len(head.wlinkTab)
for wlink := range head.wlinkTab {
wlink.byfileMu.Lock()
ΣWatch += len(wlink.byfile)
for _, w := range wlink.byfile {
w.atMu.RLock()
w.pinnedMu.Lock()
ΣPinnedBlk += len(w.pinned)
w.pinnedMu.Unlock()
w.atMu.RUnlock()
}
wlink.byfileMu.Unlock()
}
head.wlinkMu.Unlock()
head.zheadMu.RLock()
bfdir.fileMu.Lock()
lenFileTab := len(bfdir.fileTab)
bfdir.fileMu.Unlock()
head.zheadMu.RUnlock()
gdebug.zheadSockTabMu.Lock()
lenZHeadSockTab := len(gdebug.zheadSockTab)
gdebug.zheadSockTabMu.Unlock()
num("BigFile", lenFileTab) // # of head/BigFile
num("RevHead", lenRevTab) // # of @revX/ directories
num("ZHeadLink", lenZHeadSockTab) // # of open .wcfs/zhead handles
num("WatchLink", lenWLinkTab) // # of open watchlinks
num("Watch", ΣWatch) // # of setup watches
num("PinnedBlk", ΣPinnedBlk) // # of currently on-client pinned blocks
// dump information collected in root.stats
s := root.stats
num("pin", s.pin.Load())
return []byte(stats), nil
}
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?) // TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
func main() { func main() {
...@@ -2465,6 +2545,7 @@ func _main() (err error) { ...@@ -2465,6 +2545,7 @@ func _main() (err error) {
zdb: zdb, zdb: zdb,
head: head, head: head,
revTab: make(map[zodb.Tid]*Head), revTab: make(map[zodb.Tid]*Head),
stats: &Stats{},
} }
opts := &fuse.MountOptions{ opts := &fuse.MountOptions{
...@@ -2532,6 +2613,10 @@ func _main() (err error) { ...@@ -2532,6 +2613,10 @@ func _main() (err error) {
fsNode: newFSNode(fSticky), fsNode: newFSNode(fSticky),
}) })
// .wcfs/stats - special file with collected statistics.
mkfile(&_wcfs, "stats", NewSmallFile(_wcfs_Stats))
// TODO handle autoexit // TODO handle autoexit
// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl // (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
// opened, so when all inodes has been forgotten - we know all wcfs.py clients exited) // opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
......
...@@ -367,6 +367,11 @@ class tWCFS(_tWCFS): ...@@ -367,6 +367,11 @@ class tWCFS(_tWCFS):
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil nogilready.recv() # wait till _abort_ontimeout enters nogil
t._stats_prev = None
t.assertStats({'BigFile': 0, 'RevHead': 0, 'ZHeadLink': 0,
'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0,
'pin': 0})
# _abort_ontimeout is in wcfs_test.pyx # _abort_ontimeout is in wcfs_test.pyx
# close closes connection to wcfs, unmounts the filesystem and makes sure # close closes connection to wcfs, unmounts the filesystem and makes sure
...@@ -394,6 +399,69 @@ class tWCFS(_tWCFS): ...@@ -394,6 +399,69 @@ class tWCFS(_tWCFS):
defer(t.wc.close) defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint) assert is_mountpoint(t.wc.mountpoint)
# assertStats asserts that content of .wcfs/stats eventually reaches expected state.
#
# For all keys k from kvok it verifies that eventually stats[k] == kvok[k]
# and that it stays that way.
#
# The state is asserted eventually instead of immediately - for both
# counters and instance values - because wcfs increments a counter
# _after_ corresponding event happened,
# and the tests can start to observe that state
# before wcfs actually does counter increment. For the similar reason we
# need to assert that the counters stay in expected state to make sure that
# no extra event happened. For instance values we need to assert
# eventually as well, because in many cases OS kernel sends events to wcfs
# asynchronously after client triggers an action.
#
# Note that the set of keys in kvok can be smaller than the full set of keys in stats.
def assertStats(t, kvok):
# kstats loads stats subset with kvok keys.
def kstats():
stats = t._loadStats()
kstats = {}
for k in kvok.keys():
kstats[k] = stats.get(k, None)
return kstats
# wait till stats reaches expected state
ctx = timeout()
while 1:
kv = kstats()
if kv == kvok:
break
if ctx.err() is not None:
assert kv == kvok, "stats did not reach expected state"
tdelay()
# make sure that it stays that way for some time
# we do not want to make the assertion time big because it will results
# in slowing down all tests
for _ in range(3):
tdelay()
kv = kstats()
assert kv == kvok, "stats did not stay at expected state"
# _loadStats loads content of .wcfs/stats .
def _loadStats(t): # -> {}
stats = {}
for l in t.wc._read(".wcfs/stats").splitlines():
# key : value
k, v = l.split(':')
k = k.strip()
v = v.strip()
stats[k] = int(v)
# verify that keys remains the same and that cumulative counters do not decrease
if t._stats_prev is not None:
assert stats.keys() == t._stats_prev.keys()
for k in stats.keys():
if k[0].islower():
assert stats[k] >= t._stats_prev[k], k
t._stats_prev = stats
return stats
class tDB(tWCFS): class tDB(tWCFS):
# __init__ initializes test database and wcfs. # __init__ initializes test database and wcfs.
...@@ -465,6 +533,8 @@ class tDB(tWCFS): ...@@ -465,6 +533,8 @@ class tDB(tWCFS):
assert len(t._wlinks) == 0 assert len(t._wlinks) == 0
t._wc_zheadfh.close() t._wc_zheadfh.close()
t.assertStats({'PinnedBlk': 0}) # FIXME + WatchLink, Watch, ZHeadLink
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
def open(t, zf, at=None): # -> tFile def open(t, zf, at=None): # -> tFile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment