Commit a42da4de authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Do not use offset for scanning messages to validate.

This was inefficient for two reasons:
- any message we could validate during current iteration means a message we
  did not consider is now in the range we just scanned. And it will not be
  considered until validation node starts over and scan this same range
  again.
- "LIMIT x,1000" pattern on >1000 messages causes a quick-growing number of
  extra rows scanned by the SQL database just to skip the "x" first rows:
  at 2000 rows present it must scan 1000 + 2000 = 3000 rows for a complete
  loop over all pending activities. At 3k rows it must scan 6k rows.
  At 4k, 10k.
  While this is an overestimation (some rows should be possible to
  validate, so these would be scanned once only), this overhead grows so
  large that this overestimation can become negligible.

Instead, use a range condition consistent with query's "SORT ON", which is
already efficiently materialised by an index: SQL database just has to
dive into the existing index to start just above the last message from
previous iteration, and resume scanning from there, solving both issues
listed above.
parent ea56fe72
...@@ -108,6 +108,9 @@ def sqltest_dict(): ...@@ -108,6 +108,9 @@ def sqltest_dict():
_('retry') _('retry')
_('to_date', column="date", op="<=") _('to_date', column="date", op="<=")
_('uid') _('uid')
_('from_date', column="date", op=">=")
_('from_priority', column="priority", op=">=")
_('above_uid', column="uid", op=">")
return sqltest_dict return sqltest_dict
sqltest_dict = sqltest_dict() sqltest_dict = sqltest_dict()
...@@ -192,19 +195,18 @@ class SQLBase(Queue): ...@@ -192,19 +195,18 @@ class SQLBase(Queue):
assert len(result[0]) == 1 assert len(result[0]) == 1
return result[0][0] return result[0][0]
def _getMessageList(self, activity_tool, offset=0, count=1000, src__=0, **kw): def _getMessageList(self, activity_tool, count=1000, src__=0, **kw):
# XXX: Because most columns have NOT NULL constraint, conditions with None # XXX: Because most columns have NOT NULL constraint, conditions with None
# value should be ignored, instead of trying to render them # value should be ignored, instead of trying to render them
# (with comparisons with NULL). # (with comparisons with NULL).
sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection
q = sql_connection.sql_quote__ q = sql_connection.sql_quote__
if offset:
limit = '\nLIMIT %d,%d' % (offset, sys.maxint if count is None else count)
else:
limit = '' if count is None else '\nLIMIT %d' % count
sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems()) sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % ( sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
self.sql_table, sql and '\nWHERE ' + sql, limit) self.sql_table,
sql and '\nWHERE ' + sql,
'' if count is None else '\nLIMIT %d' % count,
)
return sql if src__ else Results(sql_connection().query(sql, max_rows=0)) return sql if src__ else Results(sql_connection().query(sql, max_rows=0))
def getMessageList(self, *args, **kw): def getMessageList(self, *args, **kw):
...@@ -349,17 +351,15 @@ class SQLBase(Queue): ...@@ -349,17 +351,15 @@ class SQLBase(Queue):
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None) assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is None: if assignMessage is None:
return return
offset = 0
now_date = self.getNow(activity_tool) now_date = self.getNow(activity_tool)
where_kw = {
'processing_node': -1,
'to_date': now_date,
'count': READ_MESSAGE_LIMIT,
}
validated_count = 0 validated_count = 0
while 1: while 1:
result = self._getMessageList( result = self._getMessageList(activity_tool, **where_kw)
activity_tool,
processing_node=-1,
to_date=now_date,
offset=offset,
count=READ_MESSAGE_LIMIT,
)
if not result: if not result:
return return
transaction.commit() transaction.commit()
...@@ -399,7 +399,9 @@ class SQLBase(Queue): ...@@ -399,7 +399,9 @@ class SQLBase(Queue):
validated_count += distributable_count validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT: if validated_count >= MAX_VALIDATED_LIMIT:
return return
offset += READ_MESSAGE_LIMIT where_kw['from_priority'] = line.priority
where_kw['from_date'] = line.date
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, activity_tool, date, processing_node, def getReservedMessageList(self, activity_tool, date, processing_node,
limit=None, group_method_id=None): limit=None, group_method_id=None):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment