Commit e2ff442c authored by Sebastien Robin's avatar Sebastien Robin

Commit work done by Vincent

  Make ActivityTool:Message.getObjectList simple to use: detect internaly 
wether an expand method must be called, catch exception when object on which 
the activity was executed cannot be found.
  Remove broadcast message support.
  Merge indexes on processing_node and processing columns on both message and 
message_queue tables.
  Always use SQL server's time.
  Do not update processing node value when setting the message as being 
processed.
  Commit SQL connection as soon as messages get assigned to reduce lock 
duration.
  Make SQLDict ZSQLMethods support list of uids instead of single value per 
call.
  Make ZSQLMethod handle processing_node differently if it's 0 or None (when 
not passed as parameter, behave as if it's None).
  Do not force all parameters to be passed to SQLQueue_setPriority.
  Factorise SQL code inside <dtml-if> blocks.
  Allow to select ranges of lines in readMessageList with a custom offset.
  When reseting message processing state at first activity execution pass 
after a node start, also reset the processing_node.
  Commit SQL connection as soon as messages are set to processing state, 
mainly to make it visible outside current connection.
  Add a common class for SQL-using activity queues.
  CMFActivity/Activity/SQLDict.py
    Remove unused (and broken) prepareQueueMessage method.
    Replace a tab by spaces.
    Add ZSQLMethod wrappers for new ZSQLMethods.
    Split dequeueMessage into dequeueMessage, getProcessableMessageList, 
finalizeMessage_Execution.
    Return True instead of 0 in case of an important error, in order to 
prevent CMFActivity from doing infinite loops  over dequeueMessage when 
something goes wrong.
  CMFActivity/Activity/Queue.py
    Allow caller to specify the current date and transmit it when recursing. 
Fallback on DateTime (calculate just once) if not specified.
  CMFActivity/Activity/SQLQueue.py
    Precompute parameters in prepareQueueMessage to make it easier to add a 
log when needed. Also reduces the distance with SQLDict's equivalent method.
    Add ZSQLMethod wrappers for new ZSQLMethods.
    Split dequeueMessage into dequeueMessage, getProcessableMessageList, 
finalizeMessage_Execution.
    Return True instead of 0 in case of an important error, in order to 
prevent CMFActivity from doing infinite loops  over dequeueMessage when 
something goes wrong.
  Add scripts to monitor activity distribution.  
  Remove unused ZSQLMethods.
  Add new ZSQLMethods related to the new distribution scheme and SQL server 
time grabbing.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17759 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 90018457
...@@ -117,7 +117,6 @@ class Queue: ...@@ -117,7 +117,6 @@ class Queue:
self.is_alive = {} self.is_alive = {}
self.is_awake = {} self.is_awake = {}
self.is_initialized = 0 self.is_initialized = 0
self.max_processing_date = DateTime()
def initialize(self, activity_tool): def initialize(self, activity_tool):
# This is the only moment when # This is the only moment when
...@@ -206,7 +205,7 @@ class Queue: ...@@ -206,7 +205,7 @@ class Queue:
return message_list return message_list
def getExecutableMessageList(self, activity_tool, message, message_dict, def getExecutableMessageList(self, activity_tool, message, message_dict,
validation_text_dict): validation_text_dict, now_date=None):
"""Get messages which have no dependent message, and store them in the dictionary. """Get messages which have no dependent message, and store them in the dictionary.
If the passed message itself is executable, simply store only that message. If the passed message itself is executable, simply store only that message.
...@@ -233,7 +232,8 @@ class Queue: ...@@ -233,7 +232,8 @@ class Queue:
if message_list: if message_list:
# The result is not empty, so this message is not executable. # The result is not empty, so this message is not executable.
validation_text_dict[message.order_validation_text] = 0 validation_text_dict[message.order_validation_text] = 0
now_date = DateTime() if now_date is None:
now_date = DateTime()
for activity, m in message_list: for activity, m in message_list:
# Note that the messages may contain ones which are already assigned or not # Note that the messages may contain ones which are already assigned or not
# executable yet. # executable yet.
...@@ -242,7 +242,7 @@ class Queue: ...@@ -242,7 +242,7 @@ class Queue:
message_dict[message.uid] = None message_dict[message.uid] = None
try: try:
self.getExecutableMessageList(activity_tool, m, message_dict, self.getExecutableMessageList(activity_tool, m, message_dict,
validation_text_dict) validation_text_dict, now_date=now_date)
finally: finally:
del message_dict[message.uid] del message_dict[message.uid]
else: else:
......
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
class SQLBase:
"""
Define a set of common methods for SQL-based storage of activities.
"""
def getNow(self, context):
"""
Return the current value for SQL server's NOW().
Note that this value is not cached, and is not transactionnal on MySQL
side.
"""
result = context.SQLBase_getNow()
assert len(result) == 1
assert len(result[0]) == 1
return result[0][0]
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
# #
############################################################################## ##############################################################################
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously abortTransactionSynchronously
...@@ -36,47 +35,32 @@ from Products.CMFActivity.Errors import ActivityFlushError ...@@ -36,47 +35,32 @@ from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
import sys import sys
from types import ClassType from types import ClassType
#from time import time
from SQLBase import SQLBase
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
except ImportError: except ImportError:
pass pass
from zLOG import LOG, TRACE, WARNING, ERROR, INFO from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
MAX_PRIORITY = 5 MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
# Stop electing more messages for processing if more than this many objects
# are impacted by elected messages.
MAX_GROUPED_OBJECTS = 500 MAX_GROUPED_OBJECTS = 500
priority_weight = \ class SQLDict(RAMDict, SQLBase):
[1] * 64 + \
[2] * 20 + \
[3] * 10 + \
[4] * 5 + \
[5] * 1
LAST_PROCESSING_NODE = 1
class SQLDict(RAMDict):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
because use of OOBTree. because use of OOBTree.
""" """
# Transaction commit methods # Transaction commit methods
def prepareQueueMessage(self, activity_tool, m):
if m.is_registered:
activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
broadcast = m.activity_kw.get('broadcast', 0),
message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime()),
group_method_id = '\0'.join([m.activity_kw.get('group_method_id', ''),
m.activity_kw.get('group_id', '')]),
tag = m.activity_kw.get('tag', ''),
order_validation_text = self.getOrderValidationText(m))
# Also store uid of activity
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [] registered_message_list = []
for message in message_list: for message in message_list:
...@@ -87,21 +71,18 @@ class SQLDict(RAMDict): ...@@ -87,21 +71,18 @@ class SQLDict(RAMDict):
path_list = ['/'.join(message.object_path) for message in registered_message_list] path_list = ['/'.join(message.object_path) for message in registered_message_list]
method_id_list = [message.method_id for message in registered_message_list] method_id_list = [message.method_id for message in registered_message_list]
priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list] priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
dumped_message_list = [self.dumpMessage(message) for message in registered_message_list] dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
datetime = DateTime() date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')]) group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
for message in registered_message_list] for message in registered_message_list]
tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list] tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list] order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity',
id_count=len(registered_message_list), store=0) id_count=len(registered_message_list), store=0)
activity_tool.SQLDict_writeMessageList( uid_list = uid_list, activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
path_list = path_list, path_list = path_list,
method_id_list = method_id_list, method_id_list = method_id_list,
priority_list = priority_list, priority_list = priority_list,
broadcast_list = broadcast_list,
message_list = dumped_message_list, message_list = dumped_message_list,
date_list = date_list, date_list = date_list,
group_method_id_list = group_method_id_list, group_method_id_list = group_method_id_list,
...@@ -164,195 +145,293 @@ class SQLDict(RAMDict): ...@@ -164,195 +145,293 @@ class SQLDict(RAMDict):
return 0 return 0
return 1 return 1
# Queue semantic def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, **kw):
def dequeueMessage(self, activity_tool, processing_node): """
readMessage = getattr(activity_tool, 'SQLDict_readMessage', None) Get and reserve a list of messages.
if readMessage is None: limit
return 1 Maximum number of messages to fetch.
This number is not garanted to be reached, because of:
now_date = DateTime() - not enough messages being pending execution
result = readMessage(processing_node=processing_node, to_date=now_date) - race condition (other nodes reserving the same messages at the same
if len(result) > 0: time)
line = result[0] This number is guaranted not to be exceeded.
path = line.path If None (or not given) no limit apply.
method_id = line.method_id """
group_method_id = line.group_method_id result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, limit=limit)
order_validation_text = line.order_validation_text if len(result) == 0:
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id, activity_tool.SQLDict_reserveMessageList(limit=limit, processing_node=processing_node, to_date=date, **kw)
processing_node=None, to_date=now_date, result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, limit=limit)
order_validation_text=order_validation_text, return result
group_method_id=group_method_id)
uid_list = [x.uid for x in uid_list] def makeMessageListAvailable(self, activity_tool, uid_list):
uid_list_list = [uid_list] """
priority_list = [line.priority] Put messages back in processing_node=0 .
# Make sure message can not be processed anylonger """
if len(uid_list) > 0: if len(uid_list):
# Set selected messages to processing activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
activity_tool.SQLDict_processMessage(uid=uid_list,
processing_node=processing_node) def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line):
get_transaction().commit() # Release locks before starting a potentially long calculation """
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state Delete all messages matching given one except itself.
Operator Value
# At this point, messages are marked as processed. So catch any kind of exception to make sure != uid
# that they are unmarked on error. <= date
try: = path, method_id, group_method_id, order_validation_text,
processing_node, tag
"""
activity_tool.SQLDict_deleteDuplicatedMessageList(
processing_node=processing_node, uid=line.uid,
to_date=line.date, path=line.path, method_id=line.method_id,
group_method_id=line.group_method_id,
order_validation_text=line.order_validation_text,
tag=line.tag)
def getProcessableMessageList(self, activity_tool, processing_node):
"""
Always true:
For each reserved message, delete redundant messages when it gets
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list and a group_method_id
If any error happens in above described process, try to unreserve all
messages already reserved in that process.
If it fails, complain loudly that some messages might still be in an
unclean state.
Returned values:
3-tuple:
- list of 3-tuple:
- message uid
- message
- priority
- impacted object count
- group_method_id
"""
def getReservedMessageList(**kw):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
date=now_date,
processing_node=processing_node,
**kw)
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def deleteDuplicatedLineList(line):
self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date,
processing_node=processing_node, line=line)
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
now_date = self.getNow(activity_tool)
message_list = []
def append(line, message):
uid = line.uid
message_list.append((uid, message, line.priority))
count = 0
group_method_id = None
try:
result = getReservedMessageList(limit=1)
if len(result) > 0:
line = result[0]
m = self.loadMessage(line.message, uid=line.uid) m = self.loadMessage(line.message, uid=line.uid)
message_list = [m] append(line, m)
# Validate message (make sure object exists, priority OK, etc.) group_method_id = line.group_method_id
if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): # Delete all messages matching current one - except current one.
return 0 deleteDuplicatedLineList(line)
activity_tool.SQLDict_processMessage(uid=[line.uid])
if group_method_id not in (None, '', '\0'): if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects. # Count the number of objects to prevent too many objects.
if m.hasExpandMethod(): count += len(m.getObjectList(activity_tool))
count = len(m.getObjectList(activity_tool))
else:
count = 1
if count < MAX_GROUPED_OBJECTS: if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method. # Retrieve objects which have the same group method.
result = readMessage(processing_node=processing_node, result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
to_date=now_date, group_method_id=group_method_id,
order_validation_text=order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d, group_method_id %s' % (len(result), group_method_id))
path_and_method_id_dict = {} path_and_method_id_dict = {}
unreserve_uid_list = []
for line in result: for line in result:
path = line.path # All fetched lines have the same group_method_id and
method_id = line.method_id # processing_node.
# Their dates are lower-than or equal-to now_date.
# Prevent using the same pair of a path and a method id. # We read each line once so lines have distinct uids.
key = (path, method_id) # So what remains to be filtered on are path, method_id,
# order_validation_text, tag
key = (line.path, line.method_id, line.order_validation_text, line.tag)
if key in path_and_method_id_dict: if key in path_and_method_id_dict:
LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid))
continue continue
path_and_method_id_dict[key] = 1 path_and_method_id_dict[key] = line.uid
deleteDuplicatedLineList(line)
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id, if count < MAX_GROUPED_OBJECTS:
processing_node=None, m = self.loadMessage(line.message, uid=line.uid)
to_date=now_date, group_method_id=group_method_id, count += len(m.getObjectList(activity_tool))
order_validation_text=order_validation_text) append(line, m)
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid=uid_list,
processing_node=processing_node)
get_transaction().commit() # Release locks before starting a potentially long calculation
# Save this newly marked uids as soon as possible.
uid_list_list.append(uid_list)
m = self.loadMessage(line.message, uid=line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod():
count += len(m.getObjectList(activity_tool))
else:
count += 1
message_list.append(m)
priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS:
break
else: else:
# If the uids were not valid, remove them from the list, as validateMessage unreserve_uid_list.append(line.uid)
# unmarked them. activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
uid_list_list.pop() # Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
# Release locks before starting a potentially long calculation return message_list, count, group_method_id
get_transaction().commit() except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
# Remove group_id parameter from group_method_id if len(message_list):
if group_method_id is not None: to_free_uid_list = [x[0] for x in message_list]
group_method_id = group_method_id.split('\0')[0] try:
# Try to invoke makeMessageListAvailable(to_free_uid_list)
if group_method_id not in (None, ""): except:
LOG('SQLDict', INFO, LOG('SQLDict', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
'invoking a group method %s with %d objects '\
' (%d objects in expanded form)' % (
group_method_id, len(message_list), count))
activity_tool.invokeGroup(group_method_id, message_list)
else: else:
activity_tool.invoke(message_list[0]) if len(to_free_uid_list):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
# Check if messages are executed successfully. else:
# When some of them are executed successfully, it may not be acceptable to LOG('SQLDict', TRACE, '(no message was reserved)')
# abort the transaction, because these remain pending, only due to other return [], 0, None
# invalid messages. This means that a group method should not be used if
# it has a side effect. For now, only indexing uses a group method, and this def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
# has no side effect. def makeMessageListAvailable(uid_list):
for m in message_list: self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
if m.is_executed: deletable_uid_list = []
get_transaction().commit() delay_uid_list = []
break final_error_uid_list = []
message_with_active_process_list = []
for uid, m, priority in message_uid_priority_list:
if m.is_executed:
deletable_uid_list.append(uid)
if m.active_process:
message_with_active_process_list.append(m)
else:
if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
final_error_uid_list.append(uid)
else: else:
abortTransactionSynchronously() try:
# Immediately update, because values different for every message
activity_tool.SQLDict_setPriority(
uid=[uid],
delay=VALIDATION_ERROR_DELAY,
priority=priority + 1)
except:
LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
try:
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed message %r' % (uid, ))
if len(deletable_uid_list):
try:
activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
if len(delay_uid_list):
try:
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY)
except: except:
LOG('SQLDict', INFO, LOG('SQLDict', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
'an exception happened during processing %r' % (uid_list_list,), try:
error=sys.exc_info()) makeMessageListAvailable(delay_uid_list)
# If an exception occurs, abort the transaction to minimize the impact, except:
LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (delay_uid_list, ))
if len(final_error_uid_list):
try:
activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLDict', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
for m in message_with_active_process_list:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
message_uid_priority_list, count, group_method_id = \
self.getProcessableMessageList(activity_tool, processing_node)
if len(message_uid_priority_list):
# Remove group_id parameter from group_method_id
if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0]
message_list = [x[1] for x in message_uid_priority_list]
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
args = (group_method_id, message_list)
else:
method = activity_tool.invoke
args = (message_list[0], )
try:
# Commit right before executing messages.
# As MySQL transaction do no start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be commited now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit()
# Try to invoke
method(*args)
except:
LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
to_free_uid_list = [x[0] for x in message_uid_priority_list]
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
try: try:
abortTransactionSynchronously() abortTransactionSynchronously()
except: except:
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', WARNING, LOG('SQLDict', PANIC,
'abort failed, thus some objects may be modified accidentally') 'abort failed, thus some objects may be modified accidentally')
pass return True # Stop processing messages for this tic call for this queue.
# Only abort if nothing succeeded.
# An exception happens at somewhere else but invoke or invokeGroup, so messages # This means that when processing multiple messages, failed ones must not cause
# themselves should not be delayed. # bad things to happen if transaction is commited.
try: if len([x for x in message_uid_priority_list if x[1].is_executed]) == 0:
for uid_list in uid_list_list: endTransaction = abortTransactionSynchronously
if len(uid_list): else:
# This only sets processing to zero. endTransaction = get_transaction().commit
activity_tool.SQLDict_setPriority(uid=uid_list)
get_transaction().commit()
except:
LOG('SQLDict', ERROR,
'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
error=sys.exc_info())
raise
return 0
try: try:
for i in xrange(len(message_list)): endTransaction()
m = message_list[i]
uid_list = uid_list_list[i]
priority = priority_list[i]
if m.is_executed:
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid=uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
get_transaction().commit() # Release locks before starting a potentially long calculation
elif priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid=uid_list,
processing_node=INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
priority=priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
except: except:
LOG('SQLDict', ERROR, LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(x[0], x[1].object_path, x[1].method_id) for x in message_uid_priority_list], ), error=sys.exc_info())
'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages', failed_message_uid_list = [x[0] for x in message_uid_priority_list]
error=sys.exc_info()) try:
raise makeMessageListAvailable(failed_message_uid_list)
except:
return 0 LOG('SQQueue', PANIC, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
get_transaction().commit() # Release locks before starting a potentially long calculation else:
return 1 LOG('SQQueue', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
if endTransaction == abortTransactionSynchronously:
LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
else:
try:
abortTransactionSynchronously()
except:
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
return True # Stop processing messages for this tic call for this queue.
self.finalizeMessageExecution(activity_tool, message_uid_priority_list)
get_transaction().commit()
return not len(message_uid_priority_list)
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None) hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
...@@ -470,76 +549,34 @@ class SQLDict(RAMDict): ...@@ -470,76 +549,34 @@ class SQLDict(RAMDict):
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
offset = 0
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
global LAST_PROCESSING_NODE now_date = self.getNow(activity_tool)
now_date = DateTime()
result = readMessageList(path=None, method_id=None, processing_node=-1, result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0) to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
get_transaction().commit() validated_count = 0
#TIME_begin = time()
validation_text_dict = {'none': 1} while len(result) and validated_count < MAX_VALIDATED_LIMIT:
message_dict = {} get_transaction().commit()
for line in result:
message = self.loadMessage(line.message, uid = line.uid, validation_text_dict = {'none': 1}
order_validation_text = line.order_validation_text) message_dict = {}
self.getExecutableMessageList(activity_tool, message, message_dict, for line in result:
validation_text_dict) message = self.loadMessage(line.message, uid = line.uid,
# XXX probably this below can be optimized by assigning multiple messages at a time. order_validation_text = line.order_validation_text)
path_dict = {} self.getExecutableMessageList(activity_tool, message, message_dict,
assignMessage = activity_tool.SQLDict_assignMessage validation_text_dict, now_date=now_date)
processing_node = LAST_PROCESSING_NODE distributable_count = len(message_dict)
id_tool = activity_tool.getPortalObject().portal_ids if distributable_count:
activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
for message in message_dict.itervalues(): validated_count += distributable_count
path = '/'.join(message.object_path) if validated_count < MAX_VALIDATED_LIMIT:
broadcast = message.activity_kw.get('broadcast', 0) offset += READ_MESSAGE_LIMIT
if broadcast: result = readMessageList(path=None, method_id=None, processing_node=-1,
# Broadcast messages must be distributed into all nodes. to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
uid = message.uid #TIME_end = time()
assignMessage(processing_node=1, uid=[uid]) #LOG('SQLDict.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset - READ_MESSAGE_LIMIT + len(result), validated_count))
if node_count > 1:
uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
id_count=node_count - 1,
store=0)
path_list = [path] * (node_count - 1)
method_id_list = [message.method_id] * (node_count - 1)
priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
processing_node_list = range(2, node_count + 1)
broadcast_list = [1] * (node_count - 1)
message_list = [self.dumpMessage(message)] * (node_count - 1)
date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''),
message.activity_kw.get('group_id', '')])] * (node_count - 1)
tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
order_validation_text_list = [message.order_validation_text] * (node_count - 1)
activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
path_list=path_list,
method_id_list=method_id_list,
priority_list=priority_list,
broadcast_list=broadcast_list,
processing_node_list=processing_node_list,
message_list=message_list,
date_list=date_list,
group_method_id_list=group_method_id_list,
tag_list=tag_list,
order_validation_text_list=order_validation_text_list)
get_transaction().commit()
else:
# Select a processing node. If the same path appears again, dispatch the message to
# the same node, so that object caching is more efficient. Otherwise, apply a round
# robin scheduling.
node = path_dict.get(path)
if node is None:
node = processing_node
path_dict[path] = node
processing_node += 1
if processing_node > node_count:
processing_node = 1
assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages
LAST_PROCESSING_NODE = processing_node
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from DateTime import DateTime
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously abortTransactionSynchronously
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
...@@ -37,26 +36,25 @@ from ZODB.POSException import ConflictError ...@@ -37,26 +36,25 @@ from ZODB.POSException import ConflictError
from types import ClassType from types import ClassType
import sys import sys
from time import time from time import time
from sets import ImmutableSet
from SQLBase import SQLBase
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
except ImportError: except ImportError:
pass pass
from zLOG import LOG, WARNING, ERROR from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
MAX_PRIORITY = 5 MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached
priority_weight = \ MAX_VALIDATED_LIMIT = 1000
[1] * 64 + \ # Read this many messages to validate.
[2] * 20 + \ READ_MESSAGE_LIMIT = 1000
[3] * 10 + \ # Process this many messages in each dequeueMessage call.
[4] * 5 + \ MESSAGE_BUNDLE_SIZE = 10
[5] * 1
class SQLQueue(RAMQueue, SQLBase):
LAST_PROCESSING_NODE = 1
class SQLQueue(RAMQueue):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
...@@ -66,121 +64,240 @@ class SQLQueue(RAMQueue): ...@@ -66,121 +64,240 @@ class SQLQueue(RAMQueue):
if m.is_registered: if m.is_registered:
id_tool = activity_tool.getPortalObject().portal_ids id_tool = activity_tool.getPortalObject().portal_ids
uid = id_tool.generateNewLengthId(id_group='portal_activity_queue', store=0) uid = id_tool.generateNewLengthId(id_group='portal_activity_queue', store=0)
activity_tool.SQLQueue_writeMessage(uid = uid, path = '/'.join(m.object_path)
path = '/'.join(m.object_path) , method_id = m.method_id
method_id = m.method_id, priority = m.activity_kw.get('priority', 1)
priority = m.activity_kw.get('priority', 1), date = m.activity_kw.get('at_date', None)
broadcast = m.activity_kw.get('broadcast', 0), if date is None:
message = self.dumpMessage(m), date = self.getNow(activity_tool)
date = m.activity_kw.get('at_date', DateTime()), tag = m.activity_kw.get('tag', '')
tag = m.activity_kw.get('tag', '')) activity_tool.SQLQueue_writeMessage(uid=uid,
path=path,
method_id=method_id,
priority=priority,
message=self.dumpMessage(m),
date=date,
tag=tag)
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__)) #LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLQueue_delMessage(uid = [m.uid]) activity_tool.SQLQueue_delMessage(uid = [m.uid])
def dequeueMessage(self, activity_tool, processing_node): def getReservedMessageList(self, activity_tool, date, processing_node, limit=None):
readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None) """
if readMessage is None: Get and reserve a list of messages.
return 1 limit
Maximum number of messages to fetch.
# XXX: arbitrary maximum delay. This number is not garanted to be reached, because of:
# Stop processing new messages if processing duration exceeds 10 seconds. - not enough messages being pending execution
activity_stop_time = time() + 10 - race condition (other nodes reserving the same messages at the same
now_date = DateTime() time)
# Next processing date in case of error This number is guaranted not to be exceeded.
next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400 If None (or not given) no limit apply.
message_list = readMessage(processing_node=processing_node, to_date=now_date) """
for line in message_list: result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, limit=limit)
if time() > activity_stop_time: if len(result) == 0:
break activity_tool.SQLQueue_reserveMessageList(limit=limit, processing_node=processing_node, to_date=date)
path = line.path result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, limit=limit)
method_id = line.method_id return result
# Make sure message can not be processed anylonger
activity_tool.SQLQueue_processMessage(uid=line.uid)
get_transaction().commit() # Release locks before starting a potentially long calculation
# At this point, the message is marked as processed.
try:
m = self.loadMessage(line.message)
# Make sure object exists
validation_state = m.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID:
if line.priority > MAX_PRIORITY:
# This is an error.
# Assign message back to 'error' state.
activity_tool.SQLQueue_assignMessage(uid=line.uid,
processing_node=VALIDATE_ERROR_STATE)
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority=line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
continue
# Try to invoke def makeMessageListAvailable(self, activity_tool, uid_list):
activity_tool.invoke(m) # Try to invoke the message """
if m.is_executed: # Make sure message could be invoked Put messages back in processing_node=0 .
get_transaction().commit() # If successful, commit """
except: if len(uid_list):
# If an exception occurs, abort the transaction to minimize the impact, activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
LOG('SQLQueue', WARNING, 'Could not evaluate %s on %s' % (m.method_id, path),
error=sys.exc_info()) def getProcessableMessageList(self, activity_tool, processing_node):
"""
Always true:
For each reserved message, delete redundant messages when it gets
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list
If any error happens in above described process, try to unreserve all
messages already reserved in that process.
If it fails, complain loudly that some messages might still be in an
unclean state.
Returned values:
list of 3-tuple:
- message uid
- message
- priority
"""
def getReservedMessageList(limit):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
date=now_date,
processing_node=processing_node,
limit=limit)
if len(line_list):
LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
now_date = self.getNow(activity_tool)
message_list = []
def append(line, message):
uid = line.uid
message_list.append((uid, message, line.priority))
try:
result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
for line in result:
m = self.loadMessage(line.message, uid=line.uid)
append(line, m)
if len(message_list):
activity_tool.SQLQueue_processMessage(uid=[x[0] for x in message_list])
return message_list
except:
LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
to_free_uid_list = [x[0] for x in message_list]
try: try:
abortTransactionSynchronously() makeMessageListAvailable(to_free_uid_list)
except: except:
# Unfortunately, database adapters may raise an exception against abort. LOG('SQLQueue', PANIC, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally') else:
pass if len(to_free_uid_list):
LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLQueue', TRACE, '(no message was reserved)')
return []
# An exception happens at somewhere else but invoke, so messages def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
# themselves should not be delayed. def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
deletable_uid_list = []
delay_uid_list = []
final_error_uid_list = []
message_with_active_process_list = []
for uid, m, priority in message_uid_priority_list:
if m.is_executed:
deletable_uid_list.append(uid)
if m.active_process:
message_with_active_process_list.append(m)
else:
if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
final_error_uid_list.append(uid)
else:
try:
# Immediately update, because values different for every message
activity_tool.SQLQueue_setPriority(
uid=[uid],
delay=VALIDATION_ERROR_DELAY,
priority=priority + 1)
except:
LOG('SQLQueue', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
try:
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLQueue', PANIC, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
if len(deletable_uid_list):
try:
activity_tool.SQLQueue_delMessage(uid=deletable_uid_list)
except:
LOG('SQLQueue', PANIC, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
if len(delay_uid_list):
try:
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLQueue_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY)
except:
LOG('SQLQueue', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
try:
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLQueue', PANIC, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed messages %r' % (delay_uid_list, ))
if len(final_error_uid_list):
try:
activity_tool.SQLQueue_assignMessage(uid=final_error_uid_list,
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLQueue', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
for m in message_with_active_process_list:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
message_uid_priority_list = \
self.getProcessableMessageList(activity_tool, processing_node)
if len(message_uid_priority_list):
processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
processed_message_uid_list = []
# Commit right before executing messages.
# As MySQL transaction do no start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be commited now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit()
for value in message_uid_priority_list:
processed_message_uid_list.append(value)
# Try to invoke
try: try:
activity_tool.SQLQueue_setPriority(uid=line.uid, date=line.date, activity_tool.invoke(value[1])
priority=line.priority) # Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the
# same "try" block.
get_transaction().commit() get_transaction().commit()
except: except:
LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception', LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
error=sys.exc_info()) try:
raise makeMessageListAvailable([value[0]])
continue except:
LOG('SQQueue', PANIC, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
try: else:
if m.is_executed: LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
activity_tool.SQLQueue_delMessage(uid=[line.uid]) # Delete it
else:
try: try:
# If not, abort transaction and start a new one
abortTransactionSynchronously() abortTransactionSynchronously()
except: except:
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally') LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
pass return True # Stop processing messages for this tic call for this queue.
if time() > processing_stop_time:
if type(m.exc_type) is ClassType \ LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
and issubclass(m.exc_type, ConflictError): break
activity_tool.SQLQueue_setPriority(uid=line.uid, # Release all unprocessed messages
date=next_processing_date, processed_uid_set = ImmutableSet([x[0] for x in processed_message_uid_list])
priority=line.priority) to_free_uid_list = [x[0] for x in message_uid_priority_list if x[0] not in processed_uid_set]
elif line.priority > MAX_PRIORITY: if len(to_free_uid_list):
# This is an error try:
activity_tool.SQLQueue_assignMessage(uid=line.uid, makeMessageListAvailable(to_free_uid_list)
processing_node=INVOKE_ERROR_STATE) except:
# Assign message back to 'error' state LOG('SQQueue', PANIC, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
m.notifyUser(activity_tool) # Notify Error else:
else: LOG('SQQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
# Lower priority self.finalizeMessageExecution(activity_tool, processed_message_uid_list)
activity_tool.SQLQueue_setPriority(uid=line.uid, date=next_processing_date, get_transaction().commit()
priority=line.priority + 1) return not len(message_uid_priority_list)
get_transaction().commit()
except:
LOG('SQLQueue', ERROR,
'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info())
raise
get_transaction().commit() # Release locks before starting a potentially long calculation
return len(message_list) == 0
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None) hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
...@@ -310,69 +427,36 @@ class SQLQueue(RAMQueue): ...@@ -310,69 +427,36 @@ class SQLQueue(RAMQueue):
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
offset = 0
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
global LAST_PROCESSING_NODE now_date = self.getNow(activity_tool)
now_date = DateTime() result = readMessageList(path=None, method_id=None, processing_node=-1,
result = readMessageList(path=None, method_id=None, to_date=now_date, include_processing=0,
processing_node=-1, to_date=now_date) offset=offset, limit=READ_MESSAGE_LIMIT)
get_transaction().commit() validated_count = 0
#TIME_begin = time()
while len(result) and validated_count < MAX_VALIDATED_LIMIT:
get_transaction().commit()
validation_text_dict = {'none': 1} validation_text_dict = {'none': 1}
message_dict = {} message_dict = {}
for line in result: for line in result:
message = self.loadMessage(line.message, uid=line.uid) message = self.loadMessage(line.message, uid = line.uid)
message.order_validation_text = self.getOrderValidationText(message) message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict, self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict) validation_text_dict, now_date=now_date)
distributable_count = len(message_dict)
# XXX probably this below can be optimized by assigning multiple messages at a time. if distributable_count:
path_dict = {} activity_tool.SQLQueue_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
assignMessage = activity_tool.SQLQueue_assignMessage validated_count += distributable_count
processing_node = LAST_PROCESSING_NODE if validated_count < MAX_VALIDATED_LIMIT:
id_tool = activity_tool.getPortalObject().portal_ids offset += READ_MESSAGE_LIMIT
for message in message_dict.itervalues(): result = readMessageList(path=None, method_id=None, processing_node=-1,
path = '/'.join(message.object_path) to_date=now_date, include_processing=0,
broadcast = message.activity_kw.get('broadcast', 0) offset=offset, limit=READ_MESSAGE_LIMIT)
if broadcast: #TIME_end = time()
# Broadcast messages must be distributed into all nodes. #LOG('SQLQueue.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset + len(result), validated_count))
assignMessage(processing_node=1, uid=message.uid)
if node_count > 1:
uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
id_count=node_count - 1,
store=0)
priority = message.activity_kw.get('priority', 1)
dumped_message = self.dumpMessage(message)
date = message.activity_kw.get('at_date', now_date)
tag = message.activity_kw.get('tag', '')
for node in xrange(2, node_count+1):
activity_tool.SQLQueue_writeMessage(uid=uid_list.pop(),
path=path,
method_id=message.method_id,
priority=priority,
broadcast=1,
processing_node=node,
message=dumped_message,
date=date,
tag=tag)
get_transaction().commit()
else:
# Select a processing node. If the same path appears again, dispatch the message to
# the same node, so that object caching is more efficient. Otherwise, apply a round
# robin scheduling.
node = path_dict.get(path)
round_robin_scheduling = message.activity_kw.get('round_robin_scheduling', 0)
if node is None:
node = processing_node
if not round_robin_scheduling:
path_dict[path] = node
processing_node += 1
if processing_node > node_count:
processing_node = 1
assignMessage(processing_node=node, uid=message.uid, broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages
LAST_PROCESSING_NODE = processing_node
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None): def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
......
...@@ -130,14 +130,16 @@ class Message: ...@@ -130,14 +130,16 @@ class Message:
def getObjectList(self, activity_tool): def getObjectList(self, activity_tool):
"""return the list of object that can be expanded from this message.""" """return the list of object that can be expanded from this message."""
object_list = []
try: try:
expand_method_id = self.activity_kw['expand_method_id'] object_list.append(self.getObject(activity_tool))
obj = self.getObject(activity_tool)
# FIXME: how to pass parameters?
object_list = getattr(obj, expand_method_id)()
except KeyError: except KeyError:
object_list = [self.getObject(activity_tool)] pass
else:
if self.hasExpandMethod():
expand_method_id = self.activity_kw['expand_method_id']
# FIXME: how to pass parameters?
object_list = getattr(object_list[0], expand_method_id)()
return object_list return object_list
def hasExpandMethod(self): def hasExpandMethod(self):
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
SELECT NOW()
...@@ -11,7 +11,7 @@ class_file: ...@@ -11,7 +11,7 @@ class_file:
processing_node processing_node
method_id method_id
uid uid
broadcast</params> </params>
UPDATE message UPDATE message
SET SET
processing_node=<dtml-sqlvar processing_node type="int">, processing_node=<dtml-sqlvar processing_node type="int">,
...@@ -26,6 +26,5 @@ WHERE ...@@ -26,6 +26,5 @@ WHERE
AND path = <dtml-sqlvar path type="string"> AND path = <dtml-sqlvar path type="string">
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
</dtml-if> </dtml-if>
<dtml-if broadcast> <dtml-var sql_delimiter>
AND broadcast = <dtml-sqlvar broadcast type="int"> COMMIT
</dtml-if>
...@@ -11,7 +11,7 @@ class_file: ...@@ -11,7 +11,7 @@ class_file:
UPDATE UPDATE
message message
SET SET
processing="0" processing=0,
processing_node=0
WHERE WHERE
processing="1" processing_node=<dtml-sqlvar processing_node type="int">
AND processing_node="<dtml-sqlvar processing_node type="int">"
...@@ -17,7 +17,6 @@ CREATE TABLE `message` ( ...@@ -17,7 +17,6 @@ CREATE TABLE `message` (
`processing` TINYINT NOT NULL DEFAULT 0, `processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME, `processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`broadcast` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
...@@ -26,8 +25,7 @@ CREATE TABLE `message` ( ...@@ -26,8 +25,7 @@ CREATE TABLE `message` (
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY (`path`), KEY (`path`),
KEY (`method_id`), KEY (`method_id`),
KEY (`processing_node`), KEY `processing_node_processing` (`processing_node`, `processing`),
KEY (`processing`),
KEY (`priority`), KEY (`priority`),
KEY (`tag`), KEY (`tag`),
KEY (`order_validation_text`) KEY (`order_validation_text`)
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
to_date
path
method_id
group_method_id
order_validation_text
tag
</params>
DELETE FROM
message
WHERE
processing_node IN (0, <dtml-sqlvar processing_node type="int">)
AND uid != <dtml-sqlvar uid type="int">
AND date <= <dtml-sqlvar to_date type="datetime">
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
AND tag IN ('', <dtml-sqlvar tag type="string">)
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid_list</params>
UPDATE
message
SET
processing_node=0,
processing=0
WHERE
uid IN (
<dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
)
<dtml-var sql_delimiter>
COMMIT
...@@ -7,14 +7,14 @@ cache_time:0 ...@@ -7,14 +7,14 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid <params>uid</params>
processing_node</params>
UPDATE message UPDATE message
SET SET
processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">, processing_date = NOW(),
processing = 1, processing = 1
processing_node = <dtml-sqlvar processing_node type="int">
WHERE WHERE
uid IN ( uid IN (
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in> <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
) )
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
priority
to_date
to_processing_date
group_method_id
order_validation_text</params>
SELECT * FROM
message
WHERE
processing = 0
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
<dtml-if group_method_id>AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
<dtml-if order_validation_text>AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> </dtml-if>
ORDER BY
priority, date, uid
<dtml-if group_method_id>
LIMIT 100
<dtml-else>
LIMIT 1
</dtml-if>
<dtml-comment> <dtml-comment>
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:1000 max_rows:0
max_cache:0 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
...@@ -12,7 +12,10 @@ method_id ...@@ -12,7 +12,10 @@ method_id
processing_node processing_node
priority priority
include_processing include_processing
to_date</params> to_date
offset:int=0
count:int=1000
</params>
SELECT * FROM SELECT * FROM
message message
WHERE WHERE
...@@ -20,10 +23,11 @@ WHERE ...@@ -20,10 +23,11 @@ WHERE
<dtml-if expr="not(include_processing)"> <dtml-if expr="not(include_processing)">
AND processing = 0 AND processing = 0
</dtml-if> </dtml-if>
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY ORDER BY
priority, date, uid priority, date, uid
LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">
...@@ -17,7 +17,7 @@ SELECT uid FROM ...@@ -17,7 +17,7 @@ SELECT uid FROM
message message
WHERE WHERE
processing = 0 processing = 0
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if path> AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if path> AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
to_date
limit
group_method_id
order_validation_text</params>
UPDATE
message
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime">
<dtml-if group_method_id> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
<dtml-if order_validation_text> AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> </dtml-if>
ORDER BY
priority, date, uid
<dtml-if limit>
LIMIT <dtml-sqlvar limit type="int">
</dtml-if>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
limit</params>
SELECT
*
FROM
message
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
AND processing = 0
<dtml-if limit>
LIMIT <dtml-sqlvar limit type="int">
</dtml-if>
...@@ -36,7 +36,7 @@ WHERE ...@@ -36,7 +36,7 @@ WHERE
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in> <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if processing_node> <dtml-if expr="_.getattr(_, 'processing_node', None) is not None">
AND processing_node = <dtml-sqlvar processing_node type="int"> AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if> </dtml-if>
<dtml-if order_validation_text> <dtml-if order_validation_text>
......
...@@ -20,6 +20,6 @@ SET ...@@ -20,6 +20,6 @@ SET
</dtml-if> </dtml-if>
WHERE WHERE
1 = 1 1 = 1
<dtml-if processing_node> <dtml-if expr="processing_node is not None">
AND processing_node = <dtml-sqlvar processing_node type="int"> AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if> </dtml-if>
...@@ -12,7 +12,6 @@ path ...@@ -12,7 +12,6 @@ path
method_id method_id
message message
priority priority
broadcast
date date
processing_node=-1 processing_node=-1
group_method_id group_method_id
...@@ -22,12 +21,11 @@ INSERT INTO message ...@@ -22,12 +21,11 @@ INSERT INTO message
SET SET
uid = <dtml-sqlvar uid type="int">, uid = <dtml-sqlvar uid type="int">,
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> date = <dtml-if date><dtml-sqlvar date type="datetime"><dtml-else>NOW()</dtml-if>,
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
processing_node = <dtml-sqlvar processing_node type="int">, processing_node = <dtml-sqlvar processing_node type="int">,
processing = 0, processing = 0,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
broadcast = <dtml-sqlvar broadcast type="int">,
group_method_id = <dtml-sqlvar group_method_id type="string">, group_method_id = <dtml-sqlvar group_method_id type="string">,
tag = <dtml-sqlvar tag type="string">, tag = <dtml-sqlvar tag type="string">,
order_validation_text = <dtml-sqlvar order_validation_text type="string">, order_validation_text = <dtml-sqlvar order_validation_text type="string">,
......
...@@ -12,26 +12,24 @@ path_list ...@@ -12,26 +12,24 @@ path_list
method_id_list method_id_list
message_list message_list
priority_list priority_list
broadcast_list
date_list date_list
processing_node_list processing_node_list
group_method_id_list group_method_id_list
tag_list tag_list
order_validation_text_list</params> order_validation_text_list</params>
INSERT INTO message INSERT INTO message
(uid, path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, order_validation_text, message) (uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, order_validation_text, message)
VALUES VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if> <dtml-if sequence-start><dtml-else>,</dtml-if>
( (
<dtml-sqlvar expr="uid_list[loop_item]" type="int">, <dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">, <dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-if date_list><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else><dtml-sqlvar "_.DateTime()" type="datetime"></dtml-if>, <dtml-if date_list><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>NOW()</dtml-if><dtml-else>NOW()</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">, <dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-if processing_node_list><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>, <dtml-if processing_node_list><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
0, 0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">, <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="broadcast_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">, <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">, <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">, <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,
......
...@@ -10,7 +10,6 @@ class_file: ...@@ -10,7 +10,6 @@ class_file:
<params>path <params>path
processing_node processing_node
method_id method_id
broadcast
uid</params> uid</params>
UPDATE message_queue UPDATE message_queue
SET SET
...@@ -18,9 +17,11 @@ SET ...@@ -18,9 +17,11 @@ SET
processing=0 processing=0
WHERE WHERE
<dtml-if path> path = <dtml-sqlvar path type="string"> <dtml-if path> path = <dtml-sqlvar path type="string">
<dtml-else> uid = <dtml-sqlvar uid type="int"> </dtml-if> <dtml-else>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> uid IN (
<dtml-if broadcast> <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
AND broadcast = <dtml-sqlvar broadcast type="int"> )
</dtml-if> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-var sql_delimiter>
COMMIT
...@@ -11,7 +11,7 @@ class_file: ...@@ -11,7 +11,7 @@ class_file:
UPDATE UPDATE
message_queue message_queue
SET SET
processing="0" processing=0,
processing_node=0
WHERE WHERE
processing="1" processing_node=<dtml-sqlvar processing_node type="int">
AND processing_node="<dtml-sqlvar processing_node type="int">"
...@@ -17,15 +17,13 @@ CREATE TABLE `message_queue` ( ...@@ -17,15 +17,13 @@ CREATE TABLE `message_queue` (
`processing` INT DEFAULT 0, `processing` INT DEFAULT 0,
`processing_date` datetime, `processing_date` datetime,
`priority` INT DEFAULT 0, `priority` INT DEFAULT 0,
`broadcast` INT DEFAULT 0,
`tag` VARCHAR(255), `tag` VARCHAR(255),
`message` LONGBLOB, `message` LONGBLOB,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY `date` (`date`), KEY `date` (`date`),
KEY `path` (`path`), KEY `path` (`path`),
KEY `method_id` (`method_id`), KEY `method_id` (`method_id`),
KEY `processing_node` (`processing_node`), KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing` (`processing`),
KEY `processing_date` (`processing_date`), KEY `processing_date` (`processing_date`),
KEY `priority` (`priority`), KEY `priority` (`priority`),
KEY `tag` (`tag`) KEY `tag` (`tag`)
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid_list</params>
UPDATE
message_queue
SET
processing_node=0,
processing=0
WHERE
uid IN (
<dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
)
<dtml-var sql_delimiter>
COMMIT
...@@ -11,7 +11,11 @@ class_file: ...@@ -11,7 +11,11 @@ class_file:
UPDATE UPDATE
message_queue message_queue
SET SET
processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">, processing_date = NOW(),
processing=1 processing=1
WHERE WHERE
uid = <dtml-sqlvar uid type="int"> uid IN (
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
priority
to_date</params>
SELECT * FROM
message_queue
WHERE
processing = 0
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
priority, date, uid
<dtml-comment> <dtml-comment>
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:1000 max_rows:0
max_cache:0 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
...@@ -11,15 +11,19 @@ class_file: ...@@ -11,15 +11,19 @@ class_file:
method_id method_id
processing_node processing_node
priority priority
to_date</params> to_date
offset:int=0
count:int=1000
</params>
SELECT * FROM SELECT * FROM
message_queue message_queue
WHERE WHERE
processing = 0 processing = 0
<dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY ORDER BY
priority, date, uid priority, date, uid
LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">
...@@ -15,6 +15,6 @@ SELECT uid FROM ...@@ -15,6 +15,6 @@ SELECT uid FROM
message_queue message_queue
WHERE WHERE
processing = 0 processing = 0
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if> <dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
<dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
to_date
limit
</params>
UPDATE
message_queue
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime">
ORDER BY
priority, date, uid
<dtml-if limit>
LIMIT <dtml-sqlvar limit type="int">
</dtml-if>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
limit</params>
SELECT
*
FROM
message_queue
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
AND processing = 0
<dtml-if limit>
LIMIT <dtml-sqlvar limit type="int">
</dtml-if>
...@@ -9,12 +9,21 @@ class_file: ...@@ -9,12 +9,21 @@ class_file:
</dtml-comment> </dtml-comment>
<params>uid <params>uid
priority priority
delay
date</params> date</params>
UPDATE UPDATE
message_queue message_queue
SET SET
priority = <dtml-sqlvar priority type="int">, processing = 0
processing = 0, <dtml-if priority>
date = <dtml-sqlvar date type="datetime"> , priority = <dtml-sqlvar priority type="int">
</dtml-if>
<dtml-if delay>
, date = DATE_ADD(NOW(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
<dtml-elif date>
, date = <dtml-sqlvar date type="datetime">
</dtml-if>
WHERE WHERE
uid = <dtml-sqlvar uid type="int"> uid IN (
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
...@@ -16,6 +16,6 @@ SET ...@@ -16,6 +16,6 @@ SET
processing_date = processing_date - <dtml-sqlvar delay type="int"> processing_date = processing_date - <dtml-sqlvar delay type="int">
WHERE WHERE
1 = 1 1 = 1
<dtml-if processing_node> <dtml-if expr="processing_node is not None">
AND processing_node = <dtml-sqlvar processing_node type="int"> AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if> </dtml-if>
\ No newline at end of file
...@@ -12,7 +12,6 @@ path ...@@ -12,7 +12,6 @@ path
method_id method_id
message message
priority priority
broadcast
processing_node processing_node
date date
tag</params> tag</params>
...@@ -20,12 +19,11 @@ INSERT INTO message_queue ...@@ -20,12 +19,11 @@ INSERT INTO message_queue
SET SET
uid = <dtml-sqlvar uid type="int">, uid = <dtml-sqlvar uid type="int">,
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> date = <dtml-if date><dtml-sqlvar date type="datetime"><dtml-else>NOW()</dtml-if>,
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
<dtml-if processing_node> <dtml-if processing_node>
processing_node = <dtml-sqlvar processing_node type="int">, processing_node = <dtml-sqlvar processing_node type="int">,
</dtml-if> </dtml-if>
broadcast = <dtml-sqlvar broadcast type="int">,
processing = 0, processing = 0,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
tag = <dtml-sqlvar tag type="string">, tag = <dtml-sqlvar tag type="string">,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment