Commit 04092164 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: allow activating an object from a before commit hook

This is required for workflow scripts that are run at the end of the transaction.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@42257 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent a7c48e44
...@@ -273,7 +273,7 @@ class Queue: ...@@ -273,7 +273,7 @@ class Queue:
return 0 return 0
# Transaction Management # Transaction Management
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessageList(self, activity_tool, message_list):
# Called to prepare transaction commit for queued messages # Called to prepare transaction commit for queued messages
pass pass
......
...@@ -33,108 +33,81 @@ class ActivityBuffer(TM): ...@@ -33,108 +33,81 @@ class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None _p_oid=_p_changed=_registered=None
def __init__(self, activity_tool=None): def __init__(self):
self.requires_prepare = 0 self.queued_activity = []
self.flushed_activity = []
def _getBuffer(self): self.message_list_dict = {}
buffer = self self.uid_set_dict = {}
# Create attributes only if they are not present.
if not hasattr(buffer, 'queued_activity'): def _clear(self):
buffer.queued_activity = [] del self.queued_activity[:]
buffer.flushed_activity = [] del self.flushed_activity[:]
buffer.message_list_dict = {} self.message_list_dict.clear()
buffer.uid_set_dict = {} self.uid_set_dict.clear()
return buffer self.activity_tool = None
def _clearBuffer(self):
buffer = self._getBuffer()
del buffer.queued_activity[:]
del buffer.flushed_activity[:]
buffer.message_list_dict.clear()
buffer.uid_set_dict.clear()
def getMessageList(self, activity): def getMessageList(self, activity):
buffer = self._getBuffer() return self.message_list_dict.setdefault(activity, [])
return buffer.message_list_dict.setdefault(activity, [])
def getUidSet(self, activity): def getUidSet(self, activity):
buffer = self._getBuffer() return self.uid_set_dict.setdefault(activity, set())
return buffer.uid_set_dict.setdefault(activity, set())
def _register(self, activity_tool): def _register(self, activity_tool):
if not self._registered: if not self._registered:
self._beginAndHook(activity_tool) self.activity_tool = activity_tool
self._activity_tool_path = activity_tool.getPhysicalPath()
TM._register(self) TM._register(self)
self._prepare_args = 0, 0
if self._prepare_args:
transaction.get().addBeforeCommitHook(self._prepare, self._prepare_args)
self._prepare_args = None
# Keeps a list of messages to add and remove # Keeps a list of messages to add and remove
# at end of transaction # at end of transaction
def _beginAndHook(self, activity_tool): def _begin(self):
# LOG('ActivityBuffer', 0, '_begin %r' % (self,)) # LOG('ActivityBuffer', 0, '_begin %r' % (self,))
from ActivityTool import activity_dict from ActivityTool import activity_dict
self.requires_prepare = 1
try: try:
# Reset registration for each transaction. # Reset registration for each transaction.
for activity in activity_dict.itervalues(): for activity in activity_dict.itervalues():
activity.registerActivityBuffer(self) activity.registerActivityBuffer(self)
# Notice: The operation below cannot fail silently, or we get errors late
# in the transaction that are very hard to understand.
transaction.get().addBeforeCommitHook(self.tpc_prepare,
(transaction,),
dict(activity_tool=activity_tool))
except: except:
LOG('ActivityBuffer', ERROR, "exception during _begin", LOG('ActivityBuffer', ERROR, "exception during _begin",
error=sys.exc_info()) error=sys.exc_info())
raise raise
def _finish(self, *ignored): def _finish(self):
# LOG('ActivityBuffer', 0, '_finish %r' % (self,)) # LOG('ActivityBuffer', 0, '_finish %r' % (self,))
try: try:
try: try:
# Try to push / delete all messages # Try to push / delete all messages
buffer = self._getBuffer() for activity, message in self.flushed_activity:
for (activity, message) in buffer.flushed_activity:
activity.finishDeleteMessage(self._activity_tool_path, message) activity.finishDeleteMessage(self._activity_tool_path, message)
for (activity, message) in buffer.queued_activity: for activity, message in self.queued_activity:
activity.finishQueueMessage(self._activity_tool_path, message) activity.finishQueueMessage(self._activity_tool_path, message)
except: except:
LOG('ActivityBuffer', ERROR, "exception during _finish", LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info()) error=sys.exc_info())
raise raise
finally: finally:
self._clearBuffer() self._clear()
def _abort(self, *ignored):
self._clearBuffer()
def tpc_prepare(self, transaction, sub=None, activity_tool=None):
assert activity_tool is not None
self._activity_tool_path = activity_tool.getPhysicalPath()
# Do nothing if it is a subtransaction
if sub is not None:
return
if not self.requires_prepare: _abort = _clear
return
self.requires_prepare = 0 def _prepare(self, flushed, queued):
try: try:
activity_tool = self.activity_tool
# Try to push / delete all messages # Try to push / delete all messages
buffer = self._getBuffer() for activity, message in self.flushed_activity[flushed:]:
for (activity, message) in buffer.flushed_activity:
activity.prepareDeleteMessage(activity_tool, message) activity.prepareDeleteMessage(activity_tool, message)
activity_dict = {} activity_dict = {}
for (activity, message) in buffer.queued_activity: for activity, message in self.queued_activity[queued:]:
activity_dict.setdefault(activity, []).append(message) activity_dict.setdefault(activity, []).append(message)
for activity, message_list in activity_dict.iteritems(): for activity, message_list in activity_dict.iteritems():
if hasattr(activity, 'prepareQueueMessageList'):
activity.prepareQueueMessageList(activity_tool, message_list) activity.prepareQueueMessageList(activity_tool, message_list)
else: self._prepare_args = len(self.flushed_activity), len(self.queued_activity)
for message in message_list:
activity.prepareQueueMessage(activity_tool, message)
except: except:
LOG('ActivityBuffer', ERROR, "exception during tpc_prepare", LOG('ActivityBuffer', ERROR, "exception during _prepare",
error=sys.exc_info()) error=sys.exc_info())
raise raise
...@@ -143,16 +116,14 @@ class ActivityBuffer(TM): ...@@ -143,16 +116,14 @@ class ActivityBuffer(TM):
# Activity is called to prevent queuing some messages (useful for example # Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times) # to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message): if not activity.isMessageRegistered(self, activity_tool, message):
buffer = self._getBuffer() self.queued_activity.append((activity, message))
buffer.queued_activity.append((activity, message))
# We register queued messages so that we can # We register queued messages so that we can
# unregister them # unregister them
activity.registerMessage(self, activity_tool, message) activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message): def deferredDeleteMessage(self, activity_tool, activity, message):
self._register(activity_tool) self._register(activity_tool)
buffer = self._getBuffer() self.flushed_activity.append((activity, message))
buffer.flushed_activity.append((activity, message))
def sortKey(self, *ignored): def sortKey(self, *ignored):
"""Activities must be finished before databases commit transactions.""" """Activities must be finished before databases commit transactions."""
......
...@@ -1041,14 +1041,15 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1041,14 +1041,15 @@ class ActivityTool (Folder, UniqueObject):
finally: finally:
global_activity_buffer_lock.release() global_activity_buffer_lock.release()
thread_activity_buffer = global_activity_buffer[my_instance_key] thread_activity_buffer = global_activity_buffer[my_instance_key]
if my_thread_key not in thread_activity_buffer: try:
return thread_activity_buffer[my_thread_key]
except KeyError:
if create_if_not_found: if create_if_not_found:
buffer = ActivityBuffer(activity_tool=self) buffer = ActivityBuffer()
else: else:
buffer = None buffer = None
thread_activity_buffer[my_thread_key] = buffer thread_activity_buffer[my_thread_key] = buffer
activity_buffer = thread_activity_buffer[my_thread_key] return buffer
return activity_buffer
security.declarePrivate('activateObject') security.declarePrivate('activateObject')
def activateObject(self, object, activity, active_process, **kw): def activateObject(self, object, activity, active_process, **kw):
......
...@@ -3832,6 +3832,19 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3832,6 +3832,19 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally: finally:
del activity_tool.__class__.doSomething del activity_tool.__class__.doSomething
def test_126_beforeCommitHook(self):
"""
Check it is possible to activate an object from a before commit hook
"""
def doSomething(person):
person.activate(activity='SQLDict')._setFirstName('John')
person.activate(activity='SQLQueue')._setLastName('Smith')
person = self.portal.person_module.newContent()
transaction.get().addBeforeCommitHook(doSomething, (person,))
transaction.commit()
self.tic()
self.assertEqual(person.getTitle(), 'John Smith')
def test_connection_migration(self): def test_connection_migration(self):
""" """
Make sure the cmf_activity_sql_connection is automatically migrated from Make sure the cmf_activity_sql_connection is automatically migrated from
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment