Commit 2729f3a9 authored by Jim Fulton's avatar Jim Fulton

Better IMVCCStorage support

The previous commit, made in anger, made a test pass, but wasn't
really the right fix.

This commit fixes MVCCMappingStorage's loadBefore implementation by
fixing handling of the internal _ltid variable so that it's updated
during poll_invalidations.  This allowed the base class version of
loadBefore to be used and, I'm 97% sure has the right semantics.
Fixing this revealed a problem with the Connection changes.

Fixed Connection.py to poll for invalidations before computing
_txn_time by calling lastTransaction, so as to get a current value.
We still apply invalidations after computing _txn_time, so that
persistent classes can be loaded correctly when they are invalidated.
This was accomplished by weaving _flush_invalidations into
newTransaction.
parent f2922e4c
...@@ -490,61 +490,6 @@ class Connection(ExportImport, object): ...@@ -490,61 +490,6 @@ class Connection(ExportImport, object):
self._registered_objects = [] self._registered_objects = []
self._creating.clear() self._creating.clear()
# Process pending invalidations.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(list(self._cache.cache_data.keys()))
elif invalidated:
self._cache.invalidate(invalidated)
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage.
self._cache.incrgc()
def tpc_begin(self, transaction): def tpc_begin(self, transaction):
"""Begin commit of a transaction, starting the two-phase commit.""" """Begin commit of a transaction, starting the two-phase commit."""
self._modified = [] self._modified = []
...@@ -827,13 +772,62 @@ class Connection(ExportImport, object): ...@@ -827,13 +772,62 @@ class Connection(ExportImport, object):
def newTransaction(self, transaction=None): def newTransaction(self, transaction=None):
self._readCurrent.clear() self._readCurrent.clear()
getattr(self._storage, 'sync', noop)() getattr(self._storage, 'sync', noop)()
if self._mvcc_storage:
# Poll the storage for invalidations.
mvc_invalidated = self._storage.poll_invalidations()
if mvc_invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._invalidatedCache = True
else:
mvc_invalidated = None
with self._inv_lock:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
else:
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
if self.opened: if self.opened:
self._txn_time = p64(u64(self._storage.lastTransaction()) + 1) self._txn_time = p64(u64(self._storage.lastTransaction()) + 1)
# Nope that we flush invalidation *after* setting transaction if mvc_invalidated:
# time, because invalidating persistent classes causes data to self._cache.invalidate(mvc_invalidated)
# be loaded. self._cache.invalidate(invalidated)
self._flush_invalidations()
# Now is a good time to collect some garbage.
self._cache.incrgc()
afterCompletion = newTransaction afterCompletion = newTransaction
......
...@@ -46,7 +46,7 @@ class MVCCMappingStorage(MappingStorage): ...@@ -46,7 +46,7 @@ class MVCCMappingStorage(MappingStorage):
inst._commit_lock = self._commit_lock inst._commit_lock = self._commit_lock
inst.new_oid = self.new_oid inst.new_oid = self.new_oid
inst.pack = self.pack inst.pack = self.pack
inst.loadBefore = self.loadBefore inst._ltid = self._ltid
inst._main_lock_acquire = self._lock_acquire inst._main_lock_acquire = self._lock_acquire
inst._main_lock_release = self._lock_release inst._main_lock_release = self._lock_release
return inst return inst
...@@ -74,11 +74,10 @@ class MVCCMappingStorage(MappingStorage): ...@@ -74,11 +74,10 @@ class MVCCMappingStorage(MappingStorage):
# prevent changes to _transactions and _data during analysis # prevent changes to _transactions and _data during analysis
self._main_lock_acquire() self._main_lock_acquire()
try: try:
if self._transactions: if self._transactions:
new_tid = self._transactions.maxKey() new_tid = self._transactions.maxKey()
else: else:
new_tid = b'' new_tid = ZODB.utils.z64
# Copy the current data into a snapshot. This is obviously # Copy the current data into a snapshot. This is obviously
# very inefficient for large storages, but it's good for # very inefficient for large storages, but it's good for
...@@ -113,7 +112,7 @@ class MVCCMappingStorage(MappingStorage): ...@@ -113,7 +112,7 @@ class MVCCMappingStorage(MappingStorage):
finally: finally:
self._main_lock_release() self._main_lock_release()
self._polled_tid = new_tid self._polled_tid = self._ltid = new_tid
return list(changed_oids) return list(changed_oids)
def tpc_finish(self, transaction, func = lambda tid: None): def tpc_finish(self, transaction, func = lambda tid: None):
...@@ -131,10 +130,3 @@ class MVCCMappingStorage(MappingStorage): ...@@ -131,10 +130,3 @@ class MVCCMappingStorage(MappingStorage):
MappingStorage.pack(self, t, referencesf, gc) MappingStorage.pack(self, t, referencesf, gc)
finally: finally:
self._commit_lock.release() self._commit_lock.release()
@ZODB.utils.locked(MappingStorage.opened)
def lastTransaction(self):
if self._transactions:
return self._transactions.maxKey()
else:
return ZODB.utils.z64
...@@ -746,7 +746,6 @@ class ClientThread(TestThread): ...@@ -746,7 +746,6 @@ class ClientThread(TestThread):
def runtest(self): def runtest(self):
from random import choice from random import choice
conn = self.db.open() conn = self.db.open()
root = conn.root()
for j in range(self.loop_trip): for j in range(self.loop_trip):
assign_worked = False assign_worked = False
...@@ -755,7 +754,7 @@ class ClientThread(TestThread): ...@@ -755,7 +754,7 @@ class ClientThread(TestThread):
try: try:
index = choice(self.choices) index = choice(self.choices)
alist.extend([self.millis(), index]) alist.extend([self.millis(), index])
root[index].value = MinPO(j) conn.root()[index].value = MinPO(j)
assign_worked = True assign_worked = True
transaction.commit() transaction.commit()
alist.append(self.millis()) alist.append(self.millis())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment