Commit a7d101ec authored by Julien Muchembled's avatar Julien Muchembled

client: fix race with invalidations when starting a new transaction on ZODB 5

This requires ZODB >= 5.6.0
parent fc58c089
# -*- coding: utf-8 -*-
#
# Copyright (C) 2006-2019 Nexedi SA
#
......@@ -67,7 +68,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app
app.last_tid = tid
cache = app._cache
invalidate = cache.invalidate
loading_get = app._loading.get
......@@ -83,6 +83,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
cache.store(oid, data, tid, None)
if callback is not None:
callback(tid)
app.last_tid = tid # see comment in invalidateObjects
def connectionClosed(self, conn):
app = self.app
......@@ -110,7 +111,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
app = self.app
if app.ignore_invalidations:
return
app.last_tid = tid
with app._cache_lock:
invalidate = app._cache.invalidate
loading_get = app._loading.get
......@@ -122,6 +122,13 @@ class PrimaryNotificationsHandler(MTEventHandler):
db = app.getDB()
if db is not None:
db.invalidate(tid, oid_list)
# ZODB<5: Update before releasing the lock so that app.load
# asks the last serial (with respect to already processed
# invalidations by Connection._setstate).
# ZODB≥5: Update after db.invalidate because the MVCC
# adapter starts at the greatest TID between
# IStorage.lastTransaction and processed invalidations.
app.last_tid = tid
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable)
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2019 Nexedi SA
#
......@@ -945,6 +946,72 @@ class Test(NEOThreadedTest):
t.join()
self.assertEqual(x2.value, 1)
@with_cluster()
def testInternalInvalidation2(self, cluster):
# same as testExternalInvalidation3 but with internal invalidations
t, c = cluster.getTransaction()
x = c.root()[''] = PCounter()
t.commit()
l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire()
def sync(orig):
orig()
l2.release()
l1.acquire()
def raceBeforeInvalidateZODB(orig, transaction, f):
def callback(tid):
l1.release()
begin1.join()
f(tid)
return orig(transaction, callback)
def raceAfterInvalidateZODB(orig, transaction, f):
def callback(tid):
f(tid)
l1.release()
begin1.join()
return orig(transaction, callback)
class CacheLock(object):
def __init__(self):
self._lock = client._cache_lock
def __enter__(self):
self._lock.acquire()
def __exit__(self, t, v, tb):
self._lock.release()
p.revert()
load1.start()
l1.acquire()
def _loadFromStorage(orig, *args):
l1.release()
return orig(*args)
client = cluster.client
t2, c2 = cluster.getTransaction()
x2 = c2.root()['']
x2.value = 1
with Patch(client, tpc_finish=raceBeforeInvalidateZODB):
with Patch(client, sync=sync):
begin1 = self.newThread(t.begin)
l2.acquire()
t2.commit()
self.assertEqual(x.value, 0)
x._p_deactivate()
# On ZODB≥5, the following check would fail
# if tpc_finish updated app.last_tid earlier.
self.assertEqual(x.value, 0)
t.begin()
self.assertEqual(x.value, 1)
x2.value = big = 'x' * cluster.cache_size # force load from storage
with Patch(client, _cache_lock=CacheLock()) as p, \
Patch(client, _loadFromStorage=_loadFromStorage), \
Patch(client, tpc_finish=raceAfterInvalidateZODB):
with Patch(client, sync=sync):
begin1 = self.newThread(t.begin)
l2.acquire()
load1 = self.newPausedThread(lambda: x.value)
t2.commit()
# On ZODB<5, the following check would fail
# if tpc_finish updated app.last_tid later.
self.assertEqual(load1.join(), big)
@with_cluster()
def testExternalInvalidation(self, cluster):
# Initialize objects
......@@ -1126,6 +1193,70 @@ class Test(NEOThreadedTest):
# - 1 load ending with a cache hit
self.assertEqual(nonlocal_, [2, 8, 2])
@with_cluster(serialized=False)
def testExternalInvalidation3(self, cluster):
# same as testInternalInvalidation2 but with external invalidations
t, c = cluster.getTransaction()
x = c.root()[''] = PCounter()
t.commit()
def sync(orig):
orig()
ll_sync()
def raceBeforeInvalidateZODB(orig, *args):
ll_inv()
orig(*args)
def raceAfterInvalidateZODB(orig, *args):
orig(*args)
ll_inv()
l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire()
class CacheLock(object):
def __init__(self):
self._lock = client._cache_lock
def __enter__(self):
self._lock.acquire()
def __exit__(self, t, v, tb):
self._lock.release()
l1.release()
l2.acquire()
def _loadFromStorage(orig, *args):
l2.release()
return orig(*args)
client = cluster.client
with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db)
x2 = c2.root()['']
x2.value = 1
with Patch(client._db, invalidate=raceBeforeInvalidateZODB), \
LockLock() as ll_inv:
with Patch(client, sync=sync), LockLock() as ll_sync:
begin1 = self.newThread(t.begin)
ll_sync()
t2.commit()
ll_inv()
begin1.join()
self.assertEqual(x.value, 0)
x._p_deactivate()
# On ZODB≥5, the following check would fail if
# invalidateObjects updated app.last_tid earlier.
self.assertEqual(x.value, 0)
t.begin()
self.assertEqual(x.value, 1)
x2.value = 2
with Patch(client, _cache_lock=CacheLock()), \
Patch(client._db, invalidate=raceAfterInvalidateZODB), \
LockLock() as ll_inv:
with Patch(client, sync=sync), LockLock() as ll_sync:
begin1 = self.newThread(t.begin)
ll_sync()
t2.commit()
ll_inv()
begin1.join()
with Patch(client, _loadFromStorage=_loadFromStorage):
# On ZODB<5, the following check would fail if
# invalidateObjects updated app.last_tid later.
self.assertEqual(x.value, 2)
@with_cluster(storage_count=2, partitions=2)
def testReadVerifyingStorage(self, cluster):
s1, s2 = cluster.sortStorageList()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment