Commit 1138c067 authored by Jean-Paul Smets's avatar Jean-Paul Smets

bugfixes by JPS

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@678 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 01e04353
...@@ -154,15 +154,19 @@ class Queue: ...@@ -154,15 +154,19 @@ class Queue:
# Transaction Management # Transaction Management
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
# Called to prepare transaction commit for queued messages
pass pass
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
# Called to commit queued messages
pass pass
def prepareDequeueMessage(self, activity_tool, m): def prepareDequeueMessage(self, activity_tool, m):
# Called to prepare transaction commit for deleted messages
pass pass
def finishDequeueMessage(self, activity_tool, m): def finishDequeueMessage(self, activity_tool, m):
# Called to commit deleted messages
pass pass
# Registration Management # Registration Management
......
...@@ -113,7 +113,7 @@ class RAMDict(Queue): ...@@ -113,7 +113,7 @@ class RAMDict(Queue):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.deleteMessage(m) self.deleteMessage(activity_tool, m)
else: else:
pass pass
#LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path)) #LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path))
......
...@@ -59,7 +59,7 @@ class RAMQueue(Queue): ...@@ -59,7 +59,7 @@ class RAMQueue(Queue):
return 1 # Go to sleep return 1 # Go to sleep
m = self.queue[0] m = self.queue[0]
activity_tool.invoke(m) activity_tool.invoke(m)
self.deleteMessage(m) self.deleteMessage(activity_tool, m)
return 0 # Keep on ticking return 0 # Keep on ticking
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
......
...@@ -201,7 +201,7 @@ class SQLDict(RAMDict): ...@@ -201,7 +201,7 @@ class SQLDict(RAMDict):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(m) self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
......
...@@ -148,7 +148,7 @@ class SQLQueue(RAMQueue): ...@@ -148,7 +148,7 @@ class SQLQueue(RAMQueue):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(m) self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
......
...@@ -112,10 +112,9 @@ class Message: ...@@ -112,10 +112,9 @@ class Message:
object = activity_tool.unrestrictedTraverse(self.object_path) object = activity_tool.unrestrictedTraverse(self.object_path)
# Change user if required (TO BE DONE) # Change user if required (TO BE DONE)
# self.activity_kw # self.activity_kw
REQUEST = get_request() activity_tool._v_active_process = self.active_process # Store the active_process as volatile thread variable
REQUEST.active_process = self.active_process
result = getattr(object, self.method_id)(*self.args, **self.kw) result = getattr(object, self.method_id)(*self.args, **self.kw)
if REQUEST.active_process is not None: if activity_tool._v_active_process is not None:
active_process = activity_tool.getActiveProcess() active_process = activity_tool.getActiveProcess()
active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future
self.is_executed = 1 self.is_executed = 1
...@@ -410,9 +409,9 @@ class ActivityTool (Folder, UniqueObject): ...@@ -410,9 +409,9 @@ class ActivityTool (Folder, UniqueObject):
self.immediateReindexObject() self.immediateReindexObject()
def getActiveProcess(self): def getActiveProcess(self):
REQUEST = get_request() active_process = getattr(self, '_v_active_process')
if REQUEST.active_process: if active_process:
return self.unrestrictedTraverse(REQUEST.active_process) return self.unrestrictedTraverse(active_process)
return None return None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment