Commit 5b96a405 authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity.Activity.SQLBase: Limit the number of subqueries per dependency lookup query.

parent c629ee70
...@@ -68,6 +68,12 @@ UID_SAFE_BITSIZE = 63 ...@@ -68,6 +68,12 @@ UID_SAFE_BITSIZE = 63
# enough while yielding one order of magnitude collision probability # enough while yielding one order of magnitude collision probability
# improvement. # improvement.
UID_ALLOCATION_TRY_COUNT = 10 UID_ALLOCATION_TRY_COUNT = 10
# Limit the number of UNION-joined subqueries per query when looking for
# blocking activities. Used to take a slice from a list.
# XXX: 5000 is known to work on a case on "after_tag" dependency, which fails
# at 5400 with:
# ProgrammingError: (1064, "memory exhausted near [...]")
_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT = -5000
def sort_message_key(message): def sort_message_key(message):
# same sort key as in SQLBase.getMessageList # same sort key as in SQLBase.getMessageList
...@@ -592,22 +598,30 @@ CREATE TABLE %s ( ...@@ -592,22 +598,30 @@ CREATE TABLE %s (
base_sql_prefix = '(SELECT %s FROM ' % ( base_sql_prefix = '(SELECT %s FROM ' % (
','.join(column_list), ','.join(column_list),
) )
for row in db.query( subquery_list = [
' UNION '.join( base_sql_prefix + table_name + sql_suffix
base_sql_prefix + table_name + sql_suffix for table_name in table_name_list
for table_name in table_name_list for sql_suffix in sql_suffix_list
for sql_suffix in sql_suffix_list ]
), while subquery_list:
max_rows=0, # Join queries with a UNION, to reduce per-query latency.
)[1]: # Also, limit the number of subqueries per query, as their number can
# Each row is a value which blocks some activities. # largely exceed the number of activities being considered multiplied
dependent_message_set = dependency_value_dict[row2key(row)] # by the number of activty tables: it is also proportional to the
# queue blocked messages for processing in the beginning of next # number of distinct values being looked for in the current column.
# outermost iteration. for row in db.query(
new_blocked_message_set.update(dependent_message_set) ' UNION '.join(subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]),
# ...but update result immediately, in case there is no next max_rows=0,
# outermost iteration. )[1]:
result.difference_update(dependent_message_set) # Each row is a value which blocks some activities.
dependent_message_set = dependency_value_dict[row2key(row)]
# queue blocked messages for processing in the beginning of next
# outermost iteration.
new_blocked_message_set.update(dependent_message_set)
# ...but update result immediately, in case there is no next
# outermost iteration.
result.difference_update(dependent_message_set)
del subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]
dependency_value_dict.clear() dependency_value_dict.clear()
return result return result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment