diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 72071b0c7a86cb8b306750cf8c613736ef26ff82..56f092fa63a8a6459afb901aa726d627e891083a 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -79,19 +79,13 @@ class Queue: self.activity_tool = activity_tool self.is_initialized = 1 - def queueMessage(self, activity_tool, m): + def queueMessage(self, activity_tool, m): activity_tool.deferredQueueMessage(self, m) - m.is_queued = 1 def deleteMessage(self, activity_tool, m): - activity_tool.deferredDeleteMessage(self, m) - m.is_deleted = 1 - - def isDeleted(self, m): - return m.is_deleted - - def isQueued(self, m): - return m.is_queued + if not self.isMessageDeleted(activity_tool, m): + activity_tool.deferredDeleteMessage(self, m) + # We must never deleted twice def dequeueMessage(self, activity_tool, processing_node): pass @@ -137,7 +131,7 @@ class Queue: def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw): return 0 - def flush(self, activity_tool, object, **kw): + def flush(self, activity_tool, object, **kw): pass def start(self, active_process=None): @@ -171,4 +165,27 @@ class Queue: def finishDequeueMessage(self, activity_tool, m): pass - \ No newline at end of file + + # Registration Management + def registerActivityBuffer(self, activity_buffer): + class_name = self.__class__.__name__ + if not hasattr(activity_buffer, '_%s_message_list' % class_name): + setattr(activity_buffer, '_%s_message_list' % class_name, []) + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + return m in getattr(activity_buffer, '_%s_message_list' % class_name) + + def registerMessage(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + getattr(activity_buffer, '_%s_message_list' % class_name).append(m) + m.is_registered = 1 + + def unregisterMessage(self, activity_buffer, activity_tool, m): + m.is_registered = 0 + + def getRegisteredMessageList(self, activity_buffer, activity_tool): + class_name = self.__class__.__name__ + return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name)) \ No newline at end of file diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index 9b630540942803e3956316790e6ba4d29e23087e..dc5a9e6ef55c45ba481c9197a4efc86eb6ec28d5 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -46,13 +46,32 @@ class RAMDict(Queue): self.dict = {} def finishQueueMessage(self, activity_tool, m): - self.dict[(m.object_path, m.method_id)] = m + if m.is_registered: + self.dict[(m.object_path, m.method_id)] = m def finishDeleteMessage(self, activity_tool, message): for key, m in self.dict.items(): if m.object_path == message.object_path and m.method_id == message.method_id: del self.dict[(m.object_path, m.method_id)] + def registerActivityBuffer(self, activity_buffer): + class_name = self.__class__.__name__ + if not hasattr(activity_buffer, '_%s_message_list' % class_name): + setattr(activity_buffer, '_%s_message_list' % class_name, []) + setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id)) + + def registerMessage(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + getattr(activity_buffer, '_%s_message_list' % class_name).append(m) + getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1 + m.is_registered = 1 + def dequeueMessage(self, activity_tool, processing_node): if len(self.dict.keys()) is 0: return 1 # Go to sleep @@ -70,10 +89,31 @@ class RAMDict(Queue): return 1 return 0 - def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + path = '/'.join(object_path) + # LOG('Flush', 0, str((path, invoke, method_id))) + method_dict = {} + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + self.unregisterMessage(m) + if not method_dict.has_key(method_id): + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: + # The message no longer exists + raise ActivityFlushError, ( + 'The document %s does not exist' % path) + # Parse each message in RAM dict for key, m in self.dict.items(): if not m.is_deleted: - if m.object_path == object_path: + if object_path == m.object_path and (method_id is None or method_id == m.method_id): LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) if invoke: activity_tool.invoke(m) self.deleteMessage(m) diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index a63d622628e9c4f26d60036d511927c27e4878c0..46e95df4fe19c2d551979d0f88dbd75868d23e79 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -34,21 +34,22 @@ class RAMQueue(Queue): """ A simple RAM based queue """ - message_queue_id = 0 - def __init__(self): Queue.__init__(self) self.queue = [] - + self.last_uid = 0 + def finishQueueMessage(self, activity_tool, m): - self.message_queue_id = self.message_queue_id + 1 - m.message_queue_id = self.message_queue_id - self.queue.append(m) + if m.is_registered: + # XXX - Some lock is required on this section + self.last_uid = self.last_uid + 1 + m.uid = self.last_uid + self.queue.append(m) def finishDeleteMessage(self, activity_tool, m): i = 0 for my_message in self.queue: - if my_message.message_queue_id == m.message_queue_id: + if my_message.uid == m.uid: del self.queue[i] return i = i + 1 @@ -69,11 +70,16 @@ class RAMQueue(Queue): return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.unregisterMessage(m) + # Parse each message in queue for m in self.queue: - if not m.is_deleted: - if m.object_path == object_path: - if invoke: activity_tool.invoke(m) - self.deleteMessage(m) + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.deleteMessage(m) def getMessageList(self, activity_tool, processing_node=None): new_queue = [] diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 787f6c72ba18bd211b25bd1dbf62540fbd458aa8..c87826136584c4048729b3c331e14547e7ff1b93 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -51,13 +51,14 @@ class SQLDict(RAMDict): and provide sequentiality. Should not create conflict because use of OOBTree. """ - + # Transaction commit methods def prepareQueueMessage(self, activity_tool, m): - activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , - method_id = m.method_id, - priority = m.activity_kw.get('priority', 1), - message = self.dumpMessage(m)) - # Also store uid of activity + if m.is_registered: + activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , + method_id = m.method_id, + priority = m.activity_kw.get('priority', 1), + message = self.dumpMessage(m)) + # Also store uid of activity def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction @@ -65,6 +66,32 @@ class SQLDict(RAMDict): uid_list = map(lambda x:x.uid, uid_list) activity_tool.SQLDict_delMessage(uid = uid_list) + # Registration management + def registerActivityBuffer(self, activity_buffer): + if not hasattr(activity_buffer, '_sqldict_uid_dict'): + activity_buffer._sqldict_uid_dict = {} + activity_buffer._sqldict_message_list = [] + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)) + + def registerMessage(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + m.is_registered = 1 + activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1 + activity_buffer._sqldict_message_list.append(m) + + def unregisterMessage(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + m.is_registered = 0 # This prevents from inserting deleted messages into the queue + if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)): + del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] + + def getRegisteredMessageList(self, activity_buffer, activity_tool): + return filter(lambda m: m.is_registered, activity_buffer._sqldict_message_list) + + # Queue semantic def dequeueMessage(self, activity_tool, processing_node): priority = random.choice(priority_weight) # Try to find a message at given priority level @@ -84,7 +111,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_processMessage(uid = uid_list) get_transaction().commit() # Release locks before starting a potentially long calculation # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) # Make sure object exists if not m.validate(self, activity_tool): if line.priority > MAX_PRIORITY: @@ -151,16 +178,33 @@ class SQLDict(RAMDict): """ path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) - result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {} - # Parse each message + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + self.unregisterMessage(m) + if not method_dict.has_key(method_id): + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: + # The message no longer exists + raise ActivityFlushError, ( + 'The document %s does not exist' % path) + # Parse each message in SQL dict + result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) for line in result: path = line.path method_id = line.method_id if not method_dict.has_key(method_id): # Only invoke once (it would be different for a queue) method_dict[method_id] = 1 - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) self.deleteMessage(m) if invoke: # First Validate @@ -188,7 +232,7 @@ class SQLDict(RAMDict): message_list = [] result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) for line in result: - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) m.processing_node = line.processing_node m.priority = line.priority message_list.append(m) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index b4b12edc8d156dde75ea31c6e099a923b5758072..ac14baed9adffb690d6583f6be33206608977540 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -53,10 +53,11 @@ class SQLQueue(RAMQueue): """ def prepareQueueMessage(self, activity_tool, m): - activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , - method_id = m.method_id, - priority = m.activity_kw.get('priority', 1), - message = self.dumpMessage(m)) + if m.is_registered: + activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , + method_id = m.method_id, + priority = m.activity_kw.get('priority', 1), + message = self.dumpMessage(m)) def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction @@ -131,10 +132,15 @@ class SQLQueue(RAMQueue): """ return # Do nothing here to precent overlocking path = '/'.join(object_path) + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.unregisterMessage(m) + # Parse each message in SQL queue # LOG('Flush', 0, str((path, invoke, method_id))) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {} - # Parse each message for line in result: path = line.path method_id = line.method_id diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py index 039c177c1bbf0530e12b7e88953bea8d07cbee15..17917de8d64de9af43ec69a563258dc3e2a32e5c 100755 --- a/product/CMFActivity/ActivityBuffer.py +++ b/product/CMFActivity/ActivityBuffer.py @@ -46,7 +46,6 @@ class ActivityBuffer(TM): self._tthread = get_ident() self.requires_prepare = 1 try: - LOG("_begin", 0, '') self.queued_activity = [] self.flushed_activity = [] except: @@ -100,11 +99,16 @@ class ActivityBuffer(TM): raise def deferredQueueMessage(self, activity_tool, activity, message): - self._register() - LOG("deferredQueueMessage", 0, '') - self.queued_activity.append((activity, activity_tool, message)) + self._register() + # Activity is called to prevent queuing some messages (useful for example + # to prevent reindexing objects multiple times) + if not activity.isMessageRegistered(self, activity_tool, message): + self.queued_activity.append((activity, activity_tool, message)) + # We register queued messages so that we can + # unregister them + activity.registerMessage(self, activity_tool, message) def deferredDeleteMessage(self, activity_tool, activity, message): self._register() - LOG("deferredDeleteMessage", 0, '') self.flushed_activity.append((activity, activity_tool, message)) + diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 8189ba84c8e7e844cf15d71d9853298bf1fad41b..07c702186d7e88ef4efb1916db282a034e903f1e 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -84,9 +84,6 @@ allow_class(Result) class Message: - is_deleted = 0 - is_queued = 0 - def __init__(self, object, active_process, activity_kw, method_id, args, kw): if type(object) is type('a'): self.object_path = object.split('/') @@ -318,6 +315,9 @@ class ActivityTool (Folder, UniqueObject): def deferredDeleteMessage(self, activity, message): self._v_activity_buffer.deferredDeleteMessage(self, activity, message) + def getRegisteredMessageList(self, activity): + return activity.getRegisteredMessageList(self, self._v_activity_buffer) + def flush(self, object, invoke=0, **kw): global is_initialized if not is_initialized: self.initialize()