Commit d6bce47b authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Implement node families.

The intent is to be able to tell that an independently-defined group of
activity nodes may execute given activity, and no other node.
This allows more flexible parallelism control than serialization_tag.
parent 949c393e
......@@ -70,6 +70,8 @@ class ActiveObject(ExtensionClass.Base):
- "": no node preference
- None (default): let CMFActivity decide between the above 2 choice
(see ActivityTool.activateObject)
- the name of a family: Only processing nodes member of that family
may execute this activity.
at_date -- request execution date for this activate call
(default: date of commit)
......
......@@ -82,7 +82,8 @@ class Queue(object):
activity_tool.deferredDeleteMessage(self, m)
m.is_deleted = 1
def dequeueMessage(self, activity_tool, processing_node):
def dequeueMessage(self, activity_tool, processing_node,
node_family_id_list):
raise NotImplementedError
def distribute(self, activity_tool, node_count):
......@@ -187,7 +188,7 @@ class Queue(object):
"""
pass
def getPriority(self, activity_tool, node):
def getPriority(self, activity_tool, processing_node, node_set):
"""
Get priority from this queue.
Lower number means higher priority value.
......
......@@ -278,8 +278,8 @@ CREATE TABLE %s (
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1")
def getPriority(self, activity_tool, node=None):
if node is None:
def getPriority(self, activity_tool, processing_node, node_set=None):
if node_set is None:
q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table)
......@@ -287,17 +287,19 @@ CREATE TABLE %s (
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % node
q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
node = 'node=%s' % processing_node
# "ALL" on all but one, to incur deduplication cost only once
q = ("SELECT * FROM (%s UNION ALL %s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set)) if node_set else 0),
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, node)
return Queue.getPriority(self, activity_tool, processing_node, node_set)
def _retryOnLockError(self, method, args=(), kw={}):
while True:
......@@ -416,7 +418,7 @@ CREATE TABLE %s (
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None, node=None):
group_method_id=None, node_set=None):
"""
Get and reserve a list of messages.
limit
......@@ -436,7 +438,7 @@ CREATE TABLE %s (
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
if node is None:
if node_set is None:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
......@@ -447,13 +449,15 @@ CREATE TABLE %s (
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % node
node = 'node=%s' % processing_node
result = Results(query(
"SELECT * FROM (%s UNION ALL %s UNION %s) as t"
# "ALL" on all but one, to incur deduplication cost only once
"SELECT * FROM (%s UNION ALL %s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set)) if node_set else 0),
limit), 0))
if result:
# Reserve messages.
......@@ -478,7 +482,8 @@ CREATE TABLE %s (
return m, uid, ()
return load
def getProcessableMessageList(self, activity_tool, processing_node):
def getProcessableMessageList(self, activity_tool, processing_node,
node_family_id_list):
"""
Always true:
For each reserved message, delete redundant messages when it gets
......@@ -523,7 +528,7 @@ CREATE TABLE %s (
result = Results(result)
else:
result = self.getReservedMessageList(db, now_date, processing_node,
1, node=processing_node)
1, node_set=node_family_id_list)
if not result:
break
load = self.getProcessableMessageLoader(db, processing_node)
......@@ -552,7 +557,7 @@ CREATE TABLE %s (
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id, processing_node))
limit, group_method_id, node_family_id_list))
for line in result:
if line.uid in uid_to_duplicate_uid_list_dict:
continue
......@@ -597,9 +602,11 @@ CREATE TABLE %s (
raise
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def dequeueMessage(self, activity_tool, processing_node,
node_family_id_list):
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
self.getProcessableMessageList(activity_tool, processing_node,
node_family_id_list)
if message_list:
# Remove group_id parameter from group_method_id
if group_method_id is not None:
......
......@@ -179,10 +179,10 @@ CREATE TABLE %s (
return None, original_uid, [uid]
return load
def getPriority(self, activity_tool, node):
return SQLDict.getPriority(self, activity_tool)
def getPriority(self, activity_tool, processing_node, node_set):
return SQLDict.getPriority(self, activity_tool, processing_node)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node=None):
limit=None, group_method_id=None, node_set=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)
......@@ -50,8 +50,9 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile
from Acquisition import aq_base, aq_inner, aq_parent
from ActivityBuffer import ActivityBuffer
from ActivityRuntimeEnvironment import BaseMessage
from zExceptions import ExceptionFormatter
from zExceptions import ExceptionFormatter, Redirect
from BTrees.OIBTree import OIBTree
from BTrees.OOBTree import OOBTree
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
from zope.site.hooks import setSite
......@@ -650,6 +651,8 @@ class ActivityTool (BaseTool):
distributingNode = ''
_nodes = ()
_family_list = ()
_node_family_dict = None
activity_creation_trace = False
activity_tracking = False
activity_timing_log = False
......@@ -904,6 +907,203 @@ class ActivityTool (BaseTool):
self._nodes = nodes = new_nodes
return nodes
def _getNodeFamilyIdDict(self):
result = self._node_family_dict
if result is None:
result = self._node_family_dict = OOBTree()
return result
security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyIdSet')
def getCurrentNodeFamilyIdSet(self):
"""
Returns the tuple of family ids current node is member of.
"""
return self._getNodeFamilyIdDict().get(getCurrentNode(), ())
security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyNameSet')
def getCurrentNodeFamilyNameSet(self):
"""
Returns the tuple of family names current node is member of.
"""
return [
self._family_list[-x - 1]
for x in self._getNodeFamilyIdDict().get(getCurrentNode(), ())
]
security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyId')
def getFamilyId(self, name):
"""
Raises ValueError for unknown family names.
"""
# First family is -1, second is -2, etc.
return -self._family_list.index(name) - 1
security.declareProtected(CMFCorePermissions.ManagePortal, 'addNodeToFamily')
def addNodeToFamily(self, node_id, family_name):
"""
Silently does nothing if node is already a member of family_name.
"""
family_id = self.getFamilyId(family_name)
node_family_id_dict = self._getNodeFamilyIdDict()
family_id_list = node_family_id_dict.get(node_id, ())
if family_id not in family_id_list:
node_family_id_dict[node_id] = family_id_list + (family_id, )
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addNodeSetToFamily')
def manage_addNodeSetToFamily(self, family_new_node_list, REQUEST):
"""
Add selected nodes to family.
"""
family_name = REQUEST['manage_addNodeSetToFamily']
if isinstance(family_new_node_list, basestring):
family_new_node_list = [family_new_node_list]
for node_id in family_new_node_list:
self.addNodeToFamily(node_id, family_name)
REQUEST.RESPONSE.redirect(
REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote('Nodes added to family.'),
)
security.declareProtected(CMFCorePermissions.ManagePortal, 'removeNodeFromFamily')
def removeNodeFromFamily(self, node_id, family_name):
"""
Silently does nothing if node is not member of family_name.
"""
family_id = self.getFamilyId(family_name)
node_family_id_dict = self._getNodeFamilyIdDict()
family_id_list = node_family_id_dict.get(node_id, ())
if family_id in family_id_list:
node_family_id_dict[node_id] = tuple(
x
for x in family_id_list
if x != family_id
)
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeNodeSetFromFamily')
def manage_removeNodeSetFromFamily(self, REQUEST):
"""
Remove selected nodes from family.
"""
family_name = REQUEST['manage_removeNodeSetFromFamily']
node_to_remove_list = REQUEST['family_member_set_' + family_name]
if isinstance(node_to_remove_list, basestring):
node_to_remove_list = [node_to_remove_list]
for node_id in node_to_remove_list:
self.removeNodeFromFamily(node_id, family_name)
REQUEST.RESPONSE.redirect(
REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
urllib.quote('Nodes removed from family.'),
)
def _checkFamilyName(self, name):
if not isinstance(name, basestring):
raise TypeError('Name must be a string')
if name in self._family_list:
raise ValueError('Already in use')
if name in ('', 'same'):
raise ValueError('Reserved family name')
security.declareProtected(CMFCorePermissions.ManagePortal, 'createFamily')
def createFamily(self, name):
"""
Raises ValueError if family already exists.
"""
self._checkFamilyName(name)
new_family_list = []
for existing_name in self._family_list:
if existing_name is None and name is not None:
new_family_list.append(name)
name = None
else:
new_family_list.append(existing_name)
if name is None:
# A free spot has been recycled.
self._family_list = tuple(new_family_list)
else:
# No free spot, append.
self._family_list += (name, )
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_createFamily')
def manage_createFamily(self, new_family_name, family_new_node_list, REQUEST):
"""Create a family"""
redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
if isinstance(family_new_node_list, basestring):
family_new_node_list = [family_new_node_list]
try:
self.createFamily(new_family_name)
for node_id in family_new_node_list:
self.addNodeToFamily(node_id, new_family_name)
except ValueError as exc:
raise Redirect(redirect_url + urllib.quote(str(exc)))
REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family created.'))
security.declareProtected(CMFCorePermissions.ManagePortal, 'renameFamily')
def renameFamily(self, old_name, new_name):
"""
Raises ValueError if old_name does not exist.
"""
self._checkFamilyName(new_name)
family_list = self._family_list
if old_name not in family_list:
raise ValueError('Unknown family')
self._family_list = tuple(
new_name if x == old_name else x
for x in family_list
)
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_renameFamily')
def manage_renameFamily(self, REQUEST):
"""Rename a family"""
redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
old_family_name = REQUEST['manage_renameFamily']
new_family_name = REQUEST['family_new_name_' + old_family_name]
try:
self.renameFamily(old_family_name, new_family_name)
except ValueError as exc:
raise Redirect(redirect_url + urllib.quote(str(exc)))
REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family renamed.'))
security.declareProtected(CMFCorePermissions.ManagePortal, 'deleteFamily')
def deleteFamily(self, name):
"""
Raises ValueError if name does not exist.
"""
for node_id in self._getNodeFamilyIdDict():
self.removeNodeFromFamily(node_id, name)
self._family_list = tuple(
None if x == name else x
for x in self._family_list
)
security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_deleteFamily')
def manage_deleteFamily(self, REQUEST):
"""Delete families"""
redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
family_name = REQUEST['manage_deleteFamily']
try:
self.deleteFamily(family_name)
except ValueError as exc:
raise Redirect(redirect_url + urllib.quote(str(exc)))
REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family deleted'))
security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNameList')
def getFamilyNameList(self):
"""
Return the list of existing family names.
"""
return [x for x in self._family_list if x is not None]
def getFamilyNodeList(self, family_name):
"""
Return the list of node names in given family.
"""
family_id = self.getFamilyId(family_name)
return [
x
for x, y in self._getNodeFamilyIdDict().items()
if family_id in y
]
def registerNode(self, node):
node_dict = self.getNodeDict()
if node not in node_dict:
......@@ -1117,13 +1317,17 @@ class ActivityTool (BaseTool):
# getPriority does not see messages dequeueMessage would process.
activity_list = activity_dict.values()
def sort_key(activity):
return activity.getPriority(self, processing_node)
return activity.getPriority(self, processing_node,
node_family_id_set)
while is_running_lock.acquire(0):
# May have changed since previous iteration.
node_family_id_set = self.getCurrentNodeFamilyIdSet()
try:
activity_list.sort(key=sort_key) # stable sort
for i, activity in enumerate(activity_list):
# Transaction processing is the responsability of the activity
if not activity.dequeueMessage(inner_self, processing_node):
if not activity.dequeueMessage(inner_self, processing_node,
node_family_id_set):
activity_list.append(activity_list.pop(i))
break
else:
......@@ -1219,15 +1423,19 @@ class ActivityTool (BaseTool):
# sure that this node won't overprioritize too many activities.
or kw.get('group_method_id', '') != ''):
break
elif node == '':
else:
node = 'same'
if node == '':
break
elif node != 'same':
raise ValueError("Invalid node argument %r" % node)
try:
kw['node'] = 1 + self.getNodeList(
role=ROLE_PROCESSING).index(getCurrentNode())
except ValueError:
pass
elif node == 'same':
try:
node = 1 + self.getNodeList(
role=ROLE_PROCESSING).index(getCurrentNode())
except ValueError:
break
else:
node = self.getFamilyId(node)
kw['node'] = node
break
return ActiveWrapper(self, url, oid, activity,
active_process, active_process_uid, kw,
......
......@@ -217,6 +217,47 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<td>&nbsp;</td>
<td>&nbsp;</td>
</tr>
<tr class="list-header">
<td align="left" valign="top" colspan=2>
<div class="form-label">Node Families</div>
</td>
</tr>
<tr>
<td colspan=2>
<div style="float: left;">
Available Nodes:<br />
<select name="family_new_node_list" size="10" multiple="multiple" style="width: 100%;">
<dtml-in getNodeList prefix="node">
<option value="<dtml-var node_item>"><dtml-var node_item></option>
</dtml-in>
</select><br/>
<input name="new_family_name" />
<button type="submit" class="form-element" name="manage_createFamily:method">Create Family</button>
</div>
<dtml-in getFamilyNameList prefix="family">
<div style="float: left; margin-left: 1em;">
<input value="<dtml-var family_item>" name="family_new_name_<dtml-var family_item>" />(#<dtml-var expr="getFamilyId(family_item)">)<br />
<select name="family_member_set_<dtml-var family_item>" size="10" multiple="multiple" style="width: 100%;">
<dtml-in expr="getFamilyNodeList(family_item) or ['(no members)']" prefix="node">
<option value="<dtml-var node_item>"><dtml-var node_item></option>
</dtml-in>
</select><br />
<button type="submit" class="form-element" name="manage_removeNodeSetFromFamily:method" value="<dtml-var family_item>">Remove Nodes</button>
<button type="submit" class="form-element" name="manage_addNodeSetToFamily:method" value="<dtml-var family_item>">Add Nodes</button>
<div style="border: 1px solid #F00;">
Danger zone:<br />
<button type="submit" class="form-element" name="manage_renameFamily:method" value="<dtml-var family_item>">Rename</button>
<button type="submit" class="form-element" name="manage_deleteFamily:method" value="<dtml-var family_item>">Delete</button>
</div>
</div>
</dtml-in>
<div style="clear: both"></div>
</td>
</tr>
<tr>
<td>&nbsp;</td>
<td>&nbsp;</td>
</tr>
<tr class="list-header">
<td align="left" valign="top" colspan=2>
<div class="form-label">Subscribe/Unsubscribe from Timer Service</div>
......
......@@ -2499,6 +2499,108 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(o.getTitle(), title, (activities, expected))
self.assertFalse(activity_tool.getMessageList())
def test_nodeFamilies(self):
"""
Test node families, i.e. 'node' parameter of activate() beyond "", "same"
and None.
"""
activity_tool = self.portal.portal_activities
node_id, = activity_tool.getNodeDict()
other = 'boo'
member = 'foo'
non_member = 'bar'
does_not_exist = 'baz'
# Family declaration API
self.assertItemsEqual(activity_tool.getFamilyNameList(), [])
self.assertRaises(
ValueError,
activity_tool.createFamily, 'same', # Reserved name
)
self.assertRaises(
TypeError,
activity_tool.createFamily, -5, # Not a string
)
activity_tool.createFamily(other)
self.assertRaises(
ValueError,
activity_tool.createFamily, other, # Exists
)
activity_tool.createFamily(member)
self.assertRaises(
ValueError,
activity_tool.renameFamily, other, member, # New name exists
)
self.assertRaises(
ValueError,
activity_tool.renameFamily, does_not_exist, member, # Old name does not exist
)
self.assertRaises(
TypeError,
activity_tool.renameFamily, other, -4, # New name not a string
)
activity_tool.deleteFamily(member)
# Silent success
activity_tool.deleteFamily(member)
activity_tool.createFamily(non_member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
# API for node a-/di-ssociation with/from families
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [])
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.addNodeToFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, non_member])
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.createFamily(does_not_exist)
activity_tool.addNodeToFamily(node_id, does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, does_not_exist])
activity_tool.deleteFamily(does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
activity_tool.renameFamily(other, member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
activity_tool.createFamily(other)
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member, other])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member, other])
activity_tool.deleteFamily(other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
o = self.getOrganisation()
for activity in 'SQLDict', 'SQLQueue':
# Sanity check.
self.assertEqual(self.getMessageList(activity), [])
self.assertRaises(
ValueError,
o.activate, activity=activity, node=does_not_exist,
)
for node, expected in (member, '1'), (non_member, '0'), ('', '1'), ('same', '1'):
o._setTitle('0')
o.activate(activity=activity, node=node)._setTitle('1')
self.commit()
self.ticOnce()
self.assertEqual(
o.getTitle(),
expected,
(activity, o.getTitle(), expected),
)
if expected == '0':
# The activity must still exist, waiting for a node of the
# appropriate family.
result = self.getMessageList(activity)
self.assertEqual(len(result), 1)
self.deleteMessageList(activity, result)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment