Commit 7e387bcb authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: validate a message with a single SQL request per queue

As shown in the following example, on a big catalog table,
MariaDB is able to use several indices at the same time
('...' are obfuscated unique values):

  > analyze select SQL_NO_CACHE uid, relative_url from catalog where reference='...' OR relative_url='...';
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  | id   | select_type | table   | type        | possible_keys          | key                    | key_len | ref  | rows | r_rows | filtered | r_filtered | Extra                                            |
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  |    1 | SIMPLE      | catalog | index_merge | Reference,relative_url | Reference,relative_url | 768,767 | NULL |    2 |   2.00 |   100.00 |     100.00 | Using union(Reference,relative_url); Using where |
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  1 row in set (0.00 sec)

So mixing different dependency types with OR should be fine
(no need to split into more subqueries and join with UNION).
parent 7b634f22
......@@ -113,7 +113,7 @@ class Queue(object):
cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None:
message_list = activity_tool.getDependentMessageList(message)
message_list = activity_tool.getDependentMessageList(message, self)
transaction.commit() # Release locks.
if message_list:
# The result is not empty, so this message is not executable.
......@@ -146,7 +146,7 @@ class Queue(object):
key_list = message.activity_kw.keys()
key_list.sort()
for key in key_list:
method_id = "_validate_%s" % key
method_id = "_validate_" + key
if getattr(self, method_id, None) is not None:
order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0:
......
......@@ -254,25 +254,28 @@ class SQLBase(Queue):
LOG('SQLBase', INFO, 'Got a lock error, retrying...')
# Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag or serialization_tag:
result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag,
count=False,
serialization_tag=serialization_tag)
def getDependentMessageList(self, db, activate_kw, same_queue):
q = db.string_literal
validate_list = []
for k, v in activate_kw.iteritems():
if v is not None:
try:
method = getattr(self, '_validate_' + k, None)
if method:
validate_list.append(' AND '.join(method(v, q)))
except Exception:
LOG('CMFActivity', WARNING, 'invalid %s value: %r' % (k, v),
error=True)
# Prevent validation by depending on anything, at least itself.
validate_list = '1',
same_queue = False
break
if validate_list:
message_list = []
for line in result:
for line in Results(db.query(
"SELECT * FROM %s WHERE processing_node > -10 AND (%s) LIMIT %s" % (
self.sql_table, ' OR '.join(validate_list),
READ_MESSAGE_LIMIT if same_queue else 1), 0)):
m = Message.load(line.message,
line=line,
uid=line.uid,
......@@ -282,36 +285,32 @@ class SQLBase(Queue):
m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m)
return message_list
return ()
def _validate_after_method_id(self, activity_tool, message, value):
return self._validate(activity_tool, method_id=value)
def _validate_after_method_id(self, *args):
return sqltest_dict['method_id'](*args),
def _validate_after_path(self, activity_tool, message, value):
return self._validate(activity_tool, path=value)
def _validate_after_path(self, *args):
return sqltest_dict['path'](*args),
def _validate_after_message_uid(self, activity_tool, message, value):
return self._validate(activity_tool, message_uid=value)
def _validate_after_message_uid(self, *args):
return sqltest_dict['uid'](*args),
def _validate_after_path_and_method_id(self, activity_tool, message, value):
if not (isinstance(value, (tuple, list)) and len(value) == 2):
LOG('CMFActivity', WARNING,
'unable to recognize value for after_path_and_method_id: %r' % (value,))
return []
return self._validate(activity_tool, path=value[0], method_id=value[1])
def _validate_after_path_and_method_id(self, value, quote):
path, method_id = value
return (sqltest_dict['method_id'](method_id, quote),
sqltest_dict['path'](path, quote))
def _validate_after_tag(self, activity_tool, message, value):
return self._validate(activity_tool, tag=value)
def _validate_after_tag(self, *args):
return sqltest_dict['tag'](*args),
def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id
if not (isinstance(value, (tuple, list)) and len(value) == 2):
LOG('CMFActivity', WARNING,
'unable to recognize value for after_tag_and_method_id: %r' % (value,))
return []
return self._validate(activity_tool, tag=value[0], method_id=value[1])
def _validate_after_tag_and_method_id(self, value, quote):
tag, method_id = value
return (sqltest_dict['method_id'](method_id, quote),
sqltest_dict['tag'](tag, quote))
def _validate_serialization_tag(self, activity_tool, message, value):
return self._validate(activity_tool, serialization_tag=value)
def _validate_serialization_tag(self, *args):
return 'processing_node > -1', sqltest_dict['serialization_tag'](*args)
def _log(self, severity, summary):
LOG(self.__class__.__name__, severity, summary,
......
......@@ -1549,17 +1549,13 @@ class ActivityTool (BaseTool):
return obj
security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message):
message_list = []
for validator_id, validation_value in message.activity_kw.iteritems():
method_id = "_validate_" + validator_id
for activity in activity_dict.itervalues():
method = getattr(activity, method_id, None)
if method is not None:
result = method(self, message, validation_value)
if result:
message_list += [(activity, m) for m in result]
return message_list
def getDependentMessageList(self, message, validating_queue=None):
activity_kw = message.activity_kw
db = self.getSQLConnection()
return [(activity, m)
for activity in activity_dict.itervalues()
for m in activity.getDependentMessageList(
db, activity_kw, activity is validating_queue)]
# Required for tests (time shift)
def timeShift(self, delay):
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
method_id
message_uid
path
tag
count
serialization_tag
</params>
SELECT
<dtml-if expr="count">
COUNT(*) AS uid_count
<dtml-else>
*
</dtml-if>
FROM
<dtml-var table>
WHERE
processing_node > -10
<dtml-if expr="method_id">
AND method_id IN (
<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="message_uid is not None">AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if expr="path">
AND path IN (
<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="tag">
AND tag IN (
<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="serialization_tag is not None">
AND processing_node > -1
AND serialization_tag = <dtml-sqlvar serialization_tag type="string">
</dtml-if>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment