Commit 52480993 authored by Julien Muchembled's avatar Julien Muchembled

ZMySQLDA: review API to upgrade schema of table, with locking to avoid race conditions

Race conditions are likely to happen with CMFActivity between message tables
are automatically upgraded during bootstrap.

Most code is moved from DA patch to ZMySQLDA.
parent d84d3fab
...@@ -28,8 +28,6 @@ ...@@ -28,8 +28,6 @@
import sys import sys
import transaction import transaction
from _mysql_exceptions import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
from DateTime import DateTime from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results from Shared.DC.ZRDB.Results import Results
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
...@@ -103,25 +101,19 @@ class SQLBase(Queue): ...@@ -103,25 +101,19 @@ class SQLBase(Queue):
return return
if clear: if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table) folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable(table=self.sql_table)
else: else:
column_list = [] src = createMessageTable._upgradeSchema(create_if_not_exists=1,
try: initialize=self._initialize,
src = createMessageTable._upgradeSchema(added_list=column_list, table=self.sql_table)
modified_list=column_list, if src:
table=self.sql_table) LOG('CMFActivity', INFO, "%r table upgraded\n%s"
except ProgrammingError, e: % (self.sql_table, src))
if e[0] != NO_SUCH_TABLE:
raise def _initialize(self, db, column_list):
else: LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
if column_list and self._getMessageList(activity_tool, count=1): " The following added columns could not be initialized: %s"
LOG('CMFActivity', ERROR, "Non-empty %r table upgraded." % (self.sql_table, ", ".join(column_list)))
" The following added columns could not be initialized: %s\n%s"
% (self.sql_table, ", ".join(column_list), src))
elif src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
return
createMessageTable(table=self.sql_table)
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered] registered_message_list = [m for m in message_list if m.is_registered]
......
...@@ -50,8 +50,6 @@ from Products.ERP5Type.Utils import sqlquote ...@@ -50,8 +50,6 @@ from Products.ERP5Type.Utils import sqlquote
import warnings import warnings
from zLOG import LOG, PROBLEM, WARNING, INFO from zLOG import LOG, PROBLEM, WARNING, INFO
from _mysql_exceptions import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
ACQUIRE_PERMISSION_VALUE = [] ACQUIRE_PERMISSION_VALUE = []
DYNAMIC_METHOD_NAME = 'z_related_' DYNAMIC_METHOD_NAME = 'z_related_'
...@@ -1023,20 +1021,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -1023,20 +1021,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
catalog = self.getSQLCatalog(sql_catalog_id) catalog = self.getSQLCatalog(sql_catalog_id)
connection_id = catalog.z_create_catalog.connection_id connection_id = catalog.z_create_catalog.connection_id
src = [] src = []
for clear_method in catalog.sql_clear_catalog: db = self.getPortalObject()[connection_id]()
clear_method = catalog[clear_method] with db.lock():
try: for clear_method in catalog.sql_clear_catalog:
r = clear_method._upgradeSchema(connection_id, src__=1) r = catalog[clear_method]._upgradeSchema(
except ProgrammingError, e: connection_id, create_if_not_exists=1, src__=1)
if e[0] != NO_SUCH_TABLE: if r:
raise src.append(r)
r = clear_method(src__=1) if not src__:
if r: for r in src:
src.append(r) db.query(r)
if src and not src__:
query = self.getPortalObject()[connection_id]().query
for r in src:
query(r)
return src return src
......
...@@ -254,92 +254,11 @@ def DA__call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw): ...@@ -254,92 +254,11 @@ def DA__call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw):
return result return result
def _getTableSchema(query, name, def DA_upgradeSchema(self, connection_id=None, create_if_not_exists=False,
create_lstrip = re.compile(r"[^(]+\(\s*").sub, initialize=None, src__=0, **kw):
create_rmatch = re.compile(r"(.*\S)\s*\)[^)]+\s" return self.getPortalObject()[connection_id or self.connection_id]() \
"(DEFAULT(\s+(CHARSET|COLLATE)=\S+)+).*$", re.DOTALL).match, .upgradeSchema(self(src__=1, **kw), create_if_not_exists,
create_split = re.compile(r",\n\s*").split, initialize, src__)
column_match = re.compile(r"`(\w+)`\s+(.+)").match,
):
(_, schema), = query("SHOW CREATE TABLE " + name)[1]
column_list = []
key_set = set()
m = create_rmatch(create_lstrip("", schema, 1))
for spec in create_split(m.group(1)):
if "KEY" in spec:
key_set.add(spec)
else:
column_list.append(column_match(spec).groups())
return column_list, key_set, m.group(2)
_create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+', re.I).search
_key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search
def DA_upgradeSchema(self, connection_id=None, added_list=None,
modified_list=None, src__=0, **kw):
query = self.getPortalObject()[connection_id or self.connection_id]().query
src = self(src__=1, **kw)
m = _create_search(src)
if m is None:
return
name = m.group(2)
old_list, old_set, old_default = _getTableSchema(query, name)
name_new = '_%s_new' % name
query('CREATE TEMPORARY TABLE %s %s' % (name_new, src[m.end():]))
try:
new_list, new_set, new_default = _getTableSchema(query, name_new)
finally:
query("DROP TEMPORARY TABLE " + name_new)
src = []
q = src.append
if old_default != new_default:
q(new_default)
old_dict = {}
new = {column[0] for column in new_list}
pos = 0
for column, spec in old_list:
if column in new:
old_dict[column] = pos, spec
pos += 1
else:
q("DROP COLUMN " + column)
for key in old_set - new_set:
if "PRIMARY" in key:
q("DROP PRIMARY KEY")
else:
q("DROP KEY " + _key_search(key).group(1))
added = str if added_list is None else added_list.append
modified = str if modified_list is None else modified_list.append
pos = 0
where = "FIRST"
for column, spec in new_list:
try:
old = old_dict[column]
except KeyError:
q("ADD COLUMN %s %s %s" % (column, spec, where))
added(column)
else:
if old != (pos, spec):
q("MODIFY COLUMN %s %s %s" % (column, spec, where))
if old[1] != spec:
modified(column)
pos += 1
where = "AFTER " + column
for key in new_set - old_set:
q("ADD " + key)
if src:
src = "ALTER TABLE %s%s" % (name, ','.join("\n " + q for q in src))
if not src__:
query(src)
return src
DA.__call__ = DA__call__ DA.__call__ = DA__call__
DA.fromFile = DA_fromFile DA.fromFile = DA_fromFile
......
...@@ -87,9 +87,11 @@ ...@@ -87,9 +87,11 @@
__version__='$Revision: 1.20 $'[11:-2] __version__='$Revision: 1.20 $'[11:-2]
import os import os
import re
import _mysql import _mysql
import MySQLdb import MySQLdb
import warnings import warnings
from contextlib import contextmanager, nested
from _mysql_exceptions import OperationalError, NotSupportedError, ProgrammingError from _mysql_exceptions import OperationalError, NotSupportedError, ProgrammingError
MySQLdb_version_required = (0,9,2) MySQLdb_version_required = (0,9,2)
...@@ -434,6 +436,117 @@ class DB(TM): ...@@ -434,6 +436,117 @@ class DB(TM):
else: else:
LOG('ZMySQLDA', ERROR, "aborting when non-transactional") LOG('ZMySQLDA', ERROR, "aborting when non-transactional")
@contextmanager
def lock(self):
"""Lock for the connected DB"""
db = self._kw_args.get('db', '')
lock = "SELECT GET_LOCK('ZMySQLDA(%s)', 5)" % db
unlock = "SELECT RELEASE_LOCK('ZMySQLDA(%s)')" % db
try:
while not self.query(lock, 0)[1][0][0]: pass
yield
finally:
self.query(unlock, 0)
def _getTableSchema(self, name,
create_lstrip = re.compile(r"[^(]+\(\s*").sub,
create_rmatch = re.compile(r"(.*\S)\s*\)[^)]+\s"
"(DEFAULT(\s+(CHARSET|COLLATE)=\S+)+).*$", re.DOTALL).match,
create_split = re.compile(r",\n\s*").split,
column_match = re.compile(r"`(\w+)`\s+(.+)").match,
):
(_, schema), = self.query("SHOW CREATE TABLE " + name)[1]
column_list = []
key_set = set()
m = create_rmatch(create_lstrip("", schema, 1))
for spec in create_split(m.group(1)):
if "KEY" in spec:
key_set.add(spec)
else:
column_list.append(column_match(spec).groups())
return column_list, key_set, m.group(2)
_create_search = re.compile(r'\bCREATE\s+TABLE\s+(`?)(\w+)\1\s+',
re.I).search
_key_search = re.compile(r'\bKEY\s+(`[^`]+`)\s+(.+)').search
def upgradeSchema(self, create_sql, create_if_not_exists=False,
initialize=None, src__=0):
m = self._create_search(create_sql)
if m is None:
return
name = m.group(2)
# Lock automaticaly unless src__ is True, because the caller may have
# already done it (in case that it plans to execute the returned query).
with (nested if src__ else self.lock)():
try:
old_list, old_set, old_default = self._getTableSchema(name)
except ProgrammingError, e:
if e[0] != ER.NO_SUCH_TABLE or not create_if_not_exists:
raise
if not src__:
self.query(create_sql)
return create_sql
name_new = '_%s_new' % name
self.query('CREATE TEMPORARY TABLE %s %s'
% (name_new, create_sql[m.end():]))
try:
new_list, new_set, new_default = self._getTableSchema(name_new)
finally:
self.query("DROP TEMPORARY TABLE " + name_new)
src = []
q = src.append
if old_default != new_default:
q(new_default)
old_dict = {}
new = {column[0] for column in new_list}
pos = 0
for column, spec in old_list:
if column in new:
old_dict[column] = pos, spec
pos += 1
else:
q("DROP COLUMN " + column)
for key in old_set - new_set:
if "PRIMARY" in key:
q("DROP PRIMARY KEY")
else:
q("DROP KEY " + self._key_search(key).group(1))
column_list = []
pos = 0
where = "FIRST"
for column, spec in new_list:
try:
old = old_dict[column]
except KeyError:
q("ADD COLUMN %s %s %s" % (column, spec, where))
column_list.append(column)
else:
if old != (pos, spec):
q("MODIFY COLUMN %s %s %s" % (column, spec, where))
if old[1] != spec:
column_list.append(column)
pos += 1
where = "AFTER " + column
for key in new_set - old_set:
q("ADD " + key)
if src:
src = "ALTER TABLE %s%s" % (name, ','.join("\n " + q
for q in src))
if not src__:
self.query(src)
if column_list and initialize and self.query(
"SELECT 1 FROM " + name, 1)[1]:
initialize(self, column_list)
return src
class DeferredDB(DB): class DeferredDB(DB):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment