testCache.py 7.02 KB
Newer Older
Ivan Tyagov's avatar
Ivan Tyagov committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
#                     Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

import random
import unittest
import time
import base64, md5
from ERP5Cache.CachePlugins.RamCache import RamCache
from ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Cache.CachePlugins.SQLCache import SQLCache
from ERP5Cache.CachePlugins.BaseCache import CacheEntry


class Foo:
  my_field = (1,2,3,4,5)

class TestRamCache(unittest.TestCase):
    
  def setUp(self):
    self.cache_plugins = (RamCache(), 
                          DistributedRamCache({'servers': '127.0.0.1:11211',
                                                 'debugLevel': 7,}),
                          SQLCache( {'server': '',
                                     'user': '',
                                     'passwd': '',
                                     'db': 'test',
                                     'cache_table_name': 'cache',
                                      }),
                        )

  def testScope(self):
    """ test scope functions """
    ## create some sample scopes
    iterations = 10
    test_scopes = []
    for i in range(0, iterations):
        test_scopes.append("my_scope_%s" %i)
    test_scopes.sort()
    
    ## remove DistributedRamCache since it's a flat storage
    filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
    
    for cache_plugin in filtered_cache_plugins:
      print "TESTING (scope): ", cache_plugin

      ## clear cache for this plugin
      cache_plugin.clearCache()
      
      ## should exists no scopes in cache
      self.assertEqual([], cache_plugin.getScopeList())
      
      ## set some sample values 
      for scope in test_scopes:
        cache_id = '%s_cache_id' %scope
        cache_plugin.set(cache_id, scope, scope*10)
        
        ## we set ONLY one value per scope -> check if we get the same cache_id
        self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
        print "\t", cache_id, scope, "\t\tOK"
      
      ## get list of scopes which must be the same as test_scopes since we clear cache initially
      scopes_from_cache = cache_plugin.getScopeList()
      scopes_from_cache.sort()  
      self.assertEqual(test_scopes, scopes_from_cache)
      
      ## remove scope one by one
      count = 1 
      for scope in test_scopes:
        cache_plugin.clearCacheForScope(scope)
        ## .. and check that  we should have 1 less cache scope 
        scopes_from_cache = cache_plugin.getScopeList()
        self.assertEqual(iterations - count, len(scopes_from_cache))
        count = count + 1
        
      ## .. we shouldn't have any cache scopes 
      scopes_from_cache = cache_plugin.getScopeList()
      self.assertEqual([], scopes_from_cache)

      
  def testSetGet(self):
    """ set value to cache and then get it back """
    for cache_plugin in self.cache_plugins:
      self.generaltestSetGet(cache_plugin, 100)
    
  def testExpire(self):
    """ Check expired by setting a key, wit for its timeout and check if in cache"""
    for cache_plugin in self.cache_plugins:
      self.generalExpire(cache_plugin, 2)

            
  def generalExpire(self, cache_plugin, iterations):
    print "TESTING (expire): ", cache_plugin
    base_timeout = 1
    values = self.prepareValues(iterations)
    scope = "peter"
    count = 0
    for value in values:
      count = count +1
      cache_timeout = base_timeout + random.random()*2
      cache_id = "mycache_id_to_expire_%s" %(count)
      print "\t", cache_id, " ==> timeout (s) = ", cache_timeout, 

      ## set to cache
      cache_plugin.set(cache_id, scope, value, cache_timeout)
        
      ## sleep for timeout +1
      time.sleep(cache_timeout + 1)
        
      ## should remove from cache expired cache entries 
      cache_plugin.expireOldCacheEntries(forceCheck=True)
        
      ##  check it, we MUST NOT have this key any more in cache
      self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
      print "\t\tOK"
     
  def generaltestSetGet(self, cache_plugin, iterations):
    print "TESTING (set/get/has/del): ", cache_plugin
    values = self.prepareValues(iterations)
    cache_duration = 30
    scope = "peter"
    count = 0
    for value in values:
      count = count +1
      cache_id = "mycache_id_to_set_get_has_del_%s" %(count)
        
      ## set to cache
      cache_plugin.set(cache_id, scope, value, cache_duration)
      print "\t", cache_id, 
        
      ## check has_key()
      self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
        
      ## check get()
      cache_entry = cache_plugin.get(cache_id, scope)
      if isinstance(value, Foo):
        ## when memcached or sql cached we have a new object created for user
        ## just compare one field from it
        self.assertEqual(value.my_field, cache_entry.getValue().my_field)
      else:
        ## primitive types, direct comparision
        self.assertEqual(value, cache_entry.getValue())
        
      ## is returned result proper cache entry?
      self.assertEqual(True, isinstance(cache_entry, CacheEntry))
        
      ## is returned result proper type?
      self.assertEqual(type(value), type(cache_entry.getValue()))
        
      ## check delete(), key should be removed from there
      cache_plugin.delete(cache_id, scope)
      self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
        
      print "\t\tOK"
    
  def prepareValues(self, iterations):
    """ generate a big list of values """
    values = []
    my_text = "".join(map(chr, range(50,200))) * 10 ## long string (150*x)
    for i in range(0, iterations):
      values.append(random.random()*i)
      values.append(random.random()*i/1000)
      values.append(my_text)
      values.append(Foo())
    return values

if __name__ == '__main__':
  unittest.main()