Commit c5b1fbb6 authored by Sebastien Robin's avatar Sebastien Robin

make RAMQueue working


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@635 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent f9c08522
...@@ -57,12 +57,12 @@ class RAMQueue(Queue): ...@@ -57,12 +57,12 @@ class RAMQueue(Queue):
return 1 return 1
return 0 return 0
def flush(self, activity_tool, object_path, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
new_queue = [] new_queue = []
for m in self.queue: for m in self.queue:
if m.object_path == object_path: if m.object_path == object_path:
activity_tool.invoke(m) if invoke:
del self.dict[key] activity_tool.invoke(m)
else: else:
new_queue.append(m) new_queue.append(m)
self.queue = new_queue self.queue = new_queue
......
...@@ -103,7 +103,8 @@ class Message: ...@@ -103,7 +103,8 @@ class Message:
# Store REQUEST Info ? # Store REQUEST Info ?
def __call__(self, activity_tool): def __call__(self, activity_tool):
try: #try:
if 1:
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Trying to call method %s on object %s' % (self.method_id, self.object_path)) 'Trying to call method %s on object %s' % (self.method_id, self.object_path))
object = activity_tool.unrestrictedTraverse(self.object_path) object = activity_tool.unrestrictedTraverse(self.object_path)
...@@ -116,7 +117,8 @@ class Message: ...@@ -116,7 +117,8 @@ class Message:
active_process = activity_tool.getActiveProcess() active_process = activity_tool.getActiveProcess()
active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future
self.is_executed = 1 self.is_executed = 1
except: else:
#except:
self.is_executed = 0 self.is_executed = 0
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (self.method_id, self.object_path)) 'Could not call method %s on object %s' % (self.method_id, self.object_path))
...@@ -214,7 +216,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -214,7 +216,7 @@ class ActivityTool (Folder, UniqueObject):
def initialize(self): def initialize(self):
global is_initialized global is_initialized
from Activity import RAMQueue, RAMDict, SQLDict, SQLQueue from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
# Initialize each queue # Initialize each queue
for activity in activity_list: for activity in activity_list:
activity.initialize(self) activity.initialize(self)
...@@ -230,11 +232,11 @@ class ActivityTool (Folder, UniqueObject): ...@@ -230,11 +232,11 @@ class ActivityTool (Folder, UniqueObject):
# Call distribute on each queue # Call distribute on each queue
for activity in activity_list: for activity in activity_list:
try: #try:
#if 1: if 1:
activity.distribute(self, node_count) activity.distribute(self, node_count)
except: #except:
#else: else:
LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity) LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)
security.declarePublic('tic') security.declarePublic('tic')
...@@ -262,9 +264,11 @@ class ActivityTool (Folder, UniqueObject): ...@@ -262,9 +264,11 @@ class ActivityTool (Folder, UniqueObject):
# Wakeup each queue # Wakeup each queue
for activity in activity_list: for activity in activity_list:
try: if 1:
#try:
activity.wakeup(self, processing_node) activity.wakeup(self, processing_node)
except: else:
#except:
LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity) LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
# Process messages on each queue in round robin # Process messages on each queue in round robin
...@@ -272,12 +276,12 @@ class ActivityTool (Folder, UniqueObject): ...@@ -272,12 +276,12 @@ class ActivityTool (Folder, UniqueObject):
while has_awake_activity: while has_awake_activity:
has_awake_activity = 0 has_awake_activity = 0
for activity in activity_list: for activity in activity_list:
try: #try:
#if 1: if 1:
activity.tic(self, processing_node) # Transaction processing is the responsability of the activity activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node) has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
except: #except:
#else: else:
LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity) LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity)
# decrease the number of active_threads # decrease the number of active_threads
...@@ -340,7 +344,10 @@ class ActivityTool (Folder, UniqueObject): ...@@ -340,7 +344,10 @@ class ActivityTool (Folder, UniqueObject):
if type(object_path) is type(''): if type(object_path) is type(''):
object_path = tuple(object_path.split('/')) object_path = tuple(object_path.split('/'))
for activity in activity_list: for activity in activity_list:
activity.flush(self, object_path, method_id=method_id, invoke=1) try:
activity.flush(self, object_path, method_id=method_id, invoke=1)
except AttributeError:
LOG('CMFActivity.manageCancel, Warning, could not flush activity on:',0,activity)
if REQUEST is not None: if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities')) return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))
...@@ -351,7 +358,10 @@ class ActivityTool (Folder, UniqueObject): ...@@ -351,7 +358,10 @@ class ActivityTool (Folder, UniqueObject):
if type(object_path) is type(''): if type(object_path) is type(''):
object_path = tuple(object_path.split('/')) object_path = tuple(object_path.split('/'))
for activity in activity_list: for activity in activity_list:
activity.flush(self, object_path, method_id=method_id, invoke=0) try:
activity.flush(self, object_path, method_id=method_id, invoke=0)
except AttributeError:
LOG('CMFActivity.manageCancel, Warning, could not flush activity on:',0,activity)
if REQUEST is not None: if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities')) return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))
...@@ -365,7 +375,10 @@ class ActivityTool (Folder, UniqueObject): ...@@ -365,7 +375,10 @@ class ActivityTool (Folder, UniqueObject):
message_list = [] message_list = []
for activity in activity_list: for activity in activity_list:
message_list += activity.getMessageList(self) try:
message_list += activity.getMessageList(self)
except AttributeError:
LOG('getMessageList, could not get message from Activity:',0,activity)
return message_list return message_list
security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' ) security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment