Commit 694c27f4 authored by Julien Muchembled's avatar Julien Muchembled

client: fix invalidation issues when reconnecting to the master

parent dd74d662
...@@ -332,10 +332,7 @@ class Application(ThreadedApplication): ...@@ -332,10 +332,7 @@ class Application(ThreadedApplication):
acquire = self._cache_lock_acquire acquire = self._cache_lock_acquire
release = self._cache_lock_release release = self._cache_lock_release
# XXX: Is it possible this giant lock ? # XXX: Consider using a more fine-grained lock.
# See commit b77c946d67c9d7cc1e9ee9b15437568dee144aa4
# for a way to invalidate cache properly when several loads
# are done simultaneously.
self._load_lock_acquire() self._load_lock_acquire()
try: try:
acquire() acquire()
...@@ -356,14 +353,20 @@ class Application(ThreadedApplication): ...@@ -356,14 +353,20 @@ class Application(ThreadedApplication):
data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid) data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
acquire() acquire()
try: try:
result = data, tid, (next_tid if self._loading_oid or next_tid if self._loading_oid:
else self._loading_invalidated) # Common case (no race condition).
self._cache.store(oid, *result) self._cache.store(oid, data, tid, next_tid)
return result elif self._loading_invalidated:
# oid has just been invalidated.
if not next_tid:
next_tid = self._loading_invalidated
self._cache.store(oid, data, tid, next_tid)
# Else, we just reconnected to the master.
finally: finally:
release() release()
finally: finally:
self._load_lock_release() self._load_lock_release()
return data, tid, next_tid
def _loadFromStorage(self, oid, at_tid, before_tid): def _loadFromStorage(self, oid, at_tid, before_tid):
packet = Packets.AskObject(oid, at_tid, before_tid) packet = Packets.AskObject(oid, at_tid, before_tid)
......
...@@ -264,7 +264,6 @@ class ClientCache(object): ...@@ -264,7 +264,6 @@ class ClientCache(object):
assert item.next_tid <= tid, (item, oid, tid) assert item.next_tid <= tid, (item, oid, tid)
def clear_current(self): def clear_current(self):
oid_list = []
for oid, item_list in self._oid_dict.items(): for oid, item_list in self._oid_dict.items():
item = item_list[-1] item = item_list[-1]
if item.next_tid is None: if item.next_tid is None:
...@@ -276,8 +275,6 @@ class ClientCache(object): ...@@ -276,8 +275,6 @@ class ClientCache(object):
# probably not worth it. # probably not worth it.
if not item_list: if not item_list:
del self._oid_dict[oid] del self._oid_dict[oid]
oid_list.append(oid)
return oid_list
def test(self): def test(self):
...@@ -295,11 +292,11 @@ def test(self): ...@@ -295,11 +292,11 @@ def test(self):
data = '15', 15, None data = '15', 15, None
cache.store(1, *data) cache.store(1, *data)
self.assertEqual(cache.load(1, None), data) self.assertEqual(cache.load(1, None), data)
self.assertEqual(cache.clear_current(), [1]) cache.clear_current()
self.assertEqual(cache.load(1, None), None) self.assertEqual(cache.load(1, None), None)
cache.store(1, *data) cache.store(1, *data)
cache.invalidate(1, 20) cache.invalidate(1, 20)
self.assertEqual(cache.clear_current(), []) cache.clear_current()
self.assertEqual(cache.load(1, 20), ('15', 15, 20)) self.assertEqual(cache.load(1, 20), ('15', 15, 20))
cache.store(1, '10', 10, 15) cache.store(1, '10', 10, 15)
cache.store(1, '20', 20, 21) cache.store(1, '20', 20, 21)
......
...@@ -102,19 +102,24 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -102,19 +102,24 @@ class PrimaryNotificationsHandler(MTEventHandler):
if app.master_conn is None: if app.master_conn is None:
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
db = app.getDB()
if app.last_tid < ltid: if app.last_tid < ltid:
oid_list = app._cache.clear_current() app._cache.clear_current()
db is None or db.invalidate( # In the past, we tried not to invalidate the
app.last_tid and add64(app.last_tid, 1), # Connection caches entirely, using the list of
oid_list) # oids that are invalidated by clear_current.
# This was wrong because these caches may have
# entries that are not in the NEO cache anymore.
else: else:
# The DB was truncated. It happens so # The DB was truncated. It happens so
# rarely that we don't need to optimize. # rarely that we don't need to optimize.
app._cache.clear() app._cache.clear()
db is None or db.invalidateCache() # Make sure a parallel load won't refill the cache
# with garbage.
app._loading_oid = app._loading_invalidated = None
finally: finally:
app._cache_lock_release() app._cache_lock_release()
db = app.getDB()
db is None or db.invalidateCache()
app.last_tid = ltid app.last_tid = ltid
elif type(packet) is Packets.AnswerTransactionFinished: elif type(packet) is Packets.AnswerTransactionFinished:
app = self.app app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment