Commit 3fe33026 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Coramy Optimizations


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@442 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 9b2c1763
...@@ -170,6 +170,7 @@ class SQLDict(RAMDict): ...@@ -170,6 +170,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing) activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
# YO: reading all lines might cause a deadlock
message_list = [] message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
for line in result: for line in result:
......
...@@ -127,6 +127,7 @@ class SQLQueue(RAMQueue): ...@@ -127,6 +127,7 @@ class SQLQueue(RAMQueue):
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
return # Do nothing here to precent overlocking
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
if invoke: if invoke:
...@@ -167,17 +168,16 @@ class SQLQueue(RAMQueue): ...@@ -167,17 +168,16 @@ class SQLQueue(RAMQueue):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation #LOG('distribute count',0,str(len(result)) )
path_dict = {} #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
for line in result: #get_transaction().commit() # Release locks before starting a potentially long calculation
path = line.path uid_list = map(lambda x:x.uid, result)[0:100]
if not path_dict.has_key(path): for uid in uid_list:
# Only assign once (it would be different for a queue) #LOG("distribute", 0, "assign %s" % uid)
path_dict[path] = 1 activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
activity_tool.SQLQueue_assignMessage(path=path, processing_node=processing_node) #get_transaction().commit() # Release locks immediately to allow processing of messages
get_transaction().commit() # Release locks immediately to allow processing of messages processing_node = processing_node + 1
processing_node = processing_node + 1 if processing_node > node_count:
if processing_node > node_count: processing_node = 1 # Round robin
processing_node = 1 # Round robin
registerActivity(SQLQueue) registerActivity(SQLQueue)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment