Commit b54a8418 authored by Julien Muchembled's avatar Julien Muchembled

Some cleanup in SQLDict and SQLQueue

Pass list of Message objects instead of a list of (uid, message, priority).
Store the fetched line on the Message object to retrieve the priority.

In the future, if we allow executed activity to access its related Message
object, it could also get the SQL line.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@32875 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d797bbce
No related merge requests found
This diff is collapsed.
......@@ -162,10 +162,7 @@ class SQLQueue(RAMQueue, SQLBase):
unclean state.
Returned values:
list of 3-tuple:
- message uid
- message
- priority
list of messages
"""
def getReservedMessageList(limit):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
......@@ -179,21 +176,18 @@ class SQLQueue(RAMQueue, SQLBase):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
now_date = self.getNow(activity_tool)
message_list = []
def append(line, message):
uid = line.uid
message_list.append((uid, message, line.priority))
try:
result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
for line in result:
m = self.loadMessage(line.message, uid=line.uid)
append(line, m)
m = self.loadMessage(line.message, uid=line.uid, line=line)
message_list.append(m)
if len(message_list):
activity_tool.SQLQueue_processMessage(uid=[x[0] for x in message_list])
activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
return message_list
except:
LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
to_free_uid_list = [x[0] for x in message_list]
to_free_uid_list = [m.uid for m in message_list]
try:
makeMessageListAvailable(to_free_uid_list)
except:
......@@ -205,7 +199,7 @@ class SQLQueue(RAMQueue, SQLBase):
LOG('SQLQueue', TRACE, '(no message was reserved)')
return []
def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
def finalizeMessageExecution(self, activity_tool, message_list):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
deletable_uid_list = []
......@@ -213,10 +207,12 @@ class SQLQueue(RAMQueue, SQLBase):
final_error_uid_list = []
notify_user_list = []
non_executable_message_list = []
for uid, m, priority in message_uid_priority_list:
for m in message_list:
uid = m.uid
if m.getExecutionState() == MESSAGE_EXECUTED:
deletable_uid_list.append(uid)
elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
priority = m.line.priority
# BACK: Only exceptions can be classes in Python 2.6.
# Once we drop support for Python 2.4,
# please, remove the "type(m.exc_type) is type(ConflictError)" check
......@@ -288,11 +284,11 @@ class SQLQueue(RAMQueue, SQLBase):
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
message_uid_priority_list = \
message_list = \
self.getProcessableMessageList(activity_tool, processing_node)
if len(message_uid_priority_list):
if message_list:
processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
processed_message_uid_list = []
processed_count = 0
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
......@@ -301,17 +297,17 @@ class SQLQueue(RAMQueue, SQLBase):
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit()
for value in message_uid_priority_list:
for m in message_list:
processed_count += 1
clearActivityRuntimeEnvironment()
updateActivityRuntimeValue({'processing_node': processing_node,
'activity_kw': value[1].activity_kw,
'priority': value[2],
'uid': value[0]})
processed_message_uid_list.append(value)
'activity_kw': m.activity_kw,
'priority': m.line.priority,
'uid': m.uid})
# Try to invoke
try:
activity_tool.invoke(value[1])
if value[1].getExecutionState() != MESSAGE_NOT_EXECUTED:
activity_tool.invoke(m)
if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
# Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the
......@@ -321,7 +317,8 @@ class SQLQueue(RAMQueue, SQLBase):
# This message failed, revert.
abortTransactionSynchronously()
except:
LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % ((value[0], value[1].object_path, value[1].method_id), ), error=sys.exc_info())
value = m.uid, m.object_path, m.method_id
LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
try:
abortTransactionSynchronously()
except:
......@@ -331,11 +328,11 @@ class SQLQueue(RAMQueue, SQLBase):
# We must make sure that the message is not set as executed.
# It is possible that the message is executed but the commit
# of the transaction fails
value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
# XXX Is it still useful to free message now that this node is able
# to reselect it ?
try:
makeMessageListAvailable([value[0]])
makeMessageListAvailable([m.uid])
except:
LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
else:
......@@ -344,18 +341,18 @@ class SQLQueue(RAMQueue, SQLBase):
LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
break
# Release all unprocessed messages
processed_uid_set = ImmutableSet([x[0] for x in processed_message_uid_list])
to_free_uid_list = [x[0] for x in message_uid_priority_list if x[0] not in processed_uid_set]
if len(to_free_uid_list):
to_free_uid_list = [m.uid for m in message_list[processed_count:]]
if to_free_uid_list:
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
self.finalizeMessageExecution(activity_tool, processed_message_uid_list)
self.finalizeMessageExecution(activity_tool,
message_list[:processed_count])
get_transaction().commit()
return not len(message_uid_priority_list)
return not message_list
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
......@@ -411,7 +408,7 @@ class SQLQueue(RAMQueue, SQLBase):
for line in result:
path = line.path
method_id = line.method_id
m = self.loadMessage(line.message, uid = line.uid)
m = self.loadMessage(line.message, uid=line.uid, line=line)
if invoke:
# First Validate (only if message is marked as new)
if line.processing_node == -1:
......@@ -467,7 +464,7 @@ class SQLQueue(RAMQueue, SQLBase):
if dumpMessageList is not None:
result = dumpMessageList()
for line in result:
m = self.loadMessage(line.message, uid = line.uid)
m = self.loadMessage(line.message, uid=line.uid, line=line)
message_list.append(m)
return message_list
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment