1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import random
import unittest
import time
import base64, md5
from ERP5Cache.CachePlugins.RamCache import RamCache
from ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Cache.CachePlugins.SQLCache import SQLCache
from ERP5Cache.CachePlugins.BaseCache import CacheEntry
class Foo:
my_field = (1,2,3,4,5)
class TestRamCache(unittest.TestCase):
def setUp(self):
self.cache_plugins = (RamCache(),
DistributedRamCache({'servers': '127.0.0.1:11211',
'debugLevel': 7,}),
SQLCache( {'server': '',
'user': '',
'passwd': '',
'db': 'test',
'cache_table_name': 'cache',
}),
)
def testScope(self):
""" test scope functions """
## create some sample scopes
iterations = 10
test_scopes = []
for i in range(0, iterations):
test_scopes.append("my_scope_%s" %i)
test_scopes.sort()
## remove DistributedRamCache since it's a flat storage
filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
for cache_plugin in filtered_cache_plugins:
print "TESTING (scope): ", cache_plugin
## clear cache for this plugin
cache_plugin.clearCache()
## should exists no scopes in cache
self.assertEqual([], cache_plugin.getScopeList())
## set some sample values
for scope in test_scopes:
cache_id = '%s_cache_id' %scope
cache_plugin.set(cache_id, scope, scope*10)
## we set ONLY one value per scope -> check if we get the same cache_id
self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
print "\t", cache_id, scope, "\t\tOK"
## get list of scopes which must be the same as test_scopes since we clear cache initially
scopes_from_cache = cache_plugin.getScopeList()
scopes_from_cache.sort()
self.assertEqual(test_scopes, scopes_from_cache)
## remove scope one by one
count = 1
for scope in test_scopes:
cache_plugin.clearCacheForScope(scope)
## .. and check that we should have 1 less cache scope
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual(iterations - count, len(scopes_from_cache))
count = count + 1
## .. we shouldn't have any cache scopes
scopes_from_cache = cache_plugin.getScopeList()
self.assertEqual([], scopes_from_cache)
def testSetGet(self):
""" set value to cache and then get it back """
for cache_plugin in self.cache_plugins:
self.generaltestSetGet(cache_plugin, 100)
def testExpire(self):
""" Check expired by setting a key, wit for its timeout and check if in cache"""
for cache_plugin in self.cache_plugins:
self.generalExpire(cache_plugin, 2)
def generalExpire(self, cache_plugin, iterations):
print "TESTING (expire): ", cache_plugin
base_timeout = 1
values = self.prepareValues(iterations)
scope = "peter"
count = 0
for value in values:
count = count +1
cache_timeout = base_timeout + random.random()*2
cache_id = "mycache_id_to_expire_%s" %(count)
print "\t", cache_id, " ==> timeout (s) = ", cache_timeout,
## set to cache
cache_plugin.set(cache_id, scope, value, cache_timeout)
## sleep for timeout +1
time.sleep(cache_timeout + 1)
## should remove from cache expired cache entries
cache_plugin.expireOldCacheEntries(forceCheck=True)
## check it, we MUST NOT have this key any more in cache
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def generaltestSetGet(self, cache_plugin, iterations):
print "TESTING (set/get/has/del): ", cache_plugin
values = self.prepareValues(iterations)
cache_duration = 30
scope = "peter"
count = 0
for value in values:
count = count +1
cache_id = "mycache_id_to_set_get_has_del_%s" %(count)
## set to cache
cache_plugin.set(cache_id, scope, value, cache_duration)
print "\t", cache_id,
## check has_key()
self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
## check get()
cache_entry = cache_plugin.get(cache_id, scope)
if isinstance(value, Foo):
## when memcached or sql cached we have a new object created for user
## just compare one field from it
self.assertEqual(value.my_field, cache_entry.getValue().my_field)
else:
## primitive types, direct comparision
self.assertEqual(value, cache_entry.getValue())
## is returned result proper cache entry?
self.assertEqual(True, isinstance(cache_entry, CacheEntry))
## is returned result proper type?
self.assertEqual(type(value), type(cache_entry.getValue()))
## check delete(), key should be removed from there
cache_plugin.delete(cache_id, scope)
self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
print "\t\tOK"
def prepareValues(self, iterations):
""" generate a big list of values """
values = []
my_text = "".join(map(chr, range(50,200))) * 10 ## long string (150*x)
for i in range(0, iterations):
values.append(random.random()*i)
values.append(random.random()*i/1000)
values.append(my_text)
values.append(Foo())
return values
if __name__ == '__main__':
unittest.main()