Commit f17a5136 authored by Jean-Paul Smets's avatar Jean-Paul Smets

registration API completed

more tests needed


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@662 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 13e9eaa2
...@@ -81,17 +81,11 @@ class Queue: ...@@ -81,17 +81,11 @@ class Queue:
def queueMessage(self, activity_tool, m): def queueMessage(self, activity_tool, m):
activity_tool.deferredQueueMessage(self, m) activity_tool.deferredQueueMessage(self, m)
m.is_queued = 1
def deleteMessage(self, activity_tool, m): def deleteMessage(self, activity_tool, m):
if not self.isMessageDeleted(activity_tool, m):
activity_tool.deferredDeleteMessage(self, m) activity_tool.deferredDeleteMessage(self, m)
m.is_deleted = 1 # We must never deleted twice
def isDeleted(self, m):
return m.is_deleted
def isQueued(self, m):
return m.is_queued
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
pass pass
...@@ -172,3 +166,26 @@ class Queue: ...@@ -172,3 +166,26 @@ class Queue:
def finishDequeueMessage(self, activity_tool, m): def finishDequeueMessage(self, activity_tool, m):
pass pass
# Registration Management
def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__
if not hasattr(activity_buffer, '_%s_message_list' % class_name):
setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
return m in getattr(activity_buffer, '_%s_message_list' % class_name)
def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
m.is_registered = 1
def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0
def getRegisteredMessageList(self, activity_buffer, activity_tool):
class_name = self.__class__.__name__
return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
\ No newline at end of file
...@@ -46,6 +46,7 @@ class RAMDict(Queue): ...@@ -46,6 +46,7 @@ class RAMDict(Queue):
self.dict = {} self.dict = {}
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
if m.is_registered:
self.dict[(m.object_path, m.method_id)] = m self.dict[(m.object_path, m.method_id)] = m
def finishDeleteMessage(self, activity_tool, message): def finishDeleteMessage(self, activity_tool, message):
...@@ -53,6 +54,24 @@ class RAMDict(Queue): ...@@ -53,6 +54,24 @@ class RAMDict(Queue):
if m.object_path == message.object_path and m.method_id == message.method_id: if m.object_path == message.object_path and m.method_id == message.method_id:
del self.dict[(m.object_path, m.method_id)] del self.dict[(m.object_path, m.method_id)]
def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__
if not hasattr(activity_buffer, '_%s_message_list' % class_name):
setattr(activity_buffer, '_%s_message_list' % class_name, [])
setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__
self.registerActivityBuffer(activity_buffer)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1
m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.dict.keys()) is 0: if len(self.dict.keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
...@@ -71,9 +90,30 @@ class RAMDict(Queue): ...@@ -71,9 +90,30 @@ class RAMDict(Queue):
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {}
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
self.unregisterMessage(m)
if not method_dict.has_key(method_id):
if invoke:
# First Validate
if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Parse each message in RAM dict
for key, m in self.dict.items(): for key, m in self.dict.items():
if not m.is_deleted: if not m.is_deleted:
if m.object_path == object_path: if object_path == m.object_path and (method_id is None or method_id == m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.deleteMessage(m) self.deleteMessage(m)
......
...@@ -34,21 +34,22 @@ class RAMQueue(Queue): ...@@ -34,21 +34,22 @@ class RAMQueue(Queue):
""" """
A simple RAM based queue A simple RAM based queue
""" """
message_queue_id = 0
def __init__(self): def __init__(self):
Queue.__init__(self) Queue.__init__(self)
self.queue = [] self.queue = []
self.last_uid = 0
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
self.message_queue_id = self.message_queue_id + 1 if m.is_registered:
m.message_queue_id = self.message_queue_id # XXX - Some lock is required on this section
self.last_uid = self.last_uid + 1
m.uid = self.last_uid
self.queue.append(m) self.queue.append(m)
def finishDeleteMessage(self, activity_tool, m): def finishDeleteMessage(self, activity_tool, m):
i = 0 i = 0
for my_message in self.queue: for my_message in self.queue:
if my_message.message_queue_id == m.message_queue_id: if my_message.uid == m.uid:
del self.queue[i] del self.queue[i]
return return
i = i + 1 i = i + 1
...@@ -69,9 +70,14 @@ class RAMQueue(Queue): ...@@ -69,9 +70,14 @@ class RAMQueue(Queue):
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m)
self.unregisterMessage(m)
# Parse each message in queue
for m in self.queue: for m in self.queue:
if not m.is_deleted: if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.object_path == object_path:
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.deleteMessage(m) self.deleteMessage(m)
......
...@@ -51,8 +51,9 @@ class SQLDict(RAMDict): ...@@ -51,8 +51,9 @@ class SQLDict(RAMDict):
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
because use of OOBTree. because use of OOBTree.
""" """
# Transaction commit methods
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
if m.is_registered:
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
...@@ -65,6 +66,32 @@ class SQLDict(RAMDict): ...@@ -65,6 +66,32 @@ class SQLDict(RAMDict):
uid_list = map(lambda x:x.uid, uid_list) uid_list = map(lambda x:x.uid, uid_list)
activity_tool.SQLDict_delMessage(uid = uid_list) activity_tool.SQLDict_delMessage(uid = uid_list)
# Registration management
def registerActivityBuffer(self, activity_buffer):
if not hasattr(activity_buffer, '_sqldict_uid_dict'):
activity_buffer._sqldict_uid_dict = {}
activity_buffer._sqldict_message_list = []
def isMessageRegistered(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
m.is_registered = 1
activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1
activity_buffer._sqldict_message_list.append(m)
def unregisterMessage(self, activity_buffer, activity_tool, m):
self.registerActivityBuffer(activity_buffer)
m.is_registered = 0 # This prevents from inserting deleted messages into the queue
if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)):
del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)]
def getRegisteredMessageList(self, activity_buffer, activity_tool):
return filter(lambda m: m.is_registered, activity_buffer._sqldict_message_list)
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level # Try to find a message at given priority level
...@@ -84,7 +111,7 @@ class SQLDict(RAMDict): ...@@ -84,7 +111,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
m = self.loadMessage(line.message) m = self.loadMessage(line.message, uid = line.uid)
# Make sure object exists # Make sure object exists
if not m.validate(self, activity_tool): if not m.validate(self, activity_tool):
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
...@@ -151,16 +178,33 @@ class SQLDict(RAMDict): ...@@ -151,16 +178,33 @@ class SQLDict(RAMDict):
""" """
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
method_dict = {} method_dict = {}
# Parse each message # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
self.unregisterMessage(m)
if not method_dict.has_key(method_id):
if invoke:
# First Validate
if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
if not method_dict.has_key(method_id): if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(m) self.deleteMessage(m)
if invoke: if invoke:
# First Validate # First Validate
...@@ -188,7 +232,7 @@ class SQLDict(RAMDict): ...@@ -188,7 +232,7 @@ class SQLDict(RAMDict):
message_list = [] message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
for line in result: for line in result:
m = self.loadMessage(line.message) m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node m.processing_node = line.processing_node
m.priority = line.priority m.priority = line.priority
message_list.append(m) message_list.append(m)
......
...@@ -53,6 +53,7 @@ class SQLQueue(RAMQueue): ...@@ -53,6 +53,7 @@ class SQLQueue(RAMQueue):
""" """
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
if m.is_registered:
activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
...@@ -131,10 +132,15 @@ class SQLQueue(RAMQueue): ...@@ -131,10 +132,15 @@ class SQLQueue(RAMQueue):
""" """
return # Do nothing here to precent overlocking return # Do nothing here to precent overlocking
path = '/'.join(object_path) path = '/'.join(object_path)
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m)
self.unregisterMessage(m)
# Parse each message in SQL queue
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
method_dict = {} method_dict = {}
# Parse each message
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
......
...@@ -46,7 +46,6 @@ class ActivityBuffer(TM): ...@@ -46,7 +46,6 @@ class ActivityBuffer(TM):
self._tthread = get_ident() self._tthread = get_ident()
self.requires_prepare = 1 self.requires_prepare = 1
try: try:
LOG("_begin", 0, '')
self.queued_activity = [] self.queued_activity = []
self.flushed_activity = [] self.flushed_activity = []
except: except:
...@@ -101,10 +100,15 @@ class ActivityBuffer(TM): ...@@ -101,10 +100,15 @@ class ActivityBuffer(TM):
def deferredQueueMessage(self, activity_tool, activity, message): def deferredQueueMessage(self, activity_tool, activity, message):
self._register() self._register()
LOG("deferredQueueMessage", 0, '') # Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message):
self.queued_activity.append((activity, activity_tool, message)) self.queued_activity.append((activity, activity_tool, message))
# We register queued messages so that we can
# unregister them
activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message): def deferredDeleteMessage(self, activity_tool, activity, message):
self._register() self._register()
LOG("deferredDeleteMessage", 0, '')
self.flushed_activity.append((activity, activity_tool, message)) self.flushed_activity.append((activity, activity_tool, message))
...@@ -84,9 +84,6 @@ allow_class(Result) ...@@ -84,9 +84,6 @@ allow_class(Result)
class Message: class Message:
is_deleted = 0
is_queued = 0
def __init__(self, object, active_process, activity_kw, method_id, args, kw): def __init__(self, object, active_process, activity_kw, method_id, args, kw):
if type(object) is type('a'): if type(object) is type('a'):
self.object_path = object.split('/') self.object_path = object.split('/')
...@@ -318,6 +315,9 @@ class ActivityTool (Folder, UniqueObject): ...@@ -318,6 +315,9 @@ class ActivityTool (Folder, UniqueObject):
def deferredDeleteMessage(self, activity, message): def deferredDeleteMessage(self, activity, message):
self._v_activity_buffer.deferredDeleteMessage(self, activity, message) self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity):
return activity.getRegisteredMessageList(self, self._v_activity_buffer)
def flush(self, object, invoke=0, **kw): def flush(self, object, invoke=0, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment