Commit d53271b9 authored by Kirill Smelkov's avatar Kirill Smelkov

bigfile/virtmem: Big Virtmem lock

At present several threads running can corrupt internal virtmem
datastructures (e.g. ram->lru_list, fileh->pagemap, etc).

This can happen even if we have zope instances only with 1 worker thread
- because there are other "system" thread, and python garbage collection
can trigger at any thread, so if a virtmem object, e.g. VMA or FileH was
there sitting at GC queue to be collected, their collection, and thus
e.g. vma_unmap() and fileh_close() will be called from
different-from-worker thread.

Because of that virtmem just has to be aware of threads not to allow
internal datastructure corruption.

On the other hand, the idea of introducing userspace virtual memory
manager turned out to be not so good from performance and complexity
point of view, and thus the plan is to try to move it back into the
kernel. This way it does not make sense to do a well-optimised locking
implementation for userspace version.

So we do just a simple single "protect-all" big lock for virtmem.

Of a particular note is interaction with Python's GIL - any long-lived
lock has to be taken with GIL released, because else it can deadlock:

    t1  t2

    G
    V   G
   !G   V
    G

so we introduce helpers to make sure the GIL is not taken, and to retake
it back if we were holding it initially.

Those helpers (py_gil_ensure_unlocked / py_gil_retake_if_waslocked) are
symmetrical opposites to what Python provides to make sure the GIL is
locked (via PyGILState_Ensure / PyGILState_Release).

