Commit d2e731ce authored by Vincent Pelletier's avatar Vincent Pelletier

Save last used processing node in a global variable. This avoids the case...

Save last used processing node in a global variable. This avoids the case where all activities are assigned to first node when there is just one activity to distribute per distribute call.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@16828 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent bb09358d
...@@ -54,6 +54,7 @@ priority_weight = \ ...@@ -54,6 +54,7 @@ priority_weight = \
[4] * 5 + \ [4] * 5 + \
[5] * 1 [5] * 1
LAST_PROCESSING_NODE = 1
class SQLDict(RAMDict): class SQLDict(RAMDict):
""" """
...@@ -470,6 +471,7 @@ class SQLDict(RAMDict): ...@@ -470,6 +471,7 @@ class SQLDict(RAMDict):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
global LAST_PROCESSING_NODE
now_date = DateTime() now_date = DateTime()
result = readMessageList(path=None, method_id=None, processing_node=-1, result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0) to_date=now_date, include_processing=0)
...@@ -486,7 +488,7 @@ class SQLDict(RAMDict): ...@@ -486,7 +488,7 @@ class SQLDict(RAMDict):
# XXX probably this below can be optimized by assigning multiple messages at a time. # XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {} path_dict = {}
assignMessage = activity_tool.SQLDict_assignMessage assignMessage = activity_tool.SQLDict_assignMessage
processing_node = 1 processing_node = LAST_PROCESSING_NODE
id_tool = activity_tool.getPortalObject().portal_ids id_tool = activity_tool.getPortalObject().portal_ids
for message in message_dict.itervalues(): for message in message_dict.itervalues():
path = '/'.join(message.object_path) path = '/'.join(message.object_path)
...@@ -535,6 +537,7 @@ class SQLDict(RAMDict): ...@@ -535,6 +537,7 @@ class SQLDict(RAMDict):
assignMessage(processing_node=node, uid=[message.uid], broadcast=0) assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages get_transaction().commit() # Release locks immediately to allow processing of messages
LAST_PROCESSING_NODE = processing_node
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
......
...@@ -53,6 +53,8 @@ priority_weight = \ ...@@ -53,6 +53,8 @@ priority_weight = \
[4] * 5 + \ [4] * 5 + \
[5] * 1 [5] * 1
LAST_PROCESSING_NODE = 1
class SQLQueue(RAMQueue): class SQLQueue(RAMQueue):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
...@@ -302,6 +304,7 @@ class SQLQueue(RAMQueue): ...@@ -302,6 +304,7 @@ class SQLQueue(RAMQueue):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
global LAST_PROCESSING_NODE
now_date = DateTime() now_date = DateTime()
result = readMessageList(path=None, method_id=None, result = readMessageList(path=None, method_id=None,
processing_node=-1, to_date=now_date) processing_node=-1, to_date=now_date)
...@@ -318,7 +321,7 @@ class SQLQueue(RAMQueue): ...@@ -318,7 +321,7 @@ class SQLQueue(RAMQueue):
# XXX probably this below can be optimized by assigning multiple messages at a time. # XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {} path_dict = {}
assignMessage = activity_tool.SQLQueue_assignMessage assignMessage = activity_tool.SQLQueue_assignMessage
processing_node = 1 processing_node = LAST_PROCESSING_NODE
id_tool = activity_tool.getPortalObject().portal_ids id_tool = activity_tool.getPortalObject().portal_ids
for message in message_dict.itervalues(): for message in message_dict.itervalues():
path = '/'.join(message.object_path) path = '/'.join(message.object_path)
...@@ -358,6 +361,7 @@ class SQLQueue(RAMQueue): ...@@ -358,6 +361,7 @@ class SQLQueue(RAMQueue):
assignMessage(processing_node=node, uid=message.uid, broadcast=0) assignMessage(processing_node=node, uid=message.uid, broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages get_transaction().commit() # Release locks immediately to allow processing of messages
LAST_PROCESSING_NODE = processing_node
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment