Commit 2420c89f authored by Arnaud Fontaine's avatar Arnaud Fontaine

py3: _mysql.string_literal() returns bytes().

And _mysql/mysqldb API (_mysql.connection.query()) converts the query string to
bytes() (additionally, cursor.execute(QUERY, ARGS) calls query() after
converting everything to bytes() too).
parent 8024ba17
......@@ -27,6 +27,8 @@ from __future__ import absolute_import
#
##############################################################################
from six.moves import xrange
from Products.ERP5Type.Utils import ensure_list
from collections import defaultdict
from contextlib import contextmanager
from itertools import product
......@@ -100,14 +102,14 @@ def SQLLock(db, lock_name, timeout):
"""
lock_name = db.string_literal(lock_name)
query = db.query
(_, ((acquired, ), )) = query('SELECT GET_LOCK(%s, %f)' % (lock_name, timeout))
(_, ((acquired, ), )) = query(b'SELECT GET_LOCK(%s, %f)' % (lock_name, timeout))
if acquired is None:
raise ValueError('Error acquiring lock')
try:
yield acquired
finally:
if acquired:
query('SELECT RELEASE_LOCK(%s)' % (lock_name, ))
query(b'SELECT RELEASE_LOCK(%s)' % (lock_name, ))
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
......@@ -115,23 +117,23 @@ def sqltest_dict():
def _(name, column=None, op="="):
if column is None:
column = name
column_op = "%s %s " % (column, op)
column_op = ("%s %s " % (column, op)).encode()
def render(value, render_string):
if isinstance(value, _SQLTEST_NO_QUOTE_TYPE_SET):
return column_op + str(value)
return column_op + str(value).encode()
if isinstance(value, DateTime):
value = render_datetime(value)
if isinstance(value, basestring):
return column_op + render_string(value)
assert op == "=", value
if value is None: # XXX: see comment in SQLBase._getMessageList
return column + " IS NULL"
return column + b" IS NULL"
for x in value:
return "%s IN (%s)" % (column, ', '.join(map(
return b"%s IN (%s)" % (column, ', '.join(map(
str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
render_datetime if isinstance(x, DateTime) else
render_string, value)))
return "0"
render_string, value)).encode())
return b"0"
sqltest_dict[name] = render
_('active_process_uid')
_('group_method_id')
......@@ -151,13 +153,13 @@ def sqltest_dict():
assert isinstance(priority, _SQLTEST_NO_QUOTE_TYPE_SET)
assert isinstance(uid, _SQLTEST_NO_QUOTE_TYPE_SET)
return (
'(priority>%(priority)s OR (priority=%(priority)s AND '
'(date>%(date)s OR (date=%(date)s AND uid>%(uid)s))'
'))' % {
'priority': priority,
b'(priority>%(priority)d OR (priority=%(priority)d AND '
b'(date>%(date)s OR (date=%(date)s AND uid>%(uid)d))'
b'))' % {
b'priority': priority,
# render_datetime raises if "date" lacks date API, so no need to check
'date': render_string(render_datetime(date)),
'uid': uid,
b'date': render_string(render_datetime(date)),
b'uid': uid,
}
)
sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid
......@@ -168,7 +170,7 @@ def _validate_after_path_and_method_id(value, render_string):
path, method_id = value
return (
sqltest_dict['method_id'](method_id, render_string) +
' AND ' +
b' AND ' +
sqltest_dict['path'](path, render_string)
)
......@@ -176,7 +178,7 @@ def _validate_after_tag_and_method_id(value, render_string):
tag, method_id = value
return (
sqltest_dict['method_id'](method_id, render_string) +
' AND ' +
b' AND ' +
sqltest_dict['tag'](tag, render_string)
)
......@@ -280,18 +282,18 @@ CREATE TABLE %s (
% (self.sql_table, src))
self._insert_max_payload = (db.getMaxAllowedPacket()
+ len(self._insert_separator)
- len(self._insert_template % (self.sql_table, '')))
- len(self._insert_template % (self.sql_table.encode(), b'')))
def _initialize(self, db, column_list):
LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
" The following added columns could not be initialized: %s"
% (self.sql_table, ", ".join(column_list)))
_insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, node, group_method_id, tag, serialization_tag,"
" message) VALUES\n(%s)")
_insert_separator = "),\n("
_insert_template = (b"INSERT INTO %s (uid,"
b" path, active_process_uid, date, method_id, processing_node,"
b" priority, node, group_method_id, tag, serialization_tag,"
b" message) VALUES\n(%s)")
_insert_separator = b"),\n("
def _hasDependency(self, message):
get = message.activity_kw.get
......@@ -310,10 +312,11 @@ CREATE TABLE %s (
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
db.query(b"SET @uid := %d" % getrandbits(UID_SAFE_BITSIZE))
try:
db.query(self._insert_template % (self.sql_table, values))
except MySQLdb.IntegrityError, (code, _):
db.query(self._insert_template % (self.sql_table.encode(), values))
except MySQLdb.IntegrityError as xxx_todo_changeme:
(code, _) = xxx_todo_changeme.args
if code != DUP_ENTRY:
raise
reset_uid = True
......@@ -331,18 +334,18 @@ CREATE TABLE %s (
if m.is_registered:
active_process_uid = m.active_process_uid
date = m.activity_kw.get('at_date')
row = ','.join((
'@uid+%s' % i,
row = b','.join((
b'@uid+%d' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
b'NULL' if active_process_uid is None else str(active_process_uid).encode(),
b"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id),
'-1' if hasDependency(m) else '0',
str(m.activity_kw.get('priority', 1)),
str(m.activity_kw.get('node', 0)),
b'-1' if hasDependency(m) else b'0',
str(m.activity_kw.get('priority', 1)).encode(),
str(m.activity_kw.get('node', 0)).encode(),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(m.activity_kw.get('tag', b'')),
quote(m.activity_kw.get('serialization_tag', b'')),
quote(Message.dump(m))))
i += 1
n = sep_len + len(row)
......@@ -363,11 +366,11 @@ CREATE TABLE %s (
# value should be ignored, instead of trying to render them
# (with comparisons with NULL).
q = db.string_literal
sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
self.sql_table,
sql and '\nWHERE ' + sql,
'' if count is None else '\nLIMIT %d' % count,
sql = b'\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
sql = b"SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
self.sql_table.encode(),
sql and b'\nWHERE ' + sql,
b'' if count is None else b'\nLIMIT %d' % count,
)
return sql if src__ else Results(db.query(sql, max_rows=0))
......@@ -392,17 +395,17 @@ CREATE TABLE %s (
def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v]
if only_valid:
where.append('processing_node > %d' % INVOKE_ERROR_STATE)
where.append(b'processing_node > %d' % INVOKE_ERROR_STATE)
if only_invalid:
where.append('processing_node <= %d' % INVOKE_ERROR_STATE)
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1")
where.append(b'processing_node <= %d' % INVOKE_ERROR_STATE)
return b"SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table.encode(), b" AND ".join(where) or b"1")
def getPriority(self, activity_tool, processing_node, node_set=None):
if node_set is None:
q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table)
q = (b"SELECT 3*priority, date FROM %s"
b" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date LIMIT 1" % self.sql_table.encode())
else:
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
......@@ -410,12 +413,12 @@ CREATE TABLE %s (
node = 'node=%s' % processing_node
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
q = (b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node).encode(),
subquery('', 'node=0').encode(),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0').encode(),
b' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))).encode() if node_set else b'',
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
......@@ -588,18 +591,18 @@ CREATE TABLE %s (
if len(column_list) == 1 else
_IDENTITY
)
base_sql_suffix = ' WHERE processing_node > %i AND (%%s) LIMIT 1)' % (
base_sql_suffix = b' WHERE processing_node > %i AND (%%s) LIMIT 1)' % (
min_processing_node,
)
sql_suffix_list = [
base_sql_suffix % to_sql(dependency_value, quote)
for dependency_value in dependency_value_dict
]
base_sql_prefix = '(SELECT %s FROM ' % (
','.join(column_list),
base_sql_prefix = b'(SELECT %s FROM ' % (
b','.join([ c.encode() for c in column_list ]),
)
subquery_list = [
base_sql_prefix + table_name + sql_suffix
base_sql_prefix + table_name.encode() + sql_suffix
for table_name in table_name_list
for sql_suffix in sql_suffix_list
]
......@@ -610,7 +613,7 @@ CREATE TABLE %s (
# by the number of activty tables: it is also proportional to the
# number of distinct values being looked for in the current column.
for row in db.query(
' UNION '.join(subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]),
b' UNION '.join(subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]),
max_rows=0,
)[1]:
# Each row is a value which blocks some activities.
......@@ -684,9 +687,9 @@ CREATE TABLE %s (
assert limit
quote = db.string_literal
query = db.query
args = (self.sql_table, sqltest_dict['to_date'](date, quote),
' AND group_method_id=' + quote(group_method_id)
if group_method_id else '' , limit)
args = (self.sql_table.encode(), sqltest_dict['to_date'](date, quote),
b' AND group_method_id=' + quote(group_method_id)
if group_method_id else b'' , limit)
# Note: Not all write accesses to our table are protected by this lock.
# This lock is not here for data consistency reasons, but to avoid wasting
......@@ -715,25 +718,25 @@ CREATE TABLE %s (
# time).
if node_set is None:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
b"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d FOR UPDATE" % args, 0))
else:
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % processing_node
subquery = (b"(SELECT *, 3*priority%%s as effective_priority FROM %s"
b" WHERE %%s AND processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d FOR UPDATE)" % args)
node = b'node=%d' % processing_node
result = Results(query(
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
b"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b" ORDER BY effective_priority, date LIMIT %d"% (
subquery % (b'-1', node),
subquery % (b'', b'node=0'),
subquery % (b'+IF(node, IF(%s, -1, 1), 0)' % node, b'node>=0'),
b' UNION ALL ' + subquery % (str(-1), b'node IN (%s)' % b','.join(map(str, node_set)).encode()) if node_set else b'',
limit), 0))
if result:
# Reserve messages.
......@@ -796,9 +799,9 @@ CREATE TABLE %s (
# To minimize the probability of deadlocks, we also COMMIT so that a
# new transaction starts on the first 'FOR UPDATE' query, which is all
# the more important as the current on started with getPriority().
result = db.query("SELECT * FROM %s WHERE processing_node=%s"
" ORDER BY priority, date LIMIT 1\0COMMIT" % (
self.sql_table, processing_node), 0)
result = db.query(b"SELECT * FROM %s WHERE processing_node=%d"
b" ORDER BY priority, date LIMIT 1\0COMMIT" % (
self.sql_table.encode(), processing_node), 0)
already_assigned = result[1]
if already_assigned:
result = Results(result)
......@@ -827,10 +830,10 @@ CREATE TABLE %s (
cost *= count
# Retrieve objects which have the same group method.
result = iter(already_assigned
and Results(db.query("SELECT * FROM %s"
" WHERE processing_node=%s AND group_method_id=%s"
" ORDER BY priority, date LIMIT %s" % (
self.sql_table, processing_node,
and Results(db.query(b"SELECT * FROM %s"
b" WHERE processing_node=%d AND group_method_id=%s"
b" ORDER BY priority, date LIMIT %d" % (
self.sql_table.encode(), processing_node,
db.string_literal(group_method_id), limit), 0))
# Do not optimize rare case: keep the code simple by not
# adding more results from getReservedMessageList if the
......@@ -857,7 +860,7 @@ CREATE TABLE %s (
except:
self._log(WARNING, 'Exception while reserving messages.')
if uid_to_duplicate_uid_list_dict:
to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
to_free_uid_list = ensure_list(uid_to_duplicate_uid_list_dict.keys())
for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
to_free_uid_list += uid_list
try:
......
......@@ -85,7 +85,7 @@ class SQLDict(SQLBase):
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
sql_method_id = " AND method_id = %s AND group_method_id = %s" % (
sql_method_id = b" AND method_id = %s AND group_method_id = %s" % (
quote(method_id), quote(line.group_method_id))
m = Message.load(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent')
......@@ -102,11 +102,11 @@ class SQLDict(SQLBase):
uid_list = []
if path_list:
# Select parent messages.
result = Results(db.query("SELECT * FROM message"
" WHERE processing_node IN (0, %s) AND path IN (%s)%s"
" ORDER BY path LIMIT 1 FOR UPDATE" % (
result = Results(db.query(b"SELECT * FROM message"
b" WHERE processing_node IN (0, %d) AND path IN (%s)%s"
b" ORDER BY path LIMIT 1 FOR UPDATE" % (
processing_node,
','.join(map(quote, path_list)),
b','.join(map(quote, path_list)),
sql_method_id,
), 0))
if result: # found a parent
......@@ -119,11 +119,11 @@ class SQLDict(SQLBase):
m = Message.load(line.message, uid=uid, line=line)
# return unreserved similar children
path = line.path
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
"%s FOR UPDATE" % (
result = db.query(b"SELECT uid FROM message"
b" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
b"%s FOR UPDATE" % (
quote(path), quote(path.replace('_', r'\_') + '/%'),
sql_method_id,
sql_method_id.encode(),
), 0)[1]
reserve_uid_list = [x for x, in result]
uid_list += reserve_uid_list
......@@ -132,8 +132,8 @@ class SQLDict(SQLBase):
reserve_uid_list.append(uid)
else:
# Select duplicates.
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
result = db.query(b"SELECT uid FROM message"
b" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
quote(path), sql_method_id,
), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
......
......@@ -27,6 +27,7 @@ from __future__ import absolute_import
#
##############################################################################
from six import string_types as basestring
from Products.ERP5Type.Utils import ensure_list
import socket
from six.moves import urllib
......@@ -1356,7 +1357,7 @@ class ActivityTool (BaseTool):
# use a round-robin algorithm.
# XXX: We always finish by iterating over all queues, in case that
# getPriority does not see messages dequeueMessage would process.
activity_list = activity_dict.values()
activity_list = ensure_list(activity_dict.values())
def sort_key(activity):
return activity.getPriority(self, processing_node,
node_family_id_set)
......@@ -1390,7 +1391,7 @@ class ActivityTool (BaseTool):
path = None if obj is None else '/'.join(obj.getPhysicalPath())
db = self.getSQLConnection()
quote = db.string_literal
return bool(db.query("(%s)" % ") UNION ALL (".join(
return bool(db.query(b"(%s)" % b") UNION ALL (".join(
activity.hasActivitySQL(quote, path=path, **kw)
for activity in activity_dict.itervalues()))[1])
......
......@@ -111,6 +111,7 @@ from Shared.DC.ZRDB.TM import TM
from DateTime import DateTime
from zLOG import LOG, ERROR, WARNING
from ZODB.POSException import ConflictError
from Products.ERP5Type.Utils import str2bytes
hosed_connection = (
CR.SERVER_GONE_ERROR,
......@@ -203,7 +204,7 @@ def ord_or_None(s):
return ord(s)
match_select = re.compile(
r'(?:SET\s+STATEMENT\s+(.+?)\s+FOR\s+)?SELECT\s+(.+)',
rb'(?:SET\s+STATEMENT\s+(.+?)\s+FOR\s+)?SELECT\s+(.+)',
re.IGNORECASE | re.DOTALL,
).match
......@@ -417,12 +418,14 @@ class DB(TM):
"""Execute 'query_string' and return at most 'max_rows'."""
self._use_TM and self._register()
desc = None
if not isinstance(query_string, bytes):
query_string = str2bytes(query_string)
# XXX deal with a typical mistake that the user appends
# an unnecessary and rather harmful semicolon at the end.
# Unfortunately, MySQLdb does not want to be graceful.
if query_string[-1:] == ';':
if query_string[-1:] == b';':
query_string = query_string[:-1]
for qs in query_string.split('\0'):
for qs in query_string.split(b'\0'):
qs = qs.strip()
if qs:
select_match = match_select(qs)
......@@ -431,12 +434,12 @@ class DB(TM):
if query_timeout is not None:
statement, select = select_match.groups()
if statement:
statement += ", max_statement_time=%f" % query_timeout
statement += b", max_statement_time=%f" % query_timeout
else:
statement = "max_statement_time=%f" % query_timeout
qs = "SET STATEMENT %s FOR SELECT %s" % (statement, select)
statement = b"max_statement_time=%f" % query_timeout
qs = b"SET STATEMENT %s FOR SELECT %s" % (statement, select)
if max_rows:
qs = "%s LIMIT %d" % (qs, max_rows)
qs = b"%s LIMIT %d" % (qs, max_rows)
c = self._query(qs)
if c:
if desc is not None is not c.describe():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment