Commit 4174b84a authored by Kirill Smelkov's avatar Kirill Smelkov

bigfile: BigFile backend to store data in ZODB

This adds transactionality and with e.g. NEO[1] allows to distribute
objects to nodes into cluster.

We hook into ZODB two-phase commit process as a separate data manager,
and synchronize changes to memory, to changes to object only at that
time.

Alternative would be to get notified on every page change, and mark
appropriate object as dirty right at that moment.

But I wanted to stay close to filesystem design (we don't get
notification for every file change from kernel) - that's why it is done
the first way.

[1] http://www.neoppod.org/
parent b3910de8
# -*- coding: utf-8 -*-
# Wendelin.bigfile | BigFile ZODB backend
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Open Source Initiative approved licenses and Convey
# the resulting work. Corresponding source of such a combination shall include
# the source code for all other software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
""" BigFile backed by ZODB objects
TODO big picture module description
Things are done this way (vs more ZODB-like way) because:
- compatibility with FS approach (to be able to switch to fuse)
- only one manager (vs two managers - bigfile & Connection and synchronizing them)
- on abort no need to synchronize changes (= "only 1 manager" as above)
- TODO ...
"""
from wendelin.bigfile import BigFile, WRITEOUT_STORE, WRITEOUT_MARKSTORED
from wendelin.lib.mem import bzero, memcpy
from transaction.interfaces import IDataManager, ISynchronizer
from persistent import Persistent, PickleCache
from BTrees.LOBTree import LOBTree
from zope.interface import implementer
# TODO document that first data access must be either after commit or Connection.add
# FIXME peak 2·Ndirty memory consumption on write (should be 1·NDirty)
# XXX write about why we can't memory-dirty -> ZBlk (memory could be modified
# later one more time)
# data of 1 file block as stored in ZODB
class ZBlk(Persistent):
# ._v_blkdata - bytes
__slots__ = ('_v_blkdata')
# NOTE _v_ - so that we can alter it without Persistent noticing -- we'll
# manage ZBlk states by ourselves explicitly.
# TODO __getstate__ / __reduce__ so that it pickles minimally?
# client requests us to load blkdata from DB
# (DB -> ._v_blkdata -> memory-page)
def loadblkdata(self):
# ensure ._v_blkdata is loaded
# (it could be not, if e.g. loadblk is called second time for the same
# blk - we loaded it first time and thrown ._v_blkdata away as unneeded
# intermediate copy not to waste memory - see below)
if self._v_blkdata is None:
# ZODB says: further accesses will cause object data to be reloaded.
self._p_invalidate()
blkdata = self._v_blkdata
assert blkdata is not None
# do not waste memory - ._v_blkdata is used only as intermediate copy on path
# from DB to fileh mapping. See also counterpart action in __getstate__().
self._v_blkdata = None
return blkdata
# DB (through pickle) requests us to emit state to save
# (DB <- ._v_blkdata <- memory-page)
def __getstate__(self):
# request to pickle should go in only when zblk was set changed (by
# storeblk), and only once.
assert self._v_blkdata is not None
# NOTE self._p_changed is not necessarily true here - e.g. it is not
# for newly created objects irregardles of initial ._p_changed=True
blkdata = self._v_blkdata
# do not waste memory for duplicated data - as soon as blkdata lands
# into DB we can drop it here becase ._v_blkdata was only an
# intermediate copy from fileh memory to database.
#
# FIXME this works, but transaction first prepares all objects for
# commit and ony the saves thems. For use it means __getstate__
# gets called very late and for all objects and thus we'll be
# keeping ._v_blkdata for all of them before final phase =
# 2·NDirty peak memory consumption.
self._v_blkdata = None
# XXX change .p_state to GHOST ?
return blkdata
# DB (through pickle) loads data to memory
def __setstate__(self, state):
self._v_blkdata = state
# XXX merge Persistent/BigFile comments into 1 place
# helper for ZBigFile - just redirect loadblk/storeblk back
# (because it is not possible to inherit from both Persistent and BigFile at
# the same time - see below)
class _ZBigFile(BigFile):
# .zself - reference to ZBigFile
def __new__(cls, zself, blksize):
obj = BigFile.__new__(cls, blksize)
obj.zself = zself
return obj
# redirect load/store to main class
def loadblk(self, blk, buf): return self.zself.loadblk(blk, buf)
def storeblk(self, blk, buf): return self.zself.storeblk(blk, buf)
# Persistent that never goes to ghost state, if it was ever uptodate.
# XXX move to common place?
class LivePersistent(Persistent):
# NOTE in ZODB < 3.10 (exactly before commit a9cda7fb) objects coming from
# db are first created in seemingly uptodate state and then deactivated
# just to register them by the way to persistent machinery.
#
# In ZODB >= 3.10 such registration happens explicitly and no such first
# deactivation is done.
if not hasattr(PickleCache, 'new_ghost'):
_v_registered = False # were we registered to Persistent machinery?
else:
_v_registered = True
# don't allow us to go to ghost
#
# NOTE we can't use STICKY as that state is assumed as
# short-lived-temporary by ZODB and is changed back to UPTODATE by
# persistent code. In fact ZODB says: STICKY is UPTODATE+keep in memory.
def _p_deactivate(self):
# on ZODB < 3.10 need to call Persistent._p_deactivate() the first time
# - it registers the object with its own machinery on first ghostify
# (which happens right before-on object load from DB) see comments in
# Per__p_deactivate() in persistent.
if not self._v_registered:
Persistent._p_deactivate(self)
# 1) prevents calling Persistent._p_deactivate() next time, and
# 2) causes Persistent to change state to UPTODATE
self._v_registered = True
# just returning here won't allow Persistent._p_deactivate() run and
# thus we'll stay in non-ghost state.
return
# when creating initially (contrast to loading from DB), no need to go to
# Persistent._p_deactivate() even for the first time
def __init__(self):
Persistent.__init__(self)
self._v_registered = True
# NOTE Can't inherit from Persistent and BigFile at the same time - both are C
# types and their layout conflict. Persistent must be here for object to be
# tracked -> BigFile is moved to a data member (the same way and for the same
# reason it is done for PersistentList & PersistentDict).
class ZBigFile(LivePersistent):
# file is split into blocks; each block is stored as separate object in the DB
#
# .blksize
# .blktab {} blk -> ZBlk(blkdata)
# ._v_file _ZBigFile helper
#
# NOTE Live: don't allow us to go to ghost state not to loose ._v_file
# which DataManager _ZBigFileH refers to.
def __init__(self, blksize):
LivePersistent.__init__(self)
self.__setstate__((blksize, LOBTree())) # NOTE L enough for blk_t
# TODO verify get,set state
def __getstate__(self):
return (self.blksize, self.blktab)
def __setstate__(self, state):
self.blksize, self.blktab = state
self._v_file = _ZBigFile(self, self.blksize)
# load data ZODB obj -> page
def loadblk(self, blk, buf):
zblk = self.blktab.get(blk)
# no data written yet - "hole" reads as all zeros
if zblk is None:
# Nothing to do here - the memory buf obtained from OS comes pre-cleared
# XXX reenable once/if memory comes uninitialized here
#bzero(buf)
return
# TODO use specialized unpickler and load data directly to blk.
# also in DB better store directly {#blk -> #dataref} without ZBlk overhead
blkdata = zblk.loadblkdata()
assert len(blkdata) == self._v_file.blksize
memcpy(buf, blkdata) # FIXME memcpy
# store data dirty page -> ZODB obj
def storeblk(self, blk, buf):
zblk = self.blktab.get(blk)
if zblk is None:
zblk = self.blktab[blk] = ZBlk()
zblk._v_blkdata = bytes(buf) # FIXME does memcpy
zblk._p_changed = True # if zblk was already in DB: _p_state -> CHANGED
# bigfile-like
def fileh_open(self): return _ZBigFileH(self)
# BigFileH wrapper that also acts as DataManager proxying changes back to ZODB
# objects at two-phase-commit (TPC) level.
#
# NOTE several fileh can be opened for ZBigFile - and this does not
# conflict with the way ZODB organises its work - just for fileh
# handles ZBigFile acts as a database, and for real ZODB database
# it acts as (one of) connections.
#
# NOTE ISynchronizer is used only to be able to join into transactions without
# tracking intermediate changes to pages.
@implementer(IDataManager)
@implementer(ISynchronizer)
class _ZBigFileH(object):
# .zfile ZBigFile we were opened for
# .zfileh handle for ^^^
def __init__(self, zfile):
self.zfile = zfile
self.zfileh = zfile._v_file.fileh_open()
# FIXME zfile._p_jar could be None (ex. ZBigFile is newly created
# before first commit)
# IDataManager requires .transaction_manager
self.transaction_manager = zfile._p_jar.transaction_manager
# Hook into txn_manager so that we get a chance to run before
# transaction.commit(). (see .beforeCompletion() with more details)
self.transaction_manager.registerSynch(self)
# XXX txn_manager unregister synchs itself (it uses weakset to keep references)
# XXX however that unregistration is delayed to gc.collect() time and
# XXX maybe we should not perform any action right after Connection is closed
# (as it is now .beforeCompletion() continue to be getting notified)
# ~~~~ BigFileH wrapper ~~~~
def mmap(self, pgoffset, pglen): return self.zfileh.mmap(pgoffset, pglen)
# .dirty_writeout? -- not needed - these are handled via
# .dirty_discard? -- transaction commit/abort
# ~~~~ ISynchronizer ~~~~
def beforeCompletion(self, txn):
# if dirty, join every transaction so that it knows we want to participate in TPC.
#
# This is needed because we do not track every change to pages (and
# then immediately join txn, like ZODB Connection do for objects), but
# instead join txn here right before commit.
if not self.zfileh.isdirty():
return
txn.join(self)
# XXX hack - join Connection manually before transaction.commit() starts.
#
# the reason we do it here, is that if we don't and Connection was not
# yet joined to transaction (i.e. no changes were made to Connection's
# objects), as storeblk() running inside commit will case changes to
# ZODB objects, zconn will want to join transaction, and that is
# currently forbidden.
#
# More cleaner fix would be to teach transaction to allow joining
# DataManagers while commit is running, but that comes with dificulties
# if wanting to support whole transaction semantics (i.e. whether to
# call joined tpc_begin(), if we are already at commit()? And also what
# to do about order - newly joined sortKey() could be lesser than
# DataManagers already done at current TPC phase...
zconn = self.zfile._p_jar
assert txn is zconn.transaction_manager.get()
if zconn._needs_to_join: # same as Connection._register(obj)
txn.join(zconn) # on first obj, without registering
zconn._needs_to_join = False # anything.
def afterCompletion(self, txn):
pass
# NOTE called not always - only at explicit transaction.begin()
# -> can't use as new-txn synchronization hook
def newTransaction(self, txn):
pass
# ~~~~ IDataManager ~~~~
# key ordering us wrt other DataManager in tpc_*() sequence calling
# NOTE for our propagated-to-objects changes to propagate to ZODB, we need
# to act earlier than ZODB DataManager managin zfile.blktab - hence the trick.
def sortKey(self):
# XXX _p_jar can be None - if that object is new?
zkey = self.zfile._p_jar.sortKey()
key = "%s%s (%s:%s)" % (
# first letter smaller and, if possible, readable
zkey[0] > ' ' and ' ' or '\0',
zkey[1:], type(self), id(self))
assert key < zkey
return key
# abort txn which is not in TPC phase
def abort(self, txn):
self.zfileh.dirty_discard()
def tpc_begin(self, txn):
# TODO probably take some lock and mark dirty pages RO, so that other
# threads could not change data while the transaction is being
# committed
pass
def commit(self, txn):
# propagate changes to objects, but does not mark pages as stored -
# - in case of following tpc_abort() we'll just drop dirty pages and
# changes to objects we'll be done by ZODB data manager (= Connection)
self.zfileh.dirty_writeout(WRITEOUT_STORE)
def tpc_vote(self, txn):
# XXX what here? verify that zfile.blktab can be committed without errors?
pass
def tpc_finish(self, txn):
# finish is just "mark dirty pages as stored ok"
self.zfileh.dirty_writeout(WRITEOUT_MARKSTORED)
# abort txn which is in TPC phase
def tpc_abort(self, txn):
# this is like .abort(), but any of (including none)
# tpc_{begin,commit,vote,finish) were called
#
# we assume .tpc_finish() was not yet called XXX write why
# (because for tpc_abort to work after tpc_finish, we'll need to store
# which file parts we wrote and invalidate that -> simpler not to care
# and also more right - tpc_finish is there assumed as non-failing by
# ZODB design)
self.abort(txn)
# FIXME for some (yet) unknown reason, py.test fails to import wendelin from tests without this __init__.py
# see e.g. http://stackoverflow.com/questions/10253826/path-issue-with-pytest-importerror-no-module-named-yadayadayada
# Wendelin.bigfile | common bits for ZODB-related tests
# TODO copyright/license
from ZODB.FileStorage import FileStorage
from ZODB import DB
# open stor/db/connection and return root obj
def dbopen(path):
stor = FileStorage(path)
db = DB(stor)
conn = db.open()
root = conn.root()
return root
# close db/connection/storage identified by root obj
def dbclose(root):
conn = root._p_jar
db = conn.db()
stor = db.storage
conn.close()
db.close()
stor.close()
# Wendeling.core.bigfile | Tests for ZODB BigFile backend
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Open Source Initiative approved licenses and Convey
# the resulting work. Corresponding source of such a combination shall include
# the source code for all other software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
from wendelin.bigfile.file_zodb import LivePersistent, ZBigFile
from wendelin.bigfile.tests.common_zodb import dbopen as z_dbopen, dbclose
from wendelin.bigfile import ram_reclaim
from persistent import UPTODATE, GHOST
import transaction
from tempfile import mkdtemp
from shutil import rmtree
from numpy import ndarray, array_equal, uint8, zeros
from pytest import raises
from six.moves import range as xrange
tmpd = None
blksize = 2*1024*1024 # XXX hardcoded
blen = 32 # 32*2 = 64MB # TODO set it higher by default ?
def dbopen():
return z_dbopen('%s/1.fs' % tmpd)
def setup_module():
global tmpd
tmpd = mkdtemp('', 'bigzodb.')
def teardown_module():
rmtree(tmpd)
# like db.cacheDetail(), but {} instead of []
def cacheInfo(db):
return dict(db.cacheDetail())
# key for cacheInfo() result
def kkey(klass):
return '%s.%s' % (klass.__module__, klass.__name__)
def test_livepersistent():
root = dbopen()
transaction.commit() # set root._p_jar
db = root._p_jar.db()
# ~~~ test `obj initially created` case
root['live'] = lp = LivePersistent()
assert lp._p_jar is None # connection does not know about it yet
assert lp._p_state == UPTODATE # object initially created in uptodate
# should not be in cache yet & thus should stay after gc
db.cacheMinimize()
assert lp._p_jar is None
assert lp._p_state == UPTODATE
ci = cacheInfo(db)
assert kkey(LivePersistent) not in ci
# should be registered to connection & cache after commit
transaction.commit()
assert lp._p_jar is not None
assert lp._p_state == UPTODATE
ci = cacheInfo(db)
assert ci[kkey(LivePersistent)] == 1
# should stay that way after cache gc
db.cacheMinimize()
assert lp._p_jar is not None
assert lp._p_state == UPTODATE
ci = cacheInfo(db)
assert ci[kkey(LivePersistent)] == 1
# ~~~ reopen & test `obj loaded from db` case
dbclose(root)
del root, db, lp
root = dbopen()
db = root._p_jar.db()
# known to connection & cache & UPTODATE (ZODB < 3.10) or GHOST (ZODB >= 3.10)
# right after first loading from DB
lp = root['live']
assert lp._p_jar is not None
assert lp._p_state in (UPTODATE, GHOST)
ci = cacheInfo(db)
assert ci[kkey(LivePersistent)] == 1
# should be UPTODATE for sure after read access
getattr(lp, 'attr', None)
assert lp._p_jar is not None
assert lp._p_state is UPTODATE
ci = cacheInfo(db)
assert ci[kkey(LivePersistent)] == 1
# does not go back to ghost on cache gc
db.cacheMinimize()
assert lp._p_jar is not None
assert lp._p_state == UPTODATE
ci = cacheInfo(db)
assert ci[kkey(LivePersistent)] == 1
# ok
dbclose(root)
# i'th memory block as u8 ndarray
def Blk(vma, i):
return ndarray(blksize, offset=i*blksize, buffer=vma, dtype=uint8)
def test_bigfile_filezodb():
root = dbopen()
root['zfile'] = f = ZBigFile(blksize)
transaction.commit()
fh = f.fileh_open() # TODO + ram
vma = fh.mmap(0, blen) # XXX assumes blksize == pagesize
# verify that empty file reads as all zeros
data0 = zeros(blksize, dtype=uint8)
for i in xrange(blen):
assert array_equal(data0, Blk(vma, i))
# dirty data
for i in xrange(blen):
Blk(vma, i)[0] = i
# verify that the changes are lost after abort
transaction.abort()
for i in xrange(blen):
assert array_equal(data0, Blk(vma, i))
# dirty & abort once again
# (verifies that ZBigFile data manager re-registers with transaction)
for i in xrange(blen):
Blk(vma, i)[0] = i
transaction.abort()
for i in xrange(blen):
assert array_equal(data0, Blk(vma, i))
# dirty data & commit
for i in xrange(blen):
Blk(vma, i)[0] = i
transaction.commit()
# close DB and reopen everything
# vma.unmap()
del vma
#fh.close()
del fh
dbclose(root)
del root
root = dbopen()
f = root['zfile']
fh = f.fileh_open() # TODO + ram
vma = fh.mmap(0, blen) # XXX assumes blksize == pagesize
# verify data as re-loaded
for i in xrange(blen):
assert Blk(vma, i)[0] == i
# evict all loaded pages and test loading them again
# (verifies ZBlk.loadblkdata() & loadblk logic when loading data the second time)
reclaimed = 0
while 1:
n = ram_reclaim() # TODO + ram
if n == 0:
break
reclaimed += n
assert reclaimed >= blen # XXX assumes pagesize=blksize
for i in xrange(blen):
assert Blk(vma, i)[0] == i
# dirty once again & commit
# (verified ZBlk.__setstate__() & storeblk logic when storing data the second time)
for i in xrange(blen):
Blk(vma, i)[0] = i+1
transaction.commit()
# close DB and reopen everything
del vma
del fh
dbclose(root)
del root
root = dbopen()
f = root['zfile']
fh = f.fileh_open() # TODO + ram
vma = fh.mmap(0, blen) # XXX assumes blksize == pagesize
# verify data as re-loaded
for i in xrange(blen):
assert Blk(vma, i)[0] == i+1
# ZBigFile should survive Persistent cache clearing and not go to ghost
# state (else logic to propagate changes from pages to objects would subtly
# brake after Persistent cache gc)
db = root._p_jar.db()
ci = cacheInfo(db)
assert ci[kkey(ZBigFile)] == 1
assert f._p_state == UPTODATE
db.cacheMinimize()
ci = cacheInfo(db)
assert ci[kkey(ZBigFile)] == 1
assert f._p_state == UPTODATE # it would be GHOST without LivePersistent protection
# verify that data changes propagation continue to work
assert Blk(vma, 0)[0] == 1
Blk(vma, 0)[0] = 99
transaction.commit()
del vma
del fh
dbclose(root)
del db, root
root = dbopen()
f = root['zfile']
fh = f.fileh_open() # TODO + ram
vma = fh.mmap(0, blen) # XXX assumes blksize == pagesize
# verify data as re-loaded
assert Blk(vma, 0)[0] == 99
for i in xrange(1, blen):
assert Blk(vma, i)[0] == i+1
dbclose(root)
......@@ -184,6 +184,12 @@ setup(
install_requires = [
'numpy', # lib/mem
# for ZBigFile
# ( NOTE: ZODB3 3.11 just pulls in latest ZODB _4_, so this way
# specifying ZODB _3_ we allow external requirements to
# specify either to use e.g. ZODB3.10 or ZODB4 )
'ZODB3',
'six', # compat py2/py3
],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment