Commit 4accffef authored by Nicolas Wavrant's avatar Nicolas Wavrant

Add StoredSequence to run tests from an initial stored sequence

See merge request nexedi/erp5!1385
parents 1b706b6c 167beb9e
Pipeline #14817 failed with stage
##############################################################################
#
# Copyright (c) 2021 Nexedi SARL and Contributors. All Rights Reserved.
# Nicolas Wavrant <nicolas.wavrant@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList, StoredSequence
class TestStoredSequence(ERP5TypeTestCase):
def afterSetUp(self):
self.portal = self.getPortalObject()
self.portal.person_module.manage_delObjects(
ids=list(self.portal.person_module.objectIds())
)
for trashbin_value in self.portal.portal_trash.objectValues():
if trashbin_value.getId().startswith(self.__class__.__name__):
self.portal.portal_trash.manage_delObjects(ids=[trashbin_value.getId()])
self.tic()
registerSequenceString = ERP5TypeTestCase.registerSequenceString
def _getCleanupDict(self):
return {
"person_module": list(self.portal.person_module.objectIds()),
}
def stepLogin(self, sequence):
self.login()
def stepCreatePerson(self, sequence):
sequence['person'] = self.portal.person_module.newContent(
id="person",
title="Person",
)
def stepUpdatePerson1(self, sequence):
sequence['person'].setTitle(sequence['person'].getTitle() + " 1")
def stepUpdatePerson2(self, sequence):
sequence['person'].setTitle(sequence['person'].getTitle() + " 2")
def stepFillSequenceDict(self, sequence):
sequence["string"] = "a string"
sequence["int"] = 10
sequence["float"] = 3.14
sequence["erp5_document"] = self.portal.person_module.newContent(
portal_type="Person",
id="erp5_document_0",
)
sequence["list_of_int"] = [1, 2]
sequence["list_of_erp5_document"] = [
self.portal.person_module.newContent(
portal_type="Person",
id="erp5_document_%d" % i,
) for i in range(1, 3)
]
def test_storedSequenceCanRestoreAState(self):
sequence_id = "sequence_can_restore"
self.registerSequenceString(sequence_id, """
stepCreatePerson
""")
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepUpdatePerson1")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
self.assertEqual(self.portal.person_module.person.getTitle(), "Person 1")
trashbin_value = self.portal.portal_trash[sequence._getTrashBinId(self)]
self.assertEqual(trashbin_value.person_module.person.getTitle(), "Person")
self.assertEqual(
trashbin_value.getProperty("serialised_sequence"),
({"key": "person", "type": "erp5_object", "value": "person_module/person"},)
)
self.portal.person_module.manage_delObjects(ids=["person"])
# Run new sequence, with same base sequence.
# Update the title of the person document in the trashbin to be
# sure it has been restored from trash and not created
trashbin_value.person_module.person.setTitle("Trash Person")
self.tic()
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepUpdatePerson2")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
self.assertEqual(trashbin_value.person_module.person.getTitle(), "Trash Person")
self.assertEqual(self.portal.person_module.person.getTitle(), "Trash Person 2")
def test_serialisationOfSequenceDict(self):
sequence_id = "serialisation"
self.registerSequenceString(sequence_id, "stepFillSequenceDict")
sequence = StoredSequence(self, sequence_id)
sequence.setSequenceString("stepLogin")
sequence_list = SequenceList()
sequence_list.addSequence(sequence)
sequence_list.play(self)
sequence_dict = sequence._dict
# sequence._dict will be recalculated
sequence.deserialiseSequenceDict(
self.portal.portal_trash[sequence._getTrashBinId(self)].serialised_sequence
)
self.assertEqual(
sequence_dict,
sequence._dict,
)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestStoredSequence))
return suite
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testSequence</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testSequence</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -355,6 +355,32 @@ class TestTrashTool(ERP5TypeTestCase):
self.assertTrue(subcat is not None)
sequence.edit(subcat_path=subcat.getPath())
def stepDeleteBaseCategory(self, sequence=None, sequence_list=None, **kw):
pc = self.getCategoryTool()
pc.manage_delObjects(ids=[sequence.get('bc_id')])
def stepRestore(self, sequence=None, sequence_list=None, **kw):
trash_id = sequence.get('trash_id')
trash = self.getTrashTool()
trashbin = trash._getOb(trash_id, None)
bc_id = sequence.get('bc_id')
trash.restoreObject(trashbin, ['portal_categories_items'], bc_id)
def stepCheckRestore(self, sequence=None, sequence_list=None, **kw):
bc_id = sequence.get('bc_id')
bc = self.portal.portal_categories[bc_id]
self.assertTrue(
sorted(bc.objectIds()) == sorted(sequence.get('category_id_list'))
)
self.assertEqual(
len(
self.portal.portal_catalog(
portal_type='Category',
parent_uid=bc.getUid()
)
), 10
)
# tests
def test_01_checkTrashBinCreation(self, quiet=quiet, run=run_all_test):
if not run: return
......@@ -477,6 +503,30 @@ class TestTrashTool(ERP5TypeTestCase):
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_07_checkRestore(self, quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Test Check Backup Without Subobjects'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
sequence_list = SequenceList()
sequence_string = '\
CheckTrashToolExists \
CreateTrashBin \
AddBaseCategory \
AddCategories \
Tic \
BackupObjectsWithKeepingSubobjects \
Tic \
CheckObjectBackupWithSubObjects \
DeleteBaseCategory \
Tic \
Restore \
Tic \
CheckRestore \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_suite():
......
test.erp5.testAccessTab
test.erp5.testSequence
test.erp5.testActivityTool
test.erp5.testAlarm
test.erp5.testArrow
......
......@@ -34,6 +34,7 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.ERP5 import _dtmldir
from zExceptions import BadRequest
from zLOG import LOG, WARNING
from DateTime import DateTime
from Acquisition import aq_base
......@@ -189,6 +190,79 @@ class TrashTool(BaseTool):
)
return trashbin
security.declarePrivate('restoreObject')
def restoreObject(self, trashbin, container_path, object_id, pass_if_exist=True):
"""
Restore an object from the trash bin (copy it under portal)
"""
portal = self.getPortalObject()
# recreate path of the backup object if necessary
backup_object_container = portal
for path in container_path:
if path.endswith('_items'):
path = path[0:-len('_items')]
if path not in backup_object_container.objectIds():
if not hasattr(aq_base(backup_object_container), "newContent"):
backup_object_container.manage_addFolder(id=path,)
backup_object_container = backup_object_container._getOb(path)
else:
backup_object_container = backup_object_container.newContent(
portal_type='Folder',
id=path,
)
else:
backup_object_container = backup_object_container._getOb(path)
# backup the object
# here we choose export/import to copy because cut/paste
# do too many things and check for what we want to do
object_path = container_path + [object_id]
obj = trashbin.restrictedTraverse(object_path, None)
if obj is not None:
connection = obj._p_jar
o = obj
while connection is None:
o = o.aq_parent
connection=o._p_jar
if obj._p_oid is None:
LOG("Trash Tool backupObject", WARNING,
"Trying to backup uncommitted object %s" % object_path)
return {}
if isinstance(obj, Broken):
LOG("Trash Tool backupObject", WARNING,
"Can't backup broken object %s" % object_path)
klass = obj.__class__
if klass.__module__[:27] in ('Products.ERP5Type.Document.',
'erp5.portal_type'):
# meta_type is required so that a broken object
# can be removed properly from a BTreeFolder2
# (unfortunately, we can only guess it)
klass.meta_type = 'ERP5' + re.subn('(?=[A-Z])', ' ',
klass.__name__)[0]
return
copy = connection.exportFile(obj._p_oid)
# import object in trash
connection = backup_object_container._p_jar
o = backup_object_container
while connection is None:
o = o.aq_parent
connection=o._p_jar
copy.seek(0)
try:
backup = connection.importFile(copy)
if hasattr(aq_base(backup), 'isIndexable'):
del backup.isIndexable
backup_object_container._setObject(object_id, backup)
except (AttributeError, ImportError):
# XXX we can go here due to formulator because attribute
# field_added doesn't not exists on parent if it is a Trash
# Folder and not a Form, or a module for the old object is
# already removed, and we cannot backup the object
LOG("Trash Tool backupObject", WARNING,
"Can't backup object %s" % object_path)
except BadRequest:
if pass_if_exist:
pass
security.declareProtected(Permissions.ManagePortal, 'getTrashBinObjectsList')
def getTrashBinObjectsList(self, trashbin):
"""
......
......@@ -217,6 +217,10 @@ DateTime._parse_args = _parse_args
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
"""Mixin class for ERP5 based tests.
"""
def __init__(self, *args, **kw):
super(ERP5TypeTestCaseMixin, self).__init__(*args, **kw)
self.sequence_string_registry = {}
def dummy_test(self):
ZopeTestCase._print('All tests are skipped when --save option is passed '
'with --update_business_templates or without --load')
......@@ -864,6 +868,53 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
setup_once()
ZopeTestCase._print('done (%.3fs)\n' % (time.time() - start))
def _getCleanupDict(self):
"""
You must override this. Return the documents that should be
stored while saving/restoring a StoredSequence as a dict,
the keys being the module containing them, and the values
the list of ids of documents
"""
return {}
def registerSequenceString(self, sequence_title, sequence_string):
self.sequence_string_registry[sequence_title] = sequence_string
def getSequenceString(self, sequence_title):
return self.sequence_string_registry[sequence_title]
def stepStoreSequence(self, sequence):
sequence_title = "sequence_title"
document_dict = self._getCleanupDict()
if sequence_title in self.portal.portal_trash:
self.portal.portal_trash.manage_delObjects(ids=[sequence_title])
trashbin_value = self.portal.portal_trash.newContent(
portal_type="Trash Bin",
id=sequence_title,
title=sequence_title,
serialized_sequence=sequence.serializeSequenceDict(),
document_dict=document_dict,
)
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
self.portal.portal_trash.backupObject(
trashbin_value, [module_id], object_id, save=True, keep_subobjects=True
)
def stepRestoreSequence(self, sequence):
sequence_title = "sequence_title"
trashbin_value = self.portal.portal_trash[sequence_title]
document_dict = trashbin_value.getProperty('document_dict')
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
self.portal.portal_trash.restoreObject(
trashbin_value, [module_id], object_id, pass_if_exist=True
)
sequence.deserializeSequenceDict(
trashbin_value.getProperty("serialized_sequence"),
)
self.tic()
class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
__original_ZMySQLDA_connect = None
......
......@@ -175,6 +175,115 @@ class Sequence:
step = step[4:]
self.addStep(step)
class StoredSequence(Sequence):
"""A StoredSequence is a Sequence that can store an ERP5's state into
a Trash Bin and restore it before before being played. If the state is
not stored yet, then it will create it then store it.
This capability is interesting when multiple tests share a same initial
state, as the state needs to be generated only once and can be reused
for all of them.
"""
def __init__(self, context, id):
Sequence.__init__(self, context)
self._id = id
def serialiseSequenceDict(self):
def _serialise(key, value):
result_dict = {'key': key}
if (
isinstance(value, str) or
isinstance(value, int) or
isinstance(value, float) or
isinstance(value, dict) or
value is None
):
result_dict['type'] = "raw"
result_dict['value'] = value
elif isinstance(value, list):
result_dict['type'] = "list"
result_dict['value'] = [_serialise(key, x) for x in value]
else:
result_dict['type'] = "erp5_object"
result_dict['value'] = value.getRelativeUrl()
return result_dict
result_list = []
for key, value in self._dict.iteritems():
result_list.append(_serialise(key, value))
return result_list
def deserialiseSequenceDict(self, data):
portal = self._context.getPortalObject()
def _deserialise(serialised_dict):
if serialised_dict["type"] == "raw":
return serialised_dict["value"]
elif serialised_dict["type"] == "list":
return [_deserialise(x) for x in serialised_dict["value"]]
elif serialised_dict["type"] == "erp5_object":
return portal.restrictedTraverse(serialised_dict['value'])
else:
raise TypeError("Unknown serialised type %s", serialised_dict["type"])
for serialised_dict in data:
self._dict[serialised_dict['key']] = _deserialise(serialised_dict)
def _getTrashBinId(self, context):
if not context:
context = self.context
return "%s_%s" % (context.__class__.__name__, self._id)
def store(self, context):
context.login()
document_dict = context._getCleanupDict()
trashbin_id = self._getTrashBinId(context)
if trashbin_id in context.portal.portal_trash:
context.portal.portal_trash.manage_delObjects(ids=[self._id])
trashbin_value = context.portal.portal_trash.newContent(
portal_type="Trash Bin",
id=trashbin_id,
title=trashbin_id,
serialised_sequence=self.serialiseSequenceDict(),
document_dict=document_dict,
)
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
context.portal.portal_trash.backupObject(
trashbin_value, [module_id], object_id, save=True, keep_subobjects=True
)
context.tic()
context.logout()
def restore(self, context):
context.login()
trashbin_value = context.portal.portal_trash[self._getTrashBinId(context)]
document_dict = trashbin_value.getProperty('document_dict')
for module_id, object_id_list in document_dict.iteritems():
for object_id in object_id_list:
context.portal.portal_trash.restoreObject(
trashbin_value, [module_id], object_id, pass_if_exist=True
)
self.deserialiseSequenceDict(
trashbin_value.getProperty("serialised_sequence"),
)
context.tic()
context.logout()
def play(self, context, **kw):
portal = self._context.getPortal()
if getattr(portal.portal_trash, self._getTrashBinId(context), None) is None:
ZopeTestCase._print('\nRunning and saving stored sequence \"%s\" ...' % self._id)
sequence = Sequence()
sequence.setSequenceString(context.getSequenceString(self._id))
sequence.play(context)
self._dict = sequence._dict.copy()
self.store(context)
else:
ZopeTestCase._print('\nRestoring stored sequence \"%s\" ...' % self._id)
self.restore(context)
Sequence.play(self, context, **kw)
class SequenceList:
def __init__(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment