Commit 132af7e5 authored by Ivan Tyagov's avatar Ivan Tyagov

Initial import of Cache Plugins.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@11034 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 4d5abcc3
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Base Cache plugin.
"""
import time
class CachedMethodError(Exception):
pass
class CacheEntry(object):
""" Cachable entry. Used as a wrapper around real values stored in cache.
value
cache_duration
stored_at
cache_hits
calculation_time
TODO: Based on above data we can have a different invalidation policy
"""
def __init__(self, value, cache_duration=None, calculation_time=0):
self.value = value
self.cache_duration = cache_duration
self.stored_at = int(time.time())
self.cache_hits = 0
self.calculation_time = calculation_time
def isExpired(self):
""" check cache entry for expiration """
if self.cache_duration is None or self.cache_duration==0:
## cache entry can stay in cache forever until zope restarts
return False
now = int(time.time())
if now > (self.stored_at + int(self.cache_duration)):
return True
else:
return False
def markCacheHit(self, delta=1):
""" mark a read to this cache entry """
self.cache_hits = self.cache_hits + delta
def getValue(self):
""" return cached value """
return getattr(self, 'value', None)
class BaseCache(object):
""" Base Cache class """
## Time interval (s) to check for expired objects
cache_expire_check_interval = 60
def __init__(self, params={}):
self._last_cache_expire_check_at = time.time()
self._cache_hits = 0
self._cache_misses = 0
def markCacheHit(self, delta=1):
""" Mark a read operation from cache """
self._cache_hits = self._cache_hits + delta
def markCacheMiss(self, delta=1):
""" Mark a write operation to cache """
self._cache_misses = self._cache_misses + delta
def getCacheHits(self):
""" get cache hits """
return self._cache_hits
def getCacheMisses(self):
""" get cache missess """
return self._cache_misses
def clearCache(self):
""" Clear cache """
self._cache_hits = 0
self._cache_misses = 0
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Memcached based cache plugin.
"""
from BaseCache import *
from time import time
try:
import memcache
except ImportError:
raise CachedMethodError, "Memcache module is not available"
MEMCACHED_SERVER_MAX_KEY_LENGTH = memcache.SERVER_MAX_KEY_LENGTH
## number of seconds before creating a new connection to memcached server
##KEEP_ALIVE_MEMCACHED_CONNECTION_INTERVAL = 30
class DistributedRamCache(BaseCache):
""" Memcached based cache plugin. """
def __init__(self, params):
self._servers = params.get('server', '')
self._debugLevel = params.get('debugLevel', 7)
self._last_cache_conn_creation_time = time()
BaseCache.__init__(self)
def getCacheStorage(self):
## if we use one connection object this causes "MemCached: while expecting 'STORED', got unexpected response 'END'"
## messages in log files and thus sometimes can block the thread. For the moment we create
## a new conn object for every memcache access which in turns means another socket.
## See addiionaly expireOldCacheEntries() comments for one or many connections.
try:
from Products.ERP5Type.Utils import get_request
request = get_request()
except ImportError:
request = None
if request:
## Zope/ERP5 environment
memcache_conn = request.get('_erp5_memcache_connection', None)
if not memcache_conn:
## we have not memcache_conn for this request
memcache_conn = memcache.Client(self._servers.split('\n'), debug=self._debugLevel)
request.set('_erp5_memcache_connection', memcache_conn)
return memcache_conn
else:
## we have memcache_conn for this request
return memcache_conn
else:
## run from unit tests
return memcache.Client(self._servers.split('\n'), debug=self._debugLevel)
def checkAndFixCacheId(self, cache_id, scope):
## memcached doesn't support namespaces (cache scopes) so to "emmulate"
## such behaviour when constructing cache_id we add scope in front
cache_id = "%s.%s" %(scope, cache_id)
## memcached will fail to store cache_id longer than MEMCACHED_SERVER_MAX_KEY_LENGTH.
if len(cache_id) > MEMCACHED_SERVER_MAX_KEY_LENGTH:
cache_id = cache_id[:MEMCACHED_SERVER_MAX_KEY_LENGTH]
return cache_id
def get(self, cache_id, scope, default=None):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
cache_entry = cache_storage.get(cache_id)
self.markCacheHit()
return cache_entry
def set(self, cache_id, scope, value, cache_duration= None, calculation_time=0):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
if not cache_duration:
## what should be default cache_duration when None is specified?
## currently when 'None' it means forever so give it big value of 100 hours
cache_duration = 360000
cache_entry = CacheEntry(value, cache_duration, calculation_time)
cache_storage.set(cache_id, cache_entry, cache_duration)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
""" Memcache has its own built in expire policy """
## we can not use one connection to memcached server for time being of DistributedRamCache
## because if memcached is restarted for any reason our connection object will have its socket
## to memcached server closed.
## The workaround of this problem is to create a new connection for every cache access
## but that's too much overhead or create a new connection when cache is to be expired.
## This way we can catch memcached server failures. BTW: This hack is forced by the lack functionality in python-memcached
#self._cache = memcache.Client(self._servers.split('\n'), debug=self._debugLevel)
pass
def delete(self, cache_id, scope):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
cache_storage.delete(cache_id)
def has_key(self, cache_id, scope):
if self.get(cache_id, scope):
return True
else:
return False
def getScopeList(self):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
return []
def getScopeKeyList(self, scope):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
return []
def clearCache(self):
BaseCache.clearCache(self)
cache_storage = self.getCacheStorage()
cache_storage.flush_all()
def clearCacheForScope(self, scope):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
pass
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"Dummy (no cache)"
from BaseCache import *
import time
class DummyCache(BaseCache):
""" Dummy cache plugin. """
def __init__(self, params):
BaseCache.__init__(self)
def __call__(self, callable_object, cache_id, cache_duration=None, *args, **kwd):
## Just calculate and return result - no caching
return callable_object(*args, **kwd)
def getCacheStorage(self):
pass
def get(self, cache_id, scope, default=None):
pass
def set(self, cache_id, scope, value, cache_duration= None, calculation_time=0):
pass
def expireOldCacheEntries(self, forceCheck = False):
pass
def delete(self, cache_id, scope):
pass
def has_key(self, cache_id, scope):
pass
def getScopeList(self):
pass
def getScopeKeyList(self, scope):
pass
def clearCache(self):
pass
def clearCacheForScope(self, scope):
pass
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Local RAM based cache plugin.
"""
from BaseCache import *
import time
class RamCache(BaseCache):
""" RAM based cache plugin."""
_cache_dict = {}
cache_expire_check_interval = 300
def __init__(self, params={}):
BaseCache.__init__(self)
def getCacheStorage(self):
return self._cache_dict
def get(self, cache_id, scope, default=None):
cache = self.getCacheStorage()
if self.has_key(cache_id, scope):
cache_entry = cache[scope].get(cache_id, default)
cache_entry.markCacheHit()
self.markCacheHit()
return cache_entry
else:
return default
def set(self, cache_id, scope, value, cache_duration=None, calculation_time=0):
cache = self.getCacheStorage()
if not cache.has_key(scope):
## cache scope not initialized
cache[scope] = {}
cache[scope][cache_id] = CacheEntry(value, cache_duration, calculation_time)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
now = time.time()
if forceCheck or (now > (self._last_cache_expire_check_at + self.cache_expire_check_interval)):
## time to check for expired cache items
#print "EXPIRE ", self, self.cache_expire_check_interval
self._last_cache_expire_check_at = now
cache = self.getCacheStorage()
for scope in cache.keys():
for (cache_id, cache_item) in cache[scope].items():
if cache_item.isExpired()==True:
del cache[scope][cache_id]
def delete(self, cache_id, scope):
try:
del self.getCacheStorage()[scope][cache_id]
except KeyError:
pass
def has_key(self, cache_id, scope):
cache = self.getCacheStorage()
if not cache.has_key(scope):
## cache scope not initialized
cache[scope] = {}
return cache[scope].has_key(cache_id)
def getScopeList(self):
scope_list = []
## some cache scopes in RAM Cache can have no cache_ids keys but
## they do exists. To have consistent behaviour with SQLCache plugin
## where cache scope will not exists without its cache_ids we filter them.
for scope, item in self.getCacheStorage().items():
if item!={}:
scope_list.append(scope)
return scope_list
def getScopeKeyList(self, scope):
return self.getCacheStorage()[scope].keys()
def clearCache(self):
BaseCache.clearCache(self)
self._cache_dict = {}
def clearCacheForScope(self, scope):
try:
self.getCacheStorage()[scope] = {}
except KeyError:
pass
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
SQL (MySQL) based cache plugin.
"""
from BaseCache import *
import time, base64
try:
import cPickle as pickle
except ImportError:
import pickle
try:
import MySQLdb
except ImportError:
raise CachedMethodError, "MySQLdb module is not available"
class SQLCache(BaseCache):
""" SQL based cache plugin. """
cache_expire_check_interval = 3600
create_table_sql = '''CREATE TABLE %s(cache_id VARCHAR(970) NOT NULL,
value LONGTEXT,
scope VARCHAR(20),
stored_at INT,
cache_duration INT DEFAULT 0,
calculation_time FLOAT,
UNIQUE(cache_id, scope))
'''
insert_key_sql = '''INSERT INTO %s (cache_id, value, scope, stored_at, cache_duration, calculation_time)
VALUES("%s", "%s", "%s", %s, %s, %s)
'''
has_key_sql = '''SELECT count(*)
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
get_key_sql = '''SELECT value, cache_duration, calculation_time
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
delete_key_sql = '''DELETE
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
delete_all_keys_sql = '''DELETE
FROM %s
'''
delete_all_keys_for_scope_sql = '''DELETE
FROM %s
WHERE scope="%s"
'''
delete_expired_keys_sql = '''DELETE
FROM %s
WHERE cache_duration + stored_at < %s and cache_duration!=0
'''
get_scope_list_sql = '''SELECT scope
FROM %s
GROUP BY scope
'''
get_scope_key_list_sql = '''SELECT cache_id
FROM %s
WHERE scope="%s"
'''
def __init__(self, params):
BaseCache.__init__(self)
self._dbConn = None
self._db_server = params.get('server', '')
self._db_user = params.get('user', '')
self._db_passwd = params.get('passwd', '')
self._db_name = params.get('db', '')
self._db_cache_table_name = params.get('cache_table_name')
## since SQL cache is persistent check for expired objects
#self.expireOldCacheEntries(forceCheck=True)
def getCacheStorage(self):
"""
Return current DB connection or create a new one for his thread.
See http://sourceforge.net/docman/display_doc.php?docid=32071&group_id=22307
especially threadsafety part why we create every time a new MySQL db connection object.
"""
try:
from Products.ERP5Type.Utils import get_request
request = get_request()
except ImportError:
request = None
if request:
## Zope/ERP5 environment
dbConn = request.get('_erp5_dbcache_connection', None)
if not dbConn:
## we have not dbConn for this request
dbConn = MySQLdb.connect(host=self._db_server, \
user=self._db_user,\
passwd=self._db_passwd, \
db=self._db_name)
request.set('_erp5_dbcache_connection', dbConn)
return dbConn
else:
## we have already dbConn for this request
return dbConn
else:
## run from unit tests
dbConn = MySQLdb.connect(host=self._db_server, \
user=self._db_user,\
passwd=self._db_passwd, \
db=self._db_name)
return dbConn
def get(self, cache_id, scope, default=None):
sql_query = self.get_key_sql %(self._db_cache_table_name, cache_id, scope)
cursor = self.execSQLQuery(sql_query)
if cursor:
## count return one row only
result = cursor.fetchall()
if 0 < len(result):
## we found results
result = result[0]
decoded_result = pickle.loads(base64.decodestring(result[0]))
self.markCacheHit()
cache_entry = CacheEntry(decoded_result, result[1], result[2])
return cache_entry
else:
## no such cache_id in DB
return None
else:
## DB not available
return None
def set(self, cache_id, scope, value, cache_duration=None, calculation_time=0):
value = base64.encodestring(pickle.dumps(value,2))
if not cache_duration:
## should live forever ==> setting cache_duration = 0 will make it live forever
cache_duration = 0
else:
## we have strict cache_duration defined. we calculate seconds since start of epoch
cache_duration = int(cache_duration)
## Set key in DB
stored_at = int(time.time())
sql_query = self.insert_key_sql %(self._db_cache_table_name, cache_id, value, scope, stored_at, cache_duration, calculation_time)
self.execSQLQuery(sql_query)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
now = time.time()
if forceCheck or (now > (self._last_cache_expire_check_at + self.cache_expire_check_interval)):
## time to check for expired cache items
self._last_cache_expire_check_at = now
my_query = self.delete_expired_keys_sql %(self._db_cache_table_name, now)
self.execSQLQuery(my_query)
def delete(self, cache_id, scope):
my_query = self.delete_key_sql %(self._db_cache_table_name, cache_id, scope)
self.execSQLQuery(my_query)
def has_key(self, cache_id, scope):
my_query = self.has_key_sql %(self._db_cache_table_name, cache_id, scope)
cursor = self.execSQLQuery(my_query)
if cursor:
## count() SQL function will return one row only
result = cursor.fetchall()
result = result[0][0]
if result == 0:
## no such key in DB
return False
elif result==1:
## we have this key in DB
return True
else:
## something wrong in DB model
raise CachedMethodError, "Invalid cache table reltion format. cache_id MUST be unique!"
else:
## DB not available
return False
def getScopeList(self):
rl = []
my_query = self.get_scope_list_sql %(self._db_cache_table_name)
cursor = self.execSQLQuery(my_query)
results = cursor.fetchall()
for result in results:
rl.append(result[0])
return rl
def getScopeKeyList(self, scope):
rl = []
my_query = self.get_scope_key_list_sql %(self._db_cache_table_name, scope)
cursor = self.execSQLQuery(my_query)
results = cursor.fetchall()
for result in results:
rl.append(result[0])
return rl
def clearCache(self):
BaseCache.clearCache(self)
## SQL Cache is a persistent storage rather than delete all entries
## just expire them
## self.expireOldCacheEntries(forceCheck = True):
my_query = self.delete_all_keys_sql %(self._db_cache_table_name)
self.execSQLQuery(my_query)
def clearCacheForScope(self, scope):
my_query = self.delete_all_keys_for_scope_sql %(self._db_cache_table_name, scope)
self.execSQLQuery(my_query)
def execSQLQuery(self, sql_query):
"""
Try to execute sql query.
Return cursor object because some queris can return result
"""
dbConn = self.getCacheStorage()
cursor = dbConn.cursor()
cursor.execute(sql_query)
return cursor
"""
Cache plugin classes.
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment