Commit de379f2a authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity refactoring: move code from SQLDict to SQLBase

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@37684 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 623e0230
...@@ -27,14 +27,20 @@ ...@@ -27,14 +27,20 @@
############################################################################## ##############################################################################
import sys import sys
import transaction
from zLOG import LOG, TRACE, INFO, WARNING, ERROR from zLOG import LOG, TRACE, INFO, WARNING, ERROR
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import ( from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED) MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import ( from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE) INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import VALIDATION_ERROR_DELAY from Queue import VALIDATION_ERROR_DELAY
# Stop electing more messages for processing if more than this number of
# objects are impacted by elected messages.
MAX_GROUPED_OBJECTS = 100
def sort_message_key(message): def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList # same sort key as in SQL{Dict,Queue}_readMessageList
...@@ -129,6 +135,230 @@ class SQLBase: ...@@ -129,6 +135,230 @@ class SQLBase:
LOG(self.__class__.__name__, severity, summary, LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None) error=severity>INFO and sys.exc_info() or None)
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
"""
Get and reserve a list of messages.
limit
Maximum number of messages to fetch.
This number is not garanted to be reached, because of:
- not enough messages being pending execution
- race condition (other nodes reserving the same messages at the same
time)
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
"""
result = not group_method_id and \
activity_tool.SQLDict_selectReservedMessageList(
processing_node=processing_node, count=limit)
if not result:
activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
return result
def makeMessageListAvailable(self, activity_tool, uid_list):
"""
Put messages back in processing_node=0 .
"""
if len(uid_list):
activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
def getProcessableMessageList(self, activity_tool, processing_node):
"""
Always true:
For each reserved message, delete redundant messages when it gets
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list and a group_method_id
If any error happens in above described process, try to unreserve all
messages already reserved in that process.
If it fails, complain loudly that some messages might still be in an
unclean state.
Returned values:
4-tuple:
- list of messages
- impacted object count
- group_method_id
- uid_to_duplicate_uid_list_dict
"""
def getReservedMessageList(limit, group_method_id=None):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
date=now_date,
processing_node=processing_node,
limit=limit,
group_method_id=group_method_id)
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if len(uid_list):
LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
now_date = self.getNow(activity_tool)
message_list = []
count = 0
group_method_id = None
try:
result = getReservedMessageList(limit=1)
uid_to_duplicate_uid_list_dict = {}
if len(result) > 0:
line = result[0]
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
message_list.append(m)
group_method_id = line.group_method_id
activity_tool.SQLDict_processMessage(uid=[uid])
uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
.extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects.
count += len(m.getObjectList(activity_tool))
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
path_and_method_id_dict = {}
unreserve_uid_list = []
for line in result:
if line.uid == uid:
continue
# All fetched lines have the same group_method_id and
# processing_node.
# Their dates are lower-than or equal-to now_date.
# We read each line once so lines have distinct uids.
# So what remains to be filtered on are path, method_id and
# order_validation_text.
key = (line.path, line.method_id, line.order_validation_text)
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool))
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
to_free_uid_list = [m.uid for m in message_list]
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
if len(to_free_uid_list):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
return [], 0, None, {}
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
final_uid_list = []
for uid in uid_list:
final_uid_list.append(uid)
final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
# Remove group_id parameter from group_method_id
if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0]
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
args = (group_method_id, message_list)
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
message = message_list[0]
args = (message, )
activity_runtime_environment = ActivityRuntimeEnvironment(message)
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
transaction.commit()
tv = getTransactionalVariable(None)
tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke
try:
method(*args)
except:
LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', PANIC,
'abort failed, thus some objects may be modified accidentally')
raise
# XXX Is it still useful to free messages now that this node is able
# to reselect them ?
to_free_uid_list = [x.uid for x in message_list]
try:
makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed.
if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
endTransaction = transaction.abort
else:
endTransaction = transaction.commit
try:
endTransaction()
except:
LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
if endTransaction == transaction.abort:
LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
else:
try:
transaction.abort()
except:
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise
exc_info = sys.exc_info()
for m in message_list:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
try:
makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
transaction.commit()
return not message_list
def finalizeMessageExecution(self, activity_tool, message_list, def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None): uid_to_duplicate_uid_list_dict=None):
""" """
......
...@@ -29,16 +29,10 @@ ...@@ -29,16 +29,10 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH from Queue import VALID, INVALID_PATH
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
import sys import sys
from types import ClassType
#from time import time #from time import time
from SQLBase import SQLBase, sort_message_key from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter
import transaction import transaction
...@@ -48,9 +42,6 @@ from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC ...@@ -48,9 +42,6 @@ from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate. # Read up to this number of messages to validate.
READ_MESSAGE_LIMIT = 1000 READ_MESSAGE_LIMIT = 1000
# Stop electing more messages for processing if more than this number of
# objects are impacted by elected messages.
MAX_GROUPED_OBJECTS = 100
MAX_MESSAGE_LIST_SIZE = 100 MAX_MESSAGE_LIST_SIZE = 100
...@@ -131,33 +122,6 @@ class SQLDict(RAMDict, SQLBase): ...@@ -131,33 +122,6 @@ class SQLDict(RAMDict, SQLBase):
message_list = activity_buffer.getMessageList(self) message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered] return [m for m in message_list if m.is_registered]
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
"""
Get and reserve a list of messages.
limit
Maximum number of messages to fetch.
This number is not garanted to be reached, because of:
- not enough messages being pending execution
- race condition (other nodes reserving the same messages at the same
time)
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
"""
result = not group_method_id and \
activity_tool.SQLDict_selectReservedMessageList(
processing_node=processing_node, count=limit)
if not result:
activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
return result
def makeMessageListAvailable(self, activity_tool, uid_list):
"""
Put messages back in processing_node=0 .
"""
if len(uid_list):
activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
def getDuplicateMessageUidList(self, activity_tool, line, processing_node): def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
""" """
Reserve unreserved messages matching given line. Reserve unreserved messages matching given line.
...@@ -188,202 +152,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -188,202 +152,7 @@ class SQLDict(RAMDict, SQLBase):
raise raise
return uid_list return uid_list
def getProcessableMessageList(self, activity_tool, processing_node): dequeueMessage = SQLBase.dequeueMessage
"""
Always true:
For each reserved message, delete redundant messages when it gets
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list and a group_method_id
If any error happens in above described process, try to unreserve all
messages already reserved in that process.
If it fails, complain loudly that some messages might still be in an
unclean state.
Returned values:
4-tuple:
- list of messages
- impacted object count
- group_method_id
- uid_to_duplicate_uid_list_dict
"""
def getReservedMessageList(limit, group_method_id=None):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
date=now_date,
processing_node=processing_node,
limit=limit,
group_method_id=group_method_id)
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if len(uid_list):
LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
now_date = self.getNow(activity_tool)
message_list = []
count = 0
group_method_id = None
try:
result = getReservedMessageList(limit=1)
uid_to_duplicate_uid_list_dict = {}
if len(result) > 0:
line = result[0]
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
message_list.append(m)
group_method_id = line.group_method_id
activity_tool.SQLDict_processMessage(uid=[uid])
uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
.extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects.
count += len(m.getObjectList(activity_tool))
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
path_and_method_id_dict = {}
unreserve_uid_list = []
for line in result:
if line.uid == uid:
continue
# All fetched lines have the same group_method_id and
# processing_node.
# Their dates are lower-than or equal-to now_date.
# We read each line once so lines have distinct uids.
# So what remains to be filtered on are path, method_id and
# order_validation_text.
key = (line.path, line.method_id, line.order_validation_text)
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool))
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
to_free_uid_list = [m.uid for m in message_list]
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
if len(to_free_uid_list):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
return [], 0, None, {}
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
final_uid_list = []
for uid in uid_list:
final_uid_list.append(uid)
final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
# Remove group_id parameter from group_method_id
if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0]
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
args = (group_method_id, message_list)
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
message = message_list[0]
args = (message, )
activity_runtime_environment = ActivityRuntimeEnvironment(message)
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
transaction.commit()
tv = getTransactionalVariable(None)
tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke
try:
method(*args)
except:
LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', PANIC,
'abort failed, thus some objects may be modified accidentally')
raise
# XXX Is it still useful to free messages now that this node is able
# to reselect them ?
to_free_uid_list = [x.uid for x in message_list]
try:
makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed.
if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
endTransaction = transaction.abort
else:
endTransaction = transaction.commit
try:
endTransaction()
except:
LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
if endTransaction == transaction.abort:
LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
else:
try:
transaction.abort()
except:
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise
exc_info = sys.exc_info()
for m in message_list:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
try:
makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
transaction.commit()
return not message_list
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None): def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None) hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment