diff --git a/product/ERP5Type/CachePlugins/DistributedRamCache.py b/product/ERP5Type/CachePlugins/DistributedRamCache.py index 76fc79cc16f538e2bdbebdfc88c7af079c837b2f..0d16531db9b5d5f1c5e1ebd15d326829b45373f6 100644 --- a/product/ERP5Type/CachePlugins/DistributedRamCache.py +++ b/product/ERP5Type/CachePlugins/DistributedRamCache.py @@ -30,7 +30,7 @@ """ Memcached based cache plugin. """ -from thread import get_ident +from threading import local from zLOG import LOG, WARNING from BaseCache import BaseCache from BaseCache import CacheEntry @@ -40,11 +40,12 @@ from base64 import encodestring try: import memcache + from Products.ERP5Type.Tool.MemcachedTool import MemcachedDict except ImportError: LOG('DistributedRamCache', 0, 'unable to import memcache') ## global ditionary containing connection objects -connection_pool = {} +connection_pool = local() _MARKER = [] @@ -74,35 +75,14 @@ class DistributedRamCache(BaseCache): ## "MemCached: while expecting 'STORED', got unexpected response 'END'" ## messages in log files and can sometimes can block the thread. ## For the moment we create a new conn object for every thread. - global connection_pool - thread_id = get_ident() - - memcache_conn = connection_pool.get(thread_id, None) - if memcache_conn is not None: - try: - stats = memcache_conn.get_stats() - except IndexError: - stats = () - if not len(stats) or not len(stats[0][1]): - # create a new connection if the existing connection seems - # dead. - # XXX Since python-memcached does not raise an exception in such - # a case, we check here by calling get_stats(), but it will take - # a bit more time for each getCacheStorage() call. - LOG('DistributedRamCache', WARNING, 'the existing connection seems dead. a new connection will be created.') - memcache_conn.disconnect_all() - memcache_conn = None - if memcache_conn is None: - ## we don't have memcache_conn for this thread - memcache_conn = memcache.Client(self._servers.split('\n'), - debug=self._debug_level, - server_max_key_length=self._server_max_key_length, - server_max_value_length=self._server_max_value_length) - connection_pool[thread_id] = memcache_conn - return memcache_conn - else: - ## we have memcache_conn for this thread - return memcache_conn + try: + dictionary = connection_pool.memcached_dict + except AttributeError: + dictionary = MemcachedDict(self._servers.split('\n'), + server_max_key_length=self._server_max_key_length, + server_max_value_length=self._server_max_value_length) + connection_pool.memcached_dict = dictionary + return dictionary def checkAndFixCacheId(self, cache_id, scope): ## memcached doesn't support namespaces (cache scopes) so to "emmulate" @@ -138,7 +118,7 @@ class DistributedRamCache(BaseCache): cache_storage = self.getCacheStorage() cache_id = self.checkAndFixCacheId(cache_id, scope) cache_entry = CacheEntry(value, cache_duration, calculation_time) - cache_storage.set(cache_id, cache_entry, cache_duration) + cache_storage.set(cache_id, cache_entry) self.markCacheMiss() def expireOldCacheEntries(self, forceCheck = False): @@ -155,7 +135,7 @@ class DistributedRamCache(BaseCache): def delete(self, cache_id, scope): cache_storage = self.getCacheStorage() cache_id = self.checkAndFixCacheId(cache_id, scope) - cache_storage.delete(cache_id) + del cache_storage[cache_id] def has_key(self, cache_id, scope): cache_storage = self.getCacheStorage()