Commit 465dcdec authored by Yoshinori Okuji's avatar Yoshinori Okuji

Handle ConflictError explicitly in tic. Make sure that active_threads is...

Handle ConflictError explicitly in tic. Make sure that active_threads is decremented before exiting from tic.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@3546 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent aecf72ee
......@@ -41,6 +41,7 @@ from ActivityBuffer import ActivityBuffer
from AccessControl.SecurityManagement import newSecurityManager
import threading
import sys
from ZODB.POSException import ConflictError
from zLOG import LOG
......@@ -261,28 +262,33 @@ class ActivityTool (Folder, UniqueObject):
active_threads += 1
tic_lock.release()
# Wakeup each queue
for activity in activity_list:
try:
activity.wakeup(self, processing_node)
except:
LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
# Process messages on each queue in round robin
has_awake_activity = 1
while has_awake_activity:
has_awake_activity = 0
try:
# Wakeup each queue
for activity in activity_list:
try:
activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
activity.wakeup(self, processing_node)
except ConflictError:
raise
except:
LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
# decrease the number of active_threads
tic_lock.acquire()
active_threads -= 1
tic_lock.release()
LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)
# Process messages on each queue in round robin
has_awake_activity = 1
while has_awake_activity:
has_awake_activity = 0
for activity in activity_list:
try:
activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
except ConflictError:
raise
except:
LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
finally:
# decrease the number of active_threads
tic_lock.acquire()
active_threads -= 1
tic_lock.release()
def hasActivity(self, *args, **kw):
# Check in each queue if the object has deferred tasks
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment