Commit ed2b7417 authored by Jim Fulton's avatar Jim Fulton

Added new APIs to check to make sure data read is up to date when used

to decide how to write other data.
parent f2653e8a
...@@ -134,6 +134,10 @@ class Connection(ExportImport, object): ...@@ -134,6 +134,10 @@ class Connection(ExportImport, object):
# of this list are either in _cache or in _added. # of this list are either in _cache or in _added.
self._registered_objects = [] self._registered_objects = []
# ids and serials of objects for which readCurrent was called
# in a transaction.
self._readCurrent = {}
# Dict of oid->obj added explicitly through add(). Used as a # Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved # preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at # to the real _cache. The objects are moved to _creating at
...@@ -552,6 +556,10 @@ class Connection(ExportImport, object): ...@@ -552,6 +556,10 @@ class Connection(ExportImport, object):
else: else:
self._commit(transaction) self._commit(transaction)
for oid, serial in self._readCurrent.iteritems():
self._storage.checkCurrentSerialInTransaction(
oid, serial, transaction)
def _commit(self, transaction): def _commit(self, transaction):
"""Commit changes to an object""" """Commit changes to an object"""
...@@ -674,6 +682,11 @@ class Connection(ExportImport, object): ...@@ -674,6 +682,11 @@ class Connection(ExportImport, object):
self._handle_serial(oid, s) self._handle_serial(oid, s)
def _handle_serial(self, oid, serial, change=True): def _handle_serial(self, oid, serial, change=True):
# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
self._readCurrent.pop(oid, None)
if not serial: if not serial:
return return
if not isinstance(serial, str): if not isinstance(serial, str):
...@@ -782,6 +795,7 @@ class Connection(ExportImport, object): ...@@ -782,6 +795,7 @@ class Connection(ExportImport, object):
# pending invalidations regardless. Of course this should only be # pending invalidations regardless. Of course this should only be
# called at transaction boundaries. # called at transaction boundaries.
def _storage_sync(self, *ignored): def _storage_sync(self, *ignored):
self._readCurrent.clear()
sync = getattr(self._storage, 'sync', 0) sync = getattr(self._storage, 'sync', 0)
if sync: if sync:
sync() sync()
...@@ -949,6 +963,10 @@ class Connection(ExportImport, object): ...@@ -949,6 +963,10 @@ class Connection(ExportImport, object):
if obj is not None: if obj is not None:
self._registered_objects.append(obj) self._registered_objects.append(obj)
def readCurrent(self, ob):
assert ob._p_jar is self
assert ob._p_oid is not None and ob._p_serial is not None
self._readCurrent[ob._p_oid] = ob._p_serial
# persistent.interfaces.IPersistentDatamanager # persistent.interfaces.IPersistentDatamanager
########################################################################## ##########################################################################
......
...@@ -286,6 +286,14 @@ class IConnection(Interface): ...@@ -286,6 +286,14 @@ class IConnection(Interface):
begins or until the connection os reopned. begins or until the connection os reopned.
""" """
def readCurrent(obj):
"""Make sure an object being read is current
This is used when applications want to ensure a higher level
of consistency for some operations. This should be called when
an object is read and the information read is used to write a
separate object.
"""
class IStorageWrapper(Interface): class IStorageWrapper(Interface):
"""Storage wrapper interface """Storage wrapper interface
...@@ -1119,6 +1127,22 @@ class IExternalGC(IStorage): ...@@ -1119,6 +1127,22 @@ class IExternalGC(IStorage):
commit. commit.
""" """
class ReadVerifyingStorage(IStorage):
def checkCurrentSerialInTransaction(oid, serial, transaction):
"""Check whether the given serial number is current.
The method is called during the first phase of 2-phase commit
to verify that data read in a transaction is current.
The storage should raise a ReadConflictError if the serial is not
current, although it may raise the exception later, in a call
to store or in a call to tpc_vote.
If no exception is raised, then the serial must remain current
through the end of the transaction.
"""
class IBlob(Interface): class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB.""" """A BLOB supports efficient handling of large data within ZODB."""
......
...@@ -611,6 +611,150 @@ See https://bugs.launchpad.net/zodb/+bug/185066 ...@@ -611,6 +611,150 @@ See https://bugs.launchpad.net/zodb/+bug/185066
""" """
def readCurrent():
r"""
The connection's readCurrent method is called to provide a higher
level of consistency in cases where an object if read to compute an
update to a separate object. When this is used, the
checkCurrentSerialInTransaction method on the storage is called in
2-phase commit.
To demonstrate this, we'll create a storage and give it a test
implementation of checkCurrentSerialInTransaction.
>>> import ZODB.MappingStorage
>>> store = ZODB.MappingStorage.MappingStorage()
>>> from ZODB.POSException import ReadConflictError
>>> bad = set()
>>> def checkCurrentSerialInTransaction(oid, serial, trans):
... print 'checkCurrentSerialInTransaction', `oid`
... if not trans == transaction.get(): print 'oops'
... if oid in bad:
... raise ReadConflictError(oid=oid)
>>> store.checkCurrentSerialInTransaction = checkCurrentSerialInTransaction
Now, we'll use the storage as usual. checkCurrentSerialInTransaction
won't normally be called:
>>> db = ZODB.DB(store)
>>> conn = db.open()
>>> conn.root.a = ZODB.tests.util.P('a')
>>> conn.root.b = ZODB.tests.util.P('b')
>>> transaction.commit()
If we call readCurrent for an object and we modify another object,
then checkCurrentSerialInTransaction will be called for the object
readCurrent was called on.
>>> conn.readCurrent(conn.root.a)
>>> conn.root.b.x = 0
>>> transaction.commit()
checkCurrentSerialInTransaction '\x00\x00\x00\x00\x00\x00\x00\x01'
It doesn't matter how often we call readCurrent,
checkCurrentSerialInTransaction will be called only once:
>>> conn.readCurrent(conn.root.a)
>>> conn.readCurrent(conn.root.a)
>>> conn.readCurrent(conn.root.a)
>>> conn.readCurrent(conn.root.a)
>>> conn.root.b.x += 1
>>> transaction.commit()
checkCurrentSerialInTransaction '\x00\x00\x00\x00\x00\x00\x00\x01'
checkCurrentSerialInTransaction won't be called if another object
isn't modified:
>>> conn.readCurrent(conn.root.a)
>>> transaction.commit()
Or if the object it was called on is modified:
>>> conn.readCurrent(conn.root.a)
>>> conn.root.a.x = 0
>>> conn.root.b.x += 1
>>> transaction.commit()
If the storage raises a conflict error, it'll be propigated:
>>> bad.add(conn.root.a._p_oid)
>>> conn.readCurrent(conn.root.a)
>>> conn.root.b.x += 1
>>> transaction.commit()
Traceback (most recent call last):
...
ReadConflictError: database read conflict error (oid 0x01)
>>> transaction.abort()
The storage may raise it later:
>>> def checkCurrentSerialInTransaction(oid, serial, trans):
... if not trans == transaction.get(): print 'oops'
... print 'checkCurrentSerialInTransaction', `oid`
... store.badness = ReadConflictError(oid=oid)
>>> def tpc_vote(t):
... if store.badness:
... badness = store.badness
... store.badness = None
... raise badness
>>> store.checkCurrentSerialInTransaction = checkCurrentSerialInTransaction
>>> store.badness = None
>>> store.tpc_vote = tpc_vote
It will still be propigated:
>>> conn.readCurrent(conn.root.a)
>>> conn.root.b.x = +1
>>> transaction.commit()
Traceback (most recent call last):
...
ReadConflictError: database read conflict error (oid 0x01)
>>> transaction.abort()
Read checks don't leak accross transactions:
>>> conn.readCurrent(conn.root.a)
>>> transaction.commit()
>>> conn.root.b.x = +1
>>> transaction.commit()
Read checks to work accross savepoints.
>>> conn.readCurrent(conn.root.a)
>>> conn.root.b.x = +1
>>> _ = transaction.savepoint()
>>> transaction.commit()
Traceback (most recent call last):
...
ReadConflictError: database read conflict error (oid 0x01)
>>> transaction.abort()
>>> conn.readCurrent(conn.root.a)
>>> _ = transaction.savepoint()
>>> conn.root.b.x = +1
>>> transaction.commit()
Traceback (most recent call last):
...
ReadConflictError: database read conflict error (oid 0x01)
>>> transaction.abort()
"""
# check interaction w savepoint
# check call in read-only trans followed by write trans
class _PlayPersistent(Persistent): class _PlayPersistent(Persistent):
def setValueWithSize(self, size=0): self.value = size*' ' def setValueWithSize(self, size=0): self.value = size*' '
__init__ = setValueWithSize __init__ = setValueWithSize
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment