diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 1d92601129fee1eb9a5789aa387a19a87c5f8adf..4868174c879129e8ea4136917481d149a1f9674d 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -170,6 +170,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing) def getMessageList(self, activity_tool, processing_node=None): + # YO: reading all lines might cause a deadlock message_list = [] result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) for line in result: diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 3ce44547ab76cab8e167e624db7f4b30cb1bc313..bf8a61087f128431df971ec4cecde9c60035d90b 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -127,6 +127,7 @@ class SQLQueue(RAMQueue): NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible """ + return # Do nothing here to precent overlocking path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) if invoke: @@ -167,17 +168,16 @@ class SQLQueue(RAMQueue): def distribute(self, activity_tool, node_count): processing_node = 1 result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages - get_transaction().commit() # Release locks before starting a potentially long calculation - path_dict = {} - for line in result: - path = line.path - if not path_dict.has_key(path): - # Only assign once (it would be different for a queue) - path_dict[path] = 1 - activity_tool.SQLQueue_assignMessage(path=path, processing_node=processing_node) - get_transaction().commit() # Release locks immediately to allow processing of messages - processing_node = processing_node + 1 - if processing_node > node_count: - processing_node = 1 # Round robin + #LOG('distribute count',0,str(len(result)) ) + #LOG('distribute count',0,str(map(lambda x:x.uid, result))) + #get_transaction().commit() # Release locks before starting a potentially long calculation + uid_list = map(lambda x:x.uid, result)[0:100] + for uid in uid_list: + #LOG("distribute", 0, "assign %s" % uid) + activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) + #get_transaction().commit() # Release locks immediately to allow processing of messages + processing_node = processing_node + 1 + if processing_node > node_count: + processing_node = 1 # Round robin registerActivity(SQLQueue)