Commit 6dba0a9e authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: new activate() parameter to prefer executing on the same node

The goal is to make better use of the ZODB Storage cache. It is common to do
processing on a data set in several sequential transactions: in such case, by
continuing execution of these messages on the same node, data is loaded from
ZODB only once. Without this, and if there are many other messages to process,
processing always continue on a random node, causing much more load from ZODB.

To prevent nodes from having too much work to do, or too little compared to
other nodes, this new parameter is only a hint for CMFActivity. It remains
possible for a node to execute a message that was intended for another node.

Before this commit, a processing node selects the first message(s) according to
the following ordering:

  priority, date

and now:

  priority, node_preference, date

where node_preference is:

  -1 -> same node
   0 -> no preferred node
   1 -> another node

The implementation is tricky for 2 reasons:
- MariaDB can't order this way in a single simple query, so we have 1
  subquery for each case, potentially getting 3 times the wanted maximum of
  messages, then order/filter on the resulting union.
- MariaDB also can't filter efficiently messages for other nodes, so the 3rd
  subquery returns messages for any node, potentially duplicating results from
  the first 2 subqueries. This works because they'll be ordered last.
  Unfortunately, this requires extra indices.

In any case, message reservation must be very efficient, or MariaDB deadlocks
quickly happen, and locking an activity table during reservation reduces
parallelism too much.

In addition to better cache efficiency, this new feature can be used as a
workaround for a bug affecting serialiation_tag, causing IntegrityError when
reindexing many new objects. If you have 2 recursive reindexations for both a
document and one of its lines, and if you have so many messages than grouping
is split between these 2 messages, then you end up with 2 nodes indexing the
same line in parallel: for some tables, the pattern DELETE+INSERT conflicts
since InnoDB does not take any lock when deleting a non-existent row.

If you have many activities creating such documents, you can combine with
grouping and appropriate priority to make sure that such pair of messages won't
be executed on different nodes, except maybe at the end (when there's no
document to create anymore; then activity reexecution may be enough).
For example:

  from Products.CMFActivity.ActivityTool import getCurrentNode
  portal.setPlacelessDefaultReindexParameters(
    activate_kw={'node': 'same', 'priority': priority},
    group_id=getCurrentNode())

where `priority` is the same as the activity containing the above code, which
can also use grouping without increasing the probability of IntegrityError.
parent 594d262e
......@@ -60,9 +60,29 @@ class ActiveObject(ExtensionClass.Base):
activate_kw=None, REQUEST=None, **kw):
"""Returns an active wrapper for this object.
Reserved Optional parameters:
priority -- any integer between -128 and 127 included
(default: 1)
node -- SQLDict & SQLQueue only;
can be one of the following values:
- "same": prefer execution on this node, to make
better use of the ZODB Storage cache
- "": no node preference
at_date -- request execution date for this activate call
(default: date of commit)
Messages are executed according to the following ordering:
priority, node_preference, date
where node_preference is:
-1 -> same node
0 -> no preferred node
1 -> another node
Validation parameters:
after_method_id -- never validate message if after_method_id
is in the list of methods which are
......
......@@ -187,11 +187,11 @@ class Queue(object):
"""
pass
def getPriority(self, activity_tool):
def getPriority(self, activity_tool, node):
"""
Get priority from this queue.
Lower number means higher priority value.
Legal value range is [-128, 127].
Legal value range is [-385, 382].
Values out of this range might work, but are non-standard.
"""
return 128,
return 384,
......@@ -134,6 +134,7 @@ CREATE TABLE %s (
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`node` SMALLINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
......@@ -141,7 +142,9 @@ CREATE TABLE %s (
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
......@@ -172,7 +175,7 @@ CREATE TABLE %s (
_insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, serialization_tag,"
" priority, node, group_method_id, tag, serialization_tag,"
" message) VALUES\n(%s)")
_insert_separator = "),\n("
......@@ -216,6 +219,7 @@ CREATE TABLE %s (
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
str(m.activity_kw.get('node', 0)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')),
......@@ -274,12 +278,26 @@ CREATE TABLE %s (
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1")
def getPriority(self, activity_tool):
result = activity_tool.getSQLConnection().query(
"SELECT priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table, 0)[1]
return result[0] if result else Queue.getPriority(self, activity_tool)
def getPriority(self, activity_tool, node=None):
if node is None:
q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table)
else:
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % node
q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, node)
def _retryOnLockError(self, method, args=(), kw={}):
while True:
......@@ -398,7 +416,7 @@ CREATE TABLE %s (
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None):
group_method_id=None, node=None):
"""
Get and reserve a list of messages.
limit
......@@ -418,10 +436,25 @@ CREATE TABLE %s (
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
if 1:
if node is None:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
else:
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % node
result = Results(query(
"SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
limit), 0))
if result:
# Reserve messages.
uid_list = [x.uid for x in result]
......@@ -490,7 +523,7 @@ CREATE TABLE %s (
result = Results(result)
else:
result = self.getReservedMessageList(db, now_date, processing_node,
1)
1, node=processing_node)
if not result:
break
load = self.getProcessableMessageLoader(db, processing_node)
......@@ -519,7 +552,7 @@ CREATE TABLE %s (
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id))
limit, group_method_id, processing_node))
for line in result:
if line.uid in uid_to_duplicate_uid_list_dict:
continue
......
......@@ -178,3 +178,11 @@ CREATE TABLE %s (
# earlier.
return None, original_uid, [uid]
return load
def getPriority(self, activity_tool, node):
return SQLDict.getPriority(self, activity_tool)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)
......@@ -1112,7 +1112,7 @@ class ActivityTool (BaseTool):
# getPriority does not see messages dequeueMessage would process.
activity_list = activity_dict.values()
def sort_key(activity):
return activity.getPriority(self)
return activity.getPriority(self, processing_node)
while is_running_lock.acquire(0):
try:
activity_list.sort(key=sort_key) # stable sort
......@@ -1181,7 +1181,9 @@ class ActivityTool (BaseTool):
thread_activity_buffer[my_thread_key] = buffer
return buffer
def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
def activateObject(self, object, activity=DEFAULT_ACTIVITY,
active_process=None, serialization_tag=None,
node=None, **kw):
if active_process is None:
active_process_uid = None
elif isinstance(active_process, str):
......@@ -1201,8 +1203,16 @@ class ActivityTool (BaseTool):
except AttributeError:
pass
url = object.getPhysicalPath()
if kw.get('serialization_tag', False) is None:
del kw['serialization_tag']
if serialization_tag is not None:
kw['serialization_tag'] = serialization_tag
if node is not None:
if node != 'same':
raise ValueError("Invalid node argument %r" % node)
try:
kw['node'] = 1 + self.getNodeList(
role=ROLE_PROCESSING).index(getCurrentNode())
except ValueError:
pass
return ActiveWrapper(self, url, oid, activity,
active_process, active_process_uid, kw,
getattr(self, 'REQUEST', None))
......
......@@ -29,6 +29,7 @@
import inspect
import unittest
from functools import wraps
from itertools import product
from Products.ERP5Type.tests.utils import LogInterceptor
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
......@@ -81,6 +82,14 @@ def registerFailingTransactionManager(*args, **kw):
pass
dummy_tm()._register()
class LockOnce(object):
def __init__(self):
self.acquire = threading.Lock().acquire
def release(self):
pass
class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Different variables used for this test
......@@ -149,6 +158,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user)
def ticOnce(self, *args, **kw):
is_running_lock = ActivityTool.is_running_lock
try:
ActivityTool.is_running_lock = LockOnce()
self.portal.portal_activities.tic(*args, **kw)
finally:
ActivityTool.is_running_lock = is_running_lock
@for_each_activity
def testInvokeAndCancelActivity(self, activity):
"""
......@@ -2453,6 +2470,45 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_address = self.portal.portal_activities.getServerAddress()
self.assertEqual(activity_address, server_address)
def test_nodePreference(self):
"""
Test node preference, i.e. 'node' parameter of activate()
An object is activated by 2 different nodes and the 2 messages are
processed by the node that created the newest one:
- without node preference: they're ordered by date
- with node preference: they're executed in reverse order (the
processing node executes its message first even if it's newer)
Correct ordering of queues is also checked, by including scenarios
in which one message is in SQLDict and the other in SQLQueue.
"""
activity_tool = self.portal.portal_activities
o = self.getOrganisation()
node_dict = dict(activity_tool.getNodeDict())
assert len(node_dict) == 1 and '' not in node_dict, node_dict
before = DateTime() - 1
activities = 'SQLDict', 'SQLQueue'
for activities in product(activities, activities):
for node, expected in (None, '21'), ("same", '12'):
o._setTitle('0')
# The dance around getNodeDict is to simulate the creation of
# activities from 2 different nodes. We also change title in 2
# different ways, so that SQLDict does not merge them.
o.activate(activity=activities[0], node=node)._setTitle('1')
activity_tool.getNodeDict = lambda: node_dict
node_dict[''] = ActivityTool.ROLE_PROCESSING
o.activate(activity=activities[1], node=node, at_date=before
)._setProperty('title', '2')
del node_dict['']
activity_tool._p_invalidate()
self.commit()
for title in expected:
self.ticOnce()
self.assertEqual(o.getTitle(), title, (activities, expected))
self.assertFalse(activity_tool.getMessageList())
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment