Commit 597f1823 authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: validate a message with a single SQL request per queue

As shown in the following example, on a big catalog table,
MariaDB is able to use several indices at the same time
('...' are obfuscated unique values):

  > analyze select SQL_NO_CACHE uid, relative_url from catalog where reference='...' OR relative_url='...';
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  | id   | select_type | table   | type        | possible_keys          | key                    | key_len | ref  | rows | r_rows | filtered | r_filtered | Extra                                            |
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  |    1 | SIMPLE      | catalog | index_merge | Reference,relative_url | Reference,relative_url | 768,767 | NULL |    2 |   2.00 |   100.00 |     100.00 | Using union(Reference,relative_url); Using where |
  +------+-------------+---------+-------------+------------------------+------------------------+---------+------+------+--------+----------+------------+--------------------------------------------------+
  1 row in set (0.00 sec)

So mixing different dependency types with OR should be fine
(no need to split into more subqueries and join with UNION).
parent c45ee148
...@@ -113,7 +113,7 @@ class Queue(object): ...@@ -113,7 +113,7 @@ class Queue(object):
cached_result = validation_text_dict.get(message.order_validation_text) cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None: if cached_result is None:
message_list = activity_tool.getDependentMessageList(message) message_list = activity_tool.getDependentMessageList(message, self)
transaction.commit() # Release locks. transaction.commit() # Release locks.
if message_list: if message_list:
# The result is not empty, so this message is not executable. # The result is not empty, so this message is not executable.
...@@ -146,7 +146,7 @@ class Queue(object): ...@@ -146,7 +146,7 @@ class Queue(object):
key_list = message.activity_kw.keys() key_list = message.activity_kw.keys()
key_list.sort() key_list.sort()
for key in key_list: for key in key_list:
method_id = "_validate_%s" % key method_id = "_validate_" + key
if getattr(self, method_id, None) is not None: if getattr(self, method_id, None) is not None:
order_validation_item_list.append((key, message.activity_kw[key])) order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0: if len(order_validation_item_list) == 0:
......
...@@ -254,25 +254,28 @@ class SQLBase(Queue): ...@@ -254,25 +254,28 @@ class SQLBase(Queue):
LOG('SQLBase', INFO, 'Got a lock error, retrying...') LOG('SQLBase', INFO, 'Got a lock error, retrying...')
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None, def getDependentMessageList(self, db, activate_kw, same_queue):
serialization_tag=None): q = db.string_literal
if isinstance(method_id, str): validate_list = []
method_id = [method_id] for k, v in activate_kw.iteritems():
if isinstance(path, str): if v is not None:
path = [path] try:
if isinstance(tag, str): method = getattr(self, '_validate_' + k, None)
tag = [tag] if method:
validate_list.append(' AND '.join(method(v, q)))
if method_id or message_uid or path or tag or serialization_tag: except Exception:
result = activity_tool.SQLBase_validateMessageList(table=self.sql_table, LOG('CMFActivity', WARNING, 'invalid %s value: %r' % (k, v),
method_id=method_id, error=True)
message_uid=message_uid, # Prevent validation by depending on anything, at least itself.
path=path, validate_list = '1',
tag=tag, same_queue = False
count=False, break
serialization_tag=serialization_tag) if validate_list:
message_list = [] message_list = []
for line in result: for line in Results(db.query(
"SELECT * FROM %s WHERE processing_node > -10 AND (%s) LIMIT %s" % (
self.sql_table, ' OR '.join(validate_list),
READ_MESSAGE_LIMIT if same_queue else 1), 0)):
m = Message.load(line.message, m = Message.load(line.message,
line=line, line=line,
uid=line.uid, uid=line.uid,
...@@ -282,36 +285,32 @@ class SQLBase(Queue): ...@@ -282,36 +285,32 @@ class SQLBase(Queue):
m.order_validation_text = self.getOrderValidationText(m) m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m) message_list.append(m)
return message_list return message_list
return ()
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, *args):
return self._validate(activity_tool, method_id=value) return sqltest_dict['method_id'](*args),
def _validate_after_path(self, activity_tool, message, value): def _validate_after_path(self, *args):
return self._validate(activity_tool, path=value) return sqltest_dict['path'](*args),
def _validate_after_message_uid(self, activity_tool, message, value): def _validate_after_message_uid(self, *args):
return self._validate(activity_tool, message_uid=value) return sqltest_dict['uid'](*args),
def _validate_after_path_and_method_id(self, activity_tool, message, value): def _validate_after_path_and_method_id(self, value, quote):
if not (isinstance(value, (tuple, list)) and len(value) == 2): path, method_id = value
LOG('CMFActivity', WARNING, return (sqltest_dict['method_id'](method_id, quote),
'unable to recognize value for after_path_and_method_id: %r' % (value,)) sqltest_dict['path'](path, quote))
return []
return self._validate(activity_tool, path=value[0], method_id=value[1])
def _validate_after_tag(self, activity_tool, message, value): def _validate_after_tag(self, *args):
return self._validate(activity_tool, tag=value) return sqltest_dict['tag'](*args),
def _validate_after_tag_and_method_id(self, activity_tool, message, value): def _validate_after_tag_and_method_id(self, value, quote):
# Count number of occurances of tag and method_id tag, method_id = value
if not (isinstance(value, (tuple, list)) and len(value) == 2): return (sqltest_dict['method_id'](method_id, quote),
LOG('CMFActivity', WARNING, sqltest_dict['tag'](tag, quote))
'unable to recognize value for after_tag_and_method_id: %r' % (value,))
return []
return self._validate(activity_tool, tag=value[0], method_id=value[1])
def _validate_serialization_tag(self, activity_tool, message, value): def _validate_serialization_tag(self, *args):
return self._validate(activity_tool, serialization_tag=value) return 'processing_node > -1', sqltest_dict['serialization_tag'](*args)
def _log(self, severity, summary): def _log(self, severity, summary):
LOG(self.__class__.__name__, severity, summary, LOG(self.__class__.__name__, severity, summary,
......
...@@ -1549,17 +1549,13 @@ class ActivityTool (BaseTool): ...@@ -1549,17 +1549,13 @@ class ActivityTool (BaseTool):
return obj return obj
security.declarePrivate('getDependentMessageList') security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message): def getDependentMessageList(self, message, validating_queue=None):
message_list = [] activity_kw = message.activity_kw
for validator_id, validation_value in message.activity_kw.iteritems(): db = self.getSQLConnection()
method_id = "_validate_" + validator_id return [(activity, m)
for activity in activity_dict.itervalues(): for activity in activity_dict.itervalues()
method = getattr(activity, method_id, None) for m in activity.getDependentMessageList(
if method is not None: db, activity_kw, activity is validating_queue)]
result = method(self, message, validation_value)
if result:
message_list += [(activity, m) for m in result]
return message_list
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, delay): def timeShift(self, delay):
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
method_id
message_uid
path
tag
count
serialization_tag
</params>
SELECT
<dtml-if expr="count">
COUNT(*) AS uid_count
<dtml-else>
*
</dtml-if>
FROM
<dtml-var table>
WHERE
processing_node > -10
<dtml-if expr="method_id">
AND method_id IN (
<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="message_uid is not None">AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if expr="path">
AND path IN (
<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="tag">
AND tag IN (
<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="serialization_tag is not None">
AND processing_node > -1
AND serialization_tag = <dtml-sqlvar serialization_tag type="string">
</dtml-if>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment