...
 
Commits (2673)
# setup to run tests on Nexedi testing infrastructure.
# https://stack.nexedi.com/test_status
#storv = ['fs', 'zeo', 'neo'] # storage backends to test against
storv = ['fs', 'zeo'] # XXX reenable neo
# some bugs are only likely to trigger when there is only 1 or 2 main OS thread(s) in wcfs
# GOMAXPROCS='' means use `nproc`
gonprocv = ['1', '2', ''] # GOMAXPROCS=... to test against
# test.t & friends unit-test core of UVMM and are Go-, Python- and ZODB-storage independent.
# we don't run test.vg* because there is currently no valgrind on SlapOS.
for kind in ['t', 'fault', 'asan', 'tsan']:
t = 'test.%s' % kind
envadj = {}
# test.fault: on Ubuntu, when apport is registerd in /proc/sys/kernel/core_pattern,
# it is apport which is run to process core. Apport dumps core for us, but
# rejects to run in parralel with the following message in its log:
# ERROR: apport ...: another apport instance is already running, aborting
# this way, when test.fault tests are run in parallel, some of them fail to
# find core corresponding to expected crash.
# -> force serial execution to workaround that.
if kind == 'fault':
envadj['MAKEFLAGS'] = '-j1'
TestCase(t, ['make', t], envadj=envadj) # TODO summaryf=TAP.summary
# test.py/<stor>-!wcfs runs unit- and functional- tests for wendelin.core in non-wcfs mode.
for stor in storv:
TestCase('test.py/%s-!wcfs' % stor, ['make', 'test.py'],
envadj={'WENDELIN_CORE_TEST_DB': '<%s>' % stor, 'WENDELIN_CORE_VIRTMEM': 'rw:uvmm'},
summaryf=PyTest.summary)
# test.go unit-tests Go bits in wcfs.
for nproc in gonprocv:
TestCase('test.go:%s' % nproc, ['make', 'test.go'],
envadj={'GOMAXPROCS': nproc})
# test.wcfs/<stor> runs unit tests for WCFS
# test.py/<stor>-wcfw runs unit- and functional- tests for wendelin.core in wcfs mode.
for stor in storv:
envdb = {'WENDELIN_CORE_TEST_DB': '<%s>' % stor}
for nproc in gonprocv:
TestCase('test.wcfs/%s:%s' % (stor, nproc), ['make', 'test.wcfs'],
envadj=dict(GOMAXPROCS=nproc, **envdb), summaryf=PyTest.summary)
for nproc in gonprocv:
TestCase('test.py/%s-wcfs:%s' % (stor, nproc), ['make', 'test.py'],
envadj=dict(WENDELIN_CORE_VIRTMEM='r:wcfs+w:uvmm', GOMAXPROCS=nproc, **envdb),
summaryf=PyTest.summary)
Wendelin.core change history
============================
0.14 aka 2.0.0.dev1 (2020-XX-YY)
--------------------------------
This is a major release that speeds up pagefault handling and reduces
wendelin.core RAM consumption dramatically:
The project switches to be mainly using kernel virtual memory manager.
Bigfiles are now primarily accessed with plain OS-level mmap to files from
synthetic WCFS filesystem. This removes overhead of handling pagefaults in
userspace and makes bigfile's cache (now it is the kernel's pagecache) to be
shared in between several processes.
In addition a custom coherency protocol is provided, which allows clients
that want to receive isolation guarantee ("I" from ACID) to still use the shared
cache and at the same time get bigfile view isolated from other's changes.
By default wendelin.core python client continues to provide full ACID semantics as
before.
Please see wcfs.go__ for description of the new filesystem.
.. XXX correct link -> nexedi, sha1 - fix.
__ https://lab.nexedi.com/kirr/wendelin.core/blob/t/wcfs/wcfs.go
0.13 (2019-06-18)
-----------------
......
# Wendelin.core | Instructions to build & test
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -23,6 +23,7 @@ PYTHON ?= python
PYTEST ?= $(PYTHON) -m pytest
PYBENCH ?= $(PYTHON) -m golang.cmd.pybench
VALGRIND?= valgrind
GO ?= go
# use the same C compiler as python
# (for example it could be `gcc -m64` on a 32bit userspace)
......@@ -33,14 +34,18 @@ ifeq ($(CC),)
$(error "Cannot defermine py-CC")
endif
all : bigfile/_bigfile.so
all : bigfile/_bigfile.so wcfs/wcfs
ccan_config := 3rdparty/ccan/config.h
bigfile/_bigfile.so : $(ccan_config) FORCE
$(PYTHON) setup.py build_dso --inplace
$(PYTHON) setup.py ll_build_ext --inplace
wcfs/wcfs: FORCE
cd wcfs && $(GO) build
FORCE :
......@@ -68,7 +73,7 @@ CFLAGS := -g -Wall -D_GNU_SOURCE -std=gnu99 -fplan9-extensions \
# XXX hack ugly
LOADLIBES=lib/bug.c lib/utils.c 3rdparty/ccan/ccan/tap/tap.c
TESTS := $(patsubst %.c,%,$(wildcard bigfile/tests/test_*.c))
test : test.t test.py test.fault test.asan test.tsan test.vgmem test.vghel test.vgdrd
test : test.t test.go test.wcfs test.py test.fault test.asan test.tsan test.vgmem test.vghel test.vgdrd
# TODO move XFAIL markers into *.c
......@@ -92,6 +97,7 @@ LINKC = $(LINK.c) $< $(LOADLIBES) $(LDLIBS) -o $@
# tests without instrumentation
test.t : $(TESTS:%=%.trun)
%.trun : %.t
#gdb -q -ex run -ex backtrace -ex quit $(XRUN<)
$(XRUN<)
%.t : %.c $(ccan_config)
......@@ -164,17 +170,25 @@ test.vgdrd: $(TESTS:%=%.vgdrdrun)
# run python tests
PYTEST_IGNORE := --ignore=3rdparty --ignore=build --ignore=t
test.py : bigfile/_bigfile.so
$(PYTEST) $(PYTEST_IGNORE)
# wcfs unit-tests
test.wcfs : bigfile/_bigfile.so wcfs/wcfs
$(PYTEST) wcfs/
# unit/functional tests for whole wendelin.core
test.py : bigfile/_bigfile.so wcfs/wcfs
$(PYTEST) $(PYTEST_IGNORE) --ignore=wcfs
# test.py via Valgrind (very slow)
test.py.vghel: bigfile/_bigfile.so
test.py.vghel: bigfile/_bigfile.so wcfs/wcfs
$(call vgxrun,--tool=helgrind, $(PYTEST) $(PYTEST_IGNORE))
test.py.drd: bigfile/_bigfile.so
test.py.drd: bigfile/_bigfile.so wcfs/wcfs
$(call vgxrun,--tool=drd, $(PYTEST) $(PYTEST_IGNORE))
# run go tests
test.go :
cd wcfs && $(GO) test -count=1 ./... # -count=1 disables tests caching
# test pagefault for double/real faults - it should crash
tfault := bigfile/tests/tfault
......@@ -192,5 +206,5 @@ bench : bench.t bench.py
bench.t : $(BENCHV.C:%=%.trun)
bench.py: bigfile/_bigfile.so
bench.py: bigfile/_bigfile.so wcfs/wcfs
$(PYBENCH) --count=3 --forked $(PYTEST_IGNORE)
......@@ -62,7 +62,7 @@ class _RAMFileH(object):
# mmap(2) allows mmaping past the end, but python's mmap does not.
# we workaround it with explicitly growing file as needed.
# however we need to protect against races between concurrent .mmap() calls.
# ._mmapmu is used for this.
# ._mmapmu is used for this. XXX -> our mmap?
self._mmapmu = threading.Lock()
def mmap(self, pgoffset, pglen):
......
......@@ -61,7 +61,8 @@ class ZBigArray(BigArray,
self.zfile = ZBigFile(blksize)
self._v_fileh = None
# __setstate__ (re-)loads ZBigArray data from DB, e.g. after live ZBigArray
# object was invalidated.
def __setstate__(self, state):
super(ZBigArray, self).__setstate__(state)
......@@ -73,18 +74,15 @@ class ZBigArray(BigArray,
if not hasattr(self, '_'+k):
setattr(self, '_'+k, v)
# NOTE __setstate__() is done after either
# - 1st time loading from DB, or
# - loading from DB after invalidation.
#
# as invalidation can happen e.g. by just changing .shape in another DB
# connection (e.g. resizing array and appending some data), via always
# resetting ._v_fileh we discard all data from it.
# (re-)set ._v_fileh; make sure not to loose fileh and its cache if
# .zfile has not changed. By preserving fileh we don't throw away data
# cache on e.g. just .shape change.
#
# IOW we discard whole cache just because some data were appended.
#
# -> TODO (optimize) do not through ._v_fileh if we can (.zfile not changed, etc)
self._v_fileh = None
# TODO test/bench that cache is preserved after .shape change (e.g. append)
_v_fileh = getattr(self, '_v_fileh', None)
if _v_fileh is not None and self.zfile is not _v_fileh.zfile:
_v_fileh = None
self._v_fileh = _v_fileh
# open fileh lazily, so that when we open it, zfile was already associated
......
# Wendeling.core.bigarray | Tests for ZBigArray
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import dtype, uint8, all, array_equal, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from pytest import raises
......@@ -255,7 +255,9 @@ def test_zbigarray_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -273,8 +275,8 @@ def test_zbigarray_vs_conn_migration():
# close conn, wait till T21 reopens it
del a11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open nother connection. it must be different
# (see appropriate place in zfile test about why)
......@@ -282,28 +284,31 @@ def test_zbigarray_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zarray2-modified')
wait(ctx, 'T2-zarray2-modified')
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# wait until T1 finish setting up initial data and get its connection
# (see appropriate place in zfile tests for details)
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zarray and arrange timings so that T1 commits after zarray is
# modified, but before we commit/abort.
......@@ -312,21 +317,21 @@ def test_zbigarray_vs_conn_migration():
a21[0:1] = [21] # XXX -> [0] = 21 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zarray2 should stay unchanged
transaction.abort()
del a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zarray2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -346,62 +351,69 @@ def test_zbigarray_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
T12done = chan()
@func
def T12():
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
defer(conn12.close)
tell('T1-conn12-opened')
wait('T2-zarray2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zarray2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
# open, modify, commit
T22done = chan()
@func
def T22():
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
defer(conn22.close)
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zarray and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
a22[0:1] = [22] # XXX -> [0] = 22 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zarray
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def _(ctx):
c21_1.tell(ctx, 'T11-exit-command')
wg.go(_)
wg.wait()
# now verify that zarray2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
......
/_file_zodb.cpp
/_file_zodb.h
# -*- coding: utf-8 -*-
# BigFile submodule for Wendelin
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -21,4 +21,9 @@
"""TODO big module-level picture description"""
# preload golang.so -> libgolang.so. This way dynamic linker discovers where
# libgolang.so is, and so there will be no link failure due to libgolang.so not
# found, when our C++ libraries, that use libgolang.so, are loaded (e.g. libwcfs.so).
import golang
from ._bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED, ram_reclaim
/* Wendelin.bigfile | Python interface to memory/files
* Copyright (C) 2014-2019 Nexedi SA and Contributors.
* Copyright (C) 2014-2020 Nexedi SA and Contributors.
* Kirill Smelkov <kirr@nexedi.com>
*
* This program is free software: you can Use, Study, Modify and Redistribute
......@@ -414,6 +414,20 @@ PyFunc(pyfileh_invalidate_page, "invalidate_page(pgoffset) - invalidate fileh pa
}
PyFunc(pyfileh_uses_mmap_overlay, "uses_mmap_overlay() - whether base data for all VMAs"
" of this fileh are taken as base-layer mmap")
(PyObject *pyfileh0, PyObject *args)
{
PyBigFileH *pyfileh = container_of(pyfileh0, PyBigFileH, pyobj);
BigFileH *fileh = &pyfileh->fileh;
if (!PyArg_ParseTuple(args, ""))
return NULL;
return PyBool_FromLong(fileh->mmap_overlay);
}
static void
pyfileh_dealloc(PyObject *pyfileh0)
{
......@@ -454,11 +468,12 @@ pyfileh_new(PyTypeObject *type, PyObject *args, PyObject *kw)
static /*const*/ PyMethodDef pyfileh_methods[] = {
{"mmap", pyfileh_mmap, METH_VARARGS, pyfileh_mmap_doc},
{"dirty_writeout", pyfileh_dirty_writeout, METH_VARARGS, pyfileh_dirty_writeout_doc},
{"dirty_discard", pyfileh_dirty_discard, METH_VARARGS, pyfileh_dirty_discard_doc},
{"isdirty", pyfileh_isdirty, METH_VARARGS, pyfileh_isdirty_doc},
{"invalidate_page", pyfileh_invalidate_page,METH_VARARGS, pyfileh_invalidate_page_doc},
{"mmap", pyfileh_mmap, METH_VARARGS, pyfileh_mmap_doc},
{"dirty_writeout", pyfileh_dirty_writeout, METH_VARARGS, pyfileh_dirty_writeout_doc},
{"dirty_discard", pyfileh_dirty_discard, METH_VARARGS, pyfileh_dirty_discard_doc},
{"isdirty", pyfileh_isdirty, METH_VARARGS, pyfileh_isdirty_doc},
{"invalidate_page", pyfileh_invalidate_page, METH_VARARGS, pyfileh_invalidate_page_doc},
{"uses_mmap_overlay", pyfileh_uses_mmap_overlay, METH_VARARGS, pyfileh_uses_mmap_overlay_doc},
{NULL}
};
......@@ -902,10 +917,42 @@ out:
}
/* PyBigFile: mmap methods.
* They redirect op X to type.blkmmapper.X without going to Python level */
static int
pybigfile_mmap_setup_read(VMA *vma, BigFile *file0, blk_t blk, size_t blklen)
{
PyBigFile *file = container_of(file0, PyBigFile, file);
ASSERT(file->blkmmap_ops != NULL);
return file->blkmmap_ops->mmap_setup_read(vma, file0, blk, blklen);
}
static int
pybigfile_remmap_blk_read(VMA *vma, BigFile *file0, blk_t blk)
{
PyBigFile *file = container_of(file0, PyBigFile, file);
ASSERT(file->blkmmap_ops != NULL);
return file->blkmmap_ops->remmap_blk_read(vma, file0, blk);
}
static int
pybigfile_munmap(VMA *vma, BigFile *file0)
{
PyBigFile *file = container_of(file0, PyBigFile, file);
ASSERT(file->blkmmap_ops != NULL);
return file->blkmmap_ops->munmap(vma, file0);
}
static const struct bigfile_ops pybigfile_ops = {
.loadblk = pybigfile_loadblk,
.storeblk = pybigfile_storeblk,
//.release =
.mmap_setup_read = pybigfile_mmap_setup_read,
.remmap_blk_read = pybigfile_remmap_blk_read,
.munmap = pybigfile_munmap,
};
......@@ -919,16 +966,23 @@ pyfileh_open(PyObject *pyfile0, PyObject *args)
RAM *ram = ram_get_default(NULL); // TODO get ram from args
int err;
if (!PyArg_ParseTuple(args, ""))
int mmap_overlay = -1; /* -1 means None; https://bugs.python.org/issue14705 */
if (!PyArg_ParseTuple(args, "|i", &mmap_overlay))
return NULL;
if (mmap_overlay == -1)
mmap_overlay = (pyfile->blkmmap_ops != NULL ? 1 : 0);
if (mmap_overlay && pyfile->blkmmap_ops == NULL)
return PyErr_Format(PyExc_TypeError,
"%s type does not provide blkmmapper", pyfile0->ob_type->tp_name);
pyfileh = PyType_New(PyBigFileH, &PyBigFileH_Type, NULL);
if (!pyfileh)
return NULL;
Py_INCREF(pyfile);
err = fileh_open(&pyfileh->fileh, &pyfile->file, ram);
err = fileh_open(&pyfileh->fileh, &pyfile->file, ram,
mmap_overlay ? MMAP_OVERLAY : DONT_MMAP_OVERLAY);
if (err) {
XPyErr_SetFromErrno();
Py_DECREF(pyfile);
......@@ -940,14 +994,67 @@ pyfileh_open(PyObject *pyfile0, PyObject *args)
}
static void
pyfile_dealloc(PyObject *pyfile0)
{
PyBigFile *pyfile = container_of(pyfile0, PyBigFile, pyobj);
pyfile->blkmmap_ops = NULL;
Py_CLEAR(pyfile->blkmmapper);
pyfile->pyobj.ob_type->tp_free(&pyfile->pyobj);
}
static PyObject *
pyfile_new(PyTypeObject *type, PyObject *args, PyObject *kw)
{
PyBigFile *self;
PyBigFile *self;
PyObject *blkmmapper;
bigfile_ops *blkmmap_ops = NULL;
/* try to get type.blkmmapper and verify it provides IBlkMMapper interface */
blkmmapper = PyObject_GetAttrString((PyObject*)type, "blkmmapper");
PyErr_Clear(); /* GetAttr raises exception if there is no attribute */
if (blkmmapper) {
if (!PyCapsule_IsValid(blkmmapper, "wendelin.bigfile.IBlkMMapper")) {
Py_DECREF(blkmmapper);
return PyErr_Format(PyExc_TypeError,
"%s: .blkmmapper is not a valid pycapsule with mmap methods", type->tp_name);
}
blkmmap_ops = PyCapsule_GetPointer(blkmmapper, "wendelin.bigfile.IBlkMMapper");
if (blkmmap_ops == NULL) { /* just in case - must not fail */
Py_DECREF(blkmmapper);
return NULL;
}
if (blkmmap_ops->loadblk ||
blkmmap_ops->storeblk)
{
Py_DECREF(blkmmapper);
return PyErr_Format(PyExc_TypeError,
"%s: .blkmmapper: !mmap methods present", type->tp_name);
}
if (!(blkmmap_ops->mmap_setup_read &&
blkmmap_ops->remmap_blk_read &&
blkmmap_ops->munmap))
{
Py_DECREF(blkmmapper);
return PyErr_Format(PyExc_TypeError,
"%s: .blkmmapper: not all mmap methods present", type->tp_name);
}
}
self = (PyBigFile *)PyType_GenericNew(type, args, kw);
if (!self)
if (!self) {
Py_XDECREF(blkmmapper);
return NULL;
}
self->blkmmapper = blkmmapper;
self->blkmmap_ops = blkmmap_ops;
// FIXME "k" = unsigned long - we need size_t
static char *kw_list[] = {"blksize", NULL};
......@@ -967,7 +1074,7 @@ static PyMemberDef pyfile_members[] = {
};
static /*const*/ PyMethodDef pyfile_methods[] = {
{"fileh_open", pyfileh_open, METH_VARARGS, "fileh_open(ram=None) -> new file handle"},
{"fileh_open", pyfileh_open, METH_VARARGS, "fileh_open(ram=None, mmap_overlay=None) -> new file handle"},
{NULL}
};
......@@ -978,7 +1085,7 @@ static PyTypeObject PyBigFile_Type = {
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_methods = pyfile_methods,
.tp_members = pyfile_members,
.tp_dealloc = NULL, // XXX
.tp_dealloc = pyfile_dealloc,
.tp_new = pyfile_new,
.tp_doc = "Base class for creating BigFile(s)\n\nTODO describe", // XXX
};
......
......@@ -2,7 +2,7 @@
#define _WENDELIN_BIGFILE__BIGFILE_H
/* Wendelin.bigfile | Python interface to memory/files
* Copyright (C) 2014-2019 Nexedi SA and Contributors.
* Copyright (C) 2014-2020 Nexedi SA and Contributors.
* Kirill Smelkov <kirr@nexedi.com>
*
* This program is free software: you can Use, Study, Modify and Redistribute
......@@ -26,11 +26,14 @@
*
* - `BigFile` is base class that allows implementing BigFile backends in Python.
* Users can inherit from BigFile, implement loadblk/storeblk and this way
* provide access to data managed from Python to virtmem subsystem.
* provide access to data managed from Python to virtmem subsystem(*).
* - `BigFileH` represents virtmem file handle for opened BigFile.
* It can be mmap'ed and provides writeout control.
* - `VMA` represents mmap'ed part of a BigFileH.
* It provides buffer/memoryview interface for data access.
*
* (*) A subclass may additionally provide functionality to map file data into
* memory. Please see BigFile documentation for details.
*/
#include <Python.h>
......@@ -97,8 +100,18 @@ typedef struct PyBigFileH PyBigFileH;
/*
* BigFile that can be implemented in python
*
* Allows subclasses to implement .loadblk() (& friends) in python.
* Allows subclasses to implement .loadblk() and .storeblk() in python.
* For users .fileh_open() is exposed to get to file handles.
*
* A subclass may additionally provide functionality to map file data into
* memory: if subclass provides .blkmmapper attribute, it is treated as
* pycapsule with type "wendelin.bigfile.IBlkMMapper" and C-level bigfile_ops
* struct that provides .mmap_setup_read and other operations related to
* mmapping data. To avoid deadlocks all mmap-related functionality must be
* nogil and so cannot be implemented in Python.
*
* The primary user of .blkmmapper functionality is _ZBigFile which uses WCFS
* and mmaps files from it to provide memory mappings for ZBigFile data.
*/
struct PyBigFile {
PyObject pyobj;
......@@ -106,6 +119,11 @@ struct PyBigFile {
* automatically adds support for weakrefs for in-python defined children */
BigFile file;
/* blkmmapper is PyCapsule object with type.blkmmapper if BigFile subclass has it | NULL */
PyObject *blkmmapper;
/* bigfile_ops extracted from ^^^ capsule | NULL */
bigfile_ops *blkmmap_ops;
};
typedef struct PyBigFile PyBigFile;
......
# -*- coding: utf-8 -*-
# Wendelin.bigfile | WCFS part of BigFile ZODB backend
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# distutils: language=c++
"""Modules _file_zodb.pyx complements file_zodb.py with things that cannot be
implemented in Python.
It provides wcfs integration for ZBigFile handles opened with _use_wcfs=True.
"""
from __future__ import print_function, absolute_import
cdef extern from "wcfs/client/wcfs.h":
pass
cdef extern from "bigfile/_bigfile.h":
struct PyBigFile:
pass
ctypedef extern class wendelin.bigfile._bigfile.BigFile[object PyBigFile]:
pass
# ZBigFile_mmap_ops is virtmem mmap functions for _ZBigFile.
cdef extern from "<wendelin/bigfile/file.h>" nogil:
struct bigfile_ops:
pass
cdef extern from * nogil:
"""
extern const bigfile_ops ZBigFile_mmap_ops;
"""
const bigfile_ops ZBigFile_mmap_ops
import wcfs as pywcfs
from wendelin.lib import zodb as pyzodb
from wcfs.client cimport _wcfs as wcfs
from golang cimport error, nil, pyerror
from cpython cimport PyCapsule_New
from ZODB.Connection import Connection as ZConnection
from ZODB.utils import u64
from wendelin.lib.zodb import zconn_at
import weakref
# _ZBigFile is base class for ZBigFile that provides BigFile-line base.
#
# The other base line is from Persistent. It is not possible to inherit from
# both Persistent and BigFile at the same time since both are C types and their
# layouts conflict.
#
# _ZBigFile:
#
# - redirects loadblk/storeblk calls to ZBigFile.
# - provides blkmmapper with WCFS integration.
cdef public class _ZBigFile(BigFile) [object _ZBigFile, type _ZBigFile_Type]:
cdef object zself # reference to ZBigFile
cdef wcfs.FileH wfileh # WCFS file handle. Initially nil, opened by blkmmapper
# _new creates new _ZBigFile associated with ZBigFile zself.
# XXX Cython does not allow __new__ nor to change arguments passed to __cinit__ / __init__
@staticmethod
def _new(zself, blksize):
cdef _ZBigFile obj = _ZBigFile.__new__(_ZBigFile, blksize)
obj.zself = zself
obj.wfileh = nil
return obj
def __dealloc__(_ZBigFile zf):
cdef error err = nil
if zf.wfileh != nil:
err = zf.wfileh.close()
zf.wfileh = nil
if err != nil:
raise pyerror.from_error(err)
# redirect load/store to main class
def loadblk(self, blk, buf): return self.zself.loadblk(blk, buf)
def storeblk(self, blk, buf): return self.zself.storeblk(blk, buf)
# blkmmapper complemnts loadblk/storeblk and is pycapsule with virtmem mmap
# functions for _ZBigFile. MMap functions rely on .wfileh being initialized
# by .fileh_open()
blkmmapper = PyCapsule_New(<void*>&ZBigFile_mmap_ops, "wendelin.bigfile.IBlkMMapper", NULL)
# fileh_open wraps BigFile.fileh_open and makes sure that WCFS file handle
# corresponding to ZBigFile is opened if use_wcfs=True.
def fileh_open(_ZBigFile zf, bint use_wcfs):
mmap_overlay = False
cdef wcfs.PyFileH pywfileh
if use_wcfs:
mmap_overlay = True
if zf.wfileh == nil:
zconn = zf.zself._p_jar
assert zconn is not None
# join zconn to wconn; link to wconn from _ZBigFile
pywconn = pywconnOf(zconn)
pywfileh = pywconn.open(zf.zself._p_oid)
zf.wfileh = pywfileh.wfileh
return super(_ZBigFile, zf).fileh_open(mmap_overlay)
# pywconnOf establishes and returns (py) wcfs.Conn associated with zconn.
#
# returned wcfs.Conn will be maintained to keep in sync with zconn, and will be
# closed when zconn is destroyed.
#
# It is invalid to make multiple simultaneous calls to pywconnOf with the same zconn.
# (in ZODB/py objects for zconn must be used from under 1 thread only).
cdef wcfs.PyConn pywconnOf(zconn):
assert isinstance(zconn, ZConnection)
assert zconn.opened
wconn = getattr(zconn, '_wcfs_wconn', None)
if wconn is not None:
return wconn
# zconn is not yet associated with wconn
zstor = zconn.db().storage
zurl = pyzodb.zstor_2zurl(zstor)
wc = pywcfs.join(zurl)
wconn = wc.connect(zconn_at(zconn))
zconn._wcfs_wconn = wconn
# keep wconn view of the database in sync with zconn
zconn._wcfs_wconn_zsync = ZSync(zconn, wconn)
return wconn
# ZSync keeps wconn in sync with zconn.
#
# wconn will be closed once zconn is destroyed (not closed, which returns it
# back into DB pool).
class ZSync:
# .zconn_ref weakref[zodb.Connection]
# .wconn (py) wcfs.Connection
def __init__(zsync, zconn, wconn):
assert zconn.opened
zsync.wconn = wconn
zsync.zconn_ref = weakref.ref(zconn, zsync.on_zconn_dealloc)
# NOTE zconn.onOpenCallback is not enough: zconn.at can change even
# without zconn.close/zconn.open, e.g.:
# zconn = DB.open(transaction_manager=tm)
# tm.commit() # zconn.at updated (zconn.afterCompletion -> zconn.newTransaction)
# tm.commit() # zconn.at updated again
zconn.onResyncCallback(zsync)
# .zconn dealloc -> wconn.close
def on_zconn_dealloc(zsync, _):
zsync.wconn.close()
# DB resyncs .zconn onto new database view.
# -> resync .wconn to updated database view of ZODB connection.
def on_connection_resync(zsync):
zconn = zsync.zconn_ref()
zsync.wconn.resync(zconn_at(zconn))
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// File file_zodb.cpp provides blkmmapper functions for _ZBigFile.
// MMapping is implemented via wcfs client.
#include "wcfs/client/wcfs.h"
#include "wendelin/bigfile/file.h"
#include "wendelin/bigfile/virtmem.h"
#include "bigfile/_bigfile.h"
#include "bigfile/_file_zodb.h"
#include <ccan/container_of/container_of.h>
static int zfile_mmap_setup_read(VMA *vma, BigFile *file, blk_t blk, size_t blklen) {
_ZBigFile* _zfile = container_of(file, _ZBigFile, __pyx_base.file);
wcfs::FileH fileh = _zfile->wfileh;
wcfs::Mapping mmap;
error err;
if (fileh == nil)
panic("BUG: zfile_mmap_setup_read: ZBigFile.fileh_open did not set .wfileh");
tie(mmap, err) = fileh->mmap(blk, blklen, vma);
if (err != nil) {
log::Errorf("%s", v(err)); // XXX no way to return error details to virtmem
return -1;
}
return 0;
}
static int zfile_remmap_blk_read(VMA *vma, BigFile *file, blk_t blk) {
wcfs::_Mapping* mmap = static_cast<wcfs::_Mapping*>(vma->mmap_overlay_server);
_ZBigFile* _zfile = container_of(file, _ZBigFile, __pyx_base.file);
if (mmap->fileh != _zfile->wfileh)
panic("BUG: zfile_remmap_blk_read: vma and _zfile point to different wcfs::FileH");
error err;
err = mmap->remmap_blk(blk);
if (err != nil) {
log::Errorf("%s", v(err)); // XXX no way to return error details to virtmem
return -1;
}
return 0;
}
static int zfile_munmap(VMA *vma, BigFile *file) {
wcfs::_Mapping* mmap = static_cast<wcfs::_Mapping*>(vma->mmap_overlay_server);
_ZBigFile* _zfile = container_of(file, _ZBigFile, __pyx_base.file);
if (mmap->fileh != _zfile->wfileh)
panic("BUG: zfile_remmap_blk_read: vma and _zfile point to different wcfs::FileH");
error err;
err = mmap->unmap();
if (err != nil) {
log::Errorf("%s", v(err)); // XXX no way to return error details to virtmem
return -1;
}
return 0;
}
// NOTE reusing whole bigfile_ops for just .mmap* ops.
extern const bigfile_ops ZBigFile_mmap_ops;
static bigfile_ops _mkZBigFile_mmap_ops() {
// workaround for "sorry, unimplemented: non-trivial designated initializers not supported"
bigfile_ops _;
_.mmap_setup_read = zfile_mmap_setup_read;
_.remmap_blk_read = zfile_remmap_blk_read;
_.munmap = zfile_munmap;
_.loadblk = NULL;
_.storeblk = NULL;
return _;
};
const bigfile_ops ZBigFile_mmap_ops = _mkZBigFile_mmap_ops();
......@@ -50,6 +50,30 @@ The primary user of ZBigFile is ZBigArray (see bigarray/__init__.py and
bigarray/array_zodb.py), but ZBigFile itself can be used directly too.
Operating mode
--------------
Two operating modes are provided: "local-cache" and "shared-cache".
Local-cache is the mode wendelin.core was originally implemented with in 2015.
In this mode ZBigFile data is loaded from ZODB directly via current ZODB connection.
It was relatively straight-forward to implement, but cached file data become
duplicated in between ZODB connections of current process and in between
several client processes that use ZODB.
In shared-cache mode file's data is accessed through special filesystem for
which data cache is centrally maintained by OS kernel. This mode was added in
2020 and reduces wendelin.core RAM consumption dramatically. Note that even
though the cache is shared, isolation property is still fully provided. Please
see wcfs/wcfs.go which describes the filesystem and shared-cache mode in detail.
The mode of operation can be selected via environment variable::
$WENDELIN_CORE_VIRTMEM
rw:uvmm local-cache (i.e. !wcfs) (default)
r:wcfs+w:uvmm shared-cache (i.e. wcfs)
Data format
-----------
......@@ -88,7 +112,7 @@ For "2" we have
- low-overhead in terms of ZODB size (only part of a block is overwritten
in DB on single change), but
- high-overhead in terms of access time
(several objects need to be loaded for 1 block)
(several objects need to be loaded for 1 block(*))
In general it is not possible to have low-overhead for both i) access-time, and
ii) DB size, with approach where we do block objects representation /
......@@ -98,12 +122,36 @@ On the other hand, if object management is moved to DB *server* side, it is
possible to deduplicate them there and this way have low-overhead for both
access-time and DB size with just client storing 1 object per file block. This
will be our future approach after we teach NEO about object deduplication.
(*) wcfs loads ZBlk1 subobjects in parallel, but today ZODB storage servers do
not scale well on such highly-concurrent access.
"""
# ZBigFile organization
#
# TODO add top-level overview
#
# zfile (ZBigFile)
# .blksize
# .blktab LOBTree #blk -> ZBlk*
#
# ._v_file _ZBigFile
# ._v_filehset weakset(_ZBigFileH) created for zfile
#
# zfileh (_ZBigFileH)
#
# ZBigFile is kept as Live persistent because XXX
#
#
# DB -> ZBlk.blkdata (-> memory-page)
# (DB <- ) ZBlk.blkdata <- memory-page
#
# (DB -> invalidate ZBlk.blkdata -> invalidate memory-page)
# + FIXME topology changes are not handled correctly
# + FIXME ZBlk is ghostified
#
#
#
# As file pages are changed in RAM with changes being managed by virtmem
# subsystem, we need to propagate the changes to ZODB objects back at some time.
......@@ -125,13 +173,12 @@ will be our future approach after we teach NEO about object deduplication.
# between virtmem subsystem and ZODB, and virtmem->ZODB propagation happens only
# at commit time.
#
# Since, for performance reasons, virtmem subsystem is going away and BigFiles
# will be represented by real FUSE-based filesystem with virtual memory being
# done by kernel, where we cannot get callback on a page-dirtying, it is more
# natural to also use "2" here.
# ZBigFile follows second scheme and synchronizes dirty RAM with ZODB at commit time.
# See _ZBigFileH for details.
from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED
from wendelin.bigfile import WRITEOUT_STORE, WRITEOUT_MARKSTORED
from wendelin.bigfile._file_zodb import _ZBigFile
from wendelin.lib.mem import bzero, memcpy
from wendelin.lib.zodb import LivePersistent, deactivate_btree
......@@ -464,23 +511,6 @@ if ZBlk_fmt_write not in ZBlk_fmt_registry:
# ----------------------------------------
# helper for ZBigFile - just redirect loadblk/storeblk back
# (because it is not possible to inherit from both Persistent and BigFile at
# the same time - see below)
class _ZBigFile(BigFile):
# .zself - reference to ZBigFile
def __new__(cls, zself, blksize):
obj = BigFile.__new__(cls, blksize)
obj.zself = zself
return obj
# redirect load/store to main class
def loadblk(self, blk, buf): return self.zself.loadblk(blk, buf)
def storeblk(self, blk, buf): return self.zself.storeblk(blk, buf)
# ZBigFile implements BigFile backend with data stored in ZODB.
#
# NOTE Can't inherit from Persistent and BigFile at the same time - both are C
......@@ -503,6 +533,12 @@ class ZBigFile(LivePersistent):
LivePersistent.__init__(self)
self.__setstate__((blksize, LOBTree())) # NOTE L enough for blk_t
# TODO use custom class for .blktab with adjusted bucket size, something like
# class xLOBTree(LOBTree):
# __slots__ = ()
# max_leaf_size = ... # BTree's default = 60
# max_internal_size = ... # BTree's default = 500
# state is (.blksize, .blktab)
def __getstate__(self):
......@@ -510,7 +546,7 @@ class ZBigFile(LivePersistent):
def __setstate__(self, state):
self.blksize, self.blktab = state
self._v_file = _ZBigFile(self, self.blksize)
self._v_file = _ZBigFile._new(self, self.blksize)
self._v_filehset = WeakSet()
......@@ -560,17 +596,47 @@ class ZBigFile(LivePersistent):
# invalidate data .blktab[blk] invalidated -> invalidate page
def invalidateblk(self, blk):
for fileh in self._v_filehset:
# wcfs: there is no need to propagate ZODB -> fileh invalidation by
# client since WCFS handles invalidations from ZODB by itself.
#
# More: the algorythm to compute δ(ZODB) -> δ(blk) is more complex
# than 1-1 ZBlk <-> blk mapping: ZBlk could stay constant, but if
# ZBigFile.blktab topology is changed, affected file blocks have to
# be invalidated. Currently both !wcfs and wcfs codepaths fail to
# handle that, but wcfs will be improved and !wcfs will be deprecated.
#
# -> don't propagate ZODB -> WCFS invalidation by client to fully
# rely on and test wcfs subsystem.
if fileh.uses_mmap_overlay():
continue
fileh.invalidate_page(blk) # XXX assumes blksize == pagesize
# fileh_open is bigfile-like method that creates new file-handle object
# that is given to user for mmap.
def fileh_open(self):
fileh = _ZBigFileH(self)
#
# _use_wcfs is internal option and controls whether to use wcfs to access
# ZBigFile data:
#
# - True -> use wcfs
# - False -> don't use wcfs
# - not set -> behave according to global default
def fileh_open(self, _use_wcfs=None):
if _use_wcfs is None:
_use_wcfs = self._default_use_wcfs()
fileh = _ZBigFileH(self, _use_wcfs)
self._v_filehset.add(fileh)
return fileh
# _default_use_wcfs returns whether default virtmem setting is to use wcfs or not.
@staticmethod
def _default_use_wcfs():
virtmem = os.environ.get("WENDELIN_CORE_VIRTMEM", "rw:uvmm") # unset -> !wcfs
virtmem = virtmem.lower()
return {"r:wcfs+w:uvmm": True, "rw:uvmm": False}[virtmem]
# BigFileH wrapper that also acts as DataManager proxying changes ZODB <- virtmem
......@@ -609,15 +675,18 @@ class ZBigFile(LivePersistent):
# NOTE Bear in mind that after close, connection can be reopened in different
# thread - that's why we have to adjust registration to per-thread
# transaction_manager.
#
# See also _file_zodb.pyx -> ZSync which maintains and keeps zodb.Connection
# and wcfs.Connection in sync.
@implementer(IDataManager)
@implementer(ISynchronizer)
class _ZBigFileH(object):
# .zfile ZBigFile we were opened for
# .zfileh handle for ZBigFile in virtmem
def __init__(self, zfile):
def __init__(self, zfile, use_wcfs):
self.zfile = zfile
self.zfileh = zfile._v_file.fileh_open()
self.zfileh = zfile._v_file.fileh_open(use_wcfs)
# FIXME zfile._p_jar could be None (ex. ZBigFile is newly created
# before first commit)
......@@ -679,6 +748,9 @@ class _ZBigFileH(object):
def invalidate_page(self, pgoffset):
return self.zfileh.invalidate_page(pgoffset)
def uses_mmap_overlay(self):
return self.zfileh.uses_mmap_overlay()
# ~~~~ ISynchronizer ~~~~
def beforeCompletion(self, txn):
......
/* Wendelin.bigfile | virtual memory benchmarks
* Copyright (C) 2017-2019 Nexedi SA and Contributors.
* Copyright (C) 2017-2020 Nexedi SA and Contributors.
* Kirill Smelkov <kirr@nexedi.com>
*
* This program is free software: you can Use, Study, Modify and Redistribute
......@@ -80,7 +80,7 @@ void bench_pagefault() {
};
/* setup f mapping */
err = fileh_open(fh, &f, ram);
err = fileh_open(fh, &f, ram, DONT_MMAP_OVERLAY);
ok1(!err);
err = fileh_mmap(vma, fh, 0, npage);
......
# Wendelin.core.bigfile | Tests for ZODB BigFile backend
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import ndarray, array_equal, uint32, zeros, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from six import b
import struct
......@@ -75,6 +75,8 @@ def Blk(vma, i):
return ndarray(blksize32, offset=i*blksize, buffer=vma, dtype=uint32)
def test_bigfile_filezodb():
ram_reclaim_all() # reclaim pages allocated by previous tests
root = dbopen()
root['zfile'] = f = ZBigFile(blksize)
transaction.commit()
......@@ -137,7 +139,11 @@ def test_bigfile_filezodb():
# evict all loaded pages and test loading them again
# (verifies ZBlk.loadblkdata() & loadblk logic when loading data the second time)
reclaimed = ram_reclaim_all()
assert reclaimed >= blen # XXX assumes pagesize=blksize
if fh.uses_mmap_overlay():
# in mmap-overlay mode no on-client RAM is allocated for read data
assert reclaimed == 0
else:
assert reclaimed >= blen # XXX assumes pagesize=blksize
for i in xrange(blen):
assert array_equal(Blk(vma, i), dataX(i))
......@@ -223,7 +229,9 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -247,8 +255,8 @@ def test_bigfile_filezodb_vs_conn_migration():
# close conn, wait till T21 reopens it
del vma11, fh11, a11, f11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open another connection (e.g. for handling next request) which does
# not touch zfile at all, and arrange timings so that T2 modifies
......@@ -257,33 +265,36 @@ def test_bigfile_filezodb_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zfile2-modified')
wait(ctx, 'T2-zfile2-modified')
# XXX do we want to also modify some other objects?
# (but this have side effect for joining conn11_2 to txn)
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# - wait until T1 finish setting up initial data for zfile and closes connection.
# - open that connection before T1 is asleep - because ZODB organizes
# connection pool as stack (with correction for #active objects),
# we should get exactly the same connection T1 had.
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zfile and arrange timings so that T1 commits after zfile is
# modified, but before we commit/abort.
......@@ -295,21 +306,21 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma21, 0)[0] = 21
tell('T2-zfile2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zfile2 should stay unchanged
transaction.abort()
del vma21, fh21, a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zfile2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -334,39 +345,45 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
def T12():
T12done = chan()
@func
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
tell('T1-conn12-opened')
wait('T2-zfile2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zfile2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
conn12.close()
# open, modify, commit
def T22():
T22done = chan()
@func
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zfile and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
......@@ -375,24 +392,27 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma22, 0)[0] = 22
tell('T2-zfile2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zfile
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
conn22.close()
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def