Otherwise, the patch is more-or-less straightforward application for
one-big-lock to protect everything idea.
parent 78cbf2a0
......@@ -170,6 +170,14 @@ PYTEST_IGNORE := --ignore=3rdparty --ignore=build --ignore=t
test.py : bigfile/_bigfile.so
$(PYTEST) $(PYTEST_IGNORE)
# test.py via Valgrind (very slow)
test.py.vghel: bigfile/_bigfile.so
$(call vgxrun,--tool=helgrind, $(PYTEST) $(PYTEST_IGNORE))
test.py.drd: bigfile/_bigfile.so
$(call vgxrun,--tool=drd, $(PYTEST) $(PYTEST_IGNORE))
# test pagefault for double/real faults - it should crash
tfault := bigfile/tests/tfault
......
......@@ -319,6 +319,7 @@ PyFunc(pyfileh_isdirty, "isdirty() - are there any changes to fileh memory at al
if (!PyArg_ParseTuple(args, ""))
return NULL;
/* NOTE not strictly neccessary to virt_lock() for reading ->dirty */
return PyBool_FromLong(pyfileh->dirty);
}
......@@ -653,6 +654,7 @@ pyfileh_open(PyObject *pyfile0, PyObject *args)
{
PyBigFile *pyfile = upcast(PyBigFile *, pyfile0);
PyBigFileH *pyfileh;
/* NOTE no virtmem lock needed - default RAM does not change */
RAM *ram = ram_get_default(NULL); // TODO get ram from args
int err;
......@@ -726,6 +728,7 @@ static PyTypeObject PyBigFile_Type = {
PyFunc(pyram_reclaim, "ram_reclaim() -> reclaimed -- release some non-dirty ram back to OS")
(PyObject *self, PyObject *args)
{
/* NOTE no virtmem lock needed - default RAM does not change */
RAM *ram = ram_get_default(NULL); // TODO get ram from args
int reclaimed;
......@@ -743,6 +746,58 @@ static /*const*/ PyMethodDef pybigfile_modulemeths[] = {
};
/* GIL hooks for virtmem big lock */
static PyThreadState *_PyThreadState_Current_GET(void)
{
/* non-debug version of PyThreadState_GET() */
#if PY_MAJOR_VERSION < 3
return _PyThreadState_Current;
#else
return (PyThreadState*)_Py_atomic_load_relaxed(&_PyThreadState_Current);
#endif
}
static void *py_gil_ensure_unlocked(void)
{
/* make sure we don't hold python GIL (not to deadlock, as GIL oscillates)
*
* NOTE it is ok to get _PyThreadState_Current even without holding py gil -
* we only need to check whether ts_current != ts_my, and thus if this
* thread don't hold the gil, _PyThreadState_Current will be != ts_my
* for sure.
*
* NOTE2 we don't call PyThreadState_Get() as that thinks it is a bug when
* _PyThreadState_Current == NULL */
PyThreadState *ts_my = PyGILState_GetThisThreadState();
PyThreadState *ts_current = _PyThreadState_Current_GET();
PyThreadState *ts;
if (ts_my && (ts_my == ts_current)) {
ts = PyEval_SaveThread();
BUG_ON(ts != ts_my);
return ts_my;
}
return NULL;
}
static void py_gil_retake_if_waslocked(void *arg)
{
PyThreadState *ts_my = (PyThreadState *)arg;
/* retake GIL if we were holding it originally */
PyEval_RestoreThread(ts_my);
}
static const VirtGilHooks py_virt_gil_hooks = {
.gil_ensure_unlocked = py_gil_ensure_unlocked,
.gil_retake_if_waslocked = py_gil_retake_if_waslocked,
};
/* module init */
#if PY_MAJOR_VERSION >= 3
static /*const*/ PyModuleDef pybigfile_moduledef = {
......@@ -759,6 +814,9 @@ _init_bigfile(void)
PyObject *m;
int err;
/* setup virtmem gil hooks for python */
virt_lock_hookgil(&py_virt_gil_hooks);
/* setup pagefault handler right from the beginning - memory lazy-access
* fundamentally depends on it */
err = pagefault_init();
......
......@@ -65,25 +65,45 @@ static void on_pagefault(int sig, siginfo_t *si, void *_uc)
if (si->si_code != SEGV_ACCERR)
goto dont_handle;
// XXX locking
/* save errno, before doing any library calls XXX & the like ?
* (in case we'll handle the fault, and then will need to restore it) */
int save_errno = errno;
/* lock virtmem, so we can do further lookups / handling safely to
* concurrent access / changes.
*
* NOTE it is ok to call e.g. pthread_mutex_lock() from synchronous signal
* handler. */
virt_lock();
/* make sure we are not entering SIGSEGV handler recursively.
*
* we should not - double faulting from inside sighandler should just
* coredump (see comments wrt SA_NODEFER in pagefault_init()), but anyway -
* better check just in case.
*
* NOTE since we are under virtmem lock, here we can use just one static
* variable, instead of several per-thread ones. */
static int in_on_pagefault;
BUG_ON(in_on_pagefault);
++in_on_pagefault;
/* (1) addr -> vma ;lookup VMA covering faulting memory address */
vma = virt_lookup_vma(si->si_addr);
if (!vma)
if (!vma) {
--in_on_pagefault;
virt_unlock();
goto dont_handle; /* fault outside registered file slices */
}
/* now, since we found faulting address in registered memory areas, we know
* we should serve this pagefault. */
// TODO protect against different threads
/* save/restore errno XXX & the like ? */
int save_errno = errno;
vma_on_pagefault(vma, (uintptr_t)si->si_addr, write);
/* pagefault served - restore and return from sighandler */
--in_on_pagefault;
virt_unlock();
errno = save_errno;
return;
......
# Wendelin.core.bigfile | Threading tests
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Open Source Initiative approved licenses and Convey
# the resulting work. Corresponding source of such a combination shall include
# the source code for all other software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
from wendelin.bigfile import BigFile
from threading import Thread
from time import sleep
from wendelin.bigfile.tests.test_basic import bchr_py2
# Synthetic bigfile that verifies there is no concurrent calls to loadblk
class XBigFile(BigFile):
def __new__(cls, blksize):
obj = BigFile.__new__(cls, blksize)
obj.loadblk_counter = 0
return obj
def loadblk(self, blk, buf):
assert self.loadblk_counter == 0
# sleep with increased conter - to give chance for other threads to try
# to load the same block and catch that
self.loadblk_counter += 1
sleep(3)
# nothing to do - we just leave blk as is (all zeros)
self.loadblk_counter -= 1
assert self.loadblk_counter == 0
# XXX hack, hardcoded
MB = 1024*1024
PS = 2*MB
def test_thread_basic():
f = XBigFile(PS)
fh = f.fileh_open()
vma = fh.mmap(0, 4)
m = memoryview(vma)
# simple test that access vs access don't overlap
# ( look for assert & sleep in XBigFile.loadblk() )
Q = []
def access0():
Q.append(m[0])
t1 = Thread(target=access0)
t2 = Thread(target=access0)
t1.start()
t2.start()
t1.join()
t2.join()
assert Q == [bchr_py2(0), bchr_py2(0)]
......@@ -24,6 +24,7 @@
#include <wendelin/bigfile/file.h>
#include <wendelin/bigfile/pagemap.h>
#include <wendelin/bigfile/ram.h>
#include <wendelin/utils.h>
#include <wendelin/bug.h>
#include <ccan/minmax/minmax.h>
......@@ -40,6 +41,7 @@ static pgoff_t vma_addr_fpgoffset(VMA *vma, uintptr_t addr);
static int vma_page_ismapped(VMA *vma, Page *page);
static void vma_page_ensure_unmapped(VMA *vma, Page *page);
static void vma_page_ensure_notmappedrw(VMA *vma, Page *page);
static int __ram_reclaim(RAM *ram);
#define VIRT_DEBUG 0
#if VIRT_DEBUG
......@@ -48,6 +50,46 @@ static void vma_page_ensure_notmappedrw(VMA *vma, Page *page);
# define TRACE(msg, ...) do {} while(0)
#endif
/* global lock which protects manipulating virtmem data structures
*
* NOTE not scalable, but this is temporary solution - as we are going to move
* memory managment back into the kernel, where it is done properly.
*
* NOTE type is recursive to support deleting virtmem object via python
* finalizers (triggered either from decref or from gc - both from sighandler). */
static pthread_mutex_t virtmem_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static const VirtGilHooks *virtmem_gilhooks;
void virt_lock()
{
void *gilstate = NULL;
/* make sure we don't hold e.g. python GIL (not to deadlock, as GIL oscillates) */
if (virtmem_gilhooks)
gilstate = virtmem_gilhooks->gil_ensure_unlocked();
/* acquire virtmem lock */
xpthread_mutex_lock(&virtmem_lock);
/* retake GIL if we were holding it originally */
if (gilstate)
virtmem_gilhooks->gil_retake_if_waslocked(gilstate);
}
void virt_unlock()
{
xpthread_mutex_unlock(&virtmem_lock);
}
void virt_lock_hookgil(const VirtGilHooks *gilhooks)
{
BUG_ON(virtmem_gilhooks); /* prevent registering multiple times */
virtmem_gilhooks = gilhooks;
}
/* block/restore SIGSEGV for current thread - non on-pagefault code should not
* access any not-mmapped memory -> so on any pagefault we should just die with
* coredump, not try to incorrectly handle the pagefault.
......@@ -83,6 +125,7 @@ int fileh_open(BigFileH *fileh, BigFile *file, RAM *ram)
sigset_t save_sigset;
sigsegv_block(&save_sigset);
virt_lock();
bzero(fileh, sizeof(*fileh));
fileh->ramh = ramh_open(ram);
......@@ -96,6 +139,7 @@ int fileh_open(BigFileH *fileh, BigFile *file, RAM *ram)
pagemap_init(&fileh->pagemap, ilog2_exact(ram->pagesize));
out:
virt_unlock();
sigsegv_restore(&save_sigset);
return err;
}
......@@ -107,6 +151,7 @@ void fileh_close(BigFileH *fileh)
sigset_t save_sigset;
sigsegv_block(&save_sigset);
virt_lock();
/* it's an error to close fileh with existing mappings */
// XXX implement the same semantics usual files have wrt mmaps - if we release
......@@ -128,6 +173,7 @@ void fileh_close(BigFileH *fileh)
ramh_close(fileh->ramh);
bzero(fileh, sizeof(*fileh));
virt_unlock();
sigsegv_restore(&save_sigset);
}
......@@ -145,6 +191,7 @@ int fileh_mmap(VMA *vma, BigFileH *fileh, pgoff_t pgoffset, pgoff_t pglen)
sigset_t save_sigset;
sigsegv_block(&save_sigset);
virt_lock();
/* alloc vma->page_ismappedv[] */
bzero(vma, sizeof(*vma));
......@@ -165,8 +212,6 @@ int fileh_mmap(VMA *vma, BigFileH *fileh, pgoff_t pgoffset, pgoff_t pglen)
vma->fileh = fileh;
vma->f_pgoffset = pgoffset;
// XXX locking - linking up vs concurrent traversal
// XXX need to init vma->virt_list first?
/* hook vma to fileh->mmaps */
list_add_tail(&vma->same_fileh, &fileh->mmaps);
......@@ -175,6 +220,7 @@ int fileh_mmap(VMA *vma, BigFileH *fileh, pgoff_t pgoffset, pgoff_t pglen)
virt_register_vma(vma);
out:
virt_unlock();
sigsegv_restore(&save_sigset);
return err;
......@@ -196,9 +242,8 @@ void vma_unmap(VMA *vma)
Page *page;
sigset_t save_sigset;
// XXX locking vs concurrent access
sigsegv_block(&save_sigset);
virt_lock();
/* unregister from vmamap - so that pagefault handler does not recognize
* this area as valid */
......@@ -226,6 +271,7 @@ void vma_unmap(VMA *vma)
free(vma->page_ismappedv);
bzero(vma, sizeof(*vma));
virt_unlock();
sigsegv_restore(&save_sigset);
}
......@@ -234,7 +280,6 @@ void vma_unmap(VMA *vma)
* WRITEOUT / DISCARD *
**********************/
// XXX vs concurrent access in other threads
int fileh_dirty_writeout(BigFileH *fileh, enum WriteoutFlags flags)
{
Page *page;
......@@ -249,6 +294,7 @@ int fileh_dirty_writeout(BigFileH *fileh, enum WriteoutFlags flags)
return -EINVAL;
sigsegv_block(&save_sigset);
virt_lock();
/* write out dirty pages */
pagemap_for_each(page, &fileh->pagemap) {
......@@ -313,18 +359,19 @@ int fileh_dirty_writeout(BigFileH *fileh, enum WriteoutFlags flags)
fileh->dirty = 0;
out:
virt_unlock();
sigsegv_restore(&save_sigset);
return err;
}
// XXX vs concurrent access in other threads
void fileh_dirty_discard(BigFileH *fileh)
{
Page *page;
sigset_t save_sigset;
sigsegv_block(&save_sigset);
virt_lock();
/* XXX we scan whole file pages which could be slow
* TODO -> maintain something like separate dirty_list ? */
......@@ -333,6 +380,7 @@ void fileh_dirty_discard(BigFileH *fileh)
page_drop_memory(page);
fileh->dirty = 0;
virt_unlock();
sigsegv_restore(&save_sigset);
}
......@@ -349,12 +397,8 @@ static LIST_HEAD(vma_list);
/* lookup VMA covering `addr`. NULL if not found */
// XXX protection against concurrent vma_list updates & lookups
// XXX virt_lookup_vma() operates without taking locks - XXX no -> we'll use spinlock
// (we don't know whether
// address is ours while calling it) - so it must operate correctly in
// lock-free. Updates to vma_list should thus be also done carefully.
/* lookup VMA covering `addr`. NULL if not found
* (should be called with virtmem lock held) */
VMA *virt_lookup_vma(void *addr)
{
uintptr_t uaddr = (uintptr_t)addr;
......@@ -376,8 +420,8 @@ VMA *virt_lookup_vma(void *addr)
}
/* register VMA `vma` as covering some file view */
// XXX protection against concurrent updates & lookups
/* register VMA `vma` as covering some file view
* (should be called with virtmem lock held) */
void virt_register_vma(VMA *vma)
{
uintptr_t uaddr = vma->addr_start;
......@@ -395,8 +439,8 @@ void virt_register_vma(VMA *vma)
}
/* remove `area` from VMA registry. `area` must be registered before */
// XXX protection against concurrent updates & lookups
/* remove `area` from VMA registry. `area` must be registered before
* (should be called with virtmem lock held) */
void virt_unregister_vma(VMA *vma)
{
/* _init - to clear links, just in case */
......@@ -448,7 +492,9 @@ void *mem_xvalloc(void *addr, size_t len)
* PAGEFAULT HANDLER *
*********************/
/* pagefault entry when we know request came to our memory area */
/* pagefault entry when we know request came to our memory area
*
* (virtmem_lock already taken by caller) */
void vma_on_pagefault(VMA *vma, uintptr_t addr, int write)
{
pgoff_t pagen;
......@@ -471,7 +517,7 @@ void vma_on_pagefault(VMA *vma, uintptr_t addr, int write)
/* try to release some memory back to OS */
// XXX do we need and how to distinguish "no ram page" vs "no memory for `struct page`"?
// -> no we don't -- better allocate memory for struct pages for whole RAM at ram setup
if (!ram_reclaim(fileh->ramh->ram))
if (!__ram_reclaim(fileh->ramh->ram))
OOM();
continue;
}
......@@ -591,7 +637,7 @@ void vma_on_pagefault(VMA *vma, uintptr_t addr, int write)
***********/
#define RECLAIM_BATCH 64 /* how many pages to reclaim at once */
int ram_reclaim(RAM *ram)
static int __ram_reclaim(RAM *ram)
{
struct list_head *lru_list = &ram->lru_list;
struct list_head *hlru;
......@@ -630,6 +676,21 @@ int ram_reclaim(RAM *ram)
}
int ram_reclaim(RAM *ram)
{
int ret;
sigset_t save_sigset;
sigsegv_block(&save_sigset);
virt_lock();
ret = __ram_reclaim(ram);
virt_unlock();
sigsegv_restore(&save_sigset);
return ret;
}
/********************
* Internal helpers *
......
......@@ -269,6 +269,23 @@ void virt_unregister_vma(VMA *vma);
void *mem_valloc(void *addr, size_t len);
void *mem_xvalloc(void *addr, size_t len);
/* big virtmem lock */
void virt_lock(void);
void virt_unlock(void);
/* for thirdparty to hook into locking big virtmem lock process
* (e.g. for python to hook in its GIL release/reacquire) */
struct VirtGilHooks {
/* drop gil, if current thread hold it */
void * (*gil_ensure_unlocked) (void);
/* retake gil, if we were holding it at ->ensure_unlocked() stage */
void (*gil_retake_if_waslocked) (void *);
};
typedef struct VirtGilHooks VirtGilHooks;
void virt_lock_hookgil(const VirtGilHooks *gilhooks);
// XXX is this needed? think more
/* what happens on out-of-memory */
void OOM(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment