Commit e02d03a6 authored by Jérome Perrin's avatar Jérome Perrin

testCache was not started due to import errors



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@11491 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 2f62eee8
......@@ -29,28 +29,34 @@
import random
import unittest
import time
import base64, md5
from ERP5Type.CachePlugins.RamCache import RamCache
from ERP5Type.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Type.CachePlugins.SQLCache import SQLCache
from ERP5Type.CachePlugins.BaseCache import CacheEntry
import os
from Products.ERP5Type.CachePlugins.RamCache import RamCache
from Products.ERP5Type.CachePlugins.DistributedRamCache import\
DistributedRamCache
from Products.ERP5Type.CachePlugins.SQLCache import SQLCache
from Products.ERP5Type.CachePlugins.BaseCache import CacheEntry
from Products.ERP5Type.Tool.CacheTool import CacheTool
class Foo:
my_field = (1,2,3,4,5)
class TestRamCache(unittest.TestCase):
quiet = 1
def setUp(self):
self.cache_plugins = (RamCache(),
# for SQLCache, get the connection string from runUnitTest.py parameters,
# and use parseDBConnectionString to make it usable by SQLCache
mysql_connection_string = os.environ.get(
'erp5_sql_connection_string', 'test test')
sql_cache_kw = CacheTool().parseDBConnectionString(mysql_connection_string)
sql_cache_kw['cache_table_name'] = 'cache'
self.cache_plugins = (RamCache(),
DistributedRamCache({'servers': '127.0.0.1:11211',
'debugLevel': 7,}),
SQLCache( {'server': '',
'user': '',
'passwd': '',
'db': 'test',
'cache_table_name': 'cache',
}),
SQLCache( sql_cache_kw ),
)
def testScope(self):
......@@ -63,10 +69,12 @@ class TestRamCache(unittest.TestCase):
test_scopes.sort()
## remove DistributedRamCache since it's a flat storage
filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
filtered_cache_plugins = filter(
lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
for cache_plugin in filtered_cache_plugins:
print "TESTING (scope): ", cache_plugin
if not self.quiet:
print "TESTING (scope): ", cache_plugin
## clear cache for this plugin
cache_plugin.clearCache()
......@@ -81,11 +89,12 @@ class TestRamCache(unittest.TestCase):
## we set ONLY one value per scope -> check if we get the same cache_id
self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
print "\t", cache_id, scope, "\t\tOK"
if not self.quiet:
print "\t", cache_id, scope, "\t\tOK"
## get list of scopes which must be the same as test_scopes since we clear cache initially
scopes_from_cache = cache_plugin.getScopeList()
scopes_from_cache.sort()
scopes_from_cache.sort()
self.assertEqual(test_scopes, scopes_from_cache)
## remove scope one by one
......@@ -108,13 +117,15 @@ class TestRamCache(unittest.TestCase):
self.generaltestSetGet(cache_plugin, 100)
def testExpire(self):
""" Check expired by setting a key, wit for its timeout and check if in cache"""
""" Check expired by setting a key, wit for its timeout and check if in
cache"""
for cache_plugin in self.cache_plugins:
self.generalExpire(cache_plugin, 2)
def generalExpire(self, cache_plugin, iterations):
print "TESTING (expire): ", cache_plugin
if not self.quiet:
print "TESTING (expire): ", cache_plugin
base_timeout = 1
values = self.prepareValues(iterations)
scope = "peter"
......@@ -123,7 +134,8 @@ class TestRamCache(unittest.TestCase):
count = count +1
cache_timeout = base_timeout + random.random()*2
cache_id = "mycache_id_to_expire_%s" %(count)
print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
if not self.quiet:
print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
## set to cache
cache_plugin.set(cache_id, scope, value, cache_timeout)
......@@ -136,10 +148,12 @@ class TestRamCache(unittest.TestCase):
## check it, we MUST NOT have this key any more in cache
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
if not self.quiet:
print "\t\tOK"
def generaltestSetGet(self, cache_plugin, iterations):
print "TESTING (set/get/has/del): ", cache_plugin
if not self.quiet:
print "TESTING (set/get/has/del): ", cache_plugin
values = self.prepareValues(iterations)
cache_duration = 30
scope = "peter"
......@@ -150,7 +164,8 @@ class TestRamCache(unittest.TestCase):
## set to cache
cache_plugin.set(cache_id, scope, value, cache_duration)
print "\t", cache_id,
if not self.quiet:
print "\t", cache_id,
## check has_key()
self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
......@@ -175,7 +190,8 @@ class TestRamCache(unittest.TestCase):
cache_plugin.delete(cache_id, scope)
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
if not self.quiet:
print "\t\tOK"
def prepareValues(self, iterations):
""" generate a big list of values """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment