Commit f02bc341 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: automatically reselect messages in 'processing=1' state

When a node is looking for activities to process, there's no point testing the
'processing' column. This fixes cases leaving activities at 'processing=1'
forever (unless, of course, a cluster is killed and restarted with fewer nodes).
Also remove now useless cleanup at startup.

One known case leaving such activities is when the first commit of
dequeueMessage raises, which can happen, for example, during migration of
portal_types.

It seems the 'processing' column becomes useless for CMFActivity.

Acked-by: Vincent Pelletier

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@32089 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 458b5366
...@@ -163,8 +163,10 @@ class SQLDict(RAMDict, SQLBase): ...@@ -163,8 +163,10 @@ class SQLDict(RAMDict, SQLBase):
This number is guaranted not to be exceeded. This number is guaranted not to be exceeded.
If None (or not given) no limit apply. If None (or not given) no limit apply.
""" """
result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit) result = not group_method_id and \
if len(result) == 0: activity_tool.SQLDict_selectReservedMessageList(
processing_node=processing_node, count=limit)
if not result:
activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id) activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit) result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
return result return result
...@@ -284,6 +286,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -284,6 +286,8 @@ class SQLDict(RAMDict, SQLBase):
path_and_method_id_dict = {} path_and_method_id_dict = {}
unreserve_uid_list = [] unreserve_uid_list = []
for line in result: for line in result:
if line.uid == uid:
continue
# All fetched lines have the same group_method_id and # All fetched lines have the same group_method_id and
# processing_node. # processing_node.
# Their dates are lower-than or equal-to now_date. # Their dates are lower-than or equal-to now_date.
...@@ -468,6 +472,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -468,6 +472,8 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', PANIC, LOG('SQLDict', PANIC,
'abort failed, thus some objects may be modified accidentally') 'abort failed, thus some objects may be modified accidentally')
raise raise
# XXX Is it still useful to free messages now that this node is able
# to reselect them ?
to_free_uid_list = [x[0] for x in message_uid_priority_list] to_free_uid_list = [x[0] for x in message_uid_priority_list]
try: try:
makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict) makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
......
...@@ -332,6 +332,8 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -332,6 +332,8 @@ class SQLQueue(RAMQueue, SQLBase):
# It is possible that the message is executed but the commit # It is possible that the message is executed but the commit
# of the transaction fails # of the transaction fails
value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
# XXX Is it still useful to free message now that this node is able
# to reselect it ?
try: try:
makeMessageListAvailable([value[0]]) makeMessageListAvailable([value[0]])
except: except:
......
...@@ -93,7 +93,6 @@ is_initialized = False ...@@ -93,7 +93,6 @@ is_initialized = False
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
is_running_lock = threading.Lock() is_running_lock = threading.Lock()
first_run = True
currentNode = None currentNode = None
ROLE_IDLE = 0 ROLE_IDLE = 0
ROLE_PROCESSING = 1 ROLE_PROCESSING = 1
...@@ -918,7 +917,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -918,7 +917,7 @@ class ActivityTool (Folder, UniqueObject):
Starts again an activity Starts again an activity
processing_node starts from 1 (there is not node 0) processing_node starts from 1 (there is not node 0)
""" """
global active_threads, first_run global active_threads
# return if the number of threads is too high # return if the number of threads is too high
# else, increase the number of active_threads and continue # else, increase the number of active_threads and continue
...@@ -937,15 +936,6 @@ class ActivityTool (Folder, UniqueObject): ...@@ -937,15 +936,6 @@ class ActivityTool (Folder, UniqueObject):
inner_self = aq_inner(self) inner_self = aq_inner(self)
# If this is the first tic after zope is started, reset the processing
# flag for activities of this node
if first_run:
inner_self.SQLDict_clearProcessingFlag(
processing_node=processing_node)
inner_self.SQLQueue_clearProcessingFlag(
processing_node=processing_node)
first_run = False
try: try:
#Sort activity list by priority #Sort activity list by priority
activity_list = sorted(activity_dict.itervalues(), activity_list = sorted(activity_dict.itervalues(),
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node</params>
UPDATE
message
SET
processing=0,
processing_node=0
WHERE
processing_node=<dtml-sqlvar processing_node type="int">
...@@ -15,7 +15,6 @@ FROM ...@@ -15,7 +15,6 @@ FROM
message message
WHERE WHERE
processing_node = <dtml-sqlvar processing_node type="int"> processing_node = <dtml-sqlvar processing_node type="int">
AND processing = 0
<dtml-if expr="count is not None"> <dtml-if expr="count is not None">
LIMIT <dtml-sqlvar count type="int"> LIMIT <dtml-sqlvar count type="int">
</dtml-if> </dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node</params>
UPDATE
message_queue
SET
processing=0,
processing_node=0
WHERE
processing_node=<dtml-sqlvar processing_node type="int">
...@@ -15,7 +15,6 @@ FROM ...@@ -15,7 +15,6 @@ FROM
message_queue message_queue
WHERE WHERE
processing_node = <dtml-sqlvar processing_node type="int"> processing_node = <dtml-sqlvar processing_node type="int">
AND processing = 0
<dtml-if expr="count is not None"> <dtml-if expr="count is not None">
LIMIT <dtml-sqlvar count type="int"> LIMIT <dtml-sqlvar count type="int">
</dtml-if> </dtml-if>
...@@ -3515,6 +3515,33 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -3515,6 +3515,33 @@ class TestCMFActivity(ERP5TypeTestCase):
activity_tool.SQLDict_delMessage(uid=[message.uid for message in result]) activity_tool.SQLDict_delMessage(uid=[message.uid for message in result])
get_transaction().commit() get_transaction().commit()
def test_116_RaiseInCommitBeforeMessageExecution(self):
"""
Test behaviour of CMFActivity when the commit just before message
execution fails. In particular, CMFActivity should restart the
activities it selected (processing=1) instead of ignoring them forever.
"""
processed = []
activity_tool = self.portal.portal_activities
activity_tool.__class__.doSomething = processed.append
try:
for activity in 'SQLDict', 'SQLQueue':
activity_tool.activate(activity=activity).doSomething(activity)
get_transaction().commit()
activity_tool.distribute()
# Make first commit in dequeueMessage raise
registerFailingTransactionManager()
self.assertRaises(CommitFailed, activity_tool.tic)
# Normally, the request stops here and Zope aborts the transaction
get_transaction().abort()
self.assertEqual(processed, [])
# Activity is already in 'processing=1' state. Check tic reselects it.
activity_tool.tic()
self.assertEqual(processed, [activity])
del processed[:]
finally:
del activity_tool.__class__.doSomething
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment