Commit 586cd764 authored by Julien Muchembled's avatar Julien Muchembled

client: simplify previous commit since all loads are serialized

parent 04ab2a4c
...@@ -18,7 +18,6 @@ from cPickle import dumps, loads ...@@ -18,7 +18,6 @@ from cPickle import dumps, loads
from zlib import compress as real_compress, decompress from zlib import compress as real_compress, decompress
from neo.lib.locking import Empty from neo.lib.locking import Empty
from random import shuffle from random import shuffle
from thread import get_ident
import heapq import heapq
import time import time
import os import os
...@@ -98,7 +97,7 @@ class Application(object): ...@@ -98,7 +97,7 @@ class Application(object):
# no self-assigned UUID, primary master will supply us one # no self-assigned UUID, primary master will supply us one
self.uuid = None self.uuid = None
self._cache = ClientCache() self._cache = ClientCache()
self._loading = {} self._loading_oid = None
self.new_oid_list = [] self.new_oid_list = []
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self) self.storage_event_handler = storage.StorageEventHandler(self)
...@@ -412,6 +411,10 @@ class Application(object): ...@@ -412,6 +411,10 @@ class Application(object):
acquire = self._cache_lock_acquire acquire = self._cache_lock_acquire
release = self._cache_lock_release release = self._cache_lock_release
# XXX: Is it possible this giant lock ?
# See commit b77c946d67c9d7cc1e9ee9b15437568dee144aa4
# for a way to invalidate cache properly when several loads
# are done simultaneously.
self._load_lock_acquire() self._load_lock_acquire()
try: try:
acquire() acquire()
...@@ -419,16 +422,14 @@ class Application(object): ...@@ -419,16 +422,14 @@ class Application(object):
result = self._loadFromCache(oid, tid, before_tid) result = self._loadFromCache(oid, tid, before_tid)
if result: if result:
return result return result
loading_key = oid, get_ident() self._loading_oid = oid
self._loading[loading_key] = None finally:
release() release()
try: result = self._loadFromStorage(oid, tid, before_tid)
result = self._loadFromStorage(oid, tid, before_tid) acquire()
finally: try:
acquire() if not (self._loading_oid or result[2]):
invalidated = self._loading.pop(loading_key) result = result[0], result[1], self._loading_invalidated
if invalidated and not result[2]:
result = result[0], result[1], invalidated
self._cache.store(oid, *result) self._cache.store(oid, *result)
return result return result
finally: finally:
...@@ -782,7 +783,6 @@ class Application(object): ...@@ -782,7 +783,6 @@ class Application(object):
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
cache = self._cache cache = self._cache
loading = self._loading
for oid, data in cache_dict.iteritems(): for oid, data in cache_dict.iteritems():
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
# this is just a remain of # this is just a remain of
...@@ -793,9 +793,7 @@ class Application(object): ...@@ -793,9 +793,7 @@ class Application(object):
try: try:
cache.invalidate(oid, tid) cache.invalidate(oid, tid)
except KeyError: except KeyError:
for k in loading: pass
if k[0] == oid and not loading[k]:
loading[k] = tid
if data is not None: if data is not None:
# Store in cache with no next_tid # Store in cache with no next_tid
cache.store(oid, data, tid, None) cache.store(oid, data, tid, None)
......
...@@ -238,6 +238,7 @@ def test(self): ...@@ -238,6 +238,7 @@ def test(self):
self.assertRaises(KeyError, cache.invalidate, 1, 10) self.assertRaises(KeyError, cache.invalidate, 1, 10)
data = 'foo', 5, 10 data = 'foo', 5, 10
# 2 identical stores happens if 2 threads got a cache miss at the same time # 2 identical stores happens if 2 threads got a cache miss at the same time
# (which currently never happens in NEO, due to a lock)
cache.store(1, *data) cache.store(1, *data)
cache.store(1, *data) cache.store(1, *data)
self.assertEqual(cache.load(1, 10), data) self.assertEqual(cache.load(1, 10), data)
......
...@@ -113,14 +113,14 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -113,14 +113,14 @@ class PrimaryNotificationsHandler(BaseHandler):
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
invalidate = app._cache.invalidate invalidate = app._cache.invalidate
loading = app._loading loading = app._loading_oid
for oid in oid_list: for oid in oid_list:
try: try:
invalidate(oid, tid) invalidate(oid, tid)
except KeyError: except KeyError:
for k in loading: if oid == loading:
if k[0] == oid and not loading[k]: app._loading_oid = None
loading[k] = tid app._loading_invalidated = tid
db = app.getDB() db = app.getDB()
if db is not None: if db is not None:
db.invalidate(tid, oid_list) db.invalidate(tid, oid_list)
......
...@@ -511,12 +511,6 @@ class Test(NEOThreadedTest): ...@@ -511,12 +511,6 @@ class Test(NEOThreadedTest):
l1.release() l1.release()
l2.acquire() l2.acquire()
orig(conn, packet, kw, handler) orig(conn, packet, kw, handler)
def _loadFromStorage(orig, *args):
try:
return orig(*args)
finally:
l1.release()
l2.acquire()
cluster = NEOCluster() cluster = NEOCluster()
try: try:
cluster.start() cluster.start()
...@@ -537,26 +531,6 @@ class Test(NEOThreadedTest): ...@@ -537,26 +531,6 @@ class Test(NEOThreadedTest):
l2.release() l2.release()
t.join() t.join()
self.assertEqual(x2.value, 1) self.assertEqual(x2.value, 1)
return # Following is disabled due to deadlock
# caused by client load lock
t1.begin()
x1.value = 0
x2._p_deactivate()
cluster.client._cache.clear()
p = Patch(cluster.client, _loadFromStorage=_loadFromStorage)
try:
t = self.newThread(x2._p_activate)
l1.acquire()
t1.commit()
t1.begin()
finally:
del p
l2.release()
t.join()
x1._p_deactivate()
self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0)
finally: finally:
cluster.stop() cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment