testCache.py 7.02 KB
Newer Older
Ivan Tyagov's avatar
Ivan Tyagov committed

##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
#                     Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

import random
import unittest
import time
import base64, md5
from ERP5Cache.CachePlugins.RamCache import RamCache
from ERP5Cache.CachePlugins.DistributedRamCache import DistributedRamCache
from ERP5Cache.CachePlugins.SQLCache import SQLCache
from ERP5Cache.CachePlugins.BaseCache import CacheEntry


class Foo:
  my_field = (1,2,3,4,5)

class TestRamCache(unittest.TestCase):
    
  def setUp(self):
    self.cache_plugins = (RamCache(), 
                          DistributedRamCache({'servers': '127.0.0.1:11211',
                                                 'debugLevel': 7,}),
                          SQLCache( {'server': '',
                                     'user': '',
                                     'passwd': '',
                                     'db': 'test',
                                     'cache_table_name': 'cache',
                                      }),
                        )

  def testScope(self):
    """ test scope functions """
    ## create some sample scopes
    iterations = 10
    test_scopes = []
    for i in range(0, iterations):
        test_scopes.append("my_scope_%s" %i)
    test_scopes.sort()
    
    ## remove DistributedRamCache since it's a flat storage
    filtered_cache_plugins = filter(lambda x: not isinstance(x, DistributedRamCache), self.cache_plugins)
    
    for cache_plugin in filtered_cache_plugins:
      print "TESTING (scope): ", cache_plugin

      ## clear cache for this plugin
      cache_plugin.clearCache()
      
      ## should exists no scopes in cache
      self.assertEqual([], cache_plugin.getScopeList())
      
      ## set some sample values 
      for scope in test_scopes:
        cache_id = '%s_cache_id' %scope
        cache_plugin.set(cache_id, scope, scope*10)
        
        ## we set ONLY one value per scope -> check if we get the same cache_id
        self.assertEqual([cache_id], cache_plugin.getScopeKeyList(scope))
        print "\t", cache_id, scope, "\t\tOK"
      
      ## get list of scopes which must be the same as test_scopes since we clear cache initially
      scopes_from_cache = cache_plugin.getScopeList()
      scopes_from_cache.sort()  
      self.assertEqual(test_scopes, scopes_from_cache)
      
      ## remove scope one by one
      count = 1 
      for scope in test_scopes:
        cache_plugin.clearCacheForScope(scope)
        ## .. and check that  we should have 1 less cache scope 
        scopes_from_cache = cache_plugin.getScopeList()
        self.assertEqual(iterations - count, len(scopes_from_cache))
        count = count + 1
        
      ## .. we shouldn't have any cache scopes 
      scopes_from_cache = cache_plugin.getScopeList()
      self.assertEqual([], scopes_from_cache)

      
  def testSetGet(self):
    """ set value to cache and then get it back """
    for cache_plugin in self.cache_plugins:
      self.generaltestSetGet(cache_plugin, 100)
    
  def testExpire(self):
    """ Check expired by setting a key, wit for its timeout and check if in cache"""
    for cache_plugin in self.cache_plugins:
      self.generalExpire(cache_plugin, 2)

            
  def generalExpire(self, cache_plugin, iterations):
    print "TESTING (expire): ", cache_plugin
    base_timeout = 1
    values = self.prepareValues(iterations)
    scope = "peter"
    count = 0
    for value in values:
      count = count +1
      cache_timeout = base_timeout + random.random()*2
      cache_id = "mycache_id_to_expire_%s" %(count)
      print "\t", cache_id, " ==> timeout (s) = ", cache_timeout, 

      ## set to cache
      cache_plugin.set(cache_id, scope, value, cache_timeout)
        
      ## sleep for timeout +1
      time.sleep(cache_timeout + 1)
        
      ## should remove from cache expired cache entries 
      cache_plugin.expireOldCacheEntries(forceCheck=True)
        
      ##  check it, we MUST NOT have this key any more in cache
      self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
      print "\t\tOK"
     
  def generaltestSetGet(self, cache_plugin, iterations):
    print "TESTING (set/get/has/del): ", cache_plugin
    values = self.prepareValues(iterations)
    cache_duration = 30
    scope = "peter"
    count = 0
    for value in values:
      count = count +1
      cache_id = "mycache_id_to_set_get_has_del_%s" %(count)
        
      ## set to cache
      cache_plugin.set(cache_id, scope, value, cache_duration)
      print "\t", cache_id, 
        
      ## check has_key()
      self.assertEqual(True, cache_plugin.has_key(cache_id, scope))
        
      ## check get()
      cache_entry = cache_plugin.get(cache_id, scope)
      if isinstance(value, Foo):
        ## when memcached or sql cached we have a new object created for user
        ## just compare one field from it
        self.assertEqual(value.my_field, cache_entry.getValue().my_field)
      else:
        ## primitive types, direct comparision
        self.assertEqual(value, cache_entry.getValue())
        
      ## is returned result proper cache entry?
      self.assertEqual(True, isinstance(cache_entry, CacheEntry))
        
      ## is returned result proper type?
      self.assertEqual(type(value), type(cache_entry.getValue()))
        
      ## check delete(), key should be removed from there
      cache_plugin.delete(cache_id, scope)
      self.assertEqual(False, cache_plugin.has_key(cache_id, scope))
        
      print "\t\tOK"
    
  def prepareValues(self, iterations):
    """ generate a big list of values """
    values = []
    my_text = "".join(map(chr, range(50,200))) * 10 ## long string (150*x)
    for i in range(0, iterations):
      values.append(random.random()*i)
      values.append(random.random()*i/1000)
      values.append(my_text)
      values.append(Foo())
    return values

if __name__ == '__main__':
  unittest.main()