Commit 4cbc4eea authored by Vincent Pelletier's avatar Vincent Pelletier

Products.CMFActivity: Distribute all activities pending distribution

Activity distribution (actually: validation) is fundamentally a bottleneck
of the current CMFActivity design: there can be only one distribution/
validation node, and it must check every single activity which express a
dependency over any activity, until these dependencies are satisfied.
As a result, distribution/validation is in the critical path between an
activity being spawned and it being executed, and this work cannot be
parallelised. So care should be taken to waste as little time as possible,
in order to reduce the activity execution latency.

Before this change, CMFActivity would distribute at most 1000 activities
(MAX_VALIDATED_LIMIT) per queue per timerserver wake-up.
In a typical ERP5 setup, timerserver ticks once per second, which means
CMFActivity was unable to validate more than 1000 activities per second
per queue. Maybe there are more activities which are possible to validate
but still the code was forcing the node to sleep until the next wake-up,
which is a tremendous waste of time.

This change fixes that issue by having ActivityTool.distribute keep looping
until there has been an iteration over all activity queues which
simultaneously did not find any activity they could validate. This lets
Zope yield CPU control when it would be better spent processing activities
(those which are preventing the validation of any activities remaining to
validate) without imposing a maximum effective validation-per-second
hard limit.
parent 966fb6a0
...@@ -684,7 +684,7 @@ CREATE TABLE %s ( ...@@ -684,7 +684,7 @@ CREATE TABLE %s (
result = self._getMessageList(db, **where_kw) result = self._getMessageList(db, **where_kw)
if not result: if not result:
transaction.commit() transaction.commit()
return return False
message_list = [Message.load(line.message, uid=line.uid, line=line) message_list = [Message.load(line.message, uid=line.uid, line=line)
for line in result] for line in result]
message_set = self._getExecutableMessageSet(activity_tool, db, message_list) message_set = self._getExecutableMessageSet(activity_tool, db, message_list)
...@@ -715,7 +715,7 @@ CREATE TABLE %s ( ...@@ -715,7 +715,7 @@ CREATE TABLE %s (
validated_count += distributable_count validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT: if validated_count >= MAX_VALIDATED_LIMIT:
transaction.commit() transaction.commit()
return return True
line = result[-1] line = result[-1]
where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid) where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
......
...@@ -1347,9 +1347,20 @@ class ActivityTool (BaseTool): ...@@ -1347,9 +1347,20 @@ class ActivityTool (BaseTool):
""" """
Distribute load Distribute load
""" """
inner_self = aq_inner(self)
while is_running_lock.acquire(0):
try:
# Note: "has_more_to_distribute" is to be taken in a lose sense, we
# do not positively know there is more, just that distribute returned
# before it could confirm there is nothing left to do.
has_more_to_distribute = False
# Call distribute on each queue # Call distribute on each queue
for activity in six.itervalues(activity_dict): for activity in six.itervalues(activity_dict):
activity.distribute(aq_inner(self), node_count) has_more_to_distribute |= activity.distribute(inner_self, node_count)
if not has_more_to_distribute:
break
finally:
is_running_lock.release()
security.declarePublic('tic') security.declarePublic('tic')
def tic(self, processing_node=1, force=0): def tic(self, processing_node=1, force=0):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment