Commit c39d5c67 authored by Julien Muchembled's avatar Julien Muchembled

Add support for latest versions of ZODB (4.4.3 & 5.0.1)

Many patches have been merged upstream :)

A notable change is that lastTransaction() does not ping the master anymore
(but it still causes a connection to the master if the client is disconnected).
parent 13911ca3
Pipeline #2043 skipped
Patch to ZODB 4.3.1 for ZODB unit tests.
See also monkey-patch to Connection.tpc_finish in neo/client/__init__.py
diff --git a/src/ZODB/interfaces.py b/src/ZODB/interfaces.py
index 44ded35..db5c525 100644
--- a/src/ZODB/interfaces.py
+++ b/src/ZODB/interfaces.py
@@ -776,6 +776,10 @@ def tpc_finish(transaction, func = lambda tid: None):
called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction.
+ The return value can be either None or a serial giving new
+ serial for objects whose ids were passed to previous store calls
+ in the same transaction, and for which no serial was returned
+ from either store or tpc_vote for objects passed to store.
"""
def tpc_vote(transaction):
@@ -794,8 +798,6 @@ def tpc_vote(transaction):
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
- After the tpc_vote call, new serials must have been returned,
- either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to
diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py
index e5c0526..0291654 100644
--- a/src/ZODB/tests/BasicStorage.py
+++ b/src/ZODB/tests/BasicStorage.py
@@ -69,8 +69,10 @@ def checkSerialIsNoneForInitialRevision(self):
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
- self._storage.tpc_finish(txn)
+ serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
+ if newrevid is None and serial is not None:
+ newrevid = serial
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
diff --git a/src/ZODB/tests/MTStorage.py b/src/ZODB/tests/MTStorage.py
index 4ea139c..347cae3 100644
--- a/src/ZODB/tests/MTStorage.py
+++ b/src/ZODB/tests/MTStorage.py
@@ -152,10 +152,12 @@ def dostore(self, i):
r2 = self.storage.tpc_vote(t)
self.pause()
- self.storage.tpc_finish(t)
+ serial = self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
+ if serial is not None and revid is None:
+ revid = serial
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
diff --git a/src/ZODB/tests/RevisionStorage.py b/src/ZODB/tests/RevisionStorage.py
index 2a5c370..0a0120a 100644
--- a/src/ZODB/tests/RevisionStorage.py
+++ b/src/ZODB/tests/RevisionStorage.py
@@ -150,10 +150,12 @@ def helper(tid, revid, x):
# Finish the transaction
r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
+ if serial is not None and newrevid is None:
+ newrevid = serial
return newrevid
revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2)
diff --git a/src/ZODB/tests/StorageTestBase.py b/src/ZODB/tests/StorageTestBase.py
index f0a2b72..5305807 100644
--- a/src/ZODB/tests/StorageTestBase.py
+++ b/src/ZODB/tests/StorageTestBase.py
@@ -132,7 +132,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials().
"""
- return handle_all_serials(oid, *args)[oid]
+ return handle_all_serials(oid, *args).get(oid)
def import_helper(name):
__import__(name)
@@ -189,7 +189,9 @@ def _dostore(self, oid=None, revid=None, data=None,
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
+ if serial is not None and revid is None:
+ revid = serial
except:
self._storage.tpc_abort(t)
raise
@@ -209,8 +211,8 @@ def _undo(self, tid, expected_oids=None, note=None):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- if expected_oids is not None:
+ serial = self._storage.tpc_finish(t)
+ if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
diff --git a/src/ZODB/tests/TransactionalUndoStorage.py b/src/ZODB/tests/TransactionalUndoStorage.py
index 0ee68b2..4dab889 100644
--- a/src/ZODB/tests/TransactionalUndoStorage.py
+++ b/src/ZODB/tests/TransactionalUndoStorage.py
@@ -73,6 +73,12 @@ def _transaction_vote(self, trans):
def _transaction_newserial(self, oid):
return self.__serials[oid]
+ def _transaction_finish(self, t, oid_list):
+ tid = self._storage.tpc_finish(t)
+ if tid is not None:
+ for oid in oid_list:
+ self.__serials[oid] = tid
+
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
@@ -82,7 +88,7 @@ def _multi_obj_transaction(self, objs):
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
@@ -219,9 +225,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
@@ -231,9 +237,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
@@ -289,10 +295,11 @@ def checkTwoObjectUndoAtOnce(self):
tid1 = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
# We may get the finalization stuff called an extra time,
# depending on the implementation.
- self.assertEqual(set(oids), set((oid1, oid2)))
+ if serial is None:
+ self.assertEqual(set(oids), {oid1, oid2})
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
@@ -326,7 +333,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -346,7 +353,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -358,10 +365,11 @@ def checkTwoObjectUndoAgain(self):
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
- self._storage.tpc_finish(t)
- eq(len(oids), 1)
- self.assertTrue(oid1 in oids)
- self.assertTrue(not oid2 in oids)
+ serial = self._storage.tpc_finish(t)
+ if serial is None:
+ eq(len(oids), 1)
+ self.assertTrue(oid1 in oids)
+ self.assertTrue(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
@@ -398,7 +406,7 @@ def checkNotUndoable(self):
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -685,9 +693,9 @@ def undo(i):
tid = p64(i + 1)
eq(txn.tid, tid)
- L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
- L2 = [(oid, revid, None) for _tid, oid, revid in orig
- if _tid == tid]
+ L1 = {(rec.oid, rec.tid, rec.data_txn) for rec in txn}
+ L2 = {(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid}
eq(L1, L2)
......@@ -159,11 +159,8 @@ class Storage(BaseStorage.BaseStorage,
except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid)
def sync(self, force=True):
# XXX: sync() is part of IMVCCStorage and we don't want to be called
# from afterCompletion() so it may not be a good place to ping the
# master here. See also monkey-patch in __init__.py
self.app.lastTransaction()
def sync(self):
return self.app.sync()
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
......@@ -186,7 +183,7 @@ class Storage(BaseStorage.BaseStorage,
def lastTransaction(self):
# Used in ZODB unit tests
return self.app.lastTransaction()
return self.app.last_tid
def _clear_temp(self):
raise NotImplementedError
......
......@@ -12,15 +12,11 @@
#
##############################################################################
import app # set up signal handers early enough to do it in the main thread
if 1:
def patch():
from hashlib import md5
from ZODB.Connection import Connection
def _check(f, *args):
h = md5(f.func_code.co_code).hexdigest()
assert h in args, h
H = lambda f: md5(f.func_code.co_code).hexdigest()
# Allow serial to be returned as late as tpc_finish
#
......@@ -28,9 +24,7 @@ if 1:
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
_check(Connection.tpc_finish,
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
h = H(Connection.tpc_finish)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
......@@ -61,18 +55,27 @@ if 1:
# </patch>
self._tpc_cleanup()
global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish
elif hasattr(Connection, '_handle_serial'): # merged upstream ?
assert hasattr(Connection, '_warn_about_returned_serial')
# IStorage implementations usually need to provide a "network barrier",
# at least for NEO & ZEO, to make sure we have an up-to-date view of
# the storage. It's unclear whether sync() is a good place to do this
# because a round-trip to the server introduces latency and we prefer
# it's not done when it's not useful.
# For example, we know we are up-to-date after a successful commit,
# so this should not be done in afterCompletion(), and anyway, we don't
# know any legitimate use of DB access outside a transaction.
# sync() is used to provide a "network barrier", which is required for
# NEO & ZEO to make sure our view of the storage includes all changes done
# so far by other clients. But a round-trip to the server introduces
# latency so it must not be done when it's not useful. Note also that a
# successful commit (which ends with a response from the master) already
# acts as a "network barrier".
# BBB: What this monkey-patch does has been merged in ZODB5.
if not hasattr(Connection, '_flush_invalidations'):
return
_check(Connection.afterCompletion,
assert H(Connection.afterCompletion) in (
'cd3a080b80fd957190ff3bb867149448', # Python 2.7
)
......@@ -81,3 +84,7 @@ if 1:
# PATCH: do not call sync()
self._flush_invalidations()
Connection.afterCompletion = afterCompletion
patch()
import app # set up signal handers early enough to do it in the main thread
......@@ -24,7 +24,9 @@ from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError
from ZODB.ConflictResolution import ResolvedSerial
from . import OLD_ZODB
if OLD_ZODB:
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
......@@ -108,7 +110,6 @@ class Application(ThreadedApplication):
self._loading_oid = None
self.new_oid_list = ()
self.last_oid = '\0' * 8
self.last_tid = None
self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self)
......@@ -136,7 +137,11 @@ class Application(ThreadedApplication):
self.compress = compress
def __getattr__(self, attr):
if attr == 'pt':
if attr in ('last_tid', 'pt'):
if self._connecting_to_master_node.locked():
if attr == 'last_tid':
return
else:
self._getMasterConnection()
return self.__getattribute__(attr)
......@@ -603,7 +608,7 @@ class Application(ThreadedApplication):
logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
append((oid, ResolvedSerial) if OLD_ZODB else oid)
return result
def tpc_vote(self, transaction, tryToResolveConflict):
......@@ -950,9 +955,8 @@ class Application(ThreadedApplication):
from .iterator import iterator
def lastTransaction(self):
self._askPrimary(Packets.AskLastTransaction())
return self.last_tid
def sync(self):
self._askPrimary(Packets.Ping())
def pack(self, t):
tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw()
......
......@@ -99,7 +99,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
app = self.app
ltid = packet.decode()[0]
if app.last_tid != ltid:
if app.master_conn is None:
# Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
if 1:
app._cache_lock_acquire()
try:
if app.last_tid < ltid:
......
......@@ -63,7 +63,7 @@ def iterator(app, start=None, stop=None):
"""NEO transaction iterator"""
if start is None:
start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction())
stop = min(stop or MAX_TID, app.last_tid)
while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk:
......
......@@ -226,8 +226,9 @@ class MTEventHandler(EventHandler):
def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw):
if packet.isResponse():
if not (self.dispatcher.dispatch(conn, packet.getId(), packet, kw)
or type(packet) is Packets.Pong):
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
......
......@@ -38,6 +38,7 @@ class Dispatcher(object):
def _getMasterConnection(self):
if self.master_conn is None:
self.last_tid = None
self.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24)
self.num_partitions = 10
self.num_replicas = 1
......
......@@ -335,7 +335,7 @@ class ClientTests(NEOFunctionalTest):
t3.description = 'desc'
st3.tpc_begin(t3)
# retreive the last revision
data, serial = st3.load(oid, '')
data, serial = st3.load(oid)
# try to store again, should not be delayed
st3.store(oid, serial, data, '', t3)
# the vote should not timeout
......
......@@ -632,7 +632,8 @@ class Test(NEOThreadedTest):
t.begin()
s0.stop() # force client to ask s1
self.assertEqual(sorted(c.root()), [1])
t0, t1 = c._storage.iterator()
self.tic()
t0, t1 = c.db().storage.iterator()
finally:
cluster.stop()
......@@ -884,7 +885,7 @@ class Test(NEOThreadedTest):
# Now test cache invalidation during a load from a storage
ll = LockLock()
def _loadFromStorage(orig, *args):
def break_after(orig, *args):
try:
return orig(*args)
finally:
......@@ -892,7 +893,7 @@ class Test(NEOThreadedTest):
x2._p_deactivate()
# Remove last version of x from cache
cache._remove(cache._oid_dict[x2._p_oid].pop())
with ll, Patch(cluster.client, _loadFromStorage=_loadFromStorage):
with ll, Patch(cluster.client, _loadFromStorage=break_after):
t = self.newThread(x2._p_activate)
ll()
# At this point, x could not be found the cache and the result
......@@ -910,16 +911,18 @@ class Test(NEOThreadedTest):
self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0)
# l1 is acquired and l2 is released
def invalidations(conn):
try:
return conn._storage._invalidations
except AttributeError: # BBB: ZODB < 5
return conn._invalidated
# Change x again from 0 to 1, while the checking connection c1
# is suspended at the beginning of the transaction t1,
# between Storage.sync() and flush of invalidations.
def _flush_invalidations(orig):
ll()
orig()
x1._p_deactivate()
t1.abort()
with ll, Patch(c1, _flush_invalidations=_flush_invalidations):
with ll, Patch(c1._storage, sync=break_after):
t = self.newThread(t1.begin)
ll()
txn = transaction.Transaction()
......@@ -927,10 +930,14 @@ class Test(NEOThreadedTest):
client.store(x2._p_oid, tid, y, '', txn)
tid = client.tpc_finish(txn, None)
client.close()
self.assertEqual(invalidations(c1), {x1._p_oid})
t.join()
# A transaction really begins when it acquires the lock to flush
# invalidations. The previous lastTransaction() only does a ping
# to make sure we have a recent enough view of the DB.
# A transaction really begins when it gets the last tid from the
# storage, just before flushing invalidations (on ZODB < 5, it's
# when it acquires the lock to flush invalidations). The previous
# call to sync() only does a ping to make sure we have a recent
# enough view of the DB.
self.assertFalse(invalidations(c1))
self.assertEqual(x1.value, 1)
finally:
......@@ -969,7 +976,7 @@ class Test(NEOThreadedTest):
# transaction before the last one, and clearing the cache before
# reloading x.
c1._storage.load(x._p_oid)
t0, t1, t2 = c1._storage.iterator()
t0, t1, t2 = c1.db().storage.iterator()
self.assertEqual(map(u64, t0.oid_list), [0])
self.assertEqual(map(u64, t1.oid_list), [0, 1])
# Check oid 1 is part of transaction metadata.
......@@ -1282,6 +1289,7 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction()
m2c, = cluster.master.getConnectionList(cluster.client)
cluster.client._cache.clear()
c.cacheMinimize()
with cluster.client.filterConnection(cluster.storage) as c2s:
c2s.add(disconnect)
# Storages are currently notified of clients that get
......
......@@ -404,14 +404,14 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
s1.stop()
cluster.join((s1,))
t0, t1, t2 = c._storage.iterator()
t0, t1, t2 = c.db().storage.iterator()
s1.resetNode()
s1.start()
self.tic()
self.assertEqual([], cluster.getOutdatedCells())
s0.stop()
cluster.join((s0,))
t0, t1, t2 = c._storage.iterator()
t0, t1, t2 = c.db().storage.iterator()
finally:
cluster.stop()
......
......@@ -43,7 +43,11 @@ class ZODBTestCase(TestCase):
def _tearDown(self, success):
self._storage.cleanup()
try:
self.neo.stop()
except Exception:
if success:
raise
del self.neo, self._storage
super(ZODBTestCase, self)._tearDown(success)
......
......@@ -28,6 +28,14 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
checkTransactionalUndoAfterPack = expectedFailure()(
TransactionalUndoStorage.checkTransactionalUndoAfterPack)
for x in ('checkUndoMultipleConflictResolution',
'checkUndoMultipleConflictResolutionReversed'):
try:
setattr(UndoTests, x,
expectedFailure(KeyError)(getattr(TransactionalUndoStorage, x)))
except AttributeError:
pass
if __name__ == "__main__":
suite = unittest.makeSuite(UndoTests, 'check')
unittest.main(defaultTest='suite')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment