Commit d81d2cbb authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: tests: Start verifying state of OS file cache

For WCFS to be efficient it will have to carefully preserve OS cache on
file invalidations. As preparatory step establish infrastructure for
verifying state of OS file cache and start asserting on OS cache state
in a couple of places.

See comments added to tFile constructor that describe how OS cache state
verification is setup.

Some preliminary history:

8293025b    X Thoughts on how to avoid readahead touching pages of neighbour block
3054e4a3    X not touching neighbour block works via setting MADV_RANDOM in last 1/4 of every block
18362227    X #5 access still triggers read to #4 ?
17dbf94e    X Provide mlock2 fallback for Ubuntu
d134c0b9    X wcfs: test: try to live with only hard memlock limit adjusted
c2423296    X Fix mlock2 build on Debian 8
parent e3f2ee2d
...@@ -19,15 +19,118 @@ ...@@ -19,15 +19,118 @@
# cython: language_level=2 # cython: language_level=2
"""Package mm provides access to OS memory management interfaces.""" """Package mm provides access to OS memory management interfaces like mlock and mincore."""
from posix cimport mman from posix cimport mman
from cpython.exc cimport PyErr_SetFromErrno from cpython.exc cimport PyErr_SetFromErrno
#from libc.stdio cimport printf
# mlock2 is provided starting from glibc 2.27
cdef extern from *:
"""
#if defined(__GLIBC__)
#if !__GLIBC_PREREQ(2, 27)
#include <unistd.h>
#include <sys/syscall.h>
static int mlock2(const void *addr, size_t len, int flags) {
#ifndef SYS_mlock2
errno = ENOSYS;
return -1;
#else
long err = syscall(SYS_mlock2, addr, len, flags);
if (err != 0) {
errno = -err;
return -1;
}
return 0;
#endif
}
#endif
#endif
#ifndef MLOCK_ONFAULT
# define MLOCK_ONFAULT 1
#endif
"""
pass
cdef extern from "<sys/user.h>": cdef extern from "<sys/user.h>":
cpdef enum: cpdef enum:
PAGE_SIZE PAGE_SIZE
cpdef enum:
PROT_EXEC = mman.PROT_EXEC
PROT_READ = mman.PROT_READ
PROT_WRITE = mman.PROT_WRITE
PROT_NONE = mman.PROT_NONE
MLOCK_ONFAULT = mman.MLOCK_ONFAULT
MCL_CURRENT = mman.MCL_CURRENT
MCL_FUTURE = mman.MCL_FUTURE
#MCL_ONFAULT = mman.MCL_ONFAULT
MADV_NORMAL = mman.MADV_NORMAL
MADV_RANDOM = mman.MADV_RANDOM
MADV_SEQUENTIAL = mman.MADV_SEQUENTIAL
MADV_WILLNEED = mman.MADV_WILLNEED
MADV_DONTNEED = mman.MADV_DONTNEED
#MADV_FREE = mman.MADV_FREE
MADV_REMOVE = mman.MADV_REMOVE
MS_ASYNC = mman.MS_ASYNC
MS_SYNC = mman.MS_SYNC
MS_INVALIDATE = mman.MS_INVALIDATE
# incore returns bytearray vector indicating whether page of mem is in core or not.
#
# mem start must be page-aligned.
def incore(const unsigned char[::1] mem not None) -> bytearray:
cdef size_t size = mem.shape[0]
if size == 0:
return bytearray()
cdef const void *addr = &mem[0]
# size in pages; rounded up
cdef size_t pgsize = (size + (PAGE_SIZE-1)) // PAGE_SIZE
#printf("\n\n%p %ld\n", addr, size)
incore = bytearray(pgsize)
cdef unsigned char[::1] incorev = incore
cdef err = mman.mincore(<void *>addr, size, &incorev[0])
if err:
PyErr_SetFromErrno(OSError)
return incore
# lock locks mem pages to be resident in RAM.
#
# see mlock2(2) for description of flags.
def lock(const unsigned char[::1] mem not None, int flags):
cdef const void *addr = &mem[0]
cdef size_t size = mem.shape[0]
cdef err = mman.mlock2(addr, size, flags)
if err:
PyErr_SetFromErrno(OSError)
return
# unlock unlocks mem pages from being pinned in RAM.
def unlock(const unsigned char[::1] mem not None):
cdef const void *addr = &mem[0]
cdef size_t size = mem.shape[0]
cdef err = mman.munlock(addr, size)
if err:
PyErr_SetFromErrno(OSError)
return
from posix.types cimport off_t from posix.types cimport off_t
# map_ro memory-maps fd[offset +size) as read-only. # map_ro memory-maps fd[offset +size) as read-only.
...@@ -51,3 +154,17 @@ def unmap(const unsigned char[::1] mem not None): ...@@ -51,3 +154,17 @@ def unmap(const unsigned char[::1] mem not None):
PyErr_SetFromErrno(OSError) PyErr_SetFromErrno(OSError)
return return
# advise advises kernel about use of mem's memory.
#
# see madvise(2) for details.
def advise(const unsigned char[::1] mem not None, int advice):
cdef const void *addr = &mem[0]
cdef size_t size = mem.shape[0]
cdef err = mman.madvise(<void *>addr, size, advice)
if err:
PyErr_SetFromErrno(OSError)
return
...@@ -37,6 +37,7 @@ import sys, os, os.path ...@@ -37,6 +37,7 @@ import sys, os, os.path
from thread import get_ident as gettid from thread import get_ident as gettid
from time import gmtime from time import gmtime
from errno import EINVAL, ENOTCONN from errno import EINVAL, ENOTCONN
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, b from golang import go, chan, select, func, defer, b
from golang import context, time from golang import context, time
from zodbtools.util import ashex as h from zodbtools.util import ashex as h
...@@ -66,6 +67,12 @@ def setup_module(): ...@@ -66,6 +67,12 @@ def setup_module():
gorace += " " gorace += " "
os.environ["GORACE"] = gorace + "halt_on_error=1" os.environ["GORACE"] = gorace + "halt_on_error=1"
# ↑ memlock soft-limit till its hard maximum
# (tFile needs ~ 64M to mlock while default memlock soft-limit is usually 64K)
memlockS, memlockH = getrlimit(RLIMIT_MEMLOCK)
if memlockS != memlockH:
setrlimit(RLIMIT_MEMLOCK, (memlockH, memlockH))
global testdb, testzurl, testmntpt global testdb, testzurl, testmntpt
testdb = getTestDB() testdb = getTestDB()
testdb.setup() testdb.setup()
...@@ -527,11 +534,12 @@ class tDB(tWCFS): ...@@ -527,11 +534,12 @@ class tDB(tWCFS):
# tFile provides testing environment for one bigfile opened on wcfs. # tFile provides testing environment for one bigfile opened on wcfs.
# #
# ._blk() provides access to data of a block. # ._blk() provides access to data of a block. .cached() gives state of which
# .assertBlk/.assertData assert # blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
# on state of data. # on state of cache and data.
class tFile: class tFile:
# maximum number of pages we mmap for 1 file. # maximum number of pages we mmap for 1 file.
# this should be not big not to exceed mlock limit.
_max_tracked_pages = 8 _max_tracked_pages = 8
def __init__(t, tdb, zf, at=None): def __init__(t, tdb, zf, at=None):
...@@ -549,10 +557,57 @@ class tFile: ...@@ -549,10 +557,57 @@ class tFile:
st = os.fstat(t.f.fileno()) st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize assert st.st_blksize == t.blksize
# mmap the file past the end up to _max_tracked_pages # mmap the file past the end up to _max_tracked_pages and setup
# invariants on which we rely to verify OS cache state:
#
# 1. lock pages with MLOCK_ONFAULT: this way after a page is read by
# mmap access we have the guarantee from kernel that the page will
# stay in pagecache.
#
# 2. madvise memory with MADV_SEQUENTIAL and MADV_RANDOM in interleaved
# mode. This adjusts kernel readahead (which triggers for
# MADV_NORMAL or MADV_SEQUENTIAL vma) to not go over to next block
# and thus a read access to one block won't trigger implicit read
# access to its neighbour block.
#
# https://www.quora.com/What-heuristics-does-the-adaptive-readahead-implementation-in-the-Linux-kernel-use
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/madvise.c?h=v5.2-rc4#n51
#
# we don't use MADV_NORMAL instead of MADV_SEQUENTIAL, because for
# MADV_NORMAL, there is not only read-ahead, but also read-around,
# which might result in accessing previous block.
#
# we don't disable readahead universally, since enabled readahead
# helps to test how wcfs handles simultaneous read triggered by
# async kernel readahead vs wcfs uploading data for the same block
# into OS cache. Also, fully enabled readahead is how wcfs is
# actually used in practice.
assert t.blksize % mm.PAGE_SIZE == 0 assert t.blksize % mm.PAGE_SIZE == 0
t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize) t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)
mm.lock(t.fmmap, mm.MLOCK_ONFAULT)
for blk in range(t._max_tracked_pages):
blkmmap = t.fmmap[blk*t.blksize:(blk+1)*t.blksize]
# NOTE the kernel does not start readahead from access to
# MADV_RANDOM vma, but for a MADV_{NORMAL/SEQUENTIAL} vma it starts
# readahead which can go _beyond_ vma that was used to decide RA
# start. For this reason - to prevent RA started at one block to
# overlap with the next block, we put MADV_RANDOM vma at the end of
# every block covering last 1/8 of it.
# XXX implicit assumption that RA window is < 1/8·blksize
#
# NOTE with a block completely covered by MADV_RANDOM the kernel
# issues 4K sized reads; wcfs starts uploading into cache almost
# immediately, but the kernel still issues many reads to read the
# full 2MB of the block. This works slowly.
# XXX -> investigate and maybe make read(while-uploading) wait for
# uploading to complete and only then return? (maybe it will help
# performance even in normal case)
_ = len(blkmmap)*7//8
mm.advise(blkmmap[:_], mm.MADV_SEQUENTIAL)
mm.advise(blkmmap[_:], mm.MADV_RANDOM)
def close(t): def close(t):
t.tdb._files.remove(t) t.tdb._files.remove(t)
if t.fmmap is not None: if t.fmmap is not None:
...@@ -569,6 +624,24 @@ class tFile: ...@@ -569,6 +624,24 @@ class tFile:
if t.at is None: # notify tDB only for head/file access if t.at is None: # notify tDB only for head/file access
t.tdb._blkheadaccess(t.zf, blk) t.tdb._blkheadaccess(t.zf, blk)
# cached returns [] with indicating whether a file block is cached or not.
# 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
def cached(t):
l = t._sizeinblk()
incorev = mm.incore(t.fmmap[:l*t.blksize])
# incorev is in pages; convert to in blocks
assert t.blksize % mm.PAGE_SIZE == 0
blkpages = t.blksize // mm.PAGE_SIZE
cachev = [0.]*l
for i, v in enumerate(incorev):
blk = i // blkpages
cachev[blk] += bool(v)
for blk in range(l):
cachev[blk] /= blkpages
if cachev[blk] == int(cachev[blk]):
cachev[blk] = int(cachev[blk]) # 0.0 -> 0, 1.0 -> 1
return cachev
# _sizeinblk returns file size in blocks. # _sizeinblk returns file size in blocks.
def _sizeinblk(t): def _sizeinblk(t):
st = os.fstat(t.f.fileno()) st = os.fstat(t.f.fileno())
...@@ -577,6 +650,12 @@ class tFile: ...@@ -577,6 +650,12 @@ class tFile:
assert st.st_size // t.blksize <= t._max_tracked_pages assert st.st_size // t.blksize <= t._max_tracked_pages
return st.st_size // t.blksize return st.st_size // t.blksize
# assertCache asserts on state of OS cache for file.
#
# incorev is [] of 1/0 representing whether block data is present or not.
def assertCache(t, incorev):
assert t.cached() == incorev
# assertBlk asserts that file[blk] has data as expected. # assertBlk asserts that file[blk] has data as expected.
# #
# Expected data may be given with size < t.blksize. In such case the data # Expected data may be given with size < t.blksize. In such case the data
...@@ -603,12 +682,28 @@ class tFile: ...@@ -603,12 +682,28 @@ class tFile:
dataok += b'\0'*(t.blksize - len(dataok)) # tailing zeros dataok += b'\0'*(t.blksize - len(dataok)) # tailing zeros
assert blk < t._sizeinblk() assert blk < t._sizeinblk()
# access to this block must not trigger access to other blocks
incore_before = t.cached()
def _():
incore_after = t.cached()
incore_before[blk] = 'x'
incore_after [blk] = 'x'
assert incore_before == incore_after
defer(_)
cached = t.cached()[blk]
assert cached in (0, 1) # every check accesses a block in full
blkview = t._blk(blk) blkview = t._blk(blk)
assert t.cached()[blk] == cached
# verify full data of the block # verify full data of the block
# TODO(?) assert individually for every block's page? (easier debugging?) # TODO(?) assert individually for every block's page? (easier debugging?)
assert blkview.tobytes() == dataok assert blkview.tobytes() == dataok
# we just accessed the block in full - it has to be in OS cache completely
assert t.cached()[blk] == 1
# assertData asserts that file has data blocks as specified. # assertData asserts that file has data blocks as specified.
# #
...@@ -624,10 +719,15 @@ class tFile: ...@@ -624,10 +719,15 @@ class tFile:
if mtime is not None: if mtime is not None:
assert st.st_mtime == tidtime(mtime) assert st.st_mtime == tidtime(mtime)
cachev = t.cached()
for blk, dataok in enumerate(dataokv): for blk, dataok in enumerate(dataokv):
if dataok == 'x': if dataok == 'x':
continue continue
t.assertBlk(blk, dataok) t.assertBlk(blk, dataok)
cachev[blk] = 1
# all accessed blocks must be in cache after we touched them all
t.assertCache(cachev)
# ---- infrastructure: helpers to query dFtail/accessed history ---- # ---- infrastructure: helpers to query dFtail/accessed history ----
...@@ -676,11 +776,13 @@ def test_wcfs_basic(): ...@@ -676,11 +776,13 @@ def test_wcfs_basic():
# >>> file initially empty # >>> file initially empty
f = t.open(zf) f = t.open(zf)
f.assertCache([])
f.assertData ([], mtime=t.at0) f.assertData ([], mtime=t.at0)
# >>> (@at1) commit data -> we can see it on wcfs # >>> (@at1) commit data -> we can see it on wcfs
at1 = t.commit(zf, {2:'c1'}) at1 = t.commit(zf, {2:'c1'})
f.assertCache([0,0,0]) # initially not cached
f.assertData (['','','c1']) # TODO + mtime=t.head f.assertData (['','','c1']) # TODO + mtime=t.head
# >>> (@at2) commit again -> we can see both latest and snapshotted states # >>> (@at2) commit again -> we can see both latest and snapshotted states
...@@ -688,10 +790,13 @@ def test_wcfs_basic(): ...@@ -688,10 +790,13 @@ def test_wcfs_basic():
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'}) at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
# f @head # f @head
#f.assertCache([1,1,0,0,0,0]) TODO enable after wcfs supports invalidations
f.assertData (['','', 'c2', 'd2', 'x','x']) # TODO + mtime=t.head f.assertData (['','', 'c2', 'd2', 'x','x']) # TODO + mtime=t.head
f.assertCache([1,1,1,1,0,0])
# f @at1 # f @at1
f1 = t.open(zf, at=at1) f1 = t.open(zf, at=at1)
#f1.assertCache([0,0,1]) TODO enable after wcfs supports invalidations
f1.assertData (['','','c1']) # TODO + mtime=at1 f1.assertData (['','','c1']) # TODO + mtime=at1
...@@ -699,14 +804,19 @@ def test_wcfs_basic(): ...@@ -699,14 +804,19 @@ def test_wcfs_basic():
f2 = t.open(zf, at=at2) f2 = t.open(zf, at=at2)
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'}) at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
#f.assertCache([0,1,0,1,0,0]) TODO enable after wcfs supports invalidations
# f @head # f @head
#f.assertCache([0,1,0,1,0,0]) TODO enable after wcfs supports invalidations
f.assertData (['a3','','c3','d2','x','x']) # TODO + mtime=t.head f.assertData (['a3','','c3','d2','x','x']) # TODO + mtime=t.head
# f @at2 # f @at2
# NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed # NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed
#f2.assertCache([0,0,1,0,0,0]) TODO enable after wcfs supports invalidations
f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2 f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2
# f @at1 # f @at1
#f1.assertCache([1,1,1]) TODO enable after wcfs supports invalidations
f1.assertData (['','','c1']) # TODO mtime=at1 f1.assertData (['','','c1']) # TODO mtime=at1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment