CMFActivity: new activate() parameter to prefer executing on the same node

This implements a special case of node specialization, to make better
use of the ZODB Storage cache. By default, a non-grouped message is
marked to be executed by the same node that created it, if the object
is not a tool and if it was not activated by path. This can be
overridden (either forced or prevented) using a new 'node' activate()
parameter. See message of the first merged commits for details, and
also ActiveObject.activate() docstring. For SQLDict & SQLQueue only.

In the future, the new 'node' argument could accept any other string
value that refers to a group of nodes. Groups would be defined on the
activity tool, and be assigned negative integers. Contrary to what is
implemented here, such specialization would be strict, in that a node
would never process a message for a group it does not belong.

......@@ -6,6 +6,7 @@ for document, root_document_path in zip(getPath, getRootDocumentPath):
serialization_tag='full_text_' + root_document_path,
......@@ -60,9 +60,31 @@ class ActiveObject(ExtensionClass.Base):
activate_kw=None, REQUEST=None, **kw):
"""Returns an active wrapper for this object.
Reserved Optional parameters:
priority -- any integer between -128 and 127 included
(default: 1)
node -- SQLDict & SQLQueue only;
can be one of the following values:
- "same": prefer execution on this node, to make
better use of the ZODB Storage cache
- "": no node preference
- None (default): let CMFActivity decide between the above 2 choice
(see ActivityTool.activateObject)
at_date -- request execution date for this activate call
(default: date of commit)
Messages are executed according to the following ordering:
priority, node_preference, date
where node_preference is:
-1 -> same node
0 -> no preferred node
1 -> another node
Validation parameters:
after_method_id -- never validate message if after_method_id
is in the list of methods which are
......@@ -187,11 +187,11 @@ class Queue(object):
def getPriority(self, activity_tool):
def getPriority(self, activity_tool, node):
Get priority from this queue.
Lower number means higher priority value.
Legal value range is [-128, 127].
Legal value range is [-385, 382].
Values out of this range might work, but are non-standard.
return 128,
return 384,
......@@ -134,6 +134,7 @@ CREATE TABLE %s (
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
......@@ -141,7 +142,9 @@ CREATE TABLE %s (
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
......@@ -172,7 +175,7 @@ CREATE TABLE %s (
_insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, serialization_tag,"
" priority, node, group_method_id, tag, serialization_tag,"
" message) VALUES\n(%s)")
_insert_separator = "),\n("
......@@ -216,6 +219,7 @@ CREATE TABLE %s (
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
str(m.activity_kw.get('node', 0)),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')),
......@@ -274,12 +278,26 @@ CREATE TABLE %s (
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1")
def getPriority(self, activity_tool):
result = activity_tool.getSQLConnection().query(
"SELECT priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table, 0)[1]
return result[0] if result else Queue.getPriority(self, activity_tool)
def getPriority(self, activity_tool, node=None):
if node is None:
q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table)
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % node
q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, node)
def _retryOnLockError(self, method, args=(), kw={}):
while True:
......@@ -398,7 +416,7 @@ CREATE TABLE %s (
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None, node=None):
Get and reserve a list of messages.
......@@ -418,10 +436,25 @@ CREATE TABLE %s (
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
if 1:
if node is None:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % node
result = Results(query(
"SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
limit), 0))
if result:
# Reserve messages.
uid_list = [x.uid for x in result]
......@@ -490,7 +523,7 @@ CREATE TABLE %s (
result = Results(result)
result = self.getReservedMessageList(db, now_date, processing_node,
1, node=processing_node)
if not result:
load = self.getProcessableMessageLoader(db, processing_node)
......@@ -519,7 +552,7 @@ CREATE TABLE %s (
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id))
limit, group_method_id, processing_node))
for line in result:
if line.uid in uid_to_duplicate_uid_list_dict:
......@@ -178,3 +178,11 @@ CREATE TABLE %s (
# earlier.
return None, original_uid, [uid]
return load
def getPriority(self, activity_tool, node):
return SQLDict.getPriority(self, activity_tool)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)
......@@ -91,6 +91,11 @@ _server_address = None
# Activating a path means we tried to avoid loading useless
# data in cache so there would be no gain to expect.
# And all nodes are likely to have tools already loaded.
# Logging channel definitions
import logging
# Main logging channel
......@@ -1112,7 +1117,7 @@ class ActivityTool (BaseTool):
# getPriority does not see messages dequeueMessage would process.
activity_list = activity_dict.values()
def sort_key(activity):
return activity.getPriority(self)
return activity.getPriority(self, processing_node)
while is_running_lock.acquire(0):
activity_list.sort(key=sort_key) # stable sort
......@@ -1181,7 +1186,9 @@ class ActivityTool (BaseTool):
thread_activity_buffer[my_thread_key] = buffer
return buffer
def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
def activateObject(self, object, activity=DEFAULT_ACTIVITY,
active_process=None, serialization_tag=None,
node=None, **kw):
if active_process is None:
active_process_uid = None
elif isinstance(active_process, str):
......@@ -1201,8 +1208,27 @@ class ActivityTool (BaseTool):
except AttributeError:
url = object.getPhysicalPath()
if kw.get('serialization_tag', False) is None:
del kw['serialization_tag']
if serialization_tag is not None:
kw['serialization_tag'] = serialization_tag
while 1: # not a loop
if node is None:
# The caller lets us decide whether we prefer to execute on same node
# (to increase the efficiency of the ZODB Storage cache).
if (isinstance(object, NO_DEFAULT_NODE_PREFERENCE)
# A grouped activity is the sign we may have many of them so make
# sure that this node won't overprioritize too many activities.
or kw.get('group_method_id', '') != ''):
elif node == '':
elif node != 'same':
raise ValueError("Invalid node argument %r" % node)
kw['node'] = 1 + self.getNodeList(
except ValueError:
return ActiveWrapper(self, url, oid, activity,
active_process, active_process_uid, kw,
getattr(self, 'REQUEST', None))
......@@ -29,6 +29,7 @@
import inspect
import unittest
from functools import wraps
from itertools import product
from Products.ERP5Type.tests.utils import LogInterceptor
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
......@@ -81,6 +82,14 @@ def registerFailingTransactionManager(*args, **kw):
class LockOnce(object):
def __init__(self):
self.acquire = threading.Lock().acquire
def release(self):
class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Different variables used for this test
......@@ -149,6 +158,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user)
def ticOnce(self, *args, **kw):
is_running_lock = ActivityTool.is_running_lock
ActivityTool.is_running_lock = LockOnce()
self.portal.portal_activities.tic(*args, **kw)
ActivityTool.is_running_lock = is_running_lock
def testInvokeAndCancelActivity(self, activity):
......@@ -2453,6 +2470,45 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_address = self.portal.portal_activities.getServerAddress()
self.assertEqual(activity_address, server_address)
def test_nodePreference(self):
Test node preference, i.e. 'node' parameter of activate()
An object is activated by 2 different nodes and the 2 messages are
processed by the node that created the newest one:
- without node preference: they're ordered by date
- with node preference: they're executed in reverse order (the
processing node executes its message first even if it's newer)
Correct ordering of queues is also checked, by including scenarios
in which one message is in SQLDict and the other in SQLQueue.
activity_tool = self.portal.portal_activities
o = self.getOrganisation()
node_dict = dict(activity_tool.getNodeDict())
assert len(node_dict) == 1 and '' not in node_dict, node_dict
before = DateTime() - 1
activities = 'SQLDict', 'SQLQueue'
for activities in product(activities, activities):
for node, expected in (None, '12'), ('', '21'), ('same', '12'):
# The dance around getNodeDict is to simulate the creation of
# activities from 2 different nodes. We also change title in 2
# different ways, so that SQLDict does not merge them.
o.activate(activity=activities[0], node=node)._setTitle('1')
activity_tool.getNodeDict = lambda: node_dict
node_dict[''] = ActivityTool.ROLE_PROCESSING
o.activate(activity=activities[1], node=node, at_date=before
)._setProperty('title', '2')
del node_dict['']
for title in expected:
self.assertEqual(o.getTitle(), title, (activities, expected))
def test_suite():
suite = unittest.TestSuite()
