Commit 551f61f7 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: remove some dead code

parent 0a1adf68
...@@ -90,9 +90,6 @@ class Queue(object): ...@@ -90,9 +90,6 @@ class Queue(object):
if not self.is_initialized: if not self.is_initialized:
self.is_initialized = 1 self.is_initialized = 1
def queueMessage(self, activity_tool, m):
activity_tool.deferredQueueMessage(self, m)
def deleteMessage(self, activity_tool, m): def deleteMessage(self, activity_tool, m):
if not getattr(m, 'is_deleted', 0): if not getattr(m, 'is_deleted', 0):
# We try not to delete twice # We try not to delete twice
...@@ -209,14 +206,6 @@ class Queue(object): ...@@ -209,14 +206,6 @@ class Queue(object):
def flush(self, activity_tool, object, **kw): def flush(self, activity_tool, object, **kw):
pass pass
def start(self, active_process=None):
# Start queue / activities in queue for given process
pass
def stop(self, active_process=None):
# Stop queue / activities in queue for given process
pass
def loadMessage(self, s, **kw): def loadMessage(self, s, **kw):
m = cPickle.load(StringIO(s)) m = cPickle.load(StringIO(s))
m.__dict__.update(kw) m.__dict__.update(kw)
...@@ -255,19 +244,7 @@ class Queue(object): ...@@ -255,19 +244,7 @@ class Queue(object):
# Transaction Management # Transaction Management
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
# Called to prepare transaction commit for queued messages # Called to prepare transaction commit for queued messages
pass raise NotImplementedError
def finishQueueMessage(self, activity_tool_path, m):
# Called to commit queued messages
pass
def prepareDeleteMessage(self, activity_tool, m):
# Called to prepare transaction commit for deleted messages
pass
def finishDeleteMessage(self, activity_tool_path, m):
# Called to commit deleted messages
pass
# Registration Management # Registration Management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
......
...@@ -92,16 +92,6 @@ class SQLDict(SQLBase): ...@@ -92,16 +92,6 @@ class SQLDict(SQLBase):
processing_node_list=None, processing_node_list=None,
order_validation_text_list = order_validation_text_list) order_validation_text_list = order_validation_text_list)
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
path = '/'.join(m.object_path)
order_validation_text = self.getOrderValidationText(m)
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list]
if len(uid_list)>0:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
def generateMessageUID(self, m): def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')) return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
......
...@@ -88,11 +88,6 @@ class SQLQueue(SQLBase): ...@@ -88,11 +88,6 @@ class SQLQueue(SQLBase):
processing_node_list=None, processing_node_list=None,
serialization_tag_list=serialization_tag_list) serialization_tag_list=serialization_tag_list)
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=[m.uid])
def getDuplicateMessageUidList(self, activity_tool, line, processing_node): def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
""" """
Reserve unreserved messages matching given line. Reserve unreserved messages matching given line.
......
...@@ -35,13 +35,11 @@ class ActivityBuffer(TM): ...@@ -35,13 +35,11 @@ class ActivityBuffer(TM):
def __init__(self): def __init__(self):
self.queued_activity = [] self.queued_activity = []
self.flushed_activity = []
self.message_list_dict = {} self.message_list_dict = {}
self.uid_set_dict = {} self.uid_set_dict = {}
def _clear(self): def _clear(self):
del self.queued_activity[:] del self.queued_activity[:]
del self.flushed_activity[:]
self.message_list_dict.clear() self.message_list_dict.clear()
self.uid_set_dict.clear() self.uid_set_dict.clear()
self.activity_tool = None self.activity_tool = None
...@@ -57,7 +55,7 @@ class ActivityBuffer(TM): ...@@ -57,7 +55,7 @@ class ActivityBuffer(TM):
self.activity_tool = activity_tool self.activity_tool = activity_tool
self._activity_tool_path = activity_tool.getPhysicalPath() self._activity_tool_path = activity_tool.getPhysicalPath()
TM._register(self) TM._register(self)
self._prepare_args = 0, 0 self._prepare_args = 0,
if self._prepare_args: if self._prepare_args:
transaction.get().addBeforeCommitHook(self._prepare, self._prepare_args) transaction.get().addBeforeCommitHook(self._prepare, self._prepare_args)
self._prepare_args = None self._prepare_args = None
...@@ -76,35 +74,18 @@ class ActivityBuffer(TM): ...@@ -76,35 +74,18 @@ class ActivityBuffer(TM):
error=sys.exc_info()) error=sys.exc_info())
raise raise
def _finish(self): _abort = _finish = _clear
# LOG('ActivityBuffer', 0, '_finish %r' % (self,))
try:
# Try to push / delete all messages
for activity, message in self.flushed_activity:
activity.finishDeleteMessage(self._activity_tool_path, message)
for activity, message in self.queued_activity:
activity.finishQueueMessage(self._activity_tool_path, message)
except:
LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info())
raise
finally:
self._clear()
_abort = _clear def _prepare(self, queued):
def _prepare(self, flushed, queued):
try: try:
activity_tool = self.activity_tool activity_tool = self.activity_tool
# Try to push / delete all messages # Try to push all messages
for activity, message in self.flushed_activity[flushed:]:
activity.prepareDeleteMessage(activity_tool, message)
activity_dict = {} activity_dict = {}
for activity, message in self.queued_activity[queued:]: for activity, message in self.queued_activity[queued:]:
activity_dict.setdefault(activity, []).append(message) activity_dict.setdefault(activity, []).append(message)
for activity, message_list in activity_dict.iteritems(): for activity, message_list in activity_dict.iteritems():
activity.prepareQueueMessageList(activity_tool, message_list) activity.prepareQueueMessageList(activity_tool, message_list)
self._prepare_args = len(self.flushed_activity), len(self.queued_activity) self._prepare_args = len(self.queued_activity),
except: except:
LOG('ActivityBuffer', ERROR, "exception during _prepare", LOG('ActivityBuffer', ERROR, "exception during _prepare",
error=sys.exc_info()) error=sys.exc_info())
...@@ -120,10 +101,6 @@ class ActivityBuffer(TM): ...@@ -120,10 +101,6 @@ class ActivityBuffer(TM):
# unregister them # unregister them
activity.registerMessage(self, activity_tool, message) activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message):
self._register(activity_tool)
self.flushed_activity.append((activity, message))
def sortKey(self, *ignored): def sortKey(self, *ignored):
"""Activities must be finished before databases commit transactions.""" """Activities must be finished before databases commit transactions."""
return -1 return -1
...@@ -429,7 +429,8 @@ class Method: ...@@ -429,7 +429,8 @@ class Method:
portal_activities = passive_self.getPortalObject().portal_activities portal_activities = passive_self.getPortalObject().portal_activities
if portal_activities.activity_tracking: if portal_activities.activity_tracking:
activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name)) activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
activity_dict[self.__activity].queueMessage(portal_activities, m) portal_activities.getActivityBuffer().deferredQueueMessage(
portal_activities, activity_dict[self.__activity], m)
allow_class(Method) allow_class(Method)
...@@ -1047,14 +1048,6 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1047,14 +1048,6 @@ class ActivityTool (Folder, UniqueObject):
active_process = self.unrestrictedTraverse(active_process) active_process = self.unrestrictedTraverse(active_process)
return ActiveWrapper(object, activity, active_process, kw) return ActiveWrapper(object, activity, active_process, kw)
def deferredQueueMessage(self, activity, message):
activity_buffer = self.getActivityBuffer()
activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message):
activity_buffer = self.getActivityBuffer()
activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity): def getRegisteredMessageList(self, activity):
activity_buffer = self.getActivityBuffer(create_if_not_found=False) activity_buffer = self.getActivityBuffer(create_if_not_found=False)
if activity_buffer is not None: if activity_buffer is not None:
...@@ -1080,18 +1073,6 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1080,18 +1073,6 @@ class ActivityTool (Folder, UniqueObject):
for activity in activity_dict.itervalues(): for activity in activity_dict.itervalues():
activity.flush(aq_inner(self), object_path, invoke=invoke, **kw) activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
def start(self, **kw):
if not is_initialized:
self.initialize()
for activity in activity_dict.itervalues():
activity.start(aq_inner(self), **kw)
def stop(self, **kw):
if not is_initialized:
self.initialize()
for activity in activity_dict.itervalues():
activity.stop(aq_inner(self), **kw)
def invoke(self, message): def invoke(self, message):
if self.activity_tracking: if self.activity_tracking:
activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name)) activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment