Commit a9baf7b0 authored by Alexandre Boeglin's avatar Alexandre Boeglin

Prevents TimerService from starting multiple threads in parallel (thus

creating database conflicts).
Added missing "global" statement in distribute.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@5672 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7efca437
......@@ -61,7 +61,8 @@ except ImportError:
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
tic_lock = threading.Lock() # A RAM based lock
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
first_run = 1
# Activity Registration
......@@ -416,34 +417,44 @@ class ActivityTool (Folder, UniqueObject):
This method is called by TimerService in the interval given
in zope.conf. The Default is every 5 seconds.
"""
# get owner of portal_catalog, so normally we should be able to
# have the permission to invoke all activities
user = self.portal_catalog.getOwner()
newSecurityManager(self.REQUEST, user)
currentNode = self.getCurrentNode()
# only distribute when we are the distributingNode or if it's empty
if (self.distributingNode == self.getCurrentNode()):
self.distribute(len(self._nodes))
#LOG('CMFActivity:', INFO, 'self.distribute(node_count=%s)' %len(self._nodes))
elif not self.distributingNode:
self.distribute(1)
#LOG('CMFActivity:', INFO, 'distributingNodes empty! Calling distribute(1)')
# call tic for the current processing_node
# the processing_node numbers are the indices of the elements in the node tuple +1
# because processing_node starts form 1
if currentNode in self._nodes:
self.tic(list(self._nodes).index(currentNode)+1)
#LOG('CMFActivity:', INFO, 'self.tic(processing_node=%s)' %str(list(self._nodes).index(currentNode)+1))
elif len(self._nodes) == 0:
self.tic(1)
#LOG('CMFActivity:', INFO, 'Node List is empty! Calling tic(1)')
# Prevent TimerService from starting multiple threads in parallel
acquired = timerservice_lock.acquire(0)
if not acquired:
return
try:
# get owner of portal_catalog, so normally we should be able to
# have the permission to invoke all activities
user = self.portal_catalog.getOwner()
newSecurityManager(self.REQUEST, user)
currentNode = self.getCurrentNode()
# only distribute when we are the distributingNode or if it's empty
if (self.distributingNode == self.getCurrentNode()):
self.distribute(len(self._nodes))
#LOG('CMFActivity:', INFO, 'self.distribute(node_count=%s)' %len(self._nodes))
elif not self.distributingNode:
self.distribute(1)
#LOG('CMFActivity:', INFO, 'distributingNodes empty! Calling distribute(1)')
# call tic for the current processing_node
# the processing_node numbers are the indices of the elements in the node tuple +1
# because processing_node starts form 1
if currentNode in self._nodes:
self.tic(list(self._nodes).index(currentNode)+1)
#LOG('CMFActivity:', INFO, 'self.tic(processing_node=%s)' %str(list(self._nodes).index(currentNode)+1))
elif len(self._nodes) == 0:
self.tic(1)
#LOG('CMFActivity:', INFO, 'Node List is empty! Calling tic(1)')
except:
timerservice_lock.release()
raise
else:
timerservice_lock.release()
security.declarePublic('distribute')
def distribute(self, node_count=1):
......@@ -451,6 +462,7 @@ class ActivityTool (Folder, UniqueObject):
Distribute load
"""
# Initialize if needed
global is_initialized
if not is_initialized: self.initialize()
# Call distribute on each queue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment