Commit bb9935fb authored by Jérome Perrin's avatar Jérome Perrin

ERP5Catalog: add support for multiples databases in upgradeSchema

This is needed to support datawarehouse on another database.
parent 0c2c42f0
...@@ -42,6 +42,7 @@ from Acquisition import aq_base, aq_inner, aq_parent, ImplicitAcquisitionWrapper ...@@ -42,6 +42,7 @@ from Acquisition import aq_base, aq_inner, aq_parent, ImplicitAcquisitionWrapper
from Products.CMFActivity.ActiveObject import ActiveObject from Products.CMFActivity.ActiveObject import ActiveObject
from Products.CMFActivity.ActivityTool import GroupedMessage from Products.CMFActivity.ActivityTool import GroupedMessage
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.ZMySQLDA.DA import DeferredConnection
from AccessControl.PermissionRole import rolesForPermissionOn from AccessControl.PermissionRole import rolesForPermissionOn
...@@ -1360,20 +1361,45 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -1360,20 +1361,45 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
security.declareProtected(Permissions.ManagePortal, 'upgradeSchema') security.declareProtected(Permissions.ManagePortal, 'upgradeSchema')
def upgradeSchema(self, sql_catalog_id=None, src__=0): def upgradeSchema(self, sql_catalog_id=None, src__=0):
"""Upgrade all catalog tables, with ALTER or CREATE queries""" """Upgrade all catalog tables, with ALTER or CREATE queries"""
portal = self.getPortalObject()
catalog = self.getSQLCatalog(sql_catalog_id) catalog = self.getSQLCatalog(sql_catalog_id)
connection_id = catalog.z_create_catalog.connection_id
src = [] # group methods by connection
db = self.getPortalObject()[connection_id]() method_list_by_connection_id = defaultdict(list)
with db.lock(): for method_id in catalog.sql_clear_catalog:
for clear_method in catalog.sql_clear_catalog: method = catalog[method_id]
r = catalog[clear_method]._upgradeSchema( method_list_by_connection_id[method.connection_id].append(method)
connection_id, create_if_not_exists=1, src__=1)
if r: # Because we cannot select on deferred connections, _upgradeSchema
src.append(r) # cannot be used on SQL methods using a deferred connection.
if not src__: # We try to find a "non deferred" connection using the same connection
for r in src: # string and we'll use it instead.
db.query(r) connection_by_connection_id = dict()
return src for connection_id in method_list_by_connection_id:
connection = portal[connection_id]
connection_string = connection.connection_string
connection_by_connection_id[connection_id] = connection
if isinstance(connection, DeferredConnection):
for other_connection in portal.objectValues(
spec=('Z MySQL Database Connection',)):
if connection_string == other_connection.connection_string:
connection_by_connection_id[connection_id] = other_connection
break
queries_by_connection_id = defaultdict(list)
for connection_id, method_list in method_list_by_connection_id.items():
connection = connection_by_connection_id[connection_id]
db = connection()
with db.lock():
for method in method_list:
query = method._upgradeSchema(connection.getId(), create_if_not_exists=1, src__=1)
if query:
queries_by_connection_id[connection_id].append(query)
if not src__:
for query in queries_by_connection_id[connection_id]:
db.query(query)
return sum(queries_by_connection_id.values(), [])
security.declarePublic('getDocumentValueList') security.declarePublic('getDocumentValueList')
def getDocumentValueList(self, sql_catalog_id=None, def getDocumentValueList(self, sql_catalog_id=None,
......
...@@ -34,6 +34,7 @@ import httplib ...@@ -34,6 +34,7 @@ import httplib
from AccessControl import getSecurityManager from AccessControl import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime from DateTime import DateTime
from _mysql_exceptions import ProgrammingError
from OFS.ObjectManager import ObjectManager from OFS.ObjectManager import ObjectManager
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList
...@@ -3842,8 +3843,9 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase): ...@@ -3842,8 +3843,9 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
db1, db2 = getExtraSqlConnectionStringList()[:2] db1, db2 = getExtraSqlConnectionStringList()[:2]
addConnection = self.portal.manage_addProduct[ addConnection = self.portal.manage_addProduct[
"ZMySQLDA"].manage_addZMySQLConnection "ZMySQLDA"].manage_addZMySQLConnection
addConnection("erp5_test_connection_1", '', db1) addConnection("erp5_test_connection_1", "", db1)
addConnection("erp5_test_connection_2", '', db2) addConnection("erp5_test_connection_2", "", db2)
addConnection("erp5_test_connection_deferred_2", "", db2, deferred=True)
self.catalog_tool = self.portal.portal_catalog self.catalog_tool = self.portal.portal_catalog
self.catalog = self.catalog_tool.newContent(portal_type="Catalog") self.catalog = self.catalog_tool.newContent(portal_type="Catalog")
...@@ -3862,8 +3864,10 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase): ...@@ -3862,8 +3864,10 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
self.query_connection_1("DROP TABLE IF EXISTS `%s`" % table) self.query_connection_1("DROP TABLE IF EXISTS `%s`" % table)
for table in self._db2_table_list: for table in self._db2_table_list:
self.query_connection_2("DROP TABLE IF EXISTS `%s`" % table) self.query_connection_2("DROP TABLE IF EXISTS `%s`" % table)
self.portal.manage_delObjects( self.portal.manage_delObjects([
["erp5_test_connection_1", "erp5_test_connection_2"]) "erp5_test_connection_1",
"erp5_test_connection_2",
"erp5_test_connection_deferred_2"])
self.commit() self.commit()
def query_connection_1(self, q): def query_connection_1(self, q):
...@@ -3911,3 +3915,50 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase): ...@@ -3911,3 +3915,50 @@ class CatalogToolUpgradeSchemaTestCase(ERP5TypeTestCase):
self.upgradeSchema() self.upgradeSchema()
self.commit() self.commit()
self.query_connection_1("SELECT b from altered_table") self.query_connection_1("SELECT b from altered_table")
def test_upgradeSchema_multi_connections(self):
# Check that we can upgrade tables on more than one connection,
# like when using an external datawarehouse. This is a reproduction
# for https://nexedi.erp5.net/bug_module/20170426-A3962E
# In this test we use both "normal" and deferred connections,
# which is what happens in default erp5 catalog.
self._db1_table_list.append("table1")
self.query_connection_1("CREATE TABLE table1 (a int)")
self._db2_table_list.extend(("table2", "table_deferred2"))
self.query_connection_2("CREATE TABLE table2 (a int)")
self.query_connection_2("CREATE TABLE table_deferred2 (a int)")
self.commit()
method1 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_1",
src="CREATE TABLE table1 (a int, b int)")
method2 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_2",
src="CREATE TABLE table2 (a int, b int)")
method_deferred2 = self.catalog.newContent(
portal_type="SQL Method",
connection_id="erp5_test_connection_deferred_2",
src="CREATE TABLE table_deferred2 (a int, b int)")
self.catalog.setSqlClearCatalogList(
[method1.getId(),
method2.getId(),
method_deferred2.getId()])
self.commit()
self.upgradeSchema()
self.commit()
self.query_connection_1("SELECT b from table1")
self.query_connection_2("SELECT b from table2")
self.query_connection_2("SELECT b from table_deferred2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table2' doesn't exist"):
self.query_connection_1("SELECT b from table2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table_deferred2' doesn't exist"):
self.query_connection_1("SELECT b from table_deferred2")
with self.assertRaisesRegexp(ProgrammingError,
r"Table '.*\.table1' doesn't exist"):
self.query_connection_2("SELECT b from table1")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment