Commit edb95fa2 authored by Sebastien Robin's avatar Sebastien Robin

first submission of the new cache stuff made by ivan@nexedi.com

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@10600 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 4eda5ae1
"""
Base Cache plugin.
"""
#from Products.ERP5Cache.interfaces import ICache
import time
class CachedMethodError(Exception):
pass
class CacheEntry(object):
""" Cachable entry. Used as a wrapper around real values stored in cache.
value
cache_duration
stored_at
cache_hits
calculation_time
TODO: Based on above data we can have a different invalidation policy
"""
def __init__(self, value, cache_duration=None, calculation_time=0):
self.value = value
self.cache_duration = cache_duration
self.stored_at = int(time.time())
self.cache_hits = 0
self.calculation_time = calculation_time
def isExpired(self):
""" check cache entry for expiration """
if self.cache_duration is None or self.cache_duration==0:
## cache entry can stay in cache forever until zope restarts
return False
now = int(time.time())
if now > (self.stored_at + int(self.cache_duration)):
return True
else:
return False
def markCacheHit(self, delta=1):
""" mark a read to this cache entry """
self.cache_hits = self.cache_hits + delta
def getValue(self):
""" return cached value """
return getattr(self, 'value', None)
class BaseCache(object):
""" Base Cache class """
#__implements__ = (ICache,)
## Time interval (s) to check for expired objects
cache_expire_check_interval = 60
def __init__(self, params={}):
self._last_cache_expire_check_at = time.time()
self._cache_hits = 0
self._cache_misses = 0
def markCacheHit(self, delta=1):
""" Mark a read operation from cache """
self._cache_hits = self._cache_hits + delta
def markCacheMiss(self, delta=1):
""" Mark a write operation to cache """
self._cache_misses = self._cache_misses + delta
def getCacheHits(self):
""" get cache hits """
return self._cache_hits
def getCacheMisses(self):
""" get cache missess """
return self._cache_misses
def clearCache(self):
""" Clear cache """
self._cache_hits = 0
self._cache_misses = 0
"""
Memcached based cache plugin.
"""
from BaseCache import *
from time import time
try:
import memcache
except ImportError:
raise CachedMethodError, "Memcache module is not available"
MEMCACHED_SERVER_MAX_KEY_LENGTH = memcache.SERVER_MAX_KEY_LENGTH
## number of seconds before creating a new connection to memcached server
KEEP_ALIVE_MEMCACHED_CONNECTION_INTERVAL = 30
class DistributedRamCache(BaseCache):
""" Memcached based cache plugin. """
def __init__(self, params):
self._servers = params.get('server', '')
self._debugLevel = params.get('debugLevel', 7)
self._cache = memcache.Client(self._servers.split('\n'), self._debugLevel)
self._last_cache_conn_creation_time = time()
BaseCache.__init__(self)
def getCacheStorage(self):
## if we use one connection object this causes "MemCached: while expecting 'STORED', got unexpected response 'END'"
## messages in log files and thus sometimes can block the thread. For the moment we create
## a new conn object for every memcache access which in turns cmeans another socket.
## See addiionaly expireOldCacheEntries() comments for one or many connections.
self._cache = memcache.Client(self._servers.split('\n'), debug=self._debugLevel)
return self._cache
def checkAndFixCacheId(self, cache_id, scope):
## memcached doesn't support namespaces (cache scopes) so to "emmulate"
## such behaviour when constructing cache_id we add scope in front
cache_id = "%s.%s" %(scope, cache_id)
## memcached will fail to store cache_id longer than MEMCACHED_SERVER_MAX_KEY_LENGTH.
if len(cache_id) > MEMCACHED_SERVER_MAX_KEY_LENGTH:
cache_id = cache_id[:MEMCACHED_SERVER_MAX_KEY_LENGTH]
return cache_id
def get(self, cache_id, scope, default=None):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
cache_entry = cache_storage.get(cache_id)
self.markCacheHit()
return cache_entry
def set(self, cache_id, scope, value, cache_duration= None, calculation_time=0):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
if not cache_duration:
## what should be default cache_duration when None is specified?
## currently when 'None' it means forever so give it big value of 100 hours
cache_duration = 360000
cache_entry = CacheEntry(value, cache_duration, calculation_time)
cache_storage.set(cache_id, cache_entry, cache_duration)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
""" Memcache has its own built in expire policy """
## we can not use one connection to memcached server for time being of DistributedRamCache
## because if memcached is restarted for any reason our connection object will have its socket
## to memcached server closed.
## The workaround of this problem is to create a new connection for every cache access
## but that's too much overhead or create a new connection when cache is to be expired.
## This way we can catch memcached server failures. BTW: This hack is forced by the lack functionality in python-memcached
#self._cache = memcache.Client(self._servers.split('\n'), debug=self._debugLevel)
pass
def delete(self, cache_id, scope):
cache_storage = self.getCacheStorage()
cache_id = self.checkAndFixCacheId(cache_id, scope)
cache_storage.delete(cache_id)
def has_key(self, cache_id, scope):
if self.get(cache_id, scope):
return True
else:
return False
def getScopeList(self):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
return []
def getScopeKeyList(self, scope):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
return []
def clearCache(self):
BaseCache.clearCache(self)
cache_storage = self.getCacheStorage()
cache_storage.flush_all()
def clearCacheForScope(self, scope):
## memcached doesn't support namespaces (cache scopes) neither getting cached key list
pass
"Local RAM based cache"
from BaseCache import *
import time
class DummyCache(BaseCache):
""" Dummy cache plugin. """
def __init__(self, params):
BaseCache.__init__(self)
def __call__(self, callable_object, cache_id, cache_duration=None, *args, **kwd):
## Just calculate and return result - no caching
return callable_object(*args, **kwd)
def getCacheStorage(self):
pass
def get(self, cache_id, scope, default=None):
pass
def set(self, cache_id, scope, value, cache_duration= None, calculation_time=0):
pass
def expireOldCacheEntries(self, forceCheck = False):
pass
def delete(self, cache_id, scope):
pass
def has_key(self, cache_id, scope):
pass
def getScopeList(self):
pass
def getScopeKeyList(self, scope):
pass
def clearCache(self):
pass
def clearCacheForScope(self, scope):
pass
"""
Local RAM based cache plugin.
"""
from BaseCache import *
import time
class RamCache(BaseCache):
""" RAM based cache plugin."""
_cache_dict = {}
cache_expire_check_interval = 300
def __init__(self, params={}):
BaseCache.__init__(self)
def getCacheStorage(self):
return self._cache_dict
def get(self, cache_id, scope, default=None):
cache = self.getCacheStorage()
if self.has_key(cache_id, scope):
cache_entry = cache[scope].get(cache_id, default)
cache_entry.markCacheHit()
self.markCacheHit()
return cache_entry
else:
return default
def set(self, cache_id, scope, value, cache_duration=None, calculation_time=0):
cache = self.getCacheStorage()
if not cache.has_key(scope):
## cache scope not initialized
cache[scope] = {}
cache[scope][cache_id] = CacheEntry(value, cache_duration, calculation_time)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
now = time.time()
if forceCheck or (now > (self._last_cache_expire_check_at + self.cache_expire_check_interval)):
## time to check for expired cache items
#print "EXPIRE ", self, self.cache_expire_check_interval
self._last_cache_expire_check_at = now
cache = self.getCacheStorage()
for scope in cache.keys():
for (cache_id, cache_item) in cache[scope].items():
if cache_item.isExpired()==True:
del cache[scope][cache_id]
def delete(self, cache_id, scope):
try:
del self.getCacheStorage()[scope][cache_id]
except KeyError:
pass
def has_key(self, cache_id, scope):
cache = self.getCacheStorage()
if not cache.has_key(scope):
## cache scope not initialized
cache[scope] = {}
return cache[scope].has_key(cache_id)
def getScopeList(self):
scope_list = []
## some cache scopes in RAM Cache can have no cache_ids keys but
## they do exists. To have consistent behaviour with SQLCache plugin
## where cache scope will not exists without its cache_ids we filter them.
for scope, item in self.getCacheStorage().items():
if item!={}:
scope_list.append(scope)
return scope_list
def getScopeKeyList(self, scope):
return self.getCacheStorage()[scope].keys()
def clearCache(self):
BaseCache.clearCache(self)
self._cache_dict = {}
def clearCacheForScope(self, scope):
try:
self.getCacheStorage()[scope] = {}
except KeyError:
pass
"""
SQL (MySQL) based cache plugin.
"""
from BaseCache import *
import time, base64
try:
import cPickle as pickle
except ImportError:
import pickle
try:
import MySQLdb
except ImportError:
raise CachedMethodError, "MySQLdb module is not available"
class SQLCache(BaseCache):
""" SQL based cache plugin. """
cache_expire_check_interval = 3600
create_table_sql = '''CREATE TABLE %s(cache_id VARCHAR(970) NOT NULL,
value LONGTEXT,
scope VARCHAR(20),
stored_at INT,
cache_duration INT DEFAULT 0,
calculation_time FLOAT,
UNIQUE(cache_id, scope))
'''
insert_key_sql = '''INSERT INTO %s (cache_id, value, scope, stored_at, cache_duration, calculation_time)
VALUES("%s", "%s", "%s", %s, %s, %s)
'''
has_key_sql = '''SELECT count(*)
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
get_key_sql = '''SELECT value, cache_duration, calculation_time
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
delete_key_sql = '''DELETE
FROM %s
WHERE cache_id = "%s" and scope="%s"
'''
delete_all_keys_sql = '''DELETE
FROM %s
'''
delete_all_keys_for_scope_sql = '''DELETE
FROM %s
WHERE scope="%s"
'''
delete_expired_keys_sql = '''DELETE
FROM %s
WHERE cache_duration + stored_at < %s and cache_duration!=0
'''
get_scope_list_sql = '''SELECT scope
FROM %s
GROUP BY scope
'''
get_scope_key_list_sql = '''SELECT cache_id
FROM %s
WHERE scope="%s"
'''
def __init__(self, params):
BaseCache.__init__(self)
self._dbConn = None
self._db_server = params.get('server', '')
self._db_user = params.get('user', '')
self._db_passwd = params.get('passwd', '')
self._db_name = params.get('db', '')
self._db_cache_table_name = params.get('cache_table_name')
## since SQL cache is persistent check for expired objects
#self.expireOldCacheEntries(forceCheck=True)
def getCacheStorage(self):
"""
Return current DB connection or create a new one.
See http://sourceforge.net/docman/display_doc.php?docid=32071&group_id=22307
especially threadsafety part why we create every time a new MySQL db connection object.
"""
dbConn = MySQLdb.connect(host=self._db_server, \
user=self._db_user,\
passwd=self._db_passwd, \
db=self._db_name)
return dbConn
def get(self, cache_id, scope, default=None):
sql_query = self.get_key_sql %(self._db_cache_table_name, cache_id, scope)
cursor = self.execSQLQuery(sql_query)
if cursor:
## count return one row only
result = cursor.fetchall()
if 0 < len(result):
## we found results
result = result[0]
decoded_result = pickle.loads(base64.decodestring(result[0]))
self.markCacheHit()
cache_entry = CacheEntry(decoded_result, result[1], result[2])
return cache_entry
else:
## no such cache_id in DB
return None
else:
## DB not available
return None
def set(self, cache_id, scope, value, cache_duration=None, calculation_time=0):
value = base64.encodestring(pickle.dumps(value,2))
if not cache_duration:
## should live forever ==> setting cache_duration = 0 will make it live forever
cache_duration = 0
else:
## we have strict cache_duration defined. we calculate seconds since start of epoch
cache_duration = int(cache_duration)
## Set key in DB
stored_at = int(time.time())
sql_query = self.insert_key_sql %(self._db_cache_table_name, cache_id, value, scope, stored_at, cache_duration, calculation_time)
self.execSQLQuery(sql_query)
self.markCacheMiss()
def expireOldCacheEntries(self, forceCheck = False):
now = time.time()
if forceCheck or (now > (self._last_cache_expire_check_at + self.cache_expire_check_interval)):
## time to check for expired cache items
#print "EXPIRE", self, self.cache_expire_check_interval
self._last_cache_expire_check_at = now
my_query = self.delete_expired_keys_sql %(self._db_cache_table_name, now)
self.execSQLQuery(my_query)
def delete(self, cache_id, scope):
my_query = self.delete_key_sql %(self._db_cache_table_name, cache_id, scope)
self.execSQLQuery(my_query)
def has_key(self, cache_id, scope):
my_query = self.has_key_sql %(self._db_cache_table_name, cache_id, scope)
cursor = self.execSQLQuery(my_query)
if cursor:
## count() SQL function will return one row only
result = cursor.fetchall()
result = result[0][0]
if result == 0:
## no such key in DB
return False
elif result==1:
## we have this key in DB
return True
else:
## something wrong in DB model
raise CachedMethodError, "Invalid cache table reltion format. cache_id MUST be unique!"
else:
## DB not available
return False
def getScopeList(self):
rl = []
my_query = self.get_scope_list_sql %(self._db_cache_table_name)
cursor = self.execSQLQuery(my_query)
results = cursor.fetchall()
for result in results:
rl.append(result[0])
return rl
def getScopeKeyList(self, scope):
rl = []
my_query = self.get_scope_key_list_sql %(self._db_cache_table_name, scope)
cursor = self.execSQLQuery(my_query)
results = cursor.fetchall()
for result in results:
rl.append(result[0])
return rl
def clearCache(self):
BaseCache.clearCache(self)
## SQL Cache is a persistent storage rather than delete all entries
## just expire them
## self.expireOldCacheEntries(forceCheck = True):
my_query = self.delete_all_keys_sql %(self._db_cache_table_name)
self.execSQLQuery(my_query)
def clearCacheForScope(self, scope):
my_query = self.delete_all_keys_for_scope_sql %(self._db_cache_table_name, scope)
self.execSQLQuery(my_query)
def execSQLQuery(self, sql_query):
"""
Try to execute sql query.
Return cursor object because some queris can return result
"""
dbConn = self.getCacheStorage()
cursor = dbConn.cursor()
cursor.execute(sql_query)
return cursor
##USER = 'USER'
##PORTAL = 'PORTAL'
##GLOBAL = 'GLOBAL'
##HOST = 'HOST'
##THREAD = 'THREAD'
##
##CACHE_SCOPES = (USER, PORTAL,GLOBAL, HOST, THREAD,)
##DEFAULT_CACHE_SCOPE = USER
##
##DEFAULT_CACHE_STRATEGY = ('quick_cache', 'persistent_cache', )
##
##
#### TODO: DistributedRamCache and SQLCache must be able to read their
#### configuration properties when CachingMethod is initialized
#### Sepcifying in config.py isn't the best way?!
##
##CACHE_PLUGINS_MAP = {## Local RAM based cache
## 'quick_cache': {'className': 'RamCache',
## 'fieldName': 'ram_cache',
## 'params': {},
## },
##
## ## Memcached
## 'shared_cache':{'className': 'DistributedRamCache',
## 'fieldName': 'distributed_ram_cache',
## 'params': {'servers': '127.0.0.1:11211',
## 'debugLevel': 7,
## }
## },
##
## ## MySQL cache
## 'persistent_cache':{'className': 'SQLCache',
## 'fieldName': 'sql_cache',
## 'params': {'server': 'localhost',
## 'user': 'zope',
## 'passwd': 'zope_pass',
## 'db': 'cache',
## 'cache_table_name': 'cache',
## }
## },
##
## ## Dummy (no cache)
## 'dummy_cache': {'className': 'DummyCache',
## 'fieldName': 'dummy_cache',
## 'params': {},
## },
## }
""" Cache Tool module for ERP5 """
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Globals import InitializeClass, DTMLFile, PersistentMapping
from Products.ERP5Cache import _dtmldir
from Products.ERP5Type.Cache import CachingMethod, CacheFactory
from Products.ERP5Cache.CachePlugins.RamCache import RamCache
from Products.ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from Products.ERP5Cache.CachePlugins.SQLCache import SQLCache
##try:
## from Products.TimerService import getTimerService
##except ImportError:
## def getTimerService(self):
## pass
class CacheTool(BaseTool):
""" Caches tool wrapper for ERP5 """
id = "portal_caches"
meta_type = "ERP5 Cache Tool"
portal_type = "Cache Tool"
security = ClassSecurityInfo()
manage_options = ({'label': 'Configure',
'action': 'cache_tool_configure',
},) + BaseTool.manage_options
security.declareProtected( Permissions.ManagePortal, 'cache_tool_configure')
cache_tool_configure = DTMLFile( 'cache_tool_configure', _dtmldir )
def __init__(self):
BaseTool.__init__(self)
security.declareProtected(Permissions.AccessContentsInformation, 'getCacheFactoryList')
def getCacheFactoryList(self):
""" Return available cache factories """
rd ={}
for cf in self.objectValues('ERP5 Cache Factory'):
cache_scope = cf.getId()
rd[cache_scope] = {}
rd[cache_scope]['cache_plugins'] = []
rd[cache_scope]['cache_params'] = {}
for cp in cf.getCachePluginList():
cp_meta_type = cp.meta_type
if cp_meta_type == 'ERP5 Ram Cache Plugin':
cache_obj = RamCache()
elif cp_meta_type == 'ERP5 Distributed Ram Cache Plugin':
cache_obj = DistributedRamCache({'server':cp.getServer()})
elif cp_meta_type == 'ERP5 SQL Cache Plugin':
## use connection details from 'erp5_sql_transactionless_connection' ZMySLQDA object
connection_string = self.erp5_sql_transactionless_connection.connection_string
kw = self.parseDBConnectionString(connection_string)
kw['cache_table_name'] = cp.getCacheTableName()
cache_obj = SQLCache(kw)
## set cache expire check interval
cache_obj.cache_expire_check_interval = cp.getCacheExpireCheckInterval()
rd[cache_scope]['cache_plugins'].append(cache_obj)
rd[cache_scope]['cache_params']['cache_duration'] = cf.getCacheDuration() #getattr(cf, 'cache_duration', None)
return rd
##
## DB structure
##
security.declareProtected(Permissions.ModifyPortalContent, 'createDBCacheTable')
def createDBCacheTable(self, cache_table_name="cache", REQUEST=None):
""" create in MySQL DB cache table """
my_query = SQLCache.create_table_sql %cache_table_name
try:
self.erp5_sql_transactionless_connection.manage_test("DROP TABLE %s" %cache_table_name)
except:
pass
self.erp5_sql_transactionless_connection.manage_test(my_query)
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache table successfully created.')
security.declareProtected(Permissions.AccessContentsInformation, 'parseDBConnectionString')
def parseDBConnectionString(self, connection_string):
""" Parse given connection string. Code "borrowed" from ZMySLQDA.db """
kwargs = {}
items = connection_string.split()
if not items:
return kwargs
lockreq, items = items[0], items[1:]
if lockreq[0] == "*":
db_host, items = items[0], items[1:]
else:
db_host = lockreq
if '@' in db_host:
db, host = split(db_host,'@',1)
kwargs['db'] = db
if ':' in host:
host, port = split(host,':',1)
kwargs['port'] = int(port)
kwargs['host'] = host
else:
kwargs['db'] = db_host
if kwargs['db'] and kwargs['db'][0] in ('+', '-'):
kwargs['db'] = kwargs['db'][1:]
if not kwargs['db']:
del kwargs['db']
if not items:
return kwargs
kwargs['user'], items = items[0], items[1:]
if not items:
return kwargs
kwargs['passwd'], items = items[0], items[1:]
if not items:
return kwargs
kwargs['unix_socket'], items = items[0], items[1:]
return kwargs
##
## RAM cache structure
##
security.declareProtected(Permissions.AccessContentsInformation, 'getRamCacheRoot')
def getRamCacheRoot(self):
""" Return RAM based cache root """
erp5_site_id = self.getPortalObject().getId()
return CachingMethod.factories[erp5_site_id]
security.declareProtected(Permissions.ModifyPortalContent, 'updateCache')
def updateCache(self, REQUEST=None):
""" Clear and update cache structure """
erp5_site_id = self.getPortalObject().getId()
for cf in CachingMethod.factories[erp5_site_id]:
for cp in CachingMethod.factories[erp5_site_id][cf].getCachePluginList():
del cp
CachingMethod.factories[erp5_site_id] = {}
## read configuration from ZODB
for key,item in self.getCacheFactoryList().items():
if len(item['cache_plugins'])!=0:
CachingMethod.factories[erp5_site_id][key] = CacheFactory(item['cache_plugins'], item['cache_params'])
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache updated.')
security.declareProtected(Permissions.ModifyPortalContent, 'clearCache')
def clearCache(self, REQUEST=None):
""" Clear whole cache structure """
ram_cache_root = self.getRamCacheRoot()
for cf in ram_cache_root:
for cp in ram_cache_root[cf].getCachePluginList():
cp.clearCache()
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache cleared.')
security.declareProtected(Permissions.ModifyPortalContent, 'clearCacheFactory')
def clearCacheFactory(self, cache_factory_id, REQUEST=None):
""" Clear only cache factory. """
ram_cache_root = self.getRamCacheRoot()
if ram_cache_root.has_key(cache_factory_id):
ram_cache_root[cache_factory_id].clearCache()
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache factory %s cleared.' %cache_factory_id)
# Timer - checks for cache expiration triggered by Zope's TimerService
## def isSubscribed(self):
## """
## return True, if we are subscribed to TimerService.
## Otherwise return False.
## """
## service = getTimerService(self)
## if not service:
## LOG('AlarmTool', INFO, 'TimerService not available')
## return False
##
## path = '/'.join(self.getPhysicalPath())
## if path in service.lisSubscriptions():
## return True
## return False
##
## security.declareProtected(Permissions.ManageProperties, 'subscribe')
## def subscribe(self):
## """
## Subscribe to the global Timer Service.
## """
## service = getTimerService(self)
## if not service:
## LOG('AlarmTool', INFO, 'TimerService not available')
## return
## service.subscribe(self)
## return "Subscribed to Timer Service"
##
## security.declareProtected(Permissions.ManageProperties, 'unsubscribe')
## def unsubscribe(self):
## """
## Unsubscribe from the global Timer Service.
## """
## service = getTimerService(self)
## if not service:
## LOG('AlarmTool', INFO, 'TimerService not available')
## return
## service.unsubscribe(self)
## return "Usubscribed from Timer Service"
##
## def manage_beforeDelete(self, item, container):
## self.unsubscribe()
## BaseTool.inheritedAttribute('manage_beforeDelete')(self, item, container)
##
## def manage_afterAdd(self, item, container):
## self.subscribe()
## BaseTool.inheritedAttribute('manage_afterAdd')(self, item, container)
##
## security.declarePrivate('process_timer')
## def process_timer(self, interval, tick, prev="", next=""):
## """
## This method is called by TimerService in the interval given
## in zope.conf. The Default is every 5 seconds. This method will
## try to expire cache entries.
## """
## ram_cache_root = self.getRamCacheRoot()
## for cf_id, cf_obj in ram_cache_root.items():
## cf_obj.expire()
from AccessControl import ClassSecurityInfo
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type import Permissions
from Products.ERP5Type import PropertySheet
from Products.ERP5Cache.PropertySheet import CacheFactory
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Cache import CachingMethod, CacheFactory
class CacheFactory(XMLObject):
"""
CacheFactory is a collection of cache plugins. CacheFactory is an object which liv in ZODB.
"""
meta_type = 'ERP5 Cache Factory'
portal_type = 'Cache Factory'
isPortalContent = 1
isRADContent = 1
allowed_types = ('ERP5 Ram Cache Plugin',
'ERP5 Distributed Ram Cache Plugin',
'ERP5 SQL Cache Plugin',
)
security = ClassSecurityInfo()
security.declareProtected(CMFCorePermissions.ManagePortal,
'manage_editProperties',
'manage_changeProperties',
'manage_propertiesForm',
)
property_sheets = ( PropertySheet.Base
, PropertySheet.SimpleItem
, PropertySheet.Folder
, PropertySheet.CacheFactory
)
def getCachePluginList(self):
""" get ordered list of installed cache plugins in ZODB """
cache_plugins = self.objectValues(self.allowed_types)
cache_plugins = map(None, cache_plugins)
cache_plugins.sort(lambda x,y: cmp(x.int_index, y.int_index))
return cache_plugins
security.declareProtected(Permissions.AccessContentsInformation, 'getRamCacheFactory')
def getRamCacheFactory(self):
""" Return RAM based cache factory """
erp5_site_id = self.getPortalObject().getId()
return CachingMethod.factories[erp5_site_id][self.cache_scope]
security.declareProtected(Permissions.AccessContentsInformation, 'getRamCacheFactoryPluginList')
def getRamCacheFactoryPluginList(self):
""" Return RAM based list of cache plugins for this factory """
return self.getRamCacheFactory().getCachePluginList()
def clearCache(self):
""" clear cache for this cache factory """
for cp in self.getRamCacheFactory().getCachePluginList():
cp.clearCache()
from AccessControl import ClassSecurityInfo
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import PropertySheet
from Products.ERP5Cache.PropertySheet.BaseCachePlugin import BaseCachePlugin
from Products.ERP5Cache.PropertySheet.DistributedRamCachePlugin import DistributedRamCachePlugin
class DistributedRamCachePlugin(XMLObject):
"""
DistributedRamCachePlugin is a Zope (persistent) representation of
the Distributed RAM Cache real cache plugin object.
"""
meta_type='ERP5 Distributed Ram Cache Plugin'
portal_type='Distributed Ram Cache Plugin'
isPortalContent = 1
isRADContent = 1
allowed_types = ()
security = ClassSecurityInfo()
security.declareProtected(CMFCorePermissions.ManagePortal,
'manage_editProperties',
'manage_changeProperties',
'manage_propertiesForm',
)
property_sheets = ( PropertySheet.Base
, PropertySheet.SimpleItem
, PropertySheet.Folder
, BaseCachePlugin
, DistributedRamCachePlugin
)
from AccessControl import ClassSecurityInfo
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import PropertySheet
from Products.ERP5.PropertySheet.SortIndex import SortIndex
from Products.ERP5Cache.PropertySheet.BaseCachePlugin import BaseCachePlugin
class RamCachePlugin(XMLObject):
"""
RamCachePlugin is a Zope (persistent) representation of
the RAM based real cache plugin object.
"""
meta_type = 'ERP5 Ram Cache Plugin'
portal_type = 'Ram Cache Plugin'
isPortalContent = 1
isRADContent = 1
allowed_types = ()
security = ClassSecurityInfo()
security.declareProtected(CMFCorePermissions.ManagePortal,
'manage_editProperties',
'manage_changeProperties',
'manage_propertiesForm',
)
property_sheets = ( PropertySheet.Base
, PropertySheet.SimpleItem
, PropertySheet.Folder
, SortIndex
, BaseCachePlugin
)
from AccessControl import ClassSecurityInfo
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.Base import Base
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import PropertySheet
from Products.ERP5Cache.PropertySheet.BaseCachePlugin import BaseCachePlugin
from Products.ERP5Cache.PropertySheet.SQLCachePlugin import SQLCachePlugin
class SQLCachePlugin(XMLObject):
"""
SQLCachePlugin is a Zope (persistent) representation of
the RAM based real SQL cache plugin object.
"""
meta_type = 'ERP5 SQL Cache Plugin'
portal_type = 'SQL Cache Plugin'
isPortalContent = 1
isRADContent = 1
allowed_types = ()
security = ClassSecurityInfo()
security.declareProtected(CMFCorePermissions.ManagePortal,
'manage_editProperties',
'manage_changeProperties',
'manage_propertiesForm',
)
property_sheets = ( PropertySheet.Base
, PropertySheet.SimpleItem
, PropertySheet.Folder
, BaseCachePlugin
, SQLCachePlugin
)
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Project SYSTEM "Project-3.7.dtd">
<!-- Project file for project ERP5Cache -->
<!-- Saved: 2006-10-03, 20:15:31 -->
<!-- Copyright (C) 2006 Ivan Tyagov, ivan.tyagov@brmtec.com -->
<Project version="3.7">
<ProgLanguage mixed="0">Python</ProgLanguage>
<UIType>Qt</UIType>
<Description></Description>
<Version>0.1</Version>
<Author>Ivan Tyagov</Author>
<Email>ivan.tyagov@brmtec.com</Email>
<Sources>
<Source>
<Name>CacheTool.py</Name>
</Source>
<Source>
<Name>__init__.py</Name>
</Source>
<Source>
<Name>interfaces.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>BaseCache.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>RamCache.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>DistributedRamCache.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>__init__.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>DummyCache.py</Name>
</Source>
<Source>
<Dir>CachePlugins</Dir>
<Name>SQLCache.py</Name>
</Source>
<Source>
<Dir>tests</Dir>
<Name>__init__.py</Name>
</Source>
<Source>
<Dir>tests</Dir>
<Name>testCache.py</Name>
</Source>
<Source>
<Name>INSTALL</Name>
</Source>
<Source>
<Dir>Document</Dir>
<Name>__init__.py</Name>
</Source>
<Source>
<Dir>PropertySheet</Dir>
<Name>__init__.py</Name>
</Source>
<Source>
<Dir>PropertySheet</Dir>
<Name>CacheFactory.py</Name>
</Source>
<Source>
<Dir>PropertySheet</Dir>
<Name>BaseCachePlugin.py</Name>
</Source>
<Source>
<Dir>Document</Dir>
<Name>CacheFactory.py</Name>
</Source>
<Source>
<Dir>Document</Dir>
<Name>RamCachePlugin.py</Name>
</Source>
<Source>
<Dir>PropertySheet</Dir>
<Name>DistributedRamCachePlugin.py</Name>
</Source>
<Source>
<Dir>PropertySheet</Dir>
<Name>SQLCachePlugin.py</Name>
</Source>
<Source>
<Dir>Document</Dir>
<Name>DistributedRamCachePlugin.py</Name>
</Source>
<Source>
<Dir>Document</Dir>
<Name>SQLCachePlugin.py</Name>
</Source>
<Source>
<Dir>dtml</Dir>
<Name>cache_tool_configure.dtml</Name>
</Source>
</Sources>
<Forms>
</Forms>
<Translations>
</Translations>
<Interfaces>
</Interfaces>
<Others>
<Other>
<Dir></Dir>
<Dir>home</Dir>
<Dir>ivan</Dir>
<Name>Eric3-doc</Name>
</Other>
</Others>
<Vcs>
<VcsType>Subversion</VcsType>
<VcsOptions>{'status': [''], 'log': [''], 'global': [''], 'update': [''], 'remove': [''], 'add': [''], 'tag': [''], 'export': [''], 'diff': [''], 'commit': [''], 'checkout': [''], 'history': ['']}</VcsOptions>
<VcsOtherData>{'standardLayout': True}</VcsOtherData>
</Vcs>
<Eric3Doc>
<Eric3DocParams>{'outputDirectory': u'/home/ivan/Eric3-doc'}</Eric3DocParams>
</Eric3Doc>
</Project>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Tasks SYSTEM "Tasks-3.7.dtd">
<!-- Tasks file for project ERP5Cache -->
<!-- Saved: 2006-10-05, 17:54:47 -->
<Tasks version="3.7">
<Task priority="1" completed="0">
<Description>TODO: move result file path generation to runBenchmark.</Description>
<Created>2006-09-12, 15:15:10</Created>
<Resource>
<Filename>
<Dir>tests</Dir>
<Name>createCheckPaymentSuite.py</Name>
</Filename>
<Linenumber>8</Linenumber>
</Resource>
</Task>
<Task priority="1" completed="0">
<Description>TODO: This was implemented this way to enforce the use of a given user on</Description>
<Created>2006-09-12, 15:15:10</Created>
<Resource>
<Filename>
<Dir>tests</Dir>
<Name>createCheckPaymentSuite.py</Name>
</Filename>
<Linenumber>49</Linenumber>
</Resource>
</Task>
<Task priority="1" completed="0">
<Description>TODO: DistributedRamCache and SQLCache must be able to read their</Description>
<Created>2006-09-26, 17:10:22</Created>
<Resource>
<Filename>
<Dir>CachePlugins</Dir>
<Name>config.py</Name>
</Filename>
<Linenumber>13</Linenumber>
</Resource>
</Task>
<Task priority="1" completed="0">
<Description>TODO: Based on above data we can have a different invalidation policy</Description>
<Created>2006-09-28, 13:08:57</Created>
<Resource>
<Filename>
<Dir>CachePlugins</Dir>
<Name>BaseCache.py</Name>
</Filename>
<Linenumber>19</Linenumber>
</Resource>
</Task>
<Task priority="1" completed="0">
<Description>TODO: make check not always but each 100 or n calls</Description>
<Created>2006-09-28, 13:08:57</Created>
<Resource>
<Filename>
<Dir>CachePlugins</Dir>
<Name>BaseCache.py</Name>
</Filename>
<Linenumber>64</Linenumber>
</Resource>
</Task>
<Task priority="1" completed="0">
<Description>TODO: check how to avoid problems with memcache whe using one connection to</Description>
<Created>2006-10-05, 16:42:13</Created>
<Resource>
<Filename>
<Dir>CachePlugins</Dir>
<Name>DistributedRamCache.py</Name>
</Filename>
<Linenumber>25</Linenumber>
</Resource>
</Task>
</Tasks>
Requirements
* MemCached server
Note :Please see "here":https://www.nexedi.org/workspaces/members/ivan/portal_cache_project/memcached/view
* MySQL
Note you need a separate database or you can use an existing one. For database connection options please see
'CachePlugins/config.py' --> "CACHE_PLUGINS_MAP['persistent_cache][params]"
Install instructions
* untar ERP5Cache.tar.gz under 'Products/'
* replace 'ERP5Type/Cache.py' with contents of 'mine_Cache.py' like so
> mv Cache.py old_Cache.py
> ln -s mine_Cache.py Cache.py
* Installing cache tool ('portal_caches')
* Go to 'ERP5/Tool' and create symbolic link like so:
> cd ERP5/Tool
> ln -s CacheTool.py ../../ERP5Cache/CacheTool.py
* Go to 'ERP5/' and edit '__init__.py' like follows:
=====================================
[...]
from Tool import <list of all Tools>, CacheTool
[...]
portal_tools = ( <list of all Tools>,
CacheTool.CacheTool,
)
======================================
* Restart Zope and under ZMI add from standard 'ERP5 Tool' --> 'ERP5 Cache Tool'
* go to "http://localhost:9080/erp5/portal_caches/view"
Testing (unittest)
> cd ERp5Cache/tests
> python testCache.py
class BaseCachePlugin:
"""
"""
_properties = (
{'id' : 'cache_expire_check_interval',
'description' : 'Cache expire check interval',
'type' : 'int',
'default' : 360,
'mode' : 'w' ,
},)
class CacheFactory:
"""
"""
_properties = (
{ 'id' : 'cache_duration',
'description' : 'Cache duration',
'type' : 'int',
'default' : 360,
'mode' : 'w' ,
},
)
class DistributedRamCachePlugin:
"""
"""
_properties = (
{'id' : 'server',
'description' : 'Memcached server address( you can specify multiple servers by separating them with ;)',
'type' : 'string',
'default' : '127.0.0.1:11211',
},
)
class SQLCachePlugin:
"""
"""
_properties = (
{'id' : 'cache_table_name',
'description' : 'Cache table name',
'type' : 'string',
'default' : 'cache',
},
)
"""
Cache Documents' property sheets.
"""
""" Cache tool initializion moved to ERP/__init__"""
import sys, Permissions, os
from Globals import package_home
this_module = sys.modules[ __name__ ]
product_path = package_home( globals() )
this_module._dtmldir = os.path.join( product_path, 'dtml' )
from Products.ERP5Type.Utils import initializeProduct, updateGlobals
#import CacheTool
object_classes = ()
portal_tools = () #(CacheTool.CacheTool,)
portal_tools = ()
content_classes = ()
content_constructors = ()
document_classes = updateGlobals( this_module, globals(), permissions_module = Permissions)
def initialize( context ):
import Document
initializeProduct(context, this_module, globals(),
document_module = Document,
document_classes = document_classes,
object_classes = object_classes,
portal_tools = portal_tools,
content_constructors = content_constructors,
content_classes = content_classes)
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<b><br/>
<dtml-var expr="REQUEST.get('portal_status_message', '')">
</b>
<h3>Cache invalidation</h3>
<form action="clearCache" method="POST">
<input type="submit" value="Clear all cache factories"/>
</form>
<dtml-in expr="objectIds('ERP5 Cache Factory')">
<form action="clearCacheFactory" method="POST">
<input type="hidden" name="cache_factory_id" value="<dtml-var sequence-item>">
<input type="submit" value="Clear <dtml-var sequence-item>"/>
</form>
</dtml-in>
<h3>SQLCache configuration</h3>
<p>Create SQL cache table(Note: you need to adjust later each SQLCache plugin to use this cache table name manually. Generally it is a good idea to use default sql cache table name)</p>
<form action="createDBCacheTable" method="POST">
<input name="cache_table_name" value="cache">
<br/>
<input type="submit" value="Create (Recreate) sql cache table"/>
</form>
<dtml-var manage_page_footer>
from Interface import Interface
class ICache(Interface):
""" Cache interace """
def get(self, key, default=None):
""" Get key from cache """
def set(self, key, value, timeout=None):
""" Set key to cache """
def delete(self, key):
""" Delete key from cache """
def has_key(self, key):
""" Returns True if the key is in the cache and has not expired """
def getScopeList(self):
""" get available user scopes """
def getScopeKeyList(self, scope):
""" get keys for cache scope """
def clearCache(self):
""" Clear whole cache """
def clearCacheForScope(self, scope):
""" Clear cache for scope """
import random
import unittest
import time
import base64, md5
from ERP5Cache.CachePlugins.RamCache import RamCache
from ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Cache.CachePlugins.SQLCache import SQLCache
from ERP5Cache.CachePlugins.BaseCache import CacheEntry
class Foo:
my_field = (1,2,3,4,5)
class TestRamCache(unittest.TestCase):
def setUp(self):
self.cache_plugins = (#RamCache(),
DistributedRamCache({'servers': '127.0.0.1:11211',
'debugLevel': 7,}),
#SQLCache( {'server': '',
# 'user': '',
# 'passwd': '',
# 'db': 'test',
# 'cache_table_name': 'cache',
# }),
)
def testScope(self):
""" test scope functions """
## create some sample scopes
iterations = 10
test_scopes = []
for i in range(0, iterations):
test_scopes.append("my_scope_%s" %i)
test_scopes.sort()
## remove DistributedRamCache since it's a flat storage
filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
for cache_plugin in filtered_cache_plugins:
print "TESTING (scope): ", cache_plugin
## clear cache for this plugin
cache_plugin.clearCache()
## should exists no scopes in cache
self.assertEqual([], cache_plugin.getScopeList())
## set some sample values
for scope in test_scopes:
cache_id = '%s_cache_id' %scope
cache_plugin.set(cache_id, scope, scope*10)
## we set ONLY one value per scope -> check if we get the same cache_id
self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
print "\t", cache_id, scope, "\t\tOK"
## get list of scopes which must be the same as test_scopes since we clear cache initially
scopes_from_cache = cache_plugin.getScopeList()
scopes_from_cache.sort()
self.assertEqual(test_scopes, scopes_from_cache)
## remove scope one by one
count = 1
for scope in test_scopes:
cache_plugin.clearCacheForScope(scope)
## .. and check that we should have 1 less cache scope
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual(iterations - count, len(scopes_from_cache))
count = count + 1
## .. we shouldn't have any cache scopes
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual([], scopes_from_cache)
def testSetGet(self):
""" set value to cache and then get it back """
for cache_plugin in self.cache_plugins:
self.generaltestSetGet(cache_plugin, 1000)
## def testExpire(self):
## """ Check expired by setting a key, wit for its timeout and check if in cache"""
## for cache_plugin in self.cache_plugins:
## self.generalExpire(cache_plugin, 2)
## pass
def generalExpire(self, cache_plugin, iterations):
print "TESTING (expire): ", cache_plugin
base_timeout = 1
values = self.prepareValues(iterations)
scope = "peter"
count = 0
for value in values:
count = count +1
cache_timeout = base_timeout + random.random()*2
cache_id = "mycache_id_to_expire_%s" %(count)
print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
## set to cache
cache_plugin.set(cache_id, scope, value, cache_timeout)
## sleep for timeout +1
time.sleep(cache_timeout + 1)
## should remove from cache expired cache entries
cache_plugin.expireOldCacheEntries(forceCheck=True)
## check it, we MUST NOT have this key any more in cache
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def generaltestSetGet(self, cache_plugin, iterations):
print "TESTING (set/get/has/del): ", cache_plugin
values = self.prepareValues(iterations)
cache_duration = 30
scope = "peter"
count = 0
for value in values:
count = count +1
cache_id = "mycache_id_to_set_get_has_del_%s" %(count)
## set to cache
cache_plugin.set(cache_id, scope, value, cache_duration)
print "\t", cache_id,
## check has_key()
self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
## check get()
cache_entry = cache_plugin.get(cache_id, scope)
if isinstance(value, Foo):
## when memcached or sql cached we have a new object created for user
## just compare one field from it
self.assertEqual(value.my_field, cache_entry.getValue().my_field)
else:
## primitive types, direct comparision
self.assertEqual(value, cache_entry.getValue())
## is returned result proper cache entry?
self.assertEqual(True, isinstance(cache_entry, CacheEntry))
## is returned result proper type?
self.assertEqual(type(value), type(cache_entry.getValue()))
## check delete(), key should be removed from there
cache_plugin.delete(cache_id, scope)
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def prepareValues(self, iterations):
""" generate a big list of values """
values = []
my_text = "".join(map(chr, range(50,200))) * 10 ## long string (150*x)
for i in range(0, iterations):
values.append(random.random()*i)
values.append(random.random()*i/1000)
values.append(my_text)
values.append(Foo())
return values
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment