Commit 7856af05 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #82 from zopefoundation/handle_serial

Switch all storages to the new commit protocol

Thanks!
parents ae956ad2 4d0512f1
......@@ -10,13 +10,12 @@ See 4.4.2.
5.0.0a3 (2016-07-01)
====================
Added IMultiCommitStorage to directly represent the changes in the 4.4.0
release and to make complient storages introspectable.
See 4.4.1.
5.0.0a2 (2016-07-01)
====================
See the 4.4.x releases.
See 4.4.0.
5.0.0a1 (2016-06-20)
====================
......@@ -38,7 +37,7 @@ Concurrency Control (MVCC) implementation:
==================
Better support of the new commit protocol. This fixes issues with blobs and
undo. See https://github.com/zopefoundation/ZODB/pull/77
undo. See pull requests #77, #80, #83
4.4.1 (2016-07-01)
==================
......
......@@ -106,6 +106,9 @@ class BaseStorage(UndoLogCompatible):
self._oid = z64
else:
self._oid = oid
# In case that conflicts are resolved during store,
# this collects oids to be returned by tpc_vote.
self._resolved = []
def sortKey(self):
"""Return a string that can be used to sort storage instances.
......@@ -205,6 +208,7 @@ class BaseStorage(UndoLogCompatible):
self._ts = TimeStamp(tid)
self._tid = tid
del self._resolved[:]
self._tstatus = status
self._begin(self._tid, user, desc, ext)
......@@ -226,7 +230,7 @@ class BaseStorage(UndoLogCompatible):
def _vote(self):
"""Subclasses should redefine this to supply transaction vote actions.
"""
pass
return self._resolved
def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
......@@ -249,6 +253,7 @@ class BaseStorage(UndoLogCompatible):
self._ude = None
self._transaction = None
self._commit_lock.release()
return self._tid
def _finish(self, tid, u, d, e):
"""Subclasses should redefine this to supply transaction finish actions
......
......@@ -28,7 +28,7 @@ from pickle import PicklingError
logger = logging.getLogger('ZODB.ConflictResolution')
ResolvedSerial = b'rs' # deprecated: store/tpc_finish should just use True
ResolvedSerial = b'rs' # deprecated: see IMultiCommitStorage.tpc_vote
class BadClassName(Exception):
pass
......
......@@ -589,7 +589,7 @@ class Connection(ExportImport, object):
self._handle_serial(oid, s)
def _handle_serial(self, oid, serial=True, change=True):
def _handle_serial(self, oid, serial=ResolvedSerial, change=True):
# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
......@@ -597,10 +597,7 @@ class Connection(ExportImport, object):
if not serial:
return
if serial is True:
serial = ResolvedSerial
elif not isinstance(serial, bytes):
raise serial
assert isinstance(serial, bytes), serial
obj = self._cache.get(oid, None)
if obj is None:
return
......
......@@ -32,7 +32,7 @@ import ZODB.POSException
import ZODB.utils
import zope.interface
from .ConflictResolution import ConflictResolvingStorage, ResolvedSerial
from .ConflictResolution import ConflictResolvingStorage
from .utils import load_current, maxtid
@zope.interface.implementer(
......@@ -73,6 +73,7 @@ class DemoStorage(ConflictResolvingStorage):
self._issued_oids = set()
self._stored_oids = set()
self._resolved = []
self._commit_lock = ZODB.utils.Lock()
self._transaction = None
......@@ -116,7 +117,7 @@ class DemoStorage(ConflictResolvingStorage):
for meth in (
'_lock',
'getSize', 'isReadOnly',
'sortKey', 'tpc_transaction', 'tpc_vote',
'sortKey', 'tpc_transaction',
):
setattr(self, meth, getattr(changes, meth))
......@@ -309,9 +310,9 @@ class DemoStorage(ConflictResolvingStorage):
if old != serial:
rdata = self.tryToResolveConflict(oid, old, serial, data)
self.changes.store(oid, old, rdata, '', transaction)
return ResolvedSerial
return self.changes.store(oid, serial, data, '', transaction)
self._resolved.append(oid)
else:
self.changes.store(oid, serial, data, '', transaction)
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
......@@ -324,11 +325,11 @@ class DemoStorage(ConflictResolvingStorage):
self._stored_oids.add(oid)
try:
return self.changes.storeBlob(
self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction)
except AttributeError:
if self._blobify():
return self.changes.storeBlob(
self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction)
raise
......@@ -365,6 +366,13 @@ class DemoStorage(ConflictResolvingStorage):
self.changes.tpc_begin(transaction, *a, **k)
self._transaction = transaction
self._stored_oids = set()
del self._resolved[:]
def tpc_vote(self, *a, **k):
if self.changes.tpc_vote(*a, **k):
raise ZODB.POSException.StorageTransactionError(
"Unexpected resolved conflicts")
return self._resolved
def tpc_finish(self, transaction, func = lambda tid: None):
with self._lock:
......@@ -374,8 +382,9 @@ class DemoStorage(ConflictResolvingStorage):
self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set()
self._transaction = None
self.changes.tpc_finish(transaction, func)
tid = self.changes.tpc_finish(transaction, func)
self._commit_lock.release()
return tid
_temporary_blobdirs = {}
def cleanup_temporary_blobdir(
......
......@@ -40,7 +40,6 @@ from ZODB.BaseStorage import BaseStorage
from ZODB.BaseStorage import DataRecord as _DataRecord
from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
from ZODB.ConflictResolution import ConflictResolvingStorage
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.FileStorage.format import CorruptedDataError
from ZODB.FileStorage.format import CorruptedError
from ZODB.FileStorage.format import DATA_HDR
......@@ -521,6 +520,7 @@ class FileStorage(
if oldserial != committed_tid:
data = self.tryToResolveConflict(oid, committed_tid,
oldserial, data)
self._resolved.append(oid)
pos = self._pos
here = pos + self._tfile.tell() + self._thl
......@@ -535,11 +535,6 @@ class FileStorage(
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
if old and oldserial != committed_tid:
return ResolvedSerial
else:
return self._tid
def deleteObject(self, oid, oldserial, transaction):
if self._is_read_only:
raise ReadOnlyError()
......@@ -731,6 +726,7 @@ class FileStorage(
self._files.flush()
raise
self._nextpos = self._pos + (tl + 8)
return self._resolved
def tpc_finish(self, transaction, f=None):
with self._files.write_lock():
......@@ -739,15 +735,16 @@ class FileStorage(
raise StorageTransactionError(
"tpc_finish called with wrong transaction")
try:
tid = self._tid
if f is not None:
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
f(tid)
self._finish(tid, *self._ude)
self._clear_temp()
finally:
self._ude = None
self._transaction = None
self._commit_lock.release()
return tid
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
......
......@@ -128,10 +128,10 @@ def pack_with_repeated_blob_records():
>>> fs.tpc_begin(trans)
>>> with open('ablob', 'w') as file:
... _ = file.write('some data')
>>> _ = fs.store(oid, oldserial, blob_record, '', trans)
>>> _ = fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans)
>>> fs.tpc_vote(trans)
>>> fs.tpc_finish(trans)
>>> fs.store(oid, oldserial, blob_record, '', trans)
>>> fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans)
>>> _ = fs.tpc_vote(trans)
>>> _ = fs.tpc_finish(trans)
>>> time.sleep(.01)
>>> db.pack()
......@@ -156,9 +156,9 @@ _save_index can fail for large indexes.
>>> oid = 0
>>> for i in range(5000):
... oid += (1<<16)
... _ = fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t)
>>> fs.tpc_vote(t)
>>> fs.tpc_finish(t)
... fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t)
>>> _ = fs.tpc_vote(t)
>>> _ = fs.tpc_finish(t)
>>> import sys
>>> old_limit = sys.getrecursionlimit()
......
......@@ -247,8 +247,6 @@ class MappingStorage(object):
self._tdata[oid] = data
return self._tid
checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)
......@@ -307,6 +305,7 @@ class MappingStorage(object):
self._transaction = None
del self._tdata
self._commit_lock.release()
return tid
# ZEO.interfaces.IServeable
@ZODB.utils.locked(opened)
......
......@@ -45,9 +45,10 @@ transaction ourselves.
>>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid0, s0, txn)
>>> storage.deleteObject(oid1, s1, txn)
>>> storage.tpc_vote(txn)
>>> storage.tpc_finish(txn)
>>> tid = storage.lastTransaction()
>>> _ = storage.tpc_vote(txn)
>>> tid = storage.tpc_finish(txn)
>>> tid == storage.lastTransaction()
True
Now if we try to load data for the objects, we get a POSKeyError:
......
......@@ -118,9 +118,7 @@ def handle_all_serials(oid, *args):
for arg in args:
if isinstance(arg, bytes):
d[oid] = arg
elif arg is None:
pass
else:
elif arg:
for oid, serial in arg:
if not isinstance(serial, bytes):
raise serial # error from ZEO server
......
......@@ -381,16 +381,15 @@ stored are discarded.
>>> blob_storage.tpc_begin(t)
>>> with open('blobfile', 'wb') as file:
... _ = file.write(b'This data should go away')
>>> s1 = blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
>>> blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
... '', t)
>>> new_oid = blob_storage.new_oid()
>>> with open('blobfile2', 'wb') as file:
... _ = file.write(b'This data should go away too')
>>> s2 = blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
>>> blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
... '', t)
>>> serials = blob_storage.tpc_vote(t)
>>> bool(blob_storage.tpc_vote(t))
False
>>> blob_storage.tpc_abort(t)
Now, the serial for the existing blob should be the same:
......
......@@ -42,46 +42,6 @@ from ZODB.utils import load_current
from zope.testing import renormalizing
# With the following monkey-patch, we can test the different ways
# to update _p_changed/_p_serial status of committed oids.
from ZODB.ConflictResolution import ResolvedSerial
class DemoStorage(ZODB.DemoStorage.DemoStorage):
delayed_store = False
def tpc_begin(self, *args):
super(DemoStorage, self).tpc_begin(*args)
self.__stored = []
def store(self, oid, *args):
s = super(DemoStorage, self).store(oid, *args)
if s != ResolvedSerial:
assert type(s) is bytes, s
return
if not self.delayed_store:
return True
self.__stored.append(oid)
tpc_vote = property(lambda self: self._tpc_vote, lambda *_: None)
def _tpc_vote(self, transaction):
s = self.changes.tpc_vote(transaction)
assert s is None, s
return self.__stored if self.delayed_store else s
def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]
ZODB.DemoStorage.DemoStorage = DemoStorage
class DemoStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
......@@ -146,11 +106,6 @@ class DemoStorageTests(
self._checkHistory(base_and_changes())
self._storage = self._storage.pop()
def checkResolveLate(self):
self._storage.delayed_store = True
self.checkResolve()
class DemoStorageHexTests(DemoStorageTests):
def setUp(self):
......
......@@ -36,7 +36,6 @@ from ZODB.tests.StorageTestBase import MinPO, zodb_pickle
from ZODB._compat import dump, dumps, _protocol
from . import util
from .. import multicommitadapter
class FileStorageTests(
StorageTestBase.StorageTestBase,
......@@ -324,12 +323,6 @@ class FileStorageHexTests(FileStorageTests):
self._storage = ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage('FileStorageTests.fs',**kwargs))
class MultiFileStorageTests(FileStorageTests):
def open(self, **kwargs):
self._storage = multicommitadapter.MultiCommitAdapter(
ZODB.FileStorage.FileStorage('FileStorageTests.fs', **kwargs))
class FileStorageTestsWithBlobsEnabled(FileStorageTests):
......@@ -350,15 +343,6 @@ class FileStorageHexTestsWithBlobsEnabled(FileStorageTests):
self._storage = ZODB.tests.hexstorage.HexStorage(self._storage)
class MultiFileStorageTestsWithBlobsEnabled(MultiFileStorageTests):
def open(self, **kwargs):
if 'blob_dir' not in kwargs:
kwargs = kwargs.copy()
kwargs['blob_dir'] = 'blobs'
MultiFileStorageTests.open(self, **kwargs)
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
......@@ -432,33 +416,25 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
module.Broken = Broken
oids = [[self._storage.new_oid(), None] for i in range(3)]
def store(i, data):
oid, revid = oids[i]
self._storage.store(oid, revid, data, "", t)
for i in range(2):
t = transaction.Transaction()
self._storage.tpc_begin(t)
# sometimes data is in this format
j = 0
oid, revid = oids[j]
serial = self._storage.store(
oid, revid, dumps(OOBTree, _protocol), "", t)
oids[j][1] = serial
store(0, dumps(OOBTree, _protocol))
# and it could be from a broken module
j = 1
oid, revid = oids[j]
serial = self._storage.store(
oid, revid, dumps(Broken, _protocol), "", t)
oids[j][1] = serial
store(1, dumps(Broken, _protocol))
# but mostly it looks like this
j = 2
o = MinPO(j)
oid, revid = oids[j]
serial = self._storage.store(oid, revid, zodb_pickle(o), "", t)
oids[j][1] = serial
store(2, zodb_pickle(MinPO(2)))
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
tid = self._storage.tpc_finish(t)
for oid_revid in oids:
oid_revid[1] = tid
# now break the import of the Broken class
del sys.modules[module_name]
......@@ -721,7 +697,6 @@ def test_suite():
FileStorageNoRestoreRecoveryTest,
FileStorageTestsWithBlobsEnabled, FileStorageHexTestsWithBlobsEnabled,
AnalyzeDotPyTest,
MultiFileStorageTests, MultiFileStorageTestsWithBlobsEnabled,
]:
suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite(
......@@ -743,14 +718,6 @@ def test_suite():
test_blob_storage_recovery=True,
test_packing=True,
))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobMultiFileStorage',
lambda name, blob_dir:
multicommitadapter.MultiCommitAdapter(
ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir)),
test_blob_storage_recovery=True,
test_packing=True,
))
suite.addTest(PackableStorage.IExternalGC_suite(
lambda : ZODB.FileStorage.FileStorage(
'data.fs', blob_dir='blobs', pack_gc=False)))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment