Commit a13d9764 authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Implement node families.

The intent is to be able to tell that an independently-defined group of
activity nodes may execute given activity, and no other node.
This allows more flexible parallelism control than serialization_tag.
parent 949c393e
......@@ -70,6 +70,8 @@ class ActiveObject(ExtensionClass.Base):
- "": no node preference
- None (default): let CMFActivity decide between the above 2 choice
(see ActivityTool.activateObject)
- the name of a family: Only processing nodes member of that family
may execute this activity.
at_date -- request execution date for this activate call
(default: date of commit)
......
......@@ -82,7 +82,8 @@ class Queue(object):
activity_tool.deferredDeleteMessage(self, m)
m.is_deleted = 1
def dequeueMessage(self, activity_tool, processing_node):
def dequeueMessage(self, activity_tool, processing_node,
node_family_id_list):
raise NotImplementedError
def distribute(self, activity_tool, node_count):
......@@ -187,7 +188,7 @@ class Queue(object):
"""
pass
def getPriority(self, activity_tool, node):
def getPriority(self, activity_tool, processing_node, node_set):
"""
Get priority from this queue.
Lower number means higher priority value.
......
......@@ -278,8 +278,8 @@ CREATE TABLE %s (
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1")
def getPriority(self, activity_tool, node=None):
if node is None:
def getPriority(self, activity_tool, processing_node, node_set=None):
if node_set is None:
q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table)
......@@ -287,17 +287,20 @@ CREATE TABLE %s (
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % node
q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
node = 'node=%s' % processing_node
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, node)
return Queue.getPriority(self, activity_tool, processing_node, node_set)
def _retryOnLockError(self, method, args=(), kw={}):
while True:
......@@ -416,7 +419,7 @@ CREATE TABLE %s (
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None, node=None):
group_method_id=None, node_set=None):
"""
Get and reserve a list of messages.
limit
......@@ -436,7 +439,7 @@ CREATE TABLE %s (
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
if node is None:
if node_set is None:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
......@@ -447,13 +450,16 @@ CREATE TABLE %s (
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % node
node = 'node=%s' % processing_node
result = Results(query(
"SELECT * FROM (%s UNION ALL %s UNION %s) as t"
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
limit), 0))
if result:
# Reserve messages.
......@@ -478,7 +484,8 @@ CREATE TABLE %s (
return m, uid, ()
return load
def getProcessableMessageList(self, activity_tool, processing_node):
def getProcessableMessageList(self, activity_tool, processing_node,
node_family_id_list):
"""
Always true:
For each reserved message, delete redundant messages when it gets
......@@ -523,7 +530,7 @@ CREATE TABLE %s (
result = Results(result)
else:
result = self.getReservedMessageList(db, now_date, processing_node,
1, node=processing_node)
1, node_set=node_family_id_list)
if not result:
break
load = self.getProcessableMessageLoader(db, processing_node)
......@@ -552,7 +559,7 @@ CREATE TABLE %s (
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id, processing_node))
limit, group_method_id, node_family_id_list))
for line in result:
if line.uid in uid_to_duplicate_uid_list_dict:
continue
......@@ -597,9 +604,11 @@ CREATE TABLE %s (
raise
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def dequeueMessage(self, activity_tool, processing_node,
node_family_id_list):
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
self.getProcessableMessageList(activity_tool, processing_node,
node_family_id_list)
if message_list:
# Remove group_id parameter from group_method_id
if group_method_id is not None:
......
......@@ -179,10 +179,10 @@ CREATE TABLE %s (
return None, original_uid, [uid]
return load
def getPriority(self, activity_tool, node):
return SQLDict.getPriority(self, activity_tool)
def getPriority(self, activity_tool, processing_node, node_set):
return SQLDict.getPriority(self, activity_tool, processing_node)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node=None):
limit=None, group_method_id=None, node_set=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)
This diff is collapsed.
......@@ -217,6 +217,47 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<td>&nbsp;</td>
<td>&nbsp;</td>
</tr>
<tr class="list-header">
<td align="left" valign="top" colspan=2>
<div class="form-label">Node Families</div>
</td>
</tr>
<tr>
<td colspan=2>
<div style="float: left;">
Available Nodes:<br />
<select name="family_new_node_list" size="10" multiple="multiple" style="width: 100%;">
<dtml-in getNodeList prefix="node">
<option value="<dtml-var node_item>"><dtml-var node_item></option>
</dtml-in>
</select><br/>
<input name="new_family_name" />
<button type="submit" class="form-element" name="manage_createFamily:method">Create Family</button>
</div>
<dtml-in getFamilyNameList prefix="family">
<div style="float: left; margin-left: 1em;">
<input value="<dtml-var family_item>" name="family_new_name_<dtml-var family_item>" />(#<dtml-var expr="getFamilyId(family_item)">)<br />
<select name="family_member_set_<dtml-var family_item>" size="10" multiple="multiple" style="width: 100%;">
<dtml-in expr="getFamilyNodeList(family_item) or ['(no members)']" prefix="node">
<option value="<dtml-var node_item>"><dtml-var node_item></option>
</dtml-in>
</select><br />
<button type="submit" class="form-element" name="manage_removeNodeSetFromFamily:method" value="<dtml-var family_item>">Remove Nodes</button>
<button type="submit" class="form-element" name="manage_addNodeSetToFamily:method" value="<dtml-var family_item>">Add Nodes</button>
<div style="border: 1px solid #F00;">
Danger zone:<br />
<button type="submit" class="form-element" name="manage_renameFamily:method" value="<dtml-var family_item>">Rename</button>
<button type="submit" class="form-element" name="manage_deleteFamily:method" value="<dtml-var family_item>">Delete</button>
</div>
</div>
</dtml-in>
<div style="clear: both"></div>
</td>
</tr>
<tr>
<td>&nbsp;</td>
<td>&nbsp;</td>
</tr>
<tr class="list-header">
<td align="left" valign="top" colspan=2>
<div class="form-label">Subscribe/Unsubscribe from Timer Service</div>
......
......@@ -2499,6 +2499,108 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(o.getTitle(), title, (activities, expected))
self.assertFalse(activity_tool.getMessageList())
def test_nodeFamilies(self):
"""
Test node families, i.e. 'node' parameter of activate() beyond "", "same"
and None.
"""
activity_tool = self.portal.portal_activities
node_id, = activity_tool.getNodeDict()
other = 'boo'
member = 'foo'
non_member = 'bar'
does_not_exist = 'baz'
# Family declaration API
self.assertItemsEqual(activity_tool.getFamilyNameList(), [])
self.assertRaises(
ValueError,
activity_tool.createFamily, 'same', # Reserved name
)
self.assertRaises(
TypeError,
activity_tool.createFamily, -5, # Not a string
)
activity_tool.createFamily(other)
self.assertRaises(
ValueError,
activity_tool.createFamily, other, # Exists
)
activity_tool.createFamily(member)
self.assertRaises(
ValueError,
activity_tool.renameFamily, other, member, # New name exists
)
self.assertRaises(
ValueError,
activity_tool.renameFamily, does_not_exist, member, # Old name does not exist
)
self.assertRaises(
TypeError,
activity_tool.renameFamily, other, -4, # New name not a string
)
activity_tool.deleteFamily(member)
# Silent success
activity_tool.deleteFamily(member)
activity_tool.createFamily(non_member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
# API for node a-/di-ssociation with/from families
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [])
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.addNodeToFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, non_member])
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.createFamily(does_not_exist)
activity_tool.addNodeToFamily(node_id, does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, does_not_exist])
activity_tool.deleteFamily(does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
activity_tool.renameFamily(other, member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
activity_tool.createFamily(other)
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member, other])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member, other])
activity_tool.deleteFamily(other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
o = self.getOrganisation()
for activity in 'SQLDict', 'SQLQueue':
# Sanity check.
self.assertEqual(self.getMessageList(activity), [])
self.assertRaises(
ValueError,
o.activate, activity=activity, node=does_not_exist,
)
for node, expected in (member, '1'), (non_member, '0'), ('', '1'), ('same', '1'):
o._setTitle('0')
o.activate(activity=activity, node=node)._setTitle('1')
self.commit()
self.ticOnce()
self.assertEqual(
o.getTitle(),
expected,
(activity, o.getTitle(), expected),
)
if expected == '0':
# The activity must still exist, waiting for a node of the
# appropriate family.
result = self.getMessageList(activity)
self.assertEqual(len(result), 1)
self.deleteMessageList(activity, result)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment