Unverified Commit b5895a5c authored by Julien Muchembled's avatar Julien Muchembled Committed by GitHub

mvccadapter: fix race with invalidations when starting a new transaction (#291)

Fixes #290
parent 12b52a71
......@@ -5,6 +5,11 @@
5.6.0 (unreleased)
==================
- Fix race with invalidations when starting a new transaction. The bug
affected Storage implementations that rely on mvccadapter, and could result
in data corruption (oid loaded at wrong serial after a concurrent commit).
See `issue 290 <https://github.com/zopefoundation/ZODB/issues/290>`_.
- Improve volatile attribute ``_v_`` documentation.
- Make repozo's recover mode atomic by recovering the backup in a
......
......@@ -113,15 +113,15 @@ def pack_with_repeated_blob_records():
fixed by the time you read this, but there might still be
transactions in the wild that have duplicate records.
>>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs')
>>> db = ZODB.DB(fs)
>>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs'))
>>> conn = db.open()
>>> conn.root()[1] = ZODB.blob.Blob()
>>> transaction.commit()
>>> tm = transaction.TransactionManager()
>>> oid = conn.root()[1]._p_oid
>>> from ZODB.utils import load_current
>>> blob_record, oldserial = load_current(fs, oid)
>>> fs = db._mvcc_storage.new_instance()
>>> _ = fs.poll_invalidations()
>>> blob_record, oldserial = fs.load(oid)
Now, create a transaction with multiple saves:
......
......@@ -49,6 +49,7 @@ class MVCCAdapter(Base):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
instance._lastTransaction()
return instance
def before_instance(self, before=None):
......@@ -77,13 +78,13 @@ class MVCCAdapter(Base):
def invalidate(self, transaction_id, oids):
with self._lock:
for instance in self._instances:
instance._invalidate(oids)
instance._invalidate(transaction_id, oids)
def _invalidate_finish(self, oids, committing_instance):
def _invalidate_finish(self, tid, oids, committing_instance):
with self._lock:
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
instance._invalidate(tid, oids)
references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
......@@ -98,14 +99,26 @@ class MVCCAdapterInstance(Base):
'checkCurrentSerialInTransaction', 'tpc_abort',
)
_start = None # Transaction start time
_ltid = None # Last storage transaction id
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)
def _lastTransaction(self):
ltid = self._storage.lastTransaction()
# At this precise moment, a transaction may be
# committed and we have already received the new tid.
with self._lock:
# So make sure we won't override with a smaller value.
if self._ltid is None:
# Calling lastTransaction() here could result in a deadlock.
self._ltid = ltid
def release(self):
self._base._release(self)
......@@ -115,8 +128,9 @@ class MVCCAdapterInstance(Base):
with self._lock:
self._invalidations = None
def _invalidate(self, oids):
def _invalidate(self, tid, oids):
with self._lock:
self._ltid = tid
try:
self._invalidations.update(oids)
except AttributeError:
......@@ -128,8 +142,8 @@ class MVCCAdapterInstance(Base):
self._sync()
def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
self._start = p64(u64(self._ltid) + 1)
if self._invalidations is None:
self._invalidations = set()
return None
......@@ -175,7 +189,8 @@ class MVCCAdapterInstance(Base):
self._modified = None
def invalidate_finish(tid):
self._base._invalidate_finish(modified, self)
self._base._invalidate_finish(tid, modified, self)
self._ltid = tid
func(tid)
return self._storage.tpc_finish(transaction, invalidate_finish)
......@@ -260,7 +275,7 @@ class UndoAdapterInstance(Base):
def tpc_finish(self, transaction, func = lambda tid: None):
def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
self._base._invalidate_finish(tid, self._undone, None)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
......@@ -14,6 +14,7 @@
"""Unit tests for the Connection class."""
from __future__ import print_function
from contextlib import contextmanager
import doctest
import re
import six
......@@ -535,13 +536,13 @@ class InvalidationTests(unittest.TestCase):
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
Transaction start times are based on storage's last transaction,
which is known from invalidations. (Previousely, they were
based on the first invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
>>> mvcc_instance._start == p64(2)
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
......@@ -570,6 +571,36 @@ class InvalidationTests(unittest.TestCase):
>>> db.close()
"""
def test_mvccadapterNewTransactionVsInvalidations(self):
"""
Check that polled invalidations are consistent with the TID at which
the transaction operates. Otherwise, it's like we miss invalidations.
"""
db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
try:
t1 = transaction.TransactionManager()
c1 = db.open(t1)
r1 = c1.root()
r1['a'] = 1
t1.commit()
t2 = transaction.TransactionManager()
c2 = db.open(t2)
c2.root()['b'] = 1
s1 = c1._storage
l1 = s1._lock
@contextmanager
def beforeLock1():
s1._lock = l1
t2.commit()
with l1:
yield
s1._lock = beforeLock1()
t1.begin()
self.assertIs(s1._lock, l1)
self.assertIn('b', r1)
finally:
db.close()
def doctest_invalidateCache():
"""The invalidateCache method invalidates a connection's cache.
......@@ -1395,4 +1426,5 @@ def test_suite():
s.addTest(doctest.DocTestSuite(checker=checker))
s.addTest(unittest.makeSuite(TestConnection))
s.addTest(unittest.makeSuite(EstimatedSizeTests))
s.addTest(unittest.makeSuite(InvalidationTests))
return s
......@@ -85,13 +85,14 @@ storage has seen.
>>> cn = db.open()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> ltid = u64(st.lastTransaction())
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
A connection's high-water mark is set to the transaction id taken from
......@@ -105,7 +106,7 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations.
>>> cn.sync()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(ltid + 201)
True
Basic functionality
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment