Commit 966cb758 authored by Sebastien Robin's avatar Sebastien Robin

RAMQueue activities are now stored by each cmfsite, and corrected infinite loop


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@720 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 6e895eeb
...@@ -56,19 +56,19 @@ class RAMDict(Queue): ...@@ -56,19 +56,19 @@ class RAMDict(Queue):
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
setattr(activity_buffer, '_%s_message_list' % class_name, []) setattr(activity_buffer, '_%s_message_list' % class_name, [])
setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id)) return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
getattr(activity_buffer, '_%s_message_list' % class_name).append(m) getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1 getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1
m.is_registered = 1 m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.dict.keys()) is 0: if len(self.dict.keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
...@@ -76,6 +76,7 @@ class RAMDict(Queue): ...@@ -76,6 +76,7 @@ class RAMDict(Queue):
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) activity_tool.invoke(m)
del self.dict[key] del self.dict[key]
get_transaction().commit()
return 0 return 0
return 1 return 1
...@@ -89,7 +90,7 @@ class RAMDict(Queue): ...@@ -89,7 +90,7 @@ class RAMDict(Queue):
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {} method_dict = {}
......
...@@ -60,13 +60,14 @@ class RAMQueue(Queue): ...@@ -60,13 +60,14 @@ class RAMQueue(Queue):
del queue[i] del queue[i]
return return
i = i + 1 i = i + 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.getQueue(activity_tool)) is 0: if len(self.getQueue(activity_tool)) is 0:
return 1 # Go to sleep return 1 # Go to sleep
m = self.getQueue(activity_tool)[0] m = self.getQueue(activity_tool)[0]
activity_tool.invoke(m) activity_tool.invoke(m)
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
get_transaction().commit()
return 0 # Keep on ticking return 0 # Keep on ticking
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment