Commit 74fba08a authored by Vincent Pelletier's avatar Vincent Pelletier Committed by Julien Muchembled

Allow serial to be returned as late as tpc_finish

This makes possible for storage to allocate serial inside tpc_finish,
removing the requirement to serialise 2PC's second phase phase (tpc_vote
to tpc_finish/tpc_abort).
Co-Authored-By: Julien Muchembled's avatarJulien Muchembled <jm@nexedi.com>
parent 097e74a2
...@@ -808,7 +808,16 @@ class Connection(ExportImport, object): ...@@ -808,7 +808,16 @@ class Connection(ExportImport, object):
# to be able to read any updated data until we've had a chance # to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other # to send an invalidation message to all of the other
# connections! # connections!
self._storage.tpc_finish(transaction, callback) serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
self._tpc_cleanup() self._tpc_cleanup()
def sortKey(self): def sortKey(self):
......
...@@ -776,6 +776,10 @@ class IStorage(Interface): ...@@ -776,6 +776,10 @@ class IStorage(Interface):
called while the storage transaction lock is held. It takes called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction. the new transaction id generated by the transaction.
The return value can be either None or a serial giving new
serial for objects whose ids were passed to previous store calls
in the same transaction, and for which no serial was returned
from either store or tpc_vote for objects passed to store.
""" """
def tpc_vote(transaction): def tpc_vote(transaction):
...@@ -794,14 +798,14 @@ class IStorage(Interface): ...@@ -794,14 +798,14 @@ class IStorage(Interface):
The return value can be either None or a sequence of object-id The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction. passed to previous store calls in the same transaction.
After the tpc_vote call, new serials must have been returned,
either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to the special value ZODB.ConflictResolution.ResolvedSerial to
indicate that a conflict occured and that the object should be indicate that a conflict occured and that the object should be
invalidated. invalidated.
After the tpc_vote call, all solved conflicts must have been notified,
either from tpc_vote or store for objects passed to store.
""" """
......
...@@ -69,8 +69,10 @@ class BasicStorage: ...@@ -69,8 +69,10 @@ class BasicStorage:
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)), r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn) '', txn)
r2 = self._storage.tpc_vote(txn) r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn) serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2) newrevid = handle_serials(oid, r1, r2)
if newrevid is None and serial is not None:
newrevid = serial
data, revid = self._storage.load(oid, '') data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data) value = zodb_unpickle(data)
eq(value, MinPO(11)) eq(value, MinPO(11))
......
...@@ -152,10 +152,12 @@ class StorageClientThread(TestThread): ...@@ -152,10 +152,12 @@ class StorageClientThread(TestThread):
r2 = self.storage.tpc_vote(t) r2 = self.storage.tpc_vote(t)
self.pause() self.pause()
self.storage.tpc_finish(t) serial = self.storage.tpc_finish(t)
self.pause() self.pause()
revid = handle_serials(oid, r1, r2) revid = handle_serials(oid, r1, r2)
if serial is not None and revid is None:
revid = serial
self.oids[oid] = revid self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread): class ExtStorageClientThread(StorageClientThread):
......
...@@ -150,10 +150,12 @@ class RevisionStorage: ...@@ -150,10 +150,12 @@ class RevisionStorage:
# Finish the transaction # Finish the transaction
r2 = self._storage.tpc_vote(t) r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2) newrevid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
except: except:
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
raise raise
if serial is not None and newrevid is None:
newrevid = serial
return newrevid return newrevid
revid1 = helper(1, None, 1) revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2) revid2 = helper(2, revid1, 2)
......
...@@ -132,7 +132,7 @@ def handle_serials(oid, *args): ...@@ -132,7 +132,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials(). A helper for function _handle_all_serials().
""" """
return handle_all_serials(oid, *args)[oid] return handle_all_serials(oid, *args).get(oid)
def import_helper(name): def import_helper(name):
__import__(name) __import__(name)
...@@ -189,7 +189,9 @@ class StorageTestBase(ZODB.tests.util.TestCase): ...@@ -189,7 +189,9 @@ class StorageTestBase(ZODB.tests.util.TestCase):
# Finish the transaction # Finish the transaction
r2 = self._storage.tpc_vote(t) r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2) revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
if serial is not None and revid is None:
revid = serial
except: except:
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
raise raise
...@@ -209,8 +211,8 @@ class StorageTestBase(ZODB.tests.util.TestCase): ...@@ -209,8 +211,8 @@ class StorageTestBase(ZODB.tests.util.TestCase):
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t) undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t) vote_result = self._storage.tpc_vote(t)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
if expected_oids is not None: if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else [] oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ()) oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids)) self.assertEqual(len(oids), len(expected_oids), repr(oids))
......
...@@ -73,6 +73,12 @@ class TransactionalUndoStorage: ...@@ -73,6 +73,12 @@ class TransactionalUndoStorage:
def _transaction_newserial(self, oid): def _transaction_newserial(self, oid):
return self.__serials[oid] return self.__serials[oid]
def _transaction_finish(self, t, oid_list):
tid = self._storage.tpc_finish(t)
if tid is not None:
for oid in oid_list:
self.__serials[oid] = tid
def _multi_obj_transaction(self, objs): def _multi_obj_transaction(self, objs):
newrevs = {} newrevs = {}
t = Transaction() t = Transaction()
...@@ -82,7 +88,7 @@ class TransactionalUndoStorage: ...@@ -82,7 +88,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid, rev, data, '', t) self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None newrevs[oid] = None
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys(): for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid) newrevs[oid] = self._transaction_newserial(oid)
return newrevs return newrevs
...@@ -219,9 +225,9 @@ class TransactionalUndoStorage: ...@@ -219,9 +225,9 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p51, '', t) self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2) eq(revid1, revid2)
# Update those same two objects # Update those same two objects
t = Transaction() t = Transaction()
...@@ -231,9 +237,9 @@ class TransactionalUndoStorage: ...@@ -231,9 +237,9 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p52, '', t) self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2) eq(revid1, revid2)
# Make sure the objects have the current value # Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '') data, revid1 = self._storage.load(oid1, '')
...@@ -289,10 +295,11 @@ class TransactionalUndoStorage: ...@@ -289,10 +295,11 @@ class TransactionalUndoStorage:
tid1 = info[1]['id'] tid1 = info[1]['id']
t = Transaction() t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1) oids = self._begin_undos_vote(t, tid, tid1)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
# We may get the finalization stuff called an extra time, # We may get the finalization stuff called an extra time,
# depending on the implementation. # depending on the implementation.
self.assertEqual(set(oids), set((oid1, oid2))) if serial is None:
self.assertEqual(set(oids), {oid1, oid2})
data, revid1 = self._storage.load(oid1, '') data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30)) eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '') data, revid2 = self._storage.load(oid2, '')
...@@ -326,7 +333,7 @@ class TransactionalUndoStorage: ...@@ -326,7 +333,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p52, '', t) self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
...@@ -346,7 +353,7 @@ class TransactionalUndoStorage: ...@@ -346,7 +353,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p53, '', t) self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
...@@ -358,7 +365,8 @@ class TransactionalUndoStorage: ...@@ -358,7 +365,8 @@ class TransactionalUndoStorage:
tid = info[1]['id'] tid = info[1]['id']
t = Transaction() t = Transaction()
oids = self._begin_undos_vote(t, tid) oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
if serial is None:
eq(len(oids), 1) eq(len(oids), 1)
self.assertTrue(oid1 in oids) self.assertTrue(oid1 in oids)
self.assertTrue(not oid2 in oids) self.assertTrue(not oid2 in oids)
...@@ -398,7 +406,7 @@ class TransactionalUndoStorage: ...@@ -398,7 +406,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid1, revid1, p81, '', t) self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t) self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
......
...@@ -42,6 +42,8 @@ from zope.testing import renormalizing ...@@ -42,6 +42,8 @@ from zope.testing import renormalizing
# With the following monkey-patch, we can test the different ways # With the following monkey-patch, we can test the different ways
# to update _p_changed/_p_serial status of committed oids. # to update _p_changed/_p_serial status of committed oids.
from ZODB.ConflictResolution import ResolvedSerial
class DemoStorage(ZODB.DemoStorage.DemoStorage): class DemoStorage(ZODB.DemoStorage.DemoStorage):
delayed_store = False delayed_store = False
...@@ -52,6 +54,9 @@ class DemoStorage(ZODB.DemoStorage.DemoStorage): ...@@ -52,6 +54,9 @@ class DemoStorage(ZODB.DemoStorage.DemoStorage):
def store(self, oid, *args): def store(self, oid, *args):
s = super(DemoStorage, self).store(oid, *args) s = super(DemoStorage, self).store(oid, *args)
if s != ResolvedSerial:
assert type(s) is bytes, s
return
if not self.delayed_store: if not self.delayed_store:
return s return s
self.__stored.append((oid, s)) self.__stored.append((oid, s))
...@@ -63,6 +68,15 @@ class DemoStorage(ZODB.DemoStorage.DemoStorage): ...@@ -63,6 +68,15 @@ class DemoStorage(ZODB.DemoStorage.DemoStorage):
assert s is None, s assert s is None, s
return self.__stored if self.delayed_store else s return self.__stored if self.delayed_store else s
def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]
ZODB.DemoStorage.DemoStorage = DemoStorage ZODB.DemoStorage.DemoStorage = DemoStorage
class DemoStorageTests( class DemoStorageTests(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment