Commit fa610065 authored by Vincent Pelletier's avatar Vincent Pelletier

Remove usage from volatile parameters to access the ActivityBuffer on ActivityTool.

 - implies removing the global dict used in ActivityBuffer to avoid duplicated activity insertion.
 - implies that ActivityBuffer is boud to a thread (because of get_ident) and not to a connection any more (because volatile is bound to a connection), so persistent objects must not be held outside transaction scope (the maximum scope at which a connection is bound to a thread)
   - implies modifying hook registration so that portal_activities is passed as a parameter to tpc_prepare
     - implies overloading partially TM._register method to take activity_tool as a parameter


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17102 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent ea3cca1c
...@@ -37,16 +37,6 @@ except ImportError: ...@@ -37,16 +37,6 @@ except ImportError:
if not hasattr(globals()['__builtins__'], 'set'): if not hasattr(globals()['__builtins__'], 'set'):
from sets import Set as set from sets import Set as set
# This variable is used to store thread-local buffered information.
# This must be RAM-based, because the use of a volatile attribute does
# not guarantee that the information persists until the end of a
# transaction, but we need to assure that the information is accessible
# for flushing activities. So the approach here is that information is
# stored in RAM, and removed at _finish and _abort, so that the information
# would not span over transactions.
buffer_dict_lock = threading.Lock()
buffer_dict = {}
class ActivityBuffer(TM): class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None _p_oid=_p_changed=_registered=None
...@@ -54,28 +44,8 @@ class ActivityBuffer(TM): ...@@ -54,28 +44,8 @@ class ActivityBuffer(TM):
def __init__(self, activity_tool=None): def __init__(self, activity_tool=None):
self.requires_prepare = 0 self.requires_prepare = 0
# Directly store the activity tool as an attribute. At the beginning
# the activity tool was stored as a part of the key in queued_activity and
# in flushed_activity, but this is not nice because in that case we must
# use hash on it, and when there is no uid on activity tool, it is
# impossible to generate a new uid because acquisition is not available
# in the dictionary.
assert activity_tool is not None
self._activity_tool = activity_tool
# Referring to a persistent object is dangerous when finishing a transaction,
# so store only the required information.
self._activity_tool_path = activity_tool.getPhysicalPath()
try:
buffer_dict_lock.acquire()
if self._activity_tool_path not in buffer_dict:
buffer_dict[self._activity_tool_path] = threading.local()
finally:
buffer_dict_lock.release()
def _getBuffer(self): def _getBuffer(self):
buffer = buffer_dict[self._activity_tool_path] buffer = self
# Create attributes only if they are not present. # Create attributes only if they are not present.
if not hasattr(buffer, 'queued_activity'): if not hasattr(buffer, 'queued_activity'):
buffer.queued_activity = [] buffer.queued_activity = []
...@@ -99,9 +69,13 @@ class ActivityBuffer(TM): ...@@ -99,9 +69,13 @@ class ActivityBuffer(TM):
buffer = self._getBuffer() buffer = self._getBuffer()
return buffer.uid_set_dict.setdefault(activity, set()) return buffer.uid_set_dict.setdefault(activity, set())
def _register(self, activity_tool):
self._beginAndHook(activity_tool)
TM._register(self)
# Keeps a list of messages to add and remove # Keeps a list of messages to add and remove
# at end of transaction # at end of transaction
def _begin(self, *ignored): def _beginAndHook(self, activity_tool):
# LOG('ActivityBuffer', 0, '_begin %r' % (self,)) # LOG('ActivityBuffer', 0, '_begin %r' % (self,))
from ActivityTool import activity_list from ActivityTool import activity_list
self.requires_prepare = 1 self.requires_prepare = 1
...@@ -115,7 +89,7 @@ class ActivityBuffer(TM): ...@@ -115,7 +89,7 @@ class ActivityBuffer(TM):
# patching Trasaction. # patching Trasaction.
transaction = get_transaction() transaction = get_transaction()
try: try:
transaction.beforeCommitHook(self.tpc_prepare, transaction) transaction.beforeCommitHook(self.tpc_prepare, transaction, activity_tool=activity_tool)
except AttributeError: except AttributeError:
pass pass
except: except:
...@@ -143,7 +117,9 @@ class ActivityBuffer(TM): ...@@ -143,7 +117,9 @@ class ActivityBuffer(TM):
def _abort(self, *ignored): def _abort(self, *ignored):
self._clearBuffer() self._clearBuffer()
def tpc_prepare(self, transaction, sub=None): def tpc_prepare(self, transaction, sub=None, activity_tool=None):
assert activity_tool is not None
self._activity_tool_path = activity_tool.getPhysicalPath()
# Do nothing if it is a subtransaction # Do nothing if it is a subtransaction
if sub is not None: if sub is not None:
return return
...@@ -156,23 +132,23 @@ class ActivityBuffer(TM): ...@@ -156,23 +132,23 @@ class ActivityBuffer(TM):
# Try to push / delete all messages # Try to push / delete all messages
buffer = self._getBuffer() buffer = self._getBuffer()
for (activity, message) in buffer.flushed_activity: for (activity, message) in buffer.flushed_activity:
activity.prepareDeleteMessage(self._activity_tool, message) activity.prepareDeleteMessage(activity_tool, message)
activity_dict = {} activity_dict = {}
for (activity, message) in buffer.queued_activity: for (activity, message) in buffer.queued_activity:
activity_dict.setdefault(activity, []).append(message) activity_dict.setdefault(activity, []).append(message)
for activity, message_list in activity_dict.iteritems(): for activity, message_list in activity_dict.iteritems():
if hasattr(activity, 'prepareQueueMessageList'): if hasattr(activity, 'prepareQueueMessageList'):
activity.prepareQueueMessageList(self._activity_tool, message_list) activity.prepareQueueMessageList(activity_tool, message_list)
else: else:
for message in message_list: for message in message_list:
activity.prepareQueueMessage(self._activity_tool, message) activity.prepareQueueMessage(activity_tool, message)
except: except:
LOG('ActivityBuffer', ERROR, "exception during tpc_prepare", LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
error=sys.exc_info()) error=sys.exc_info())
raise raise
def deferredQueueMessage(self, activity_tool, activity, message): def deferredQueueMessage(self, activity_tool, activity, message):
self._register() self._register(activity_tool)
# Activity is called to prevent queuing some messages (useful for example # Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times) # to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message): if not activity.isMessageRegistered(self, activity_tool, message):
...@@ -183,7 +159,7 @@ class ActivityBuffer(TM): ...@@ -183,7 +159,7 @@ class ActivityBuffer(TM):
activity.registerMessage(self, activity_tool, message) activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message): def deferredDeleteMessage(self, activity_tool, activity, message):
self._register() self._register(activity_tool)
buffer = self._getBuffer() buffer = self._getBuffer()
buffer.flushed_activity.append((activity, message)) buffer.flushed_activity.append((activity, message))
......
...@@ -81,6 +81,13 @@ ROLE_PROCESSING = 1 ...@@ -81,6 +81,13 @@ ROLE_PROCESSING = 1
activity_dict = {} activity_dict = {}
activity_list = [] activity_list = []
# Here go ActivityBuffer instances
# Structure:
# global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()
def registerActivity(activity): def registerActivity(activity):
# Must be rewritten to register # Must be rewritten to register
# class and create instance for each activity # class and create instance for each activity
...@@ -680,40 +687,76 @@ class ActivityTool (Folder, UniqueObject): ...@@ -680,40 +687,76 @@ class ActivityTool (Folder, UniqueObject):
return 1 return 1
return 0 return 0
def getActivityBuffer(self, create_if_not_found=True):
"""
Get activtity buffer for this thread for this activity tool.
If no activity buffer is found at lowest level and create_if_not_found
is True, create one.
Intermediate level is unconditionaly created if non existant because
chances are it will be used in the instance life.
Lock is held when checking for intermediate level existance
because:
- intermediate level dict must not be created in 2 threads at the
same time, since one creation would destroy the existing one.
It's released after that step because:
- lower level access is at thread scope, thus by definition there
can be only one access at a time to a key
- GIL protects us when accessing python instances
"""
global global_activity_buffer
global global_activity_buffer_lock
assert getattr(self, 'aq_self', None) is not None
my_instance_key = self.getPhysicalPath()
my_thread_key = get_ident()
global_activity_buffer_lock.acquire()
try:
if my_instance_key not in global_activity_buffer:
global_activity_buffer[my_instance_key] = {}
finally:
global_activity_buffer_lock.release()
thread_activity_buffer = global_activity_buffer[my_instance_key]
if my_thread_key not in thread_activity_buffer:
if create_if_not_found:
buffer = ActivityBuffer(activity_tool=self)
else:
buffer = None
thread_activity_buffer[my_thread_key] = buffer
activity_buffer = thread_activity_buffer[my_thread_key]
return activity_buffer
security.declarePrivate('activateObject') security.declarePrivate('activateObject')
def activateObject(self, object, activity, active_process, **kw): def activateObject(self, object, activity, active_process, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
if getattr(self, '_v_activity_buffer', None) is None: self.getActivityBuffer()
self._v_activity_buffer = ActivityBuffer(activity_tool=self)
return ActiveWrapper(object, activity, active_process, **kw) return ActiveWrapper(object, activity, active_process, **kw)
def deferredQueueMessage(self, activity, message): def deferredQueueMessage(self, activity, message):
self._v_activity_buffer.deferredQueueMessage(self, activity, message) activity_buffer = self.getActivityBuffer()
activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message): def deferredDeleteMessage(self, activity, message):
if getattr(self, '_v_activity_buffer', None) is None: activity_buffer = self.getActivityBuffer()
self._v_activity_buffer = ActivityBuffer(activity_tool=self) activity_buffer.deferredDeleteMessage(self, activity, message)
self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity): def getRegisteredMessageList(self, activity):
activity_buffer = getattr(self, '_v_activity_buffer', None) activity_buffer = self.getActivityBuffer(create_if_not_found=False)
if activity_buffer is not None: if activity_buffer is not None:
activity_buffer._register() # This is required if flush flush is called outside activate #activity_buffer._register() # This is required if flush flush is called outside activate
return activity.getRegisteredMessageList(self._v_activity_buffer, return activity.getRegisteredMessageList(activity_buffer,
aq_inner(self)) aq_inner(self))
else: else:
return [] return []
def unregisterMessage(self, activity, message): def unregisterMessage(self, activity, message):
self._v_activity_buffer._register() # Required if called by flush, outside activate activity_buffer = self.getActivityBuffer()
return activity.unregisterMessage(self._v_activity_buffer, aq_inner(self), message) #activity_buffer._register()
return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
def flush(self, obj, invoke=0, **kw): def flush(self, obj, invoke=0, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
if getattr(self, '_v_activity_buffer', None) is None: self.getActivityBuffer()
self._v_activity_buffer = ActivityBuffer(activity_tool=self)
if isinstance(obj, tuple): if isinstance(obj, tuple):
object_path = obj object_path = obj
else: else:
...@@ -840,8 +883,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -840,8 +883,7 @@ class ActivityTool (Folder, UniqueObject):
# Some Security Cheking should be made here XXX # Some Security Cheking should be made here XXX
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
if getattr(self, '_v_activity_buffer', None) is None: self.getActivityBuffer()
self._v_activity_buffer = ActivityBuffer(activity_tool=self)
activity_dict[activity].queueMessage(aq_inner(self), activity_dict[activity].queueMessage(aq_inner(self),
Message(path, active_process, activity_kw, method_id, args, kw)) Message(path, active_process, activity_kw, method_id, args, kw))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment