Commit 49b94b4b authored by Ivan Tyagov's avatar Ivan Tyagov

Added Cache Tool

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@11028 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 9690dc60
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
""" Cache Tool module for ERP5 """
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Globals import InitializeClass, DTMLFile, PersistentMapping
from Products.ERP5 import _dtmldir
from Products.ERP5Type.Cache import CachingMethod, CacheFactory
from Products.ERP5Type.CachePlugins.RamCache import RamCache
from Products.ERP5Type.CachePlugins.DistributedRamCache import DistributedRamCache
from Products.ERP5Type.CachePlugins.SQLCache import SQLCache
class CacheTool(BaseTool):
""" Caches tool wrapper for ERP5 """
id = "portal_caches"
meta_type = "ERP5 Cache Tool"
portal_type = "Cache Tool"
security = ClassSecurityInfo()
manage_options = ({'label': 'Configure',
'action': 'cache_tool_configure',
},) + BaseTool.manage_options
security.declareProtected( Permissions.ManagePortal, 'cache_tool_configure')
cache_tool_configure = DTMLFile('cache_tool_configure', _dtmldir)
def __init__(self):
BaseTool.__init__(self)
security.declareProtected(Permissions.AccessContentsInformation, 'getCacheFactoryList')
def getCacheFactoryList(self):
""" Return available cache factories """
rd ={}
for cf in self.objectValues('ERP5 Cache Factory'):
cache_scope = cf.getId()
rd[cache_scope] = {}
rd[cache_scope]['cache_plugins'] = []
rd[cache_scope]['cache_params'] = {}
for cp in cf.getCachePluginList():
cp_meta_type = cp.meta_type
if cp_meta_type == 'ERP5 Ram Cache Plugin':
cache_obj = RamCache()
elif cp_meta_type == 'ERP5 Distributed Ram Cache Plugin':
cache_obj = DistributedRamCache({'server':cp.getServer()})
elif cp_meta_type == 'ERP5 SQL Cache Plugin':
## use connection details from 'erp5_sql_transactionless_connection' ZMySLQDA object
connection_string = self.erp5_sql_transactionless_connection.connection_string
kw = self.parseDBConnectionString(connection_string)
kw['cache_table_name'] = cp.getCacheTableName()
cache_obj = SQLCache(kw)
## set cache expire check interval
cache_obj.cache_expire_check_interval = cp.getCacheExpireCheckInterval()
rd[cache_scope]['cache_plugins'].append(cache_obj)
rd[cache_scope]['cache_params']['cache_duration'] = cf.getCacheDuration()
return rd
##
## DB structure
##
security.declareProtected(Permissions.ModifyPortalContent, 'createDBCacheTable')
def createDBCacheTable(self, cache_table_name="cache", REQUEST=None):
""" create in MySQL DB cache table """
my_query = SQLCache.create_table_sql %cache_table_name
try:
self.erp5_sql_transactionless_connection.manage_test("DROP TABLE %s" %cache_table_name)
except:
pass
self.erp5_sql_transactionless_connection.manage_test(my_query)
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache table successfully created.')
security.declareProtected(Permissions.AccessContentsInformation, 'parseDBConnectionString')
def parseDBConnectionString(self, connection_string):
""" Parse given connection string. Code "borrowed" from ZMySLQDA.db """
kwargs = {}
items = connection_string.split()
if not items:
return kwargs
lockreq, items = items[0], items[1:]
if lockreq[0] == "*":
db_host, items = items[0], items[1:]
else:
db_host = lockreq
if '@' in db_host:
db, host = split(db_host,'@',1)
kwargs['db'] = db
if ':' in host:
host, port = split(host,':',1)
kwargs['port'] = int(port)
kwargs['host'] = host
else:
kwargs['db'] = db_host
if kwargs['db'] and kwargs['db'][0] in ('+', '-'):
kwargs['db'] = kwargs['db'][1:]
if not kwargs['db']:
del kwargs['db']
if not items:
return kwargs
kwargs['user'], items = items[0], items[1:]
if not items:
return kwargs
kwargs['passwd'], items = items[0], items[1:]
if not items:
return kwargs
kwargs['unix_socket'], items = items[0], items[1:]
return kwargs
##
## RAM cache structure
##
security.declareProtected(Permissions.AccessContentsInformation, 'getRamCacheRoot')
def getRamCacheRoot(self):
""" Return RAM based cache root """
return CachingMethod.factories
security.declareProtected(Permissions.ModifyPortalContent, 'updateCache')
def updateCache(self, REQUEST=None):
""" Clear and update cache structure """
#erp5_site_id = self.getPortalObject().getId()
for cf in CachingMethod.factories:
for cp in CachingMethod.factories[cf].getCachePluginList():
del cp
CachingMethod.factories = {}
## read configuration from ZODB
for key,item in self.getCacheFactoryList().items():
if len(item['cache_plugins'])!=0:
CachingMethod.factories[key] = CacheFactory(item['cache_plugins'], item['cache_params'])
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache updated.')
security.declareProtected(Permissions.ModifyPortalContent, 'clearCache')
def clearCache(self, REQUEST=None):
""" Clear whole cache structure """
ram_cache_root = self.getRamCacheRoot()
for cf in ram_cache_root:
for cp in ram_cache_root[cf].getCachePluginList():
cp.clearCache()
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache cleared.')
security.declareProtected(Permissions.ModifyPortalContent, 'clearCacheFactory')
def clearCacheFactory(self, cache_factory_id, REQUEST=None):
""" Clear only cache factory. """
ram_cache_root = self.getRamCacheRoot()
if ram_cache_root.has_key(cache_factory_id):
ram_cache_root[cache_factory_id].clearCache()
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache factory %s cleared.' %cache_factory_id)
security.declareProtected(Permissions.ModifyPortalContent, 'clearCacheFactoryScope')
def clearCacheFactoryScope(self, cache_factory_id, scope, REQUEST=None):
""" Clear only cache factory. """
ram_cache_root = self.getRamCacheRoot()
if ram_cache_root.has_key(cache_factory_id):
ram_cache_root[cache_factory_id].clearCacheForScope(scope)
if REQUEST:
self.REQUEST.RESPONSE.redirect('cache_tool_configure?portal_status_message=Cache factory scope %s cleared.' %cache_factory_id)
...@@ -46,7 +46,7 @@ product_path = package_home( globals() ) ...@@ -46,7 +46,7 @@ product_path = package_home( globals() )
# Define object classes and tools # Define object classes and tools
from Tool import CategoryTool, SimulationTool, RuleTool, IdTool, TemplateTool,\ from Tool import CategoryTool, SimulationTool, RuleTool, IdTool, TemplateTool,\
TestTool, DomainTool, AlarmTool, OrderTool, DeliveryTool,\ TestTool, DomainTool, AlarmTool, OrderTool, DeliveryTool,\
TrashTool TrashTool, CacheTool
import ERP5Site import ERP5Site
object_classes = ( ERP5Site.ERP5Site, object_classes = ( ERP5Site.ERP5Site,
) )
...@@ -61,6 +61,7 @@ portal_tools = ( CategoryTool.CategoryTool, ...@@ -61,6 +61,7 @@ portal_tools = ( CategoryTool.CategoryTool,
OrderTool.OrderTool, OrderTool.OrderTool,
DeliveryTool.DeliveryTool, DeliveryTool.DeliveryTool,
TrashTool.TrashTool, TrashTool.TrashTool,
CacheTool.CacheTool,
) )
content_classes = () content_classes = ()
content_constructors = () content_constructors = ()
......
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<b><br/>
<dtml-var expr="REQUEST.get('portal_status_message', '')">
</b>
<h3>Cache invalidation</h3>
<form action="clearCache" method="POST">
<input type="submit" value="Clear all cache factories"/>
</form>
<dtml-in expr="objectIds('ERP5 Cache Factory')">
<form action="clearCacheFactory" method="POST">
<input type="hidden" name="cache_factory_id" value="<dtml-var sequence-item>">
<input type="submit" value="Clear <dtml-var sequence-item>"/>
</form>
</dtml-in>
<h3>SQLCache configuration</h3>
<p>Create SQL cache table(Note: you need to adjust later each SQLCache plugin to use this cache table name manually. Generally it is a good idea to use default sql cache table name)</p>
<form action="createDBCacheTable" method="POST">
<input name="cache_table_name" value="cache">
<br/>
<input type="submit" value="Create (Recreate) sql cache table"/>
</form>
<dtml-var manage_page_footer>
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import random
import unittest
import time
import base64, md5
from ERP5Cache.CachePlugins.RamCache import RamCache
from ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Cache.CachePlugins.SQLCache import SQLCache
from ERP5Cache.CachePlugins.BaseCache import CacheEntry
class Foo:
my_field = (1,2,3,4,5)
class TestRamCache(unittest.TestCase):
def setUp(self):
self.cache_plugins = (RamCache(),
DistributedRamCache({'servers': '127.0.0.1:11211',
'debugLevel': 7,}),
SQLCache( {'server': '',
'user': '',
'passwd': '',
'db': 'test',
'cache_table_name': 'cache',
}),
)
def testScope(self):
""" test scope functions """
## create some sample scopes
iterations = 10
test_scopes = []
for i in range(0, iterations):
test_scopes.append("my_scope_%s" %i)
test_scopes.sort()
## remove DistributedRamCache since it's a flat storage
filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
for cache_plugin in filtered_cache_plugins:
print "TESTING (scope): ", cache_plugin
## clear cache for this plugin
cache_plugin.clearCache()
## should exists no scopes in cache
self.assertEqual([], cache_plugin.getScopeList())
## set some sample values
for scope in test_scopes:
cache_id = '%s_cache_id' %scope
cache_plugin.set(cache_id, scope, scope*10)
## we set ONLY one value per scope -> check if we get the same cache_id
self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
print "\t", cache_id, scope, "\t\tOK"
## get list of scopes which must be the same as test_scopes since we clear cache initially
scopes_from_cache = cache_plugin.getScopeList()
scopes_from_cache.sort()
self.assertEqual(test_scopes, scopes_from_cache)
## remove scope one by one
count = 1
for scope in test_scopes:
cache_plugin.clearCacheForScope(scope)
## .. and check that we should have 1 less cache scope
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual(iterations - count, len(scopes_from_cache))
count = count + 1
## .. we shouldn't have any cache scopes
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual([], scopes_from_cache)
def testSetGet(self):
""" set value to cache and then get it back """
for cache_plugin in self.cache_plugins:
self.generaltestSetGet(cache_plugin, 100)
def testExpire(self):
""" Check expired by setting a key, wit for its timeout and check if in cache"""
for cache_plugin in self.cache_plugins:
self.generalExpire(cache_plugin, 2)
def generalExpire(self, cache_plugin, iterations):
print "TESTING (expire): ", cache_plugin
base_timeout = 1
values = self.prepareValues(iterations)
scope = "peter"
count = 0
for value in values:
count = count +1
cache_timeout = base_timeout + random.random()*2
cache_id = "mycache_id_to_expire_%s" %(count)
print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
## set to cache
cache_plugin.set(cache_id, scope, value, cache_timeout)
## sleep for timeout +1
time.sleep(cache_timeout + 1)
## should remove from cache expired cache entries
cache_plugin.expireOldCacheEntries(forceCheck=True)
## check it, we MUST NOT have this key any more in cache
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def generaltestSetGet(self, cache_plugin, iterations):
print "TESTING (set/get/has/del): ", cache_plugin
values = self.prepareValues(iterations)
cache_duration = 30
scope = "peter"
count = 0
for value in values:
count = count +1
cache_id = "mycache_id_to_set_get_has_del_%s" %(count)
## set to cache
cache_plugin.set(cache_id, scope, value, cache_duration)
print "\t", cache_id,
## check has_key()
self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
## check get()
cache_entry = cache_plugin.get(cache_id, scope)
if isinstance(value, Foo):
## when memcached or sql cached we have a new object created for user
## just compare one field from it
self.assertEqual(value.my_field, cache_entry.getValue().my_field)
else:
## primitive types, direct comparision
self.assertEqual(value, cache_entry.getValue())
## is returned result proper cache entry?
self.assertEqual(True, isinstance(cache_entry, CacheEntry))
## is returned result proper type?
self.assertEqual(type(value), type(cache_entry.getValue()))
## check delete(), key should be removed from there
cache_plugin.delete(cache_id, scope)
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def prepareValues(self, iterations):
""" generate a big list of values """
values = []
my_text = "".join(map(chr, range(50,200))) * 10 ## long string (150*x)
for i in range(0, iterations):
values.append(random.random()*i)
values.append(random.random()*i/1000)
values.append(my_text)
values.append(Foo())
return values
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from random import randint
from pprint import pprint
import os, sys
if __name__ == '__main__':
execfile(os.path.join(sys.path[0], 'framework.py'))
# Needed in order to have a log file inside the current folder
os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
os.environ['EVENT_LOG_SEVERITY'] = '-300'
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.Document.Organisation import Organisation
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
from DateTime import DateTime
from Acquisition import aq_base, aq_inner
from zLOG import LOG
import time
try:
from transaction import get as get_transaction
except ImportError:
pass
class TestCacheTool(ERP5TypeTestCase):
run_all_test = 1
def afterSetUp(self):
self.login()
def login(self, quiet=0, run=run_all_test):
uf = self.getPortal().acl_users
uf._doAddUser('seb', '', ['Manager'], [])
uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user)
def test_01_CreateCacheTool(self, quiet=0, run=run_all_test):
if not run:
return
if not quiet:
message = '\nCreate CacheTool '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
addTool = portal.manage_addProduct['ERP5'].manage_addTool
addTool("ERP5 Cache Tool", None)
get_transaction().commit()
def test_02_CreatePortalTypes(self, quiet=0, run=run_all_test):
if not run:
return
if not quiet:
message = '\nCreate Portal Types'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
portal_types = portal.portal_types
typeinfo_names = ("ERP5Type: Cache Factory (ERP5 Cache Factory)",
"ERP5Type: Ram Cache Plugin (ERP5 Ram Cache Plugin)",
"ERP5Type: Distributed Ram Cache Plugin (ERP5 Distributed Ram Cache Plugin)",
"ERP5Type: SQL Cache Plugin (ERP5 SQL Cache Plugin)",
)
for typeinfo_name in typeinfo_names:
portal_types.manage_addTypeInformation(add_meta_type = "ERP5 Type Information",
id = "",
typeinfo_name = typeinfo_name)
get_transaction().commit()
def test_03_CreateCacheFactories(self, quiet=0, run=run_all_test):
if not run:
return
if not quiet:
message = '\nCreate Cache Tool Factories'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
portal_caches = portal.portal_caches
## Cache plugins are organised into 'Cache factories' so we create factories first
## ram_cache_factory (to test Ram Cache Plugin)
ram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'ram_cache_factory',
container=portal_caches)
ram_cache_plugin = ram_cache_factory.newContent(portal_type="Ram Cache Plugin", container=ram_cache_factory)
ram_cache_plugin.setIntIndex(0)
## distributed_ram_cache_factory (to test Distributed Ram Cache Plugin)
dram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'distributed_ram_cache_factory',
container=portal_caches)
dram_cache_plugin = dram_cache_factory.newContent(portal_type="Distributed Ram Cache Plugin", container=dram_cache_factory)
dram_cache_plugin.setIntIndex(0)
## sql_cache_factory (to test SQL Cache Plugin)
sql_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'sql_cache_factory',
container=portal_caches)
sql_cache_plugin = sql_cache_factory.newContent(portal_type="SQL Cache Plugin", container=sql_cache_factory)
sql_cache_plugin.setIntIndex(0)
## erp5_user_factory (to test a combination of all cache plugins)
erp5_user_factory = portal_caches.newContent(portal_type="Cache Factory",
id = "erp5_user_factory",
container=portal_caches)
ram_cache_plugin = erp5_user_factory.newContent(portal_type="Ram Cache Plugin", container=erp5_user_factory)
ram_cache_plugin.setIntIndex(0)
dram_cache_plugin = erp5_user_factory.newContent(portal_type="Distributed Ram Cache Plugin", container=erp5_user_factory)
dram_cache_plugin.setIntIndex(1)
sql_cache_plugin = erp5_user_factory.newContent(portal_type="SQL Cache Plugin", container=erp5_user_factory)
sql_cache_plugin.setIntIndex(2)
##
get_transaction().commit()
## update Ram Cache structure
portal_caches.updateCache()
from Products.ERP5Type.Cache import CachingMethod
## do we have cache enabled for this site?
erp5_site_id = portal.getId()
self.assert_(CachingMethod.factories.has_key(erp5_site_id))
## do we have the same structure we created above?
self.assert_('ram_cache_factory' in CachingMethod.factories[erp5_site_id])
self.assert_('distributed_ram_cache_factory' in CachingMethod.factories[erp5_site_id])
self.assert_('sql_cache_factory' in CachingMethod.factories[erp5_site_id])
self.assert_('erp5_user_factory' in CachingMethod.factories[erp5_site_id])
def test_04_CreateCachedMethod(self, quiet=0, run=run_all_test):
if not run:
return
if not quiet:
message = '\nCreate Cache Method (Python Script)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
## add test cached method
py_script_id = "testCachedMethod"
py_script_params = "value=10000, portal_path=('','erp5')"
py_script_body = """
def veryExpensiveMethod(value):
## do something expensive for some time
## no 'time.sleep()' available in Zope
## so concatenate strings
s = ""
for i in range(0, value):
s = str(value*value*value) + s
return value
result = veryExpensiveMethod(value)
return result
"""
portal.manage_addProduct['PythonScripts'].manage_addPythonScript(id=py_script_id)
py_script_obj = getattr(portal, py_script_id)
py_script_obj.ZPythonScript_edit(py_script_params, py_script_body)
get_transaction().commit()
def test_05_CacheFactoryOnePlugin(self, quiet=0, run=run_all_test):
""" Test cache factory containing only one cache plugin. """
if not run:
return
if not quiet:
message = '\nTest each type of cache plugin individually.'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
from Products.ERP5Type.Cache import CachingMethod
py_script_id = "testCachedMethod"
py_script_obj = getattr(portal, py_script_id)
for cf_name in ('ram_cache_factory', 'distributed_ram_cache_factory', 'sql_cache_factory'):
my_cache = CachingMethod(py_script_obj, 'py_script_obj', cache_factory=cf_name)
self._cacheFactoryInstanceTest(my_cache, cf_name)
print "OK."
def test_06_CacheFactoryMultiPlugins(self, quiet=0, run=run_all_test):
""" Test a cache factory containing multiple cache plugins. """
if not run:
return
if not quiet:
message = '\nTest combination of available cache plugins under a cache factory'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
from Products.ERP5Type.Cache import CachingMethod
py_script_id = "testCachedMethod"
py_script_obj = getattr(portal, py_script_id)
cf_name = 'erp5_user_factory'
my_cache = CachingMethod(py_script_obj, 'py_script_obj', cache_factory=cf_name)
self._cacheFactoryInstanceTest(my_cache, cf_name)
print "OK."
def _cacheFactoryInstanceTest(self, my_cache, cf_name):
portal = self.getPortal()
print
print "="*40
print "TESTING:", cf_name
portal.portal_caches.clearCacheFactory(cf_name)
## 1st call
start = time.time()
original = my_cache(20000, portal_path=('', portal.getId()))
end = time.time()
calculation_time = end-start
print "\n\tCalculation time (1st call)", calculation_time
## 2nd call - should be cached now
start = time.time()
cached = my_cache(20000, portal_path=('', portal.getId()))
end = time.time()
calculation_time = end-start
print "\n\tCalculation time (2nd call)", calculation_time
## check if cache works by getting calculation_time for last cache operation
## even remote cache must have access time less than a second :-)
## if it's greater than method wasn't previously cached and was calculated instead
self.assert_(1.0 > calculation_time)
## check if equal.
self.assertEquals(original, cached)
## OK so far let's clear cache
portal.portal_caches.clearCacheFactory(cf_name)
## 1st call
start = time.time()
original = my_cache(20000, portal_path=('', portal.getId()))
end = time.time()
calculation_time = end-start
print "\n\tCalculation time (after cache clear)", calculation_time
## Cache cleared shouldn't be previously cached
self.assert_(1.0 < calculation_time)
if __name__ == '__main__':
framework()
else:
import unittest
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCacheTool))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment