Commit 7e1db13f authored by Julien Muchembled's avatar Julien Muchembled

Switch all storages to the new commit protocol

parent bfae36df
...@@ -106,6 +106,9 @@ class BaseStorage(UndoLogCompatible): ...@@ -106,6 +106,9 @@ class BaseStorage(UndoLogCompatible):
self._oid = z64 self._oid = z64
else: else:
self._oid = oid self._oid = oid
# In case that conflicts are resolved during store,
# this collects oids to be returned by tpc_vote.
self._resolved = []
def sortKey(self): def sortKey(self):
"""Return a string that can be used to sort storage instances. """Return a string that can be used to sort storage instances.
...@@ -205,6 +208,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -205,6 +208,7 @@ class BaseStorage(UndoLogCompatible):
self._ts = TimeStamp(tid) self._ts = TimeStamp(tid)
self._tid = tid self._tid = tid
del self._resolved[:]
self._tstatus = status self._tstatus = status
self._begin(self._tid, user, desc, ext) self._begin(self._tid, user, desc, ext)
...@@ -226,7 +230,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -226,7 +230,7 @@ class BaseStorage(UndoLogCompatible):
def _vote(self): def _vote(self):
"""Subclasses should redefine this to supply transaction vote actions. """Subclasses should redefine this to supply transaction vote actions.
""" """
pass return self._resolved
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass # It's important that the storage calls the function we pass
...@@ -249,6 +253,7 @@ class BaseStorage(UndoLogCompatible): ...@@ -249,6 +253,7 @@ class BaseStorage(UndoLogCompatible):
self._ude = None self._ude = None
self._transaction = None self._transaction = None
self._commit_lock.release() self._commit_lock.release()
return self._tid
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
"""Subclasses should redefine this to supply transaction finish actions """Subclasses should redefine this to supply transaction finish actions
......
...@@ -32,7 +32,7 @@ import ZODB.POSException ...@@ -32,7 +32,7 @@ import ZODB.POSException
import ZODB.utils import ZODB.utils
import zope.interface import zope.interface
from .ConflictResolution import ConflictResolvingStorage, ResolvedSerial from .ConflictResolution import ConflictResolvingStorage
from .utils import load_current, maxtid from .utils import load_current, maxtid
@zope.interface.implementer( @zope.interface.implementer(
...@@ -308,10 +308,9 @@ class DemoStorage(ConflictResolvingStorage): ...@@ -308,10 +308,9 @@ class DemoStorage(ConflictResolvingStorage):
if old != serial: if old != serial:
rdata = self.tryToResolveConflict(oid, old, serial, data) rdata = self.tryToResolveConflict(oid, old, serial, data)
self.changes.store(oid, old, rdata, '', transaction) self.changes.store(oid, old, rdata, '', transaction, True)
return ResolvedSerial else:
self.changes.store(oid, serial, data, '', transaction)
return self.changes.store(oid, serial, data, '', transaction)
def storeBlob(self, oid, oldserial, data, blobfilename, version, def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction): transaction):
...@@ -324,11 +323,11 @@ class DemoStorage(ConflictResolvingStorage): ...@@ -324,11 +323,11 @@ class DemoStorage(ConflictResolvingStorage):
self._stored_oids.add(oid) self._stored_oids.add(oid)
try: try:
return self.changes.storeBlob( self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction) oid, oldserial, data, blobfilename, '', transaction)
except AttributeError: except AttributeError:
if self._blobify(): if self._blobify():
return self.changes.storeBlob( self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction) oid, oldserial, data, blobfilename, '', transaction)
raise raise
...@@ -374,8 +373,9 @@ class DemoStorage(ConflictResolvingStorage): ...@@ -374,8 +373,9 @@ class DemoStorage(ConflictResolvingStorage):
self._issued_oids.difference_update(self._stored_oids) self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set() self._stored_oids = set()
self._transaction = None self._transaction = None
self.changes.tpc_finish(transaction, func) tid = self.changes.tpc_finish(transaction, func)
self._commit_lock.release() self._commit_lock.release()
return tid
_temporary_blobdirs = {} _temporary_blobdirs = {}
def cleanup_temporary_blobdir( def cleanup_temporary_blobdir(
......
...@@ -40,7 +40,6 @@ from ZODB.BaseStorage import BaseStorage ...@@ -40,7 +40,6 @@ from ZODB.BaseStorage import BaseStorage
from ZODB.BaseStorage import DataRecord as _DataRecord from ZODB.BaseStorage import DataRecord as _DataRecord
from ZODB.BaseStorage import TransactionRecord as _TransactionRecord from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
from ZODB.ConflictResolution import ConflictResolvingStorage from ZODB.ConflictResolution import ConflictResolvingStorage
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.FileStorage.format import CorruptedDataError from ZODB.FileStorage.format import CorruptedDataError
from ZODB.FileStorage.format import CorruptedError from ZODB.FileStorage.format import CorruptedError
from ZODB.FileStorage.format import DATA_HDR from ZODB.FileStorage.format import DATA_HDR
...@@ -536,9 +535,7 @@ class FileStorage( ...@@ -536,9 +535,7 @@ class FileStorage(
"The storage quota has been exceeded.") "The storage quota has been exceeded.")
if old and oldserial != committed_tid: if old and oldserial != committed_tid:
return ResolvedSerial self._resolved.append(oid)
else:
return self._tid
def deleteObject(self, oid, oldserial, transaction): def deleteObject(self, oid, oldserial, transaction):
if self._is_read_only: if self._is_read_only:
...@@ -731,6 +728,7 @@ class FileStorage( ...@@ -731,6 +728,7 @@ class FileStorage(
self._files.flush() self._files.flush()
raise raise
self._nextpos = self._pos + (tl + 8) self._nextpos = self._pos + (tl + 8)
return self._resolved
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
with self._files.write_lock(): with self._files.write_lock():
...@@ -739,15 +737,16 @@ class FileStorage( ...@@ -739,15 +737,16 @@ class FileStorage(
raise StorageTransactionError( raise StorageTransactionError(
"tpc_finish called with wrong transaction") "tpc_finish called with wrong transaction")
try: try:
tid = self._tid
if f is not None: if f is not None:
f(self._tid) f(tid)
u, d, e = self._ude self._finish(tid, *self._ude)
self._finish(self._tid, u, d, e)
self._clear_temp() self._clear_temp()
finally: finally:
self._ude = None self._ude = None
self._transaction = None self._transaction = None
self._commit_lock.release() self._commit_lock.release()
return tid
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any # If self._nextpos is 0, then the transaction didn't write any
......
...@@ -128,10 +128,10 @@ def pack_with_repeated_blob_records(): ...@@ -128,10 +128,10 @@ def pack_with_repeated_blob_records():
>>> fs.tpc_begin(trans) >>> fs.tpc_begin(trans)
>>> with open('ablob', 'w') as file: >>> with open('ablob', 'w') as file:
... _ = file.write('some data') ... _ = file.write('some data')
>>> _ = fs.store(oid, oldserial, blob_record, '', trans) >>> fs.store(oid, oldserial, blob_record, '', trans)
>>> _ = fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans) >>> fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans)
>>> fs.tpc_vote(trans) >>> _ = fs.tpc_vote(trans)
>>> fs.tpc_finish(trans) >>> _ = fs.tpc_finish(trans)
>>> time.sleep(.01) >>> time.sleep(.01)
>>> db.pack() >>> db.pack()
...@@ -156,9 +156,9 @@ _save_index can fail for large indexes. ...@@ -156,9 +156,9 @@ _save_index can fail for large indexes.
>>> oid = 0 >>> oid = 0
>>> for i in range(5000): >>> for i in range(5000):
... oid += (1<<16) ... oid += (1<<16)
... _ = fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t) ... fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t)
>>> fs.tpc_vote(t) >>> _ = fs.tpc_vote(t)
>>> fs.tpc_finish(t) >>> _ = fs.tpc_finish(t)
>>> import sys >>> import sys
>>> old_limit = sys.getrecursionlimit() >>> old_limit = sys.getrecursionlimit()
......
...@@ -43,6 +43,7 @@ class MappingStorage(object): ...@@ -43,6 +43,7 @@ class MappingStorage(object):
self._commit_lock = ZODB.utils.Lock() self._commit_lock = ZODB.utils.Lock()
self._opened = True self._opened = True
self._transaction = None self._transaction = None
self._resolved = []
self._oid = 0 self._oid = 0
###################################################################### ######################################################################
...@@ -232,7 +233,7 @@ class MappingStorage(object): ...@@ -232,7 +233,7 @@ class MappingStorage(object):
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction, resolved=False):
assert not version, "Versions are not supported" assert not version, "Versions are not supported"
if transaction is not self._transaction: if transaction is not self._transaction:
raise ZODB.POSException.StorageTransactionError(self, transaction) raise ZODB.POSException.StorageTransactionError(self, transaction)
...@@ -246,8 +247,8 @@ class MappingStorage(object): ...@@ -246,8 +247,8 @@ class MappingStorage(object):
oid=oid, serials=(old_tid, serial), data=data) oid=oid, serials=(old_tid, serial), data=data)
self._tdata[oid] = data self._tdata[oid] = data
if resolved:
return self._tid self._resolved.append(oid)
checkCurrentSerialInTransaction = ( checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction) ZODB.BaseStorage.checkCurrentSerialInTransaction)
...@@ -283,6 +284,7 @@ class MappingStorage(object): ...@@ -283,6 +284,7 @@ class MappingStorage(object):
old_tid = None old_tid = None
tid = ZODB.utils.newTid(old_tid) tid = ZODB.utils.newTid(old_tid)
self._tid = tid self._tid = tid
del self._resolved[:]
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
...@@ -307,6 +309,7 @@ class MappingStorage(object): ...@@ -307,6 +309,7 @@ class MappingStorage(object):
self._transaction = None self._transaction = None
del self._tdata del self._tdata
self._commit_lock.release() self._commit_lock.release()
return tid
# ZEO.interfaces.IServeable # ZEO.interfaces.IServeable
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
...@@ -318,6 +321,7 @@ class MappingStorage(object): ...@@ -318,6 +321,7 @@ class MappingStorage(object):
if transaction is not self._transaction: if transaction is not self._transaction:
raise ZODB.POSException.StorageTransactionError( raise ZODB.POSException.StorageTransactionError(
"tpc_vote called with wrong transaction") "tpc_vote called with wrong transaction")
return self._resolved
class TransactionRecord: class TransactionRecord:
......
...@@ -45,9 +45,10 @@ transaction ourselves. ...@@ -45,9 +45,10 @@ transaction ourselves.
>>> storage.tpc_begin(txn) >>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid0, s0, txn) >>> storage.deleteObject(oid0, s0, txn)
>>> storage.deleteObject(oid1, s1, txn) >>> storage.deleteObject(oid1, s1, txn)
>>> storage.tpc_vote(txn) >>> _ = storage.tpc_vote(txn)
>>> storage.tpc_finish(txn) >>> tid = storage.tpc_finish(txn)
>>> tid = storage.lastTransaction() >>> tid == storage.lastTransaction()
True
Now if we try to load data for the objects, we get a POSKeyError: Now if we try to load data for the objects, we get a POSKeyError:
......
...@@ -118,9 +118,7 @@ def handle_all_serials(oid, *args): ...@@ -118,9 +118,7 @@ def handle_all_serials(oid, *args):
for arg in args: for arg in args:
if isinstance(arg, bytes): if isinstance(arg, bytes):
d[oid] = arg d[oid] = arg
elif arg is None: elif arg:
pass
else:
for oid, serial in arg: for oid, serial in arg:
if not isinstance(serial, bytes): if not isinstance(serial, bytes):
raise serial # error from ZEO server raise serial # error from ZEO server
......
...@@ -381,16 +381,17 @@ stored are discarded. ...@@ -381,16 +381,17 @@ stored are discarded.
>>> blob_storage.tpc_begin(t) >>> blob_storage.tpc_begin(t)
>>> with open('blobfile', 'wb') as file: >>> with open('blobfile', 'wb') as file:
... _ = file.write(b'This data should go away') ... _ = file.write(b'This data should go away')
>>> s1 = blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile', >>> blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
... '', t) ... '', t)
>>> new_oid = blob_storage.new_oid() >>> new_oid = blob_storage.new_oid()
>>> with open('blobfile2', 'wb') as file: >>> with open('blobfile2', 'wb') as file:
... _ = file.write(b'This data should go away too') ... _ = file.write(b'This data should go away too')
>>> s2 = blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2', >>> blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
... '', t) ... '', t)
>>> bool(blob_storage.tpc_vote(t))
>>> serials = blob_storage.tpc_vote(t) False
>>> oldserial < blob_storage._tid
True
>>> blob_storage.tpc_abort(t) >>> blob_storage.tpc_abort(t)
Now, the serial for the existing blob should be the same: Now, the serial for the existing blob should be the same:
......
...@@ -42,42 +42,6 @@ from ZODB.utils import load_current ...@@ -42,42 +42,6 @@ from ZODB.utils import load_current
from zope.testing import renormalizing from zope.testing import renormalizing
# With the following monkey-patch, we can test the different ways
# to update _p_changed/_p_serial status of committed oids.
from ZODB.ConflictResolution import ResolvedSerial
class DemoStorage(ZODB.DemoStorage.DemoStorage):
def tpc_begin(self, *args):
super(DemoStorage, self).tpc_begin(*args)
self.__stored = []
def store(self, oid, *args):
s = super(DemoStorage, self).store(oid, *args)
if s != ResolvedSerial:
assert type(s) is bytes, s
return
self.__stored.append(oid)
tpc_vote = property(lambda self: self._tpc_vote, lambda *_: None)
def _tpc_vote(self, transaction):
s = self.changes.tpc_vote(transaction)
assert s is None, s
return self.__stored
def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]
ZODB.DemoStorage.DemoStorage = DemoStorage
class DemoStorageTests( class DemoStorageTests(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage, BasicStorage.BasicStorage,
......
...@@ -36,7 +36,6 @@ from ZODB.tests.StorageTestBase import MinPO, zodb_pickle ...@@ -36,7 +36,6 @@ from ZODB.tests.StorageTestBase import MinPO, zodb_pickle
from ZODB._compat import dump, dumps, _protocol from ZODB._compat import dump, dumps, _protocol
from . import util from . import util
from .. import multicommitadapter
class FileStorageTests( class FileStorageTests(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
...@@ -324,12 +323,6 @@ class FileStorageHexTests(FileStorageTests): ...@@ -324,12 +323,6 @@ class FileStorageHexTests(FileStorageTests):
self._storage = ZODB.tests.hexstorage.HexStorage( self._storage = ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage('FileStorageTests.fs',**kwargs)) ZODB.FileStorage.FileStorage('FileStorageTests.fs',**kwargs))
class MultiFileStorageTests(FileStorageTests):
def open(self, **kwargs):
self._storage = multicommitadapter.MultiCommitAdapter(
ZODB.FileStorage.FileStorage('FileStorageTests.fs', **kwargs))
class FileStorageTestsWithBlobsEnabled(FileStorageTests): class FileStorageTestsWithBlobsEnabled(FileStorageTests):
...@@ -350,15 +343,6 @@ class FileStorageHexTestsWithBlobsEnabled(FileStorageTests): ...@@ -350,15 +343,6 @@ class FileStorageHexTestsWithBlobsEnabled(FileStorageTests):
self._storage = ZODB.tests.hexstorage.HexStorage(self._storage) self._storage = ZODB.tests.hexstorage.HexStorage(self._storage)
class MultiFileStorageTestsWithBlobsEnabled(MultiFileStorageTests):
def open(self, **kwargs):
if 'blob_dir' not in kwargs:
kwargs = kwargs.copy()
kwargs['blob_dir'] = 'blobs'
MultiFileStorageTests.open(self, **kwargs)
class FileStorageRecoveryTest( class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage, RecoveryStorage.RecoveryStorage,
...@@ -432,33 +416,25 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase): ...@@ -432,33 +416,25 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
module.Broken = Broken module.Broken = Broken
oids = [[self._storage.new_oid(), None] for i in range(3)] oids = [[self._storage.new_oid(), None] for i in range(3)]
def store(i, data):
oid, revid = oids[i]
self._storage.store(oid, revid, data, "", t)
for i in range(2): for i in range(2):
t = transaction.Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
# sometimes data is in this format # sometimes data is in this format
j = 0 store(0, dumps(OOBTree, _protocol))
oid, revid = oids[j]
serial = self._storage.store(
oid, revid, dumps(OOBTree, _protocol), "", t)
oids[j][1] = serial
# and it could be from a broken module # and it could be from a broken module
j = 1 store(1, dumps(Broken, _protocol))
oid, revid = oids[j]
serial = self._storage.store(
oid, revid, dumps(Broken, _protocol), "", t)
oids[j][1] = serial
# but mostly it looks like this # but mostly it looks like this
j = 2 store(2, zodb_pickle(MinPO(2)))
o = MinPO(j)
oid, revid = oids[j]
serial = self._storage.store(oid, revid, zodb_pickle(o), "", t)
oids[j][1] = serial
self._storage.tpc_vote(t) self._storage.tpc_vote(t)
self._storage.tpc_finish(t) tid = self._storage.tpc_finish(t)
for oid_revid in oids:
oid_revid[1] = tid
# now break the import of the Broken class # now break the import of the Broken class
del sys.modules[module_name] del sys.modules[module_name]
...@@ -721,7 +697,6 @@ def test_suite(): ...@@ -721,7 +697,6 @@ def test_suite():
FileStorageNoRestoreRecoveryTest, FileStorageNoRestoreRecoveryTest,
FileStorageTestsWithBlobsEnabled, FileStorageHexTestsWithBlobsEnabled, FileStorageTestsWithBlobsEnabled, FileStorageHexTestsWithBlobsEnabled,
AnalyzeDotPyTest, AnalyzeDotPyTest,
MultiFileStorageTests, MultiFileStorageTestsWithBlobsEnabled,
]: ]:
suite.addTest(unittest.makeSuite(klass, "check")) suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite( suite.addTest(doctest.DocTestSuite(
...@@ -743,14 +718,6 @@ def test_suite(): ...@@ -743,14 +718,6 @@ def test_suite():
test_blob_storage_recovery=True, test_blob_storage_recovery=True,
test_packing=True, test_packing=True,
)) ))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobMultiFileStorage',
lambda name, blob_dir:
multicommitadapter.MultiCommitAdapter(
ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir)),
test_blob_storage_recovery=True,
test_packing=True,
))
suite.addTest(PackableStorage.IExternalGC_suite( suite.addTest(PackableStorage.IExternalGC_suite(
lambda : ZODB.FileStorage.FileStorage( lambda : ZODB.FileStorage.FileStorage(
'data.fs', blob_dir='blobs', pack_gc=False))) 'data.fs', blob_dir='blobs', pack_gc=False)))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment