Commit 179530d5 authored by dieter's avatar dieter

satisfy `ZODB>=5.6` requirement that `lastTransaction` pnly changes after invalidation processing

parent e277ba09
...@@ -4,6 +4,12 @@ Changelog ...@@ -4,6 +4,12 @@ Changelog
5.2.3 (unreleased) 5.2.3 (unreleased)
------------------ ------------------
- Ensure ``ZEO`` satisfies the ``ZODB >= 5.6`` requirement that
``lastTransaction()`` changes only after invalidation processing.
Violating this requirement can lead to race conditions and
associated data corruption
`#166 <https://github.com/zopefoundation/ZEO/issues/166>`_.
- Fix data corruption due to race between load and external invalidations. - Fix data corruption due to race between load and external invalidations.
See `issue 155 <https://github.com/zopefoundation/ZEO/issues/155>`_. See `issue 155 <https://github.com/zopefoundation/ZEO/issues/155>`_.
......
...@@ -646,11 +646,31 @@ class Client(object): ...@@ -646,11 +646,31 @@ class Client(object):
try: try:
tid = yield self.protocol.fut('tpc_finish', tid) tid = yield self.protocol.fut('tpc_finish', tid)
cache = self.cache cache = self.cache
# The cache invalidation here and that in
# ``invalidateTransaction`` are both performed
# in the IO thread. Thus there is no interference.
# Other threads might observe a partially invalidated
# cache. However, regular loads will access
# object state before ``tid``; therefore,
# partial invalidation for ``tid`` should not harm.
for oid, data, resolved in updates: for oid, data, resolved in updates:
cache.invalidate(oid, tid) cache.invalidate(oid, tid)
if data and not resolved: if data and not resolved:
cache.store(oid, tid, None, data) cache.store(oid, tid, None, data)
# ZODB >= 5.6 requires that ``lastTransaction`` changes
# only after invalidation processing (performed in
# the ``f`` call below) (for ``ZEO``, ``lastTransaction``
# is implemented as ``cache.getLastTid()``).
# Some tests involve ``f`` in the verification that
# ``tpc_finish`` modifies ``lastTransaction`` and require
# that ``cache.setLastTid`` is called before ``f``.
# We use locking below to ensure that the
# effect of ``setLastTid`` is observable by other
# threads only after ``f`` has been called.
with cache._lock:
cache.setLastTid(tid) cache.setLastTid(tid)
f(tid)
future.set_result(tid)
except Exception as exc: except Exception as exc:
future.set_exception(exc) future.set_exception(exc)
...@@ -659,9 +679,6 @@ class Client(object): ...@@ -659,9 +679,6 @@ class Client(object):
# recovering to a consistent state. # recovering to a consistent state.
self.protocol.close() self.protocol.close()
self.disconnected(self.protocol) self.disconnected(self.protocol)
else:
f(tid)
future.set_result(tid)
else: else:
future.set_exception(ClientDisconnected()) future.set_exception(ClientDisconnected())
...@@ -671,6 +688,8 @@ class Client(object): ...@@ -671,6 +688,8 @@ class Client(object):
def invalidateTransaction(self, tid, oids): def invalidateTransaction(self, tid, oids):
if self.ready: if self.ready:
# see the cache related comment in ``tpc_finish_threadsafe``
# why we think that locking is not necessary at this place
for oid in oids: for oid in oids:
self.cache.invalidate(oid, tid) self.cache.invalidate(oid, tid)
self.client.invalidateTransaction(tid, oids) self.client.invalidateTransaction(tid, oids)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment