Commit 3bd82127 authored by Kirill Smelkov's avatar Kirill Smelkov

lib/zodb: Add zconn_at draft (ZODB5 only)

For wendelin.core v2 we need a way to know at which particular database
state application-level ZODB connection is viewing the database. Knowing
that state, WCFS client library will interact with WCFS filesystem server
and, in simple terms, request the server to provide data as of that
particular database state.

Contrary to ZODB/go[1] ZODB/py does not provide the functionality to
obtain DB state of connection view, so we have to build it ourselves.
Let us call the function that for a client ZODB connection returns
database state corresponding to its database view as zconn_at.

It is relatively easy to implement zconn_at for ZODB5, since ZODB5
adopted MVCC uniformly and this patch does just that. However even with
ZODB5 currently all released ZODB5 versions have race in vs invalidations[2], and so the first ZODB5 release
with which zconn_at implemented here will work reliable should be
upcoming ZODB 5.5.2

It is TODO to implement zconn_at for ZODB4 and ZODB3, which organize
things differently.

Please note what would happen if zconn_at gives, even a bit, incorrect
answer: wcfs client will ask wcfs server to provide array data as of
different database state compared to current on-client ZODB connection.
This will result in that data accessed via ZBigArray will _not_
correspond to all other data accessed via regular ZODB mechanism.
It is, in other words, would be a data corruptions.

parent 8c0b7471
......@@ -17,13 +17,16 @@
# See COPYING file for full licensing terms.
# See for rationale and options.
from wendelin.lib.zodb import LivePersistent, deactivate_btree, dbclose
from wendelin.lib.zodb import LivePersistent, deactivate_btree, dbclose, zconn_at, zmajor
from wendelin.lib.testing import getTestDB
from persistent import Persistent, UPTODATE, GHOST, CHANGED
from ZODB import DB, POSException
from BTrees.IOBTree import IOBTree
import transaction
from transaction import TransactionManager
from golang import defer, func
from pytest import raises
import pytest; xfail = pytest.mark.xfail
testdb = None
......@@ -225,3 +228,89 @@ def test_deactivate_btree():
for obj in [B] + leafv:
assert obj._p_state == GHOST
assert obj not in cached
# verify that zconn_at gives correct answer.
@xfail(zmajor < 5, reason="zconn_at is TODO for ZODB4 and ZODB3")
def test_zconn_at():
stor = testdb.getZODBStorage()
db = DB(stor)
at0 = stor.lastTransaction()
# open connection, it must be viewing the database @at0
tm1 = TransactionManager()
conn1 =
assert zconn_at(conn1) == at0
# open another simultaneous connection
tm2 = TransactionManager()
conn2 =
assert zconn_at(conn2) == at0
# commit in conn1
root1 = conn1.root()
root1['z'] = 1
at1 = stor.lastTransaction()
# after commit conn1 view is updated; conn2 view stays @at0
assert zconn_at(conn1) == at1
assert zconn_at(conn2) == at0
# reopen conn1 -> view @at1
with raises(POSException.ConnectionStateError):
assert zconn_at(conn2) == at0
conn1_ =
assert conn1_ is conn1 # returned from DB pool
assert zconn_at(conn1) == at1
assert zconn_at(conn2) == at0
# commit empty transaction - view stays in sync with storage head
conn1_ =
assert conn1_ is conn1 # from DB pool
assert zconn_at(conn1) == at1
assert zconn_at(conn2) == at0
at1_ = stor.lastTransaction()
assert zconn_at(conn1) == at1_
assert zconn_at(conn2) == at0
# reopen conn2 -> view upated to @at1_
conn2_ =
assert conn2_ is conn2 # from DB pool
assert zconn_at(conn1) == at1_
assert zconn_at(conn2) == at1_
# verify with historic connection @at0
tm_old = TransactionManager()
conn_at0 =, at=at0)
assert conn_at0 is not conn1
assert conn_at0 is not conn2
assert zconn_at(conn_at0) == at0
# ---- misc ----
# zsync syncs ZODB storage.
# it is noop, if zstor does not support syncing (i.e. FileStorage has no .sync())
def zsync(zstor):
sync = getattr(zstor, 'sync', None)
if sync is not None:
......@@ -22,6 +22,8 @@
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB import DB
from ZODB import POSException
from ZODB.utils import p64, u64
from persistent import Persistent
from weakref import WeakSet
import gc
......@@ -137,6 +139,70 @@ def _deactivate_bucket(bucket):
# zconn_at returns tid as of which ZODB connection is viewing the database.
def zconn_at(zconn): # -> tid
assert isinstance(zconn, ZODB.Connection.Connection)
if zconn.opened is None: # zconn must be in "opened" state
raise POSException.ConnectionStateError("database connection is closed")
# ZODB5 uses MVCC uniformly
# zconn.db._storage always provides IMVCCStorage - either raw storage provides it,
# or DB wraps raw storage with MVCCAdapter.
# MVCCAdapter in turn uses either MVCCAdapterInstance (current) or
# HistoricalStorageAdapter, or UndoAdapterInstance. Retriving from those:
# MVCCAdapterInstance
# ._start
# HistoricalStorageAdapter
# ._before
# UndoAdapterInstance
# # no way to retrieve `at`, but .undo_instance() through which
# # UndoAdapterInstance is returnerd, is not used anywhere.
# For the reference: FileStorage, ZEO and NEO do not provide IMVCCStorage, thus
# for them we can rely on MVCCAdapterInstance.
# RelStorage is IMVCCStorage - TODO: how to extract at.
if zmajor >= 5:
zstor = zconn._storage
if isinstance(zstor, ZODB.mvccadapter.MVCCAdapterInstance):
return before2at(zstor._start)
if isinstance(zstor, ZODB.mvccadapter.HistoricalStorageAdapter):
return before2at(zstor._before)
raise AssertionError("zconn_at: TODO: add support for zstor %r" % zstor)
# Connection:
# .before !None for historic connections
# ._txn_time - if !None - set to tid of _next_ transaction
# XXX set to None initially - what to do?
# # XXX do something like that ZODB5 is doing:
# zconn._start = zconn._storage.lastTransaction() + 1
# # XXX _and_ check out queued invalidations
elif zmajor == 4:
raise AssertionError("zconn_at: TODO: add support for ZODB4")
raise AssertionError("zconn_at: TODO: add support for ZODB3")
# before2at converts tid that specifies database state as "before" into tid that
# specifies database state as "at".
def before2at(before): # -> at
return p64(u64(before) - 1)
# _zversion returns ZODB version object
def _zversion():
dzodb3 = pkg_resources.working_set.find(pkg_resources.Requirement.parse('ZODB3'))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment