Commit e954de07 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity refactoring: style changes + rewrite some code coming from SQLDict

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@37685 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent de379f2a
......@@ -135,7 +135,8 @@ class SQLBase:
LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None)
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
def getReservedMessageList(self, activity_tool, date, processing_node,
limit=None, group_method_id=None):
"""
Get and reserve a list of messages.
limit
......@@ -199,16 +200,14 @@ class SQLBase:
limit=limit,
group_method_id=group_method_id)
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if len(uid_list):
LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
now_date = self.getNow(activity_tool)
message_list = []
......@@ -231,7 +230,8 @@ class SQLBase:
count += len(m.getObjectList(activity_tool))
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
group_method_id=group_method_id)
path_and_method_id_dict = {}
unreserve_uid_list = []
for line in result:
......@@ -246,33 +246,39 @@ class SQLBase:
key = (line.path, line.method_id, line.order_validation_text)
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
.append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
.extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool))
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
activity_tool.SQLDict_processMessage(
uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=unreserve_uid_list)
return (message_list, count, group_method_id,
uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
self._log(WARNING, 'Exception while reserving messages.')
if len(message_list):
to_free_uid_list = [m.uid for m in message_list]
try:
makeMessageListAvailable(to_free_uid_list)
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=to_free_uid_list)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else:
if len(to_free_uid_list):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
self._log(TRACE, '(no message was reserved)')
return [], 0, None, {}
# Queue semantic
......@@ -282,7 +288,8 @@ class SQLBase:
for uid in uid_list:
final_uid_list.append(uid)
final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=final_uid_list)
message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
......@@ -312,23 +319,27 @@ class SQLBase:
try:
method(*args)
except:
LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
self._log(WARNING,
'Exception raised when invoking messages (uid, path, method_id) %r'
% [(m.uid, m.object_path, m.method_id) for m in message_list])
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', PANIC,
# Unfortunately, database adapters may raise an exception against
# abort.
self._log(PANIC,
'abort failed, thus some objects may be modified accidentally')
raise
# XXX Is it still useful to free messages now that this node is able
# to reselect them ?
to_free_uid_list = [x.uid for x in message_list]
try:
makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
makeMessageListAvailable(to_free_uid_list,
uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
# Abort if something failed.
if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
endTransaction = transaction.abort
......@@ -337,25 +348,32 @@ class SQLBase:
try:
endTransaction()
except:
LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
self._log(WARNING,
'Failed to end transaction for messages (uid, path, method_id) %r'
% [(m.uid, m.object_path, m.method_id) for m in message_list])
if endTransaction == transaction.abort:
LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
self._log(PANIC, 'Failed to abort executed messages.'
' Some objects may be modified accidentally.')
else:
try:
transaction.abort()
except:
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
self._log(PANIC, 'Failed to abort executed messages which also'
' failed to commit. Some objects may be modified accidentally.')
raise
exc_info = sys.exc_info()
for m in message_list:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
try:
makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
makeMessageListAvailable([x.uid for x in message_list],
uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
self._log(ERROR, 'Failed to free remaining messages: %r'
% (message_list, ))
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
self._log(TRACE, 'Freed messages %r' % (message_list, ))
self.finalizeMessageExecution(activity_tool, message_list,
uid_to_duplicate_uid_list_dict)
transaction.commit()
return not message_list
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment