Commit 92bfd03e authored by Kirill Smelkov's avatar Kirill Smelkov

bigfile: ZODB -> BigFileH invalidate propagation

Continuing theme from the previous patch, here is propagation of
invalidation messages from ZODB to BigFileH memory.

The use-case here is that e.g. one fileh mapping was created in one
connection, another in another, and after doing changes in second
connection and committing there, the first fileh has to invalidate
appropriate already-loaded pages, so its next transaction won't work
with stale data.

To do it, we hook into ZBlk._p_invalidate() and propagate the
invalidation message to ZBigFile which then notifies all
opened-through-it ZBigFileH to invalidate a page.

ZBlk -> ZBigFile lookup is done without storing backpointer in ZODB -
instead, every time ZBigFile touches ZBlk object (and thus potentially
does GHOST -> Live transition to it), we (re-)bind it back to ZBigFile.
Since ZBigFile is the only class that works with ZBlk objects it is safe
to do so.

For ZBigFile to notify "all-opened-through-it" ZBigFileH, a weakset is
introduced to track them.

Otherwise the real page invalidation work is done by virtmem (see
previous patch).
parent cb779c7b
......@@ -21,6 +21,7 @@ from wendelin.lib.zodb import dbclose
from wendelin.lib.testing import getTestDB
from persistent import UPTODATE
import transaction
from transaction import TransactionManager
from numpy import dtype, uint8, all, array_equal, arange
from threading import Thread
from six.moves import _thread
......@@ -357,3 +358,51 @@ def test_zbigarray_vs_conn_migration():
del a03
dbclose(root03)
# underlying ZBigFile/ZBigFileH should properly handle 'invalidate' messages from DB
# ( NOTE this test is almost dup of test_zbigarray_vs_cache_invalidation() )
def test_zbigarray_vs_cache_invalidation():
root = testdb.dbopen()
conn = root._p_jar
db = conn.db()
conn.close()
del root, conn
tm1 = TransactionManager()
tm2 = TransactionManager()
conn1 = db.open(transaction_manager=tm1)
root1 = conn1.root()
# setup zarray
root1['zarray3'] = a1 = ZBigArray((10,), uint8)
tm1.commit()
# set zarray initial data
a1[0:1] = [1] # XXX -> [0] = 1 after BigArray can
tm1.commit()
# read zarray in conn2
conn2 = db.open(transaction_manager=tm2)
root2 = conn2.root()
a2 = root2['zarray3']
assert a2[0:1] == [1] # read data in conn2 + make sure read correctly
# XXX -> [0] == 1 after BigArray can
# now zarray content is both in ZODB.Connection cache and in _ZBigFileH
# cache for each conn1 and conn2. Modify data in conn1 and make sure it
# fully propagate to conn2.
a1[0:1] = [2] # XXX -> [0] = 2 after BigArray can
tm1.commit()
tm2.commit() # just transaction boundary for t2
# data from tm1 should propagate -> ZODB -> ram pages for _ZBigFileH in conn2
assert a2[0] == 2
conn2.close()
del conn2, root2
dbclose(root1)
......@@ -324,6 +324,21 @@ PyFunc(pyfileh_isdirty, "isdirty() - are there any changes to fileh memory at al
}
PyFunc(pyfileh_invalidate_page, "invalidate_page(pgoffset) - invalidate fileh page")
(PyObject *pyfileh0, PyObject *args)
{
PyBigFileH *pyfileh = upcast(PyBigFileH *, pyfileh0);
Py_ssize_t pgoffset; // XXX Py_ssize_t vs pgoff_t ?
if (!PyArg_ParseTuple(args, "n", &pgoffset))
return NULL;
fileh_invalidate_page(pyfileh, pgoffset);
Py_RETURN_NONE;
}
static void
pyfileh_dealloc(PyObject *pyfileh0)
{
......@@ -365,6 +380,7 @@ static /*const*/ PyMethodDef pyfileh_methods[] = {
{"dirty_writeout", pyfileh_dirty_writeout, METH_VARARGS, pyfileh_dirty_writeout_doc},
{"dirty_discard", pyfileh_dirty_discard, METH_VARARGS, pyfileh_dirty_discard_doc},
{"isdirty", pyfileh_isdirty, METH_VARARGS, pyfileh_isdirty_doc},
{"invalidate_page", pyfileh_invalidate_page,METH_VARARGS, pyfileh_invalidate_page_doc},
{NULL}
};
......
......@@ -36,6 +36,7 @@ from persistent import Persistent, PickleCache
from BTrees.LOBTree import LOBTree
from zope.interface import implementer
from ZODB.Connection import Connection
from weakref import WeakSet
# TODO document that first data access must be either after commit or Connection.add
......@@ -48,7 +49,9 @@ from ZODB.Connection import Connection
# data of 1 file block as stored in ZODB
class ZBlk(Persistent):
# ._v_blkdata - bytes
__slots__ = ('_v_blkdata')
# ._v_zfile - ZBigFile | None
# ._v_blk - offset of this blk in ._v_zfile | None
__slots__ = ('_v_blkdata', '_v_zfile', '_v_blk')
# NOTE _v_ - so that we can alter it without Persistent noticing -- we'll
# manage ZBlk states by ourselves explicitly.
......@@ -63,7 +66,9 @@ class ZBlk(Persistent):
# intermediate copy not to waste memory - see below)
if self._v_blkdata is None:
# ZODB says: further accesses will cause object data to be reloaded.
self._p_invalidate()
# NOTE directly Persistent._p_invalidate() to avoid
# virtmem->loadblk->invalidate callback
Persistent._p_invalidate(self)
blkdata = self._v_blkdata
assert blkdata is not None
......@@ -102,7 +107,40 @@ class ZBlk(Persistent):
# DB (through pickle) loads data to memory
def __setstate__(self, state):
self._v_blkdata = state
self._v_zfile = None
self._v_blk = None
# ZBlk as initially created (empty placeholder)
def __init__(self):
self._v_zfile = None
self._v_blk = None
# NOTE ._v_blkdata is not set - the master will set it from outside
# make this ZBlk know it represents zfile[blk]
# NOTE this has to be called by master every time ZBlk object potentially
# goes from GHOST to Live state
# NOTE it is ok to keep reference to zfile (yes it creates
# ZBigFile->ZBlk->ZBigFile cycle but that does not hurt).
def bindzfile(self, zfile, blk):
# bind; if already bound, should be the same
if self._v_zfile is None:
self._v_zfile = zfile
self._v_blk = blk
else:
assert self._v_zfile is zfile
assert self._v_blk == blk
# DB notifies this object has to be invalidated
# (DB -> invalidate ._v_blkdata -> invalidate memory-page)
def _p_invalidate(self):
# on invalidation we must be already bound
# (to know which ZBigFileH to propagate invalidation to)
assert self._v_zfile is not None
assert self._v_blk is not None
self._v_zfile.invalidateblk(self._v_blk)
Persistent._p_invalidate(self)
# XXX merge Persistent/BigFile comments into 1 place
......@@ -177,6 +215,7 @@ class ZBigFile(LivePersistent):
# .blktab {} blk -> ZBlk(blkdata)
# ._v_file _ZBigFile helper
# ._v_filehset weakset( _ZBigFileH ) that we created
#
# NOTE Live: don't allow us to go to ghost state not to loose ._v_file
# which DataManager _ZBigFileH refers to.
......@@ -193,6 +232,7 @@ class ZBigFile(LivePersistent):
def __setstate__(self, state):
self.blksize, self.blktab = state
self._v_file = _ZBigFile(self, self.blksize)
self._v_filehset = WeakSet()
# load data ZODB obj -> page
......@@ -209,6 +249,7 @@ class ZBigFile(LivePersistent):
# also in DB better store directly {#blk -> #dataref} without ZBlk overhead
blkdata = zblk.loadblkdata()
assert len(blkdata) == self._v_file.blksize
zblk.bindzfile(self, blk)
memcpy(buf, blkdata) # FIXME memcpy
......@@ -220,10 +261,21 @@ class ZBigFile(LivePersistent):
zblk._v_blkdata = bytes(buf) # FIXME does memcpy
zblk._p_changed = True # if zblk was already in DB: _p_state -> CHANGED
zblk.bindzfile(self, blk)
# invalidate data .blktab[blk] invalidated -> invalidate page
def invalidateblk(self, blk):
for fileh in self._v_filehset:
fileh.invalidate_page(blk) # XXX assumes blksize == pagesize
# bigfile-like
def fileh_open(self): return _ZBigFileH(self)
def fileh_open(self):
fileh = _ZBigFileH(self)
self._v_filehset.add(fileh)
return fileh
......@@ -341,6 +393,9 @@ class _ZBigFileH(object):
# .dirty_writeout? -- not needed - these are handled via
# .dirty_discard? -- transaction commit/abort
def invalidate_page(self, pgoffset):
return self.zfileh.invalidate_page(pgoffset)
# ~~~~ ISynchronizer ~~~~
def beforeCompletion(self, txn):
......
......@@ -21,6 +21,7 @@ from wendelin.lib.zodb import dbclose
from wendelin.lib.testing import getTestDB
from persistent import UPTODATE, GHOST
import transaction
from transaction import TransactionManager
from numpy import ndarray, array_equal, uint8, zeros
from threading import Thread
from six.moves import _thread
......@@ -484,3 +485,57 @@ def test_bigfile_filezodb_vs_conn_migration():
del vma03, fh03, f03
dbclose(root03)
# ZBlk should properly handle 'invalidate' messages from DB
# ( NOTE this test is almost dupped at test_zbigarray_vs_cache_invalidation() )
def test_bigfile_filezodb_vs_cache_invalidation():
root = dbopen()
conn = root._p_jar
db = conn.db()
conn.close()
del root, conn
tm1 = TransactionManager()
tm2 = TransactionManager()
conn1 = db.open(transaction_manager=tm1)
root1 = conn1.root()
# setup zfile with fileh view to it
root1['zfile3'] = f1 = ZBigFile(blksize)
tm1.commit()
fh1 = f1.fileh_open()
tm1.commit()
# set zfile initial data
vma1 = fh1.mmap(0, 1)
Blk(vma1, 0)[0] = 1
tm1.commit()
# read zfile and setup fileh for it in conn2
conn2 = db.open(transaction_manager=tm2)
root2 = conn2.root()
f2 = root2['zfile3']
fh2 = f2.fileh_open()
vma2 = fh2.mmap(0, 1)
assert Blk(vma2, 0)[0] == 1 # read data in conn2 + make sure read correctly
# now zfile content is both in ZODB.Connection cache and in _ZBigFileH
# cache for each conn1 and conn2. Modify data in conn1 and make sure it
# fully propagate to conn2.
Blk(vma1, 0)[0] = 2
tm1.commit()
tm2.commit() # just transaction boundary for t2
# data from tm1 should propagate -> ZODB -> ram pages for _ZBigFileH in conn2
assert Blk(vma2, 0)[0] == 2
conn2.close()
del conn2, root2
dbclose(root1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment