From b44daf40abc80a815d902541a4a497091c32a3dc Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Tue, 12 Mar 2019 21:40:43 +0100 Subject: [PATCH 1/6] qa: comment testExternalInvalidation2 --- neo/tests/threaded/test.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/neo/tests/threaded/test.py b/neo/tests/threaded/test.py index b39bcbb1..a4be35ac 100644 --- a/neo/tests/threaded/test.py +++ b/neo/tests/threaded/test.py @@ -1008,13 +1008,27 @@ class Test(NEOThreadedTest): client.sync() with cluster.master.filterConnection(client) as mc2: mc2.delayInvalidateObjects() + # A first client node (C1) modifies an oid whereas + # invalidations to the other node (C2) are delayed. x._p_changed = 1 t.commit() tid2 = x._p_serial + # C2 loads the most recent revision of this oid (last_tid=tid1). self.assertEqual((tid1, tid2), client.load(x._p_oid)[1:]) + # C2 poll thread is frozen just before processing invalidation + # packet for tid2. C1 modifies something else -> tid3 r._p_changed = 1 t.commit() + self.assertEqual(tid1, client.last_tid) with Patch(client, _cache_lock_release=_cache_lock_release): + # 1. Just after having found nothing in cache, the worker + # thread asks the poll thread to get notified about + # invalidations for the loading oid. + # + # 2. Both invalidations are processed. -> last_tid=tid3 + # + # 3. The worker thread loads before tid3+1. + # The poll thread notified [tid2], which must be ignored. self.assertEqual((tid2, None), client.load(x._p_oid)[1:]) self.assertEqual(nonlocal_, [2, 0]) -- 2.30.9 From ef42ecc1cfbd4dd111d52532ee82543206fe25ed Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Wed, 13 Mar 2019 16:06:44 +0100 Subject: [PATCH 2/6] qa: check cache in testExternalInvalidation Both testExternalInvalidation & testExternalInvalidation2 check processing of invalidations for an oid being loaded, the first one when the next tid needs to be corrected, and the second when it's really None. --- neo/tests/threaded/test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neo/tests/threaded/test.py b/neo/tests/threaded/test.py index a4be35ac..ec493356 100644 --- a/neo/tests/threaded/test.py +++ b/neo/tests/threaded/test.py @@ -949,6 +949,8 @@ class Test(NEOThreadedTest): t.join() self.assertEqual(x2.value, 1) self.assertEqual(x1.value, 0) + self.assertEqual((x2._p_serial, x1._p_serial), + cluster.client._cache.load(x1._p_oid, x1._p_serial)[1:]) def invalidations(conn): try: -- 2.30.9 From 0cc593aec3a67bb704af42341d91e3fae68b4b05 Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Fri, 8 Mar 2019 11:54:10 +0100 Subject: [PATCH 3/6] client: remove load lock in tpc_finish --- neo/client/app.py | 7 +------ neo/client/handlers/master.py | 6 +++++- neo/tests/threaded/test.py | 21 +++++++++++++++++++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/neo/client/app.py b/neo/client/app.py index 880346b8..e9cb492a 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -725,10 +725,7 @@ class Application(ThreadedApplication): txn_container = self._txn_container if not txn_container.get(transaction).voted: self.tpc_vote(transaction) - checked_list = [] - self._load_lock_acquire() - try: - # Call finish on master + if 1: txn_context = txn_container.pop(transaction) cache_dict = txn_context.cache_dict checked_list = [oid for oid, data in cache_dict.iteritems() @@ -745,8 +742,6 @@ class Application(ThreadedApplication): if not tid: raise return tid - finally: - self._load_lock_release() def _getFinalTID(self, ttid): try: diff --git a/neo/client/handlers/master.py b/neo/client/handlers/master.py index b6e7122d..790f9f9a 100644 --- a/neo/client/handlers/master.py +++ b/neo/client/handlers/master.py @@ -82,9 +82,13 @@ class PrimaryNotificationsHandler(MTEventHandler): cache = app._cache app._cache_lock_acquire() try: + invalidate = app._cache.invalidate + loading = app._loading_oid for oid, data in cache_dict.iteritems(): # Update ex-latest value in cache - cache.invalidate(oid, tid) + invalidate(oid, tid) + if oid == loading: + app._loading_invalidated.append(tid) if data is not None: # Store in cache with no next_tid cache.store(oid, data, tid, None) diff --git a/neo/tests/threaded/test.py b/neo/tests/threaded/test.py index ec493356..583d00b4 100644 --- a/neo/tests/threaded/test.py +++ b/neo/tests/threaded/test.py @@ -862,6 +862,27 @@ class Test(NEOThreadedTest): self.assertEqual(c.root()['1'].value, 1) self.assertNotIn('2', c.root()) + @with_cluster() + def testLoadVsFinish(self, cluster): + t1, c1 = cluster.getTransaction() + c1.root()['x'] = x1 = PCounter() + t1.commit() + t1.begin() + x1.value = 1 + t2, c2 = cluster.getTransaction() + x2 = c2.root()['x'] + cluster.client._cache.clear() + def _loadFromStorage(orig, *args): + r = orig(*args) + ll() + return r + with LockLock() as ll, Patch(cluster.client, + _loadFromStorage=_loadFromStorage): + t = self.newThread(x2._p_activate) + ll() + t1.commit() + t.join() + @with_cluster() def testInternalInvalidation(self, cluster): def _handlePacket(orig, conn, packet, kw={}, handler=None): -- 2.30.9 From c7cdcf87df2d44964e795f9021ba41e37ef2588c Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Fri, 8 Mar 2019 12:04:37 +0100 Subject: [PATCH 4/6] client: unindent code --- neo/client/app.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/neo/client/app.py b/neo/client/app.py index e9cb492a..692d12fb 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -725,23 +725,22 @@ class Application(ThreadedApplication): txn_container = self._txn_container if not txn_container.get(transaction).voted: self.tpc_vote(transaction) - if 1: - txn_context = txn_container.pop(transaction) - cache_dict = txn_context.cache_dict - checked_list = [oid for oid, data in cache_dict.iteritems() - if data is CHECKED_SERIAL] - for oid in checked_list: - del cache_dict[oid] - ttid = txn_context.ttid - p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list) - try: - tid = self._askPrimary(p, cache_dict=cache_dict, callback=f) - assert tid - except ConnectionClosed: - tid = self._getFinalTID(ttid) - if not tid: - raise - return tid + txn_context = txn_container.pop(transaction) + cache_dict = txn_context.cache_dict + checked_list = [oid for oid, data in cache_dict.iteritems() + if data is CHECKED_SERIAL] + for oid in checked_list: + del cache_dict[oid] + ttid = txn_context.ttid + p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list) + try: + tid = self._askPrimary(p, cache_dict=cache_dict, callback=f) + assert tid + except ConnectionClosed: + tid = self._getFinalTID(ttid) + if not tid: + raise + return tid def _getFinalTID(self, ttid): try: -- 2.30.9 From 87eca1e073b87fcb6c124ee3cfcfdbaf33b0ea9b Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Sat, 9 Mar 2019 07:08:41 +0100 Subject: [PATCH 5/6] client: replace global load lock by a per-oid one --- neo/client/app.py | 72 +++++++++++++++++----------------- neo/client/handlers/master.py | 31 ++++++--------- neo/debug.py | 5 +-- neo/tests/threaded/__init__.py | 14 ++++--- neo/tests/threaded/test.py | 53 +++++++++++++++++++------ 5 files changed, 100 insertions(+), 75 deletions(-) diff --git a/neo/client/app.py b/neo/client/app.py index 692d12fb..47b9a78b 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -17,7 +17,7 @@ import heapq import random import time - +from collections import defaultdict try: from ZODB._compat import dumps, loads, _protocol except ImportError: @@ -79,7 +79,7 @@ class Application(ThreadedApplication): # no self-assigned UUID, primary master will supply us one self._cache = ClientCache() if cache_size is None else \ ClientCache(max_size=cache_size) - self._loading_oid = None + self._loading = defaultdict(lambda: (Lock(), [])) self.new_oid_list = () self.last_oid = '\0' * 8 self.storage_event_handler = storage.StorageEventHandler(self) @@ -90,19 +90,13 @@ class Application(ThreadedApplication): self.notifications_handler = master.PrimaryNotificationsHandler( self) self._txn_container = TransactionContainer() # Lock definition : - # _load_lock is used to make loading and storing atomic - lock = Lock() - self._load_lock_acquire = lock.acquire - self._load_lock_release = lock.release # _oid_lock is used in order to not call multiple oid # generation at the same time lock = Lock() self._oid_lock_acquire = lock.acquire self._oid_lock_release = lock.release - lock = Lock() # _cache_lock is used for the client cache - self._cache_lock_acquire = lock.acquire - self._cache_lock_release = lock.release + self._cache_lock = Lock() # _connecting_to_master_node is used to prevent simultaneous master # node connection attempts self._connecting_to_master_node = Lock() @@ -398,21 +392,28 @@ class Application(ThreadedApplication): """ # TODO: # - rename parameters (here? and in handlers & packet definitions) - - acquire = self._cache_lock_acquire - release = self._cache_lock_release - # XXX: Consider using a more fine-grained lock. - self._load_lock_acquire() + acquired = False + lock = self._cache_lock try: - acquire() - try: - result = self._loadFromCache(oid, tid, before_tid) - if result: - return result - self._loading_oid = oid - self._loading_invalidated = [] - finally: - release() + while 1: + with lock: + result = self._loadFromCache(oid, tid, before_tid) + if result: + return result + load_lock = self._loading[oid][0] + acquired = load_lock.acquire(0) + # Several concurrent cache misses for the same oid are probably + # for the same tid so we use a per-oid lock to avoid asking the + # same data to the storage node. + if acquired: + # The first thread does load from storage, + # and fills cache with the response. + break + # The other threads wait for the first one to complete and + # loop, possibly resulting in a new cache miss if a different + # tid is actually wanted or if the data was too big. + with load_lock: + pass # While the cache lock is released, an arbitrary number of # invalidations may be processed, for this oid or not. And at this # precise moment, if both tid and before_tid are None (which is @@ -428,20 +429,24 @@ class Application(ThreadedApplication): # we got from master. before_tid = p64(u64(self.last_tid) + 1) data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid) - acquire() - try: - if self._loading_oid: + with lock: + loading = self._loading.pop(oid, None) + if loading: + assert loading[0] is load_lock if not next_tid: - for t in self._loading_invalidated: + for t in loading[1]: if tid < t: next_tid = t break self._cache.store(oid, data, tid, next_tid) # Else, we just reconnected to the master. - finally: - release() - finally: - self._load_lock_release() + load_lock.release() + except: + if acquired: + with lock: + self._loading.pop(oid, None) + load_lock.release() + raise return data, tid, next_tid def _loadFromStorage(self, oid, at_tid, before_tid): @@ -986,11 +991,8 @@ class Application(ThreadedApplication): # It should not be otherwise required (clients should be free to load # old data as long as it is available in cache, event if it was pruned # by a pack), so don't bother invalidating on other clients. - self._cache_lock_acquire() - try: + with self._cache_lock: self._cache.clear() - finally: - self._cache_lock_release() def getLastTID(self, oid): return self.load(oid)[1] diff --git a/neo/client/handlers/master.py b/neo/client/handlers/master.py index 790f9f9a..773ba7f4 100644 --- a/neo/client/handlers/master.py +++ b/neo/client/handlers/master.py @@ -52,8 +52,7 @@ class PrimaryNotificationsHandler(MTEventHandler): # Either we're connecting or we already know the last tid # via invalidations. assert app.master_conn is None, app.master_conn - app._cache_lock_acquire() - try: + with app._cache_lock: if app_last_tid < ltid: app._cache.clear_current() # In the past, we tried not to invalidate the @@ -67,9 +66,7 @@ class PrimaryNotificationsHandler(MTEventHandler): app._cache.clear() # Make sure a parallel load won't refill the cache # with garbage. - app._loading_oid = app._loading_invalidated = None - finally: - app._cache_lock_release() + app._loading.clear() db = app.getDB() db is None or db.invalidateCache() app.last_tid = ltid @@ -80,22 +77,20 @@ class PrimaryNotificationsHandler(MTEventHandler): app.last_tid = tid # Update cache cache = app._cache - app._cache_lock_acquire() - try: + with app._cache_lock: invalidate = app._cache.invalidate - loading = app._loading_oid + loading_get = app._loading.get for oid, data in cache_dict.iteritems(): # Update ex-latest value in cache invalidate(oid, tid) - if oid == loading: - app._loading_invalidated.append(tid) + loading = loading_get(oid) + if loading: + loading[1].append(tid) if data is not None: # Store in cache with no next_tid cache.store(oid, data, tid, None) if callback is not None: callback(tid) - finally: - app._cache_lock_release() def connectionClosed(self, conn): app = self.app @@ -124,19 +119,17 @@ class PrimaryNotificationsHandler(MTEventHandler): if app.ignore_invalidations: return app.last_tid = tid - app._cache_lock_acquire() - try: + with app._cache_lock: invalidate = app._cache.invalidate - loading = app._loading_oid + loading_get = app._loading.get for oid in oid_list: invalidate(oid, tid) - if oid == loading: - app._loading_invalidated.append(tid) + loading = loading_get(oid) + if loading: + loading[1].append(tid) db = app.getDB() if db is not None: db.invalidate(tid, oid_list) - finally: - app._cache_lock_release() def notifyPartitionChanges(self, conn, ptid, cell_list): if self.app.pt.filled(): diff --git a/neo/debug.py b/neo/debug.py index 0722e71b..7846a77a 100644 --- a/neo/debug.py +++ b/neo/debug.py @@ -197,8 +197,7 @@ elif IF == 'trace-cache': @defer def profile(app): - app._cache_lock_acquire() - try: + with app._cache_lock: cache = app._cache if type(cache) is ClientCache: app._cache = CacheTracer(cache, '%s-%s.neo-cache-trace' % @@ -206,5 +205,3 @@ elif IF == 'trace-cache': app._cache.clear() else: app._cache = cache.close() - finally: - app._cache_lock_release() diff --git a/neo/tests/threaded/__init__.py b/neo/tests/threaded/__init__.py index 085bcba0..9d47044b 100644 --- a/neo/tests/threaded/__init__.py +++ b/neo/tests/threaded/__init__.py @@ -1072,8 +1072,7 @@ class NEOThreadedTest(NeoTestBase): def run(self): try: - apply(*self.__target) - self.__exc_info = None + self.__result = apply(*self.__target) except: self.__exc_info = sys.exc_info() if self.__exc_info[0] is NEOThreadedTest.failureException: @@ -1081,10 +1080,13 @@ class NEOThreadedTest(NeoTestBase): def join(self, timeout=None): threading.Thread.join(self, timeout) - if not self.is_alive() and self.__exc_info: - etype, value, tb = self.__exc_info - del self.__exc_info - raise etype, value, tb + if not self.is_alive(): + try: + return self.__result + except AttributeError: + etype, value, tb = self.__exc_info + del self.__exc_info + raise etype, value, tb class newThread(newPausedThread): diff --git a/neo/tests/threaded/test.py b/neo/tests/threaded/test.py index 583d00b4..94a64d26 100644 --- a/neo/tests/threaded/test.py +++ b/neo/tests/threaded/test.py @@ -1009,7 +1009,7 @@ class Test(NEOThreadedTest): x = r[''] = PCounter() t.commit() tid1 = x._p_serial - nonlocal_ = [0, 1] + nonlocal_ = [0, 0, 0] l1 = threading.Lock(); l1.acquire() l2 = threading.Lock(); l2.acquire() def invalidateObjects(orig, *args): @@ -1019,12 +1019,26 @@ class Test(NEOThreadedTest): nonlocal_[0] += 1 if nonlocal_[0] == 2: l2.release() - def _cache_lock_release(orig): - orig() - if nonlocal_[1]: - nonlocal_[1] = 0 + class CacheLock(object): + def __init__(self, client): + self._lock = client._cache_lock + def __enter__(self): + self._lock.acquire() + def __exit__(self, t, v, tb): + count = nonlocal_[1] + nonlocal_[1] = count + 1 + self._lock.release() + if count == 0: + load_same.start() + l2.acquire() + elif count == 1: + load_other.start() + def _loadFromStorage(orig, *args): + count = nonlocal_[2] + nonlocal_[2] = count + 1 + if not count: l1.release() - l2.acquire() + return orig(*args) with cluster.newClient() as client, \ Patch(client.notifications_handler, invalidateObjects=invalidateObjects): @@ -1043,17 +1057,34 @@ class Test(NEOThreadedTest): r._p_changed = 1 t.commit() self.assertEqual(tid1, client.last_tid) - with Patch(client, _cache_lock_release=_cache_lock_release): + load_same = self.newPausedThread(client.load, x._p_oid) + load_other = self.newPausedThread(client.load, r._p_oid) + with Patch(client, _cache_lock=CacheLock(client)), \ + Patch(client, _loadFromStorage=_loadFromStorage): # 1. Just after having found nothing in cache, the worker # thread asks the poll thread to get notified about # invalidations for the loading oid. - # + # (l1) # 2. Both invalidations are processed. -> last_tid=tid3 - # + # (l2) # 3. The worker thread loads before tid3+1. # The poll thread notified [tid2], which must be ignored. - self.assertEqual((tid2, None), client.load(x._p_oid)[1:]) - self.assertEqual(nonlocal_, [2, 0]) + # In parallel, 2 other loads are done (both cache misses): + # - one for the same oid, which waits for first load to + # complete and in particular fill cache, in order to + # avoid asking the same data to the storage node + # - another for a different oid, which doesn't wait, as shown + # by the fact that it returns an old record (i.e. before any + # invalidation packet is processed) + loaded = client.load(x._p_oid) + self.assertEqual((tid2, None), loaded[1:]) + self.assertEqual(loaded, load_same.join()) + self.assertEqual((tid1, r._p_serial), load_other.join()[1:]) + # To summary: + # - 3 concurrent loads starting with cache misses + # - 2 loads from storage + # - 1 load ending with a cache hit + self.assertEqual(nonlocal_, [2, 8, 2]) @with_cluster(storage_count=2, partitions=2) def testReadVerifyingStorage(self, cluster): -- 2.30.9 From a33c624c31086cb0938ca58aecb72554419515da Mon Sep 17 00:00:00 2001 From: Julien Muchembled Date: Wed, 13 Mar 2019 19:35:50 +0100 Subject: [PATCH 6/6] client: inline Application._loadFromCache --- neo/client/app.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/neo/client/app.py b/neo/client/app.py index 47b9a78b..178ee017 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -397,7 +397,11 @@ class Application(ThreadedApplication): try: while 1: with lock: - result = self._loadFromCache(oid, tid, before_tid) + if tid: + result = self._cache.load(oid, tid + '*') + assert not result or result[1] == tid + else: + result = self._cache.load(oid, before_tid) if result: return result load_lock = self._loading[oid][0] @@ -465,16 +469,6 @@ class Application(ThreadedApplication): Packets.AskObject(oid, at_tid, before_tid), askStorage) - def _loadFromCache(self, oid, at_tid=None, before_tid=None): - """ - Load from local cache, return None if not found. - """ - if at_tid: - result = self._cache.load(oid, at_tid + '*') - assert not result or result[1] == at_tid - return result - return self._cache.load(oid, before_tid) - def tpc_begin(self, storage, transaction, tid=None, status=' '): """Begin a new transaction.""" # First get a transaction, only one is allowed at a time -- 2.30.9