Commit f50a68df authored by Jérome Perrin's avatar Jérome Perrin

ZMySQLDA: escape when upgrading schema

Tables, columns and index names needs to be escaped for syntax to be
valid.
parent bf336faf
......@@ -530,7 +530,7 @@ class DB(TM):
# already done it (in case that it plans to execute the returned query).
with (nested if src__ else self.lock)():
try:
old_list, old_set, old_default = self._getTableSchema(name)
old_list, old_set, old_default = self._getTableSchema("`%s`" % name)
except ProgrammingError, e:
if e[0] != ER.NO_SUCH_TABLE or not create_if_not_exists:
raise
......@@ -538,7 +538,7 @@ class DB(TM):
self.query(create_sql)
return create_sql
name_new = '_%s_new' % name
name_new = '`_%s_new`' % name
self.query('CREATE TEMPORARY TABLE %s %s'
% (name_new, create_sql[m.end():]))
try:
......@@ -559,7 +559,7 @@ class DB(TM):
old_dict[column] = pos, spec
pos += 1
else:
q("DROP COLUMN " + column)
q("DROP COLUMN `%s`" % column)
for key in old_set - new_set:
if "PRIMARY" in key:
......@@ -574,26 +574,26 @@ class DB(TM):
try:
old = old_dict[column]
except KeyError:
q("ADD COLUMN %s %s %s" % (column, spec, where))
q("ADD COLUMN `%s` %s %s" % (column, spec, where))
column_list.append(column)
else:
if old != (pos, spec):
q("MODIFY COLUMN %s %s %s" % (column, spec, where))
q("MODIFY COLUMN `%s` %s %s" % (column, spec, where))
if old[1] != spec:
column_list.append(column)
pos += 1
where = "AFTER " + column
where = "AFTER `%s`" % column
for key in new_set - old_set:
q("ADD " + key)
if src:
src = "ALTER TABLE %s%s" % (name, ','.join("\n " + q
src = "ALTER TABLE `%s`%s" % (name, ','.join("\n " + q
for q in src))
if not src__:
self.query(src)
if column_list and initialize and self.query(
"SELECT 1 FROM " + name, 1)[1]:
"SELECT 1 FROM `%s`" % name, 1)[1]:
initialize(self, column_list)
return src
......
......@@ -39,12 +39,13 @@ class TestTableStructureMigrationTestCase(ERP5TypeTestCase):
def beforeTearDown(self):
self.portal.erp5_sql_connection().query('DROP table if exists X')
self.portal.erp5_sql_connection().query('DROP table if exists `table`')
self.commit()
def query(self, q):
return self.portal.erp5_sql_connection().query(q)
def check_upgrade_schema(self, previous_schema, new_schema):
def check_upgrade_schema(self, previous_schema, new_schema, table_name='X'):
self.query(previous_schema)
da = DA(
id=self.id(),
......@@ -55,7 +56,9 @@ class TestTableStructureMigrationTestCase(ERP5TypeTestCase):
self.assertTrue(da._upgradeSchema(src__=True))
da._upgradeSchema()
self.assertFalse(da._upgradeSchema(src__=True))
self.assertEqual(new_schema, self.query('SHOW CREATE TABLE `X`')[1][0][1])
self.assertEqual(
new_schema,
self.query('SHOW CREATE TABLE `%s`' % table_name)[1][0][1])
def test_add_column(self):
self.check_upgrade_schema(
......@@ -170,3 +173,22 @@ class TestTableStructureMigrationTestCase(ERP5TypeTestCase):
"Key 'idx_a' doesn't exist in table 'X'"):
self.query("SELECT * FROM X USE INDEX (`idx_a`)")
def test_escape(self):
self.check_upgrade_schema(
dedent(
"""\
CREATE TABLE `table` (
`drop` int(11) DEFAULT NULL,
`alter` int(11) DEFAULT NULL,
KEY `CASE` (`drop`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci"""),
dedent(
"""\
CREATE TABLE `table` (
`and` int(11) DEFAULT NULL,
`alter` varchar(255) COLLATE utf8_unicode_ci DEFAULT 'BETWEEN',
KEY `use` (`alter`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci"""),
table_name='table')
self.query(
"SELECT `alter`, `and` FROM `table` USE INDEX (`use`)")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment