Commit 41b86818 authored by Jérome Perrin's avatar Jérome Perrin

core_test: modernize and extend testSessionTool

Stop using Sequence, use more high level assertion methods instead of
self.assert_, define the shared tests in a test case and make tests for each
cache plugins inherit from the test case.

Also add a few tests:
  - storing temp objects in Distributed cache
  - storing "recursive" temp objects (like a sale order with sale order lines)
parent 20f0f6dc
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
import unittest import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
from erp5.component.tool.SessionTool import SESSION_CACHE_FACTORY from erp5.component.tool.SessionTool import SESSION_CACHE_FACTORY
from string import letters as LETTERS from string import letters as LETTERS
from random import choice from random import choice
...@@ -42,10 +41,18 @@ primitives_kw = dict(attr_1 = ['list_item'], \ ...@@ -42,10 +41,18 @@ primitives_kw = dict(attr_1 = ['list_item'], \
attr_5 = {'some_key': 'some_value'}, \ attr_5 = {'some_key': 'some_value'}, \
attr_6 = 'string', ) attr_6 = 'string', )
class TestSessionTool(ERP5TypeTestCase):
class SessionToolTestCase(ERP5TypeTestCase):
session_id = "123456789" session_id = "123456789"
def afterSetUp(self):
super(SessionToolTestCase, self).afterSetUp()
self.setCachePlugin()
def setCachePlugin(self):
raise NotImplementedError()
def _changeCachePlugin(self, portal_type, storage_duration = 86400): def _changeCachePlugin(self, portal_type, storage_duration = 86400):
""" Change current cache plugin with new one. """ """ Change current cache plugin with new one. """
portal_caches = self.portal.portal_caches portal_caches = self.portal.portal_caches
...@@ -60,8 +67,7 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -60,8 +67,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.commit() self.commit()
portal_caches.updateCache() portal_caches.updateCache()
def stepTestSetGet(self, sequence=None, def test_set_get(self):
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
session.clear() session.clear()
session.update(primitives_kw) session.update(primitives_kw)
...@@ -69,15 +75,14 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -69,15 +75,14 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(primitives_kw, session) self.assertEqual(primitives_kw, session)
# API check # API check
self.assert_(self.portal.portal_sessions[self.session_id] == \ self.assertEqual(self.portal.portal_sessions[self.session_id],
self.portal.portal_sessions.getSession(self.session_id)) self.portal.portal_sessions.getSession(self.session_id))
session.clear() session.clear()
session.edit(**primitives_kw) session.edit(**primitives_kw)
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
self.assertEqual(primitives_kw, session) self.assertEqual(primitives_kw, session)
def stepTestAcquisitionRamSessionStorage(self, sequence=None, \ def test_store_temp_object(self):
sequence_list=None, **kw):
portal_sessions = self.portal.portal_sessions portal_sessions = self.portal.portal_sessions
session = portal_sessions.newContent( session = portal_sessions.newContent(
self.session_id, self.session_id,
...@@ -86,13 +91,24 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -86,13 +91,24 @@ class TestSessionTool(ERP5TypeTestCase):
## check temp (RAM based) attributes stored in session ## check temp (RAM based) attributes stored in session
for i in range (1, 3): for i in range (1, 3):
attr_name = 'attr_%s' %i attr_name = 'attr_%s' %i
self.assert_(attr_name in session.keys()) self.assertIn(attr_name, session.keys())
attr = session[attr_name] attr = session[attr_name]
self.assert_(str(i), attr.getId()) self.assertEqual(str(i), attr.getId())
self.assert_(0 == len(attr.objectIds())) self.assertEqual(0, len(attr.objectIds()))
def test_store_recursive_temp_object(self):
doc = self.portal.newContent(
temp_object=True, portal_type='Document', id='doc', title='Doc')
doc.newContent(
temp_object=True, portal_type='Document', id='sub_doc', title='Sub doc')
self.portal.portal_sessions.newContent(self.session_id, doc=doc)
self.commit()
doc = self.portal.portal_sessions[self.session_id]['doc']
self.assertEqual(doc.getTitle(), 'Doc')
self.assertEqual(doc.sub_doc.getTitle(), 'Sub doc')
self.assertEqual(len(doc.contentValues()), 1)
def stepModifySession(self, sequence=None, \ def test_modify_session(self):
sequence_list=None, **kw):
""" Modify session and check that modifications are updated in storage backend.""" """ Modify session and check that modifications are updated in storage backend."""
portal_sessions = self.portal.portal_sessions portal_sessions = self.portal.portal_sessions
session = portal_sessions.newContent(self.session_id, \ session = portal_sessions.newContent(self.session_id, \
...@@ -104,33 +120,33 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -104,33 +120,33 @@ class TestSessionTool(ERP5TypeTestCase):
# get again session object again and check that session value is updated # get again session object again and check that session value is updated
# (this makes sense for memcached) # (this makes sense for memcached)
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(not 'attr_1' in session.keys()) self.assertNotIn('attr_1', session.keys())
self.assert_(not 'attr_2' in session.keys()) self.assertNotIn('attr_2', session.keys())
session.update(**{'key_1':'value_1', session.update(**{'key_1': 'value_1',
'key_2':'value_2',}) 'key_2': 'value_2',})
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_('key_1' in session.keys()) self.assertIn('key_1', session.keys())
self.assert_(session['key_1'] == 'value_1') self.assertEqual(session['key_1'], 'value_1')
self.assert_('key_2' in session.keys()) self.assertIn('key_2', session.keys())
self.assert_(session['key_2'] == 'value_2') self.assertEqual(session['key_2'], 'value_2')
session.clear() session.clear()
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(session == {}) self.assertEqual(session, {})
session['pop_key'] = 'pop_value' session['pop_key'] = 'pop_value'
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(session['pop_key'] == 'pop_value') self.assertEqual(session['pop_key'], 'pop_value')
session.popitem() session.popitem()
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(session == {}) self.assertEqual(session, {})
session.setdefault('default', 'value') session.setdefault('default', 'value')
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(session['default'] == 'value') self.assertEqual(session['default'], 'value')
def stepDeleteClearSession(self, sequence=None, \ def test_clear_session(self, sequence=None, \
sequence_list=None, **kw): sequence_list=None, **kw):
""" Get session object and check keys stored in previous test. """ """ Get session object and check keys stored in previous test. """
portal_sessions = self.portal.portal_sessions portal_sessions = self.portal.portal_sessions
...@@ -139,19 +155,18 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -139,19 +155,18 @@ class TestSessionTool(ERP5TypeTestCase):
# delete it # delete it
portal_sessions.manage_delObjects(self.session_id) portal_sessions.manage_delObjects(self.session_id)
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_({} == session) self.assertEqual(session, {})
# clear it # clear it
session = portal_sessions.newContent( session = portal_sessions.newContent(
self.session_id, \ self.session_id, \
**primitives_kw) **primitives_kw)
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(primitives_kw == session) self.assertEqual(session, primitives_kw)
session.clear() session.clear()
session = portal_sessions[self.session_id] session = portal_sessions[self.session_id]
self.assert_(session == {}) self.assertEqual(session, {})
def stepTestSessionDictInterface(self, sequence=None, \ def test_session_dict_interface(self):
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
session.clear() session.clear()
# get / set # get / set
...@@ -182,9 +197,7 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -182,9 +197,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(('popitem', 'Bar'), session.popitem()) self.assertEqual(('popitem', 'Bar'), session.popitem())
self.assertRaises(KeyError, session.popitem) self.assertRaises(KeyError, session.popitem)
def test_session_getattr(self):
def stepTestSessionGetattr(self, sequence=None, \
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
session.clear() session.clear()
session['foo'] = 'Bar' session['foo'] = 'Bar'
...@@ -192,8 +205,7 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -192,8 +205,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual('Default', getattr(session, 'bar', 'Default')) self.assertEqual('Default', getattr(session, 'bar', 'Default'))
self.assertRaises(AttributeError, getattr, session, 'bar') self.assertRaises(AttributeError, getattr, session, 'bar')
def stepTestSessionBulkStorage(self, sequence=None, \ def test_session_bulk_storage(self):
sequence_list=None, **kw):
""" Test massive session sets which uses different cache plugin. """ """ Test massive session sets which uses different cache plugin. """
kw = {} kw = {}
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
...@@ -216,8 +228,7 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -216,8 +228,7 @@ class TestSessionTool(ERP5TypeTestCase):
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
self.assertEqual(kw, session) self.assertEqual(kw, session)
def stepTestSessionExpire(self, sequence=None, \ def test_session_expire(self):
sequence_list=None, **kw):
""" Test expire session which uses different cache plugin. """ """ Test expire session which uses different cache plugin. """
interval = 3 interval = 3
portal_sessions = self.portal.portal_sessions portal_sessions = self.portal.portal_sessions
...@@ -227,10 +238,9 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -227,10 +238,9 @@ class TestSessionTool(ERP5TypeTestCase):
time.sleep(interval+1) time.sleep(interval+1)
session = self.portal.portal_sessions.getSession(self.session_id) session = self.portal.portal_sessions.getSession(self.session_id)
# session should be an emty dic as it expired # session should be an emty dic as it expired
self.assert_(session == {}) self.assertEqual(session, {})
def stepTestCheckSessionAfterNewPerson(self, sequence=None, \ def test_check_session_after_new_person(self):
sequence_list=None, **kw):
""" Test if session still the same after create new person setting the """ Test if session still the same after create new person setting the
reference. """ reference. """
session = self.portal.portal_sessions[self.session_id] session = self.portal.portal_sessions[self.session_id]
...@@ -252,44 +262,19 @@ class TestSessionTool(ERP5TypeTestCase): ...@@ -252,44 +262,19 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(session.get('key'), 'value') self.assertEqual(session.get('key'), 'value')
self.abort() self.abort()
def test_01_CheckSessionTool(self):
""" Checks session tool is present """ class TestRAMCacheSessionTool(SessionToolTestCase):
self.assertNotEqual(None, getattr(self.portal, 'portal_sessions', None)) def setCachePlugin(self):
self._changeCachePlugin('Ram Cache')
def test_02_RamSession(self):
""" Test RamSession which uses local RAM based cache plugin. """
sequence_list = SequenceList() class TestDistributedCacheSessionTool(SessionToolTestCase):
sequence_string = 'stepTestSetGet \ def setCachePlugin(self):
stepTestAcquisitionRamSessionStorage \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
stepTestCheckSessionAfterNewPerson \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_03_MemcachedDistributedSession(self):
""" Test DistributedSession which uses memcached based cache plugin. """
# create memcached plugin and test
self._changeCachePlugin('Distributed Ram Cache') self._changeCachePlugin('Distributed Ram Cache')
sequence_list = SequenceList()
sequence_string = 'stepTestSetGet \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
stepTestCheckSessionAfterNewPerson \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestSessionTool)) suite.addTest(unittest.makeSuite(TestRAMCacheSessionTool))
suite.addTest(unittest.makeSuite(TestDistributedCacheSessionTool))
return suite return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment