Commit 0c2c42f0 authored by Jérome Perrin's avatar Jérome Perrin

ERP5Catalog: tests for upgradeSchema

parent f50a68df
......@@ -3829,7 +3829,85 @@ VALUES
# but a proper page
self.assertIn('<title>Catalog Tool - portal_catalog', ret.getBody())
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestERP5Catalog))
return suite
class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
"""Tests for "upgrade schema" feature of ERP5 Catalog.
"""
def getBusinessTemplateList(self):
return ("erp5_full_text_mroonga_catalog",)
def afterSetUp(self):
# Add two connections
db1, db2 = getExtraSqlConnectionStringList()[:2]
addConnection = self.portal.manage_addProduct[
"ZMySQLDA"].manage_addZMySQLConnection
addConnection("erp5_test_connection_1", '', db1)
addConnection("erp5_test_connection_2", '', db2)
self.catalog_tool = self.portal.portal_catalog
self.catalog = self.catalog_tool.newContent(portal_type="Catalog")
self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_1",
id="z_create_catalog",
src="CREATE TABLE dummy_catalog (uid int)")
# These will be cleaned up at tear down
self._db1_table_list = ["dummy_catalog"]
self._db2_table_list = []
def beforeTearDown(self):
for table in self._db1_table_list:
self.query_connection_1("DROP TABLE IF EXISTS `%s`" % table)
for table in self._db2_table_list:
self.query_connection_2("DROP TABLE IF EXISTS `%s`" % table)
self.portal.manage_delObjects(
["erp5_test_connection_1", "erp5_test_connection_2"])
self.commit()
def query_connection_1(self, q):
return self.portal.erp5_test_connection_1().query(q)
def query_connection_2(self, q):
return self.portal.erp5_test_connection_2().query(q)
def upgradeSchema(self):
self.assertTrue(
self.catalog_tool.upgradeSchema(
sql_catalog_id=self.catalog.getId(), src__=True))
self.catalog_tool.upgradeSchema(sql_catalog_id=self.catalog.getId())
self.assertFalse(
self.catalog_tool.upgradeSchema(
sql_catalog_id=self.catalog.getId(), src__=True))
def test_upgradeSchema_add_table(self):
self._db1_table_list.append("add_table")
method = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_1",
id=self.id(),
src="CREATE TABLE add_table (a int)")
self.catalog.setSqlClearCatalogList([method.getId()])
self.commit()
self.upgradeSchema()
self.commit()
self.query_connection_1("SELECT a from add_table")
def test_upgradeSchema_alter_table(self):
self._db1_table_list.append("altered_table")
self.query_connection_1("CREATE TABLE altered_table (a int)")
self.commit()
method = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_1",
id=self.id(),
src="CREATE TABLE altered_table (a int, b int)")
self.catalog.setSqlClearCatalogList([method.getId()])
self.commit()
self.upgradeSchema()
self.commit()
self.query_connection_1("SELECT b from altered_table")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment