############################################################################## # # Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved. # Vincent Pelletier <vincent@nexedi.com> # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## from collections import defaultdict from itertools import product import operator import sys import transaction from random import getrandbits import MySQLdb from MySQLdb.constants.ER import DUP_ENTRY from DateTime import DateTime from Shared.DC.ZRDB.Results import Results from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from ZODB.POSException import ConflictError from Products.CMFActivity.ActivityTool import ( Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage) from Products.CMFActivity.ActivityRuntimeEnvironment import ( DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment) from Queue import Queue, VALIDATION_ERROR_DELAY from Products.CMFActivity.Errors import ActivityFlushError from Products.ERP5Type import Timeout from Products.ERP5Type.Timeout import TimeoutReachedError, Deadline # Stop validating more messages when this limit is reached MAX_VALIDATED_LIMIT = 1000 # Read this many messages to validate. READ_MESSAGE_LIMIT = 1000 INVOKE_ERROR_STATE = -2 DEPENDENCY_IGNORED_ERROR_STATE = -10 # Activity uids are stored as 64 bits unsigned integers. # No need to depend on a database that supports unsigned integers. # Numbers are far big enough without using the MSb. Assuming a busy activity # table having one million activities, the probability of triggering a conflict # when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it # increases to 1e-13, which is still very low. UID_SAFE_BITSIZE = 63 # Inserting an activity batch of 100 activities among one million existing # activities has a probability of failing of 1e-11. While it should be low # enough, retries can help lower that. Try 10 times, which should be short # enough while yielding one order of magnitude collision probability # improvement. UID_ALLOCATION_TRY_COUNT = 10 def sort_message_key(message): # same sort key as in SQLBase.getMessageList return message.line.priority, message.line.date, message.uid _DequeueMessageException = Exception() _ITEMGETTER0 = operator.itemgetter(0) _IDENTITY = lambda x: x def render_datetime(x): return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6] # sqltest_dict ({'condition_name': <render_function>}) defines how to render # condition statements in the SQL query used by SQLBase.getMessageList def sqltest_dict(): sqltest_dict = {} no_quote_type = int, float, long def _(name, column=None, op="="): if column is None: column = name column_op = "%s %s " % (column, op) def render(value, render_string): if isinstance(value, no_quote_type): return column_op + str(value) if isinstance(value, DateTime): value = render_datetime(value) if isinstance(value, basestring): return column_op + render_string(value) assert op == "=", value if value is None: # XXX: see comment in SQLBase._getMessageList return column + " IS NULL" for x in value: return "%s IN (%s)" % (column, ', '.join(map( str if isinstance(x, no_quote_type) else render_datetime if isinstance(x, DateTime) else render_string, value))) return "0" sqltest_dict[name] = render _('active_process_uid') _('group_method_id') _('method_id') _('path') _('processing_node') _('serialization_tag') _('tag') _('retry') _('to_date', column="date", op="<=") _('uid') def renderAbovePriorityDateUid(value, render_string): # Strictly dependent on _getMessageList's sort order: given a well-ordered # list of values, rendered condition will match the immediate next row in # that sort order. priority, date, uid = value assert isinstance(priority, no_quote_type) assert isinstance(uid, no_quote_type) return ( '(priority>%(priority)s OR (priority=%(priority)s AND ' '(date>%(date)s OR (date=%(date)s AND uid>%(uid)s))' '))' % { 'priority': priority, # render_datetime raises if "date" lacks date API, so no need to check 'date': render_string(render_datetime(date)), 'uid': uid, } ) sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid return sqltest_dict sqltest_dict = sqltest_dict() def _validate_after_path_and_method_id(value, render_string): path, method_id = value return ( sqltest_dict['method_id'](method_id, render_string) + ' AND ' + sqltest_dict['path'](path, render_string) ) def _validate_after_tag_and_method_id(value, render_string): tag, method_id = value return ( sqltest_dict['method_id'](method_id, render_string) + ' AND ' + sqltest_dict['tag'](tag, render_string) ) # Definition of activity dependencies # key: dependency name (as passed to ActiveObject.activate() & friends) # value: # - tuple of column names. If there is more than one, they must be in the # same order as the dependency value items expected by the next item # - callable rendering given values into an SQL condition # (value, render_string) -> str _DEPENDENCY_TESTER_DICT = { 'after_method_id': ( ('method_id', ), sqltest_dict['method_id'], ), 'after_path': ( ('path', ), sqltest_dict['path'], ), 'after_message_uid': ( ('uid', ), sqltest_dict['uid'], ), 'after_path_and_method_id': ( ('path', 'method_id'), _validate_after_path_and_method_id, ), 'after_tag': ( ('tag', ), sqltest_dict['tag'], ), 'after_tag_and_method_id': ( ('tag', 'method_id'), _validate_after_tag_and_method_id, ), 'serialization_tag': ( ('serialization_tag', ), lambda value, render_string: ( 'processing_node > -1 AND ' + sqltest_dict['serialization_tag'](value, render_string) ), ), } def getNow(db): """ Return the UTC date from the point of view of the SQL server. Note that this value is not cached, and is not transactionnal on MySQL side. """ return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0] class SQLBase(Queue): """ Define a set of common methods for SQL-based storage of activities. """ def createTableSQL(self): return """\ CREATE TABLE %s ( `uid` BIGINT UNSIGNED NOT NULL, `date` DATETIME(6) NOT NULL, `path` VARCHAR(255) NOT NULL, `active_process_uid` BIGINT UNSIGNED NULL, `method_id` VARCHAR(255) NOT NULL, `processing_node` SMALLINT NOT NULL DEFAULT -1, `priority` TINYINT NOT NULL DEFAULT 0, `node` SMALLINT NOT NULL DEFAULT 0, `group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `tag` VARCHAR(255) NOT NULL, `serialization_tag` VARCHAR(255) NOT NULL, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `message` LONGBLOB NOT NULL, PRIMARY KEY (`uid`), KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`), KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`), KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`), KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`), KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`), KEY (`path`, `processing_node`), KEY (`active_process_uid`), KEY (`method_id`, `processing_node`), KEY (`tag`, `processing_node`) ) ENGINE=InnoDB""" % self.sql_table def initialize(self, activity_tool, clear): db = activity_tool.getSQLConnection() create = self.createTableSQL() if clear: db.query("DROP TABLE IF EXISTS " + self.sql_table) db.query(create) else: src = db.upgradeSchema(create, create_if_not_exists=1, initialize=self._initialize) if src: LOG('CMFActivity', INFO, "%r table upgraded\n%s" % (self.sql_table, src)) self._insert_max_payload = (db.getMaxAllowedPacket() + len(self._insert_separator) - len(self._insert_template % (self.sql_table, ''))) def _initialize(self, db, column_list): LOG('CMFActivity', ERROR, "Non-empty %r table upgraded." " The following added columns could not be initialized: %s" % (self.sql_table, ", ".join(column_list))) _insert_template = ("INSERT INTO %s (uid," " path, active_process_uid, date, method_id, processing_node," " priority, node, group_method_id, tag, serialization_tag," " message) VALUES\n(%s)") _insert_separator = "),\n(" def _hasDependency(self, message): get = message.activity_kw.get return any( get(x) is not None for x in _DEPENDENCY_TESTER_DICT ) def prepareQueueMessageList(self, activity_tool, message_list): db = activity_tool.getSQLConnection() quote = db.string_literal def insert(reset_uid): values = self._insert_separator.join(values_list) del values_list[:] for _ in xrange(UID_ALLOCATION_TRY_COUNT): if reset_uid: reset_uid = False # Overflow will result into IntegrityError. db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE)) try: db.query(self._insert_template % (self.sql_table, values)) except MySQLdb.IntegrityError, (code, _): if code != DUP_ENTRY: raise reset_uid = True else: break else: raise RuntimeError("Maximum retry for prepareQueueMessageList reached") i = 0 reset_uid = True values_list = [] max_payload = self._insert_max_payload sep_len = len(self._insert_separator) hasDependency = self._hasDependency for m in message_list: if m.is_registered: active_process_uid = m.active_process_uid date = m.activity_kw.get('at_date') row = ','.join(( '@uid+%s' % i, quote('/'.join(m.object_path)), 'NULL' if active_process_uid is None else str(active_process_uid), "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)), quote(m.method_id), '-1' if hasDependency(m) else '0', str(m.activity_kw.get('priority', 1)), str(m.activity_kw.get('node', 0)), quote(m.getGroupId()), quote(m.activity_kw.get('tag', '')), quote(m.activity_kw.get('serialization_tag', '')), quote(Message.dump(m)))) i += 1 n = sep_len + len(row) max_payload -= n if max_payload < 0: if values_list: insert(reset_uid) reset_uid = False max_payload = self._insert_max_payload - n else: raise ValueError("max_allowed_packet too small to insert message") values_list.append(row) if values_list: insert(reset_uid) def _getMessageList(self, db, count=1000, src__=0, **kw): # XXX: Because most columns have NOT NULL constraint, conditions with None # value should be ignored, instead of trying to render them # (with comparisons with NULL). q = db.string_literal sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems()) sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % ( self.sql_table, sql and '\nWHERE ' + sql, '' if count is None else '\nLIMIT %d' % count, ) return sql if src__ else Results(db.query(sql, max_rows=0)) def getMessageList(self, activity_tool, *args, **kw): result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw) if type(result) is str: # src__ == 1 return result, class_name = self.__class__.__name__ return [Message.load(line.message, activity=class_name, uid=line.uid, processing_node=line.processing_node, retry=line.retry) for line in result] def countMessageSQL(self, quote, **kw): return "SELECT count(*) FROM %s WHERE processing_node > %d AND %s" % ( self.sql_table, DEPENDENCY_IGNORED_ERROR_STATE, " AND ".join( sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v ) or "1") def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw): where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v] if only_valid: where.append('processing_node > %d' % INVOKE_ERROR_STATE) if only_invalid: where.append('processing_node <= %d' % INVOKE_ERROR_STATE) return "SELECT 1 FROM %s WHERE %s LIMIT 1" % ( self.sql_table, " AND ".join(where) or "1") def getPriority(self, activity_tool, processing_node, node_set=None): if node_set is None: q = ("SELECT 3*priority, date FROM %s" " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)" " ORDER BY priority, date LIMIT 1" % self.sql_table) else: subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s" " WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)" " ORDER BY priority, date LIMIT 1)" % self.sql_table).format node = 'node=%s' % processing_node # "ALL" on all but one, to incur deduplication cost only once. # "UNION ALL" between the two naturally distinct sets. q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t" " ORDER BY effective_priority, date LIMIT 1" % ( subquery(-1, node), subquery('', 'node=0'), subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'), ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '', )) result = activity_tool.getSQLConnection().query(q, 0)[1] if result: return result[0] return Queue.getPriority(self, activity_tool, processing_node, node_set) def _retryOnLockError(self, method, args=(), kw={}): while True: try: return method(*args, **kw) except ConflictError: # Note that this code assumes that a database adapter translates # a lock error into a conflict error. LOG('SQLBase', INFO, 'Got a lock error, retrying...') def _log(self, severity, summary): LOG(self.__class__.__name__, severity, summary, error=severity > INFO) def _getExecutableMessageSet(self, activity_tool, db, message_list): """ Return, from given message list, the set of messages which have all their dependencies satisfied. """ # Principle of operation: # For each dependency type used in given message list, find all messages # matching any of the dependency values used in given message list. # This provides the SQL database with structurally simple queries that it # should be able to optimise easily. # Further refinements: # - Any blocked message is ignored in further dendency type lookups (we # already know it is blocked, no point in checking further). # - Test the most popular dependency types first, with the expectation # that these will find most of the blockers, reducing the set of # activities left to test and (with the refinement above) reducing the # probability of having to run further queries (if there are other # dependency types to test) dependency_tester_dict = _DEPENDENCY_TESTER_DICT # dependency_name (str): Something like 'serialization_tag', etc # dependency_value (any): dependency_name-dependent structure and meaning. # dependency_dict: define the dependencies to check, and which messages are # blocked by each found blocker. # [dependency_name][dependency_value] -> message set dependency_dict = defaultdict(lambda: defaultdict(set)) # message_dependency_dict: define which message has which dependencies, to # efficiently remove a message from dependency_dict once it is found to be # blocked. # [message][dependency_name] -> dependency_value message_dependency_dict = defaultdict(dict) for message in message_list: for ( dependency_name, dependency_value, ) in message.activity_kw.iteritems(): try: column_list, _ = dependency_tester_dict[dependency_name] except KeyError: continue # There are 2 types of dependencies: # - monovalued (most), which accepts a single value and a vector of # values. # - 2-valued (after_path_and_method_id and after_tag_and_method_id) # which accept a 2-vector, each item being a single value or a vector # of values. if len(column_list) == 1: dependency_value_list = [ x for x in ( dependency_value if isinstance(dependency_value, (tuple, list)) else (dependency_value, ) ) if x is not None ] else: dependency_value_list = list(product(*( ( x if isinstance(x, (tuple, list)) else (x, ) ) for x in dependency_value if x is not None ))) if not dependency_value_list: continue message_dependency_dict[message][dependency_name] = dependency_value_list dependency_value_dict = dependency_dict[dependency_name] for dependency_value in dependency_value_list: dependency_value_dict[dependency_value].add(message) # Messages are supposed valid until blockage is found. result = set(message_list) # Messages for which a blockage is found, so removal of this message from # further dependency processing is delayed to the next iteration, to avoid # doing such work if there is no such further iteration. new_blocked_message_set = set() quote = db.string_literal table_name_list = activity_tool.getSQLQueueTableNameSet() for ( dependency_name, dependency_value_dict, ) in sorted( dependency_dict.iteritems(), # Test first the condition with the most values. # XXX: after_path=('foo', 'bar') counts as 2 points for after_path # despite being a single activity. Is there a fairer (while cheap) way ? key=lambda dependency_dict_item: sum( len(message_set) for message_set in dependency_dict_item[1].itervalues() ), reverse=True, ): # Previous iteration found blocked messages. # Find which activities, and remove their values from dependency_dict # so these activities are not tested in further queries (we already # know they are blocked). while new_blocked_message_set: blocked_message = new_blocked_message_set.pop() for ( message_dependency_name, message_dependency_value_list, ) in message_dependency_dict[blocked_message].iteritems(): message_dependency_value_dict = dependency_dict[message_dependency_name] if not message_dependency_value_dict: # This dependency was already dropped or evaluated, nothing to # cleanup here. continue for message_dependency_value in message_dependency_value_list: message_set = message_dependency_value_dict[message_dependency_value] message_set.remove(blocked_message) if not message_set: # No more message wait for this value for this dependency, drop # the entry. del message_dependency_value_dict[message_dependency_value] # Note: no point in editing dependency_dict if # message_dependency_value_dict is empty, the outer loop is working # on a copy. if not dependency_value_dict: # No more non-blocked message for this dependency, skip it. continue column_list, to_sql = dependency_tester_dict[dependency_name] if len(column_list) == 1: row2key = _ITEMGETTER0 dependency_sql = to_sql(dependency_value_dict.keys(), quote) else: row2key = _IDENTITY # XXX: generated SQL could be simpler: for example, a dependency input # as # ('foo', ('bar', 'baz')) # will become # (... = 'foo' AND ... = 'bar') OR (... = 'foo' AND ... = 'baz') # This is the correct condition, but it could be expressed with shorter # SQL. But I'm not sure this makes much of a difference for the query # planner, it would likely increase the complexity here a lot, and # anyway these multi-column dependencies should rather be replaced with # tags (as it often possible and produces better overall activity # behaviour). dependency_sql = ' OR '.join( '(' + to_sql(dependency_value, quote) + ')' for dependency_value in dependency_value_dict ) base_sql_prefix = '(SELECT DISTINCT %s FROM ' % ( ','.join(column_list), ) base_sql_suffix = ' WHERE processing_node > %i AND (%s))' % ( DEPENDENCY_IGNORED_ERROR_STATE, dependency_sql, ) for row in db.query( ' UNION '.join( base_sql_prefix + table_name + base_sql_suffix for table_name in table_name_list ), max_rows=0, )[1]: # Each row is a value which blocks some activities. dependent_message_set = dependency_value_dict[row2key(row)] # queue blocked messages for processing in the beginning of next # outermost iteration. new_blocked_message_set.update(dependent_message_set) # ...but update result immediately, in case there is no next # outermost iteration. result.difference_update(dependent_message_set) dependency_value_dict.clear() return result def distribute(self, activity_tool, node_count): db = activity_tool.getSQLConnection() now_date = getNow(db) where_kw = { 'processing_node': -1, 'to_date': now_date, 'count': READ_MESSAGE_LIMIT, } validated_count = 0 while 1: result = self._getMessageList(db, **where_kw) if not result: return transaction.commit() message_list = [Message.load(line.message, uid=line.uid, line=line) for line in result] message_set = self._getExecutableMessageSet(activity_tool, db, message_list) transaction.commit() if message_set: distributable_uid_set = set() serialization_tag_dict = {} for message in message_set: serialization_tag = message.activity_kw.get('serialization_tag') if serialization_tag is None: distributable_uid_set.add(message.uid) else: serialization_tag_dict.setdefault(serialization_tag, []).append(message) for message_list in serialization_tag_dict.itervalues(): # Sort list of messages to validate the message with highest score message_list.sort(key=sort_message_key) distributable_uid_set.add(message_list[0].uid) group_method_id = message_list[0].line.group_method_id if group_method_id == '\0': continue for message in message_list[1:]: if group_method_id == message.line.group_method_id: distributable_uid_set.add(message.uid) distributable_count = len(distributable_uid_set) if distributable_count: self.assignMessageList(db, 0, distributable_uid_set) validated_count += distributable_count if validated_count >= MAX_VALIDATED_LIMIT: return line = result[-1] where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid) def getReservedMessageList(self, db, date, processing_node, limit, group_method_id=None, node_set=None): """ Get and reserve a list of messages. limit Maximum number of messages to fetch. This number is not garanted to be reached, because of not enough messages being pending execution. """ assert limit quote = db.string_literal query = db.query args = (self.sql_table, sqltest_dict['to_date'](date, quote), ' AND group_method_id=' + quote(group_method_id) if group_method_id else '' , limit) # Get reservable messages. # During normal operation, sorting by date (as last criteria) is fairer # for users and reduce the probability to do the same work several times # (think of an object that is modified several times in a short period of # time). if node_set is None: result = Results(query( "SELECT * FROM %s WHERE processing_node=0 AND %s%s" " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0)) else: # We'd like to write # ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date # but this makes indices inefficient. subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s" " WHERE {} AND processing_node=0 AND %s%s" " ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format node = 'node=%s' % processing_node result = Results(query( # "ALL" on all but one, to incur deduplication cost only once. # "UNION ALL" between the two naturally distinct sets. "SELECT * FROM (%s UNION ALL %s UNION %s%s) as t" " ORDER BY effective_priority, date LIMIT %s"% ( subquery(-1, node), subquery('', 'node=0'), subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'), ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '', limit), 0)) if result: # Reserve messages. uid_list = [x.uid for x in result] self.assignMessageList(db, processing_node, uid_list) self._log(TRACE, 'Reserved messages: %r' % uid_list) return result return () def assignMessageList(self, db, state, uid_list): """ Put messages back in given processing_node. """ db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % ( self.sql_table, state, ','.join(map(str, uid_list)))) def getProcessableMessageLoader(self, db, processing_node): # do not merge anything def load(line): uid = line.uid m = Message.load(line.message, uid=uid, line=line) return m, uid, () return load def getProcessableMessageList(self, activity_tool, processing_node, node_family_id_list): """ Always true: For each reserved message, delete redundant messages when it gets reserved (definitely lost, but they are expandable since redundant). - reserve a message - if this message has a group_method_id: - reserve a bunch of messages - until the total "cost" of the group goes over 1 - get one message from the reserved bunch (this messages will be "needed") - update the total cost - unreserve "unneeded" messages - return still-reserved message list and a group_method_id If any error happens in above described process, try to unreserve all messages already reserved in that process. If it fails, complain loudly that some messages might still be in an unclean state. Returned values: 4-tuple: - list of messages - group_method_id - uid_to_duplicate_uid_list_dict """ db = activity_tool.getSQLConnection() now_date = getNow(db) uid_to_duplicate_uid_list_dict = {} try: while 1: # not a loop # Select messages that were either assigned manually or left # unprocessed after a shutdown. Most of the time, there's none. # To minimize the probability of deadlocks, we also COMMIT so that a # new transaction starts on the first 'FOR UPDATE' query, which is all # the more important as the current on started with getPriority(). result = db.query("SELECT * FROM %s WHERE processing_node=%s" " ORDER BY priority, date LIMIT 1\0COMMIT" % ( self.sql_table, processing_node), 0) already_assigned = result[1] if already_assigned: result = Results(result) else: result = self.getReservedMessageList(db, now_date, processing_node, 1, node_set=node_family_id_list) if not result: break # So reserved documents are properly released even if load raises. for line in result: uid_to_duplicate_uid_list_dict[line.uid] = [] load = self.getProcessableMessageLoader(db, processing_node) m, uid, uid_list = load(result[0]) message_list = [m] uid_to_duplicate_uid_list_dict[uid] = uid_list group_method_id = m.line.group_method_id if group_method_id[0] != '\0': # Count the number of objects to prevent too many objects. cost = m.getGroupMethodCost() assert 0 < cost <= 1, (self.sql_table, uid) count = m.getObjectCount(activity_tool) # this is heuristic (messages with same group_method_id # are likely to have the same group_method_cost) limit = int(1. / cost + 1 - count) if limit > 1: # <=> cost * count < 1 cost *= count # Retrieve objects which have the same group method. result = iter(already_assigned and Results(db.query("SELECT * FROM %s" " WHERE processing_node=%s AND group_method_id=%s" " ORDER BY priority, date LIMIT %s" % ( self.sql_table, processing_node, db.string_literal(group_method_id), limit), 0)) # Do not optimize rare case: keep the code simple by not # adding more results from getReservedMessageList if the # limit is not reached. or self.getReservedMessageList(db, now_date, processing_node, limit, group_method_id, node_family_id_list)) for line in result: if line.uid in uid_to_duplicate_uid_list_dict: continue m, uid, uid_list = load(line) if m is None: uid_to_duplicate_uid_list_dict[uid] += uid_list continue uid_to_duplicate_uid_list_dict[uid] = uid_list cost += m.getObjectCount(activity_tool) * \ m.getGroupMethodCost() message_list.append(m) if cost >= 1: # Unreserve extra messages as soon as possible. uid_list = [line.uid for line in result if line.uid != uid] if uid_list: self.assignMessageList(db, 0, uid_list) return message_list, group_method_id, uid_to_duplicate_uid_list_dict except: self._log(WARNING, 'Exception while reserving messages.') if uid_to_duplicate_uid_list_dict: to_free_uid_list = uid_to_duplicate_uid_list_dict.keys() for uid_list in uid_to_duplicate_uid_list_dict.itervalues(): to_free_uid_list += uid_list try: self.assignMessageList(db, 0, to_free_uid_list) except: self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list) else: if to_free_uid_list: self._log(TRACE, 'Freed messages %r' % to_free_uid_list) else: self._log(TRACE, '(no message was reserved)') return (), None, None def _abort(self): try: transaction.abort() except: # Unfortunately, database adapters may raise an exception against abort. self._log(PANIC, 'abort failed, thus some objects may be modified accidentally') raise # Queue semantic def dequeueMessage(self, activity_tool, processing_node, node_family_id_list): message_list, group_method_id, uid_to_duplicate_uid_list_dict = \ self.getProcessableMessageList(activity_tool, processing_node, node_family_id_list) if message_list: # Remove group_id parameter from group_method_id group_method_id = group_method_id.split('\0')[0] if group_method_id != "": method = activity_tool.invokeGroup args = (group_method_id, message_list, self.__class__.__name__, hasattr(self, 'generateMessageUID')) activity_runtime_environment = ActivityRuntimeEnvironment( None, priority=min(x.line.priority for x in message_list), ) else: method = activity_tool.invoke message, = message_list args = message_list activity_runtime_environment = ActivityRuntimeEnvironment(message) # Commit right before executing messages. # As MySQL transaction does not start exactly at the same time as ZODB # transactions but a bit later, messages available might be called # on objects which are not available - or available in an old # version - to ZODB connector. # So all connectors must be committed now that we have selected # everything needed from MySQL to get a fresh view of ZODB objects. transaction.commit() transaction.begin() # Try to invoke try: # Refer Timeout.activity_timeout instead of # from Products.ERP5Type.Timeout import activity_timeout # so that we can override the value in Timeout namescope in unit tests. offset = Timeout.activity_timeout with activity_runtime_environment, Deadline(offset): method(*args) # Abort if at least 1 message failed. On next tic, only those that # succeeded will be selected because their at_date won't have been # increased. for m in message_list: if m.getExecutionState() == MESSAGE_NOT_EXECUTED: raise _DequeueMessageException transaction.commit() except: exc_info = sys.exc_info() if exc_info[1] is not _DequeueMessageException: self._log(WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % [(m.uid, m.object_path, m.method_id) for m in message_list]) for m in message_list: m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False) self._abort() exc_info = message_list[0].exc_info if exc_info: try: # Register it again. with activity_runtime_environment: cancel = message.on_error_callback(*exc_info) del exc_info, message.exc_info transaction.commit() if cancel: message.setExecutionState(MESSAGE_EXECUTED) except: self._log(WARNING, 'Exception raised when processing error callbacks') message.setExecutionState(MESSAGE_NOT_EXECUTED) self._abort() self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict) transaction.commit() return not message_list def deleteMessageList(self, db, uid_list): db.query("DELETE FROM %s WHERE uid IN (%s)" % ( self.sql_table, ','.join(map(str, uid_list)))) def reactivateMessageList(self, db, uid_list, delay, retry): db.query("UPDATE %s SET" " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)" "%s WHERE uid IN (%s)" % ( self.sql_table, delay, ", priority = priority + 1, retry = retry + 1" if retry else "", ",".join(map(str, uid_list)))) def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict=None): """ If everything was fine, delete all messages. If anything failed, make successful messages available (if any), and the following rules apply to failed messages: - Failures due to ConflictErrors cause messages to be postponed, but their retry count is *not* increased. - Failures of messages already above maximum retry count cause them to be put in a permanent-error state. - In all other cases, retry count is increased and message is delayed. """ db = activity_tool.getSQLConnection() deletable_uid_list = [] delay_uid_list = [] final_error_uid_list = [] make_available_uid_list = [] notify_user_list = [] executed_uid_list = deletable_uid_list if uid_to_duplicate_uid_list_dict is not None: for m in message_list: if m.getExecutionState() == MESSAGE_NOT_EXECUTED: executed_uid_list = make_available_uid_list break for m in message_list: uid = m.uid if m.getExecutionState() == MESSAGE_EXECUTED: executed_uid_list.append(uid) if uid_to_duplicate_uid_list_dict is not None: executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ()) elif m.getExecutionState() == MESSAGE_NOT_EXECUTED: # Should duplicate messages follow strictly the original message, or # should they be just made available again ? if uid_to_duplicate_uid_list_dict is not None: make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ()) if (m.exc_type and # m.exc_type may be None (m.conflict_retry if issubclass(m.exc_type, ConflictError) else m.exc_type is SkippedMessage)): delay_uid_list.append(uid) else: max_retry = m.max_retry retry = m.line.retry if (max_retry is not None and retry >= max_retry) or \ m.exc_type == TimeoutReachedError: # Always notify when we stop retrying. notify_user_list.append((m, False)) final_error_uid_list.append(uid) continue # In case of infinite retry, notify the user # when the default limit is reached. if max_retry is None and retry == DEFAULT_MAX_RETRY: notify_user_list.append((m, True)) delay = m.delay if delay is None: # By default, make delay quadratic to the number of retries. delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2 try: # Immediately update, because values different for every message self.reactivateMessageList(db, (uid,), delay, True) except: self._log(WARNING, 'Failed to reactivate %r' % uid) make_available_uid_list.append(uid) else: # MESSAGE_NOT_EXECUTABLE # 'path' does not point to any object. Activities are normally flushed # (without invoking them) when an object is deleted, but this is only # an optimisation. There is no efficient and reliable way to do such # this, because a concurrent and very long transaction may be about to # activate this object, without conflict. # So we have to clean up any remaining activity. deletable_uid_list.append(uid) if deletable_uid_list: try: self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list)) except: self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list) else: self._log(TRACE, 'Deleted messages %r' % deletable_uid_list) if delay_uid_list: try: # If this is a conflict error, do not increase 'retry' but only delay. self.reactivateMessageList(db, delay_uid_list, VALIDATION_ERROR_DELAY, False) except: self._log(ERROR, 'Failed to delay %r' % delay_uid_list) if final_error_uid_list: try: self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list) except: self._log(ERROR, 'Failed to set message to error state for %r' % final_error_uid_list) if make_available_uid_list: try: self.assignMessageList(db, 0, make_available_uid_list) except: self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list) else: self._log(TRACE, 'Freed messages %r' % make_available_uid_list) try: for m, retry in notify_user_list: m.notifyUser(activity_tool, retry) except: # Notification failures must not cause this method to raise. self._log(WARNING, 'Exception during notification phase of finalizeMessageExecution') def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw): """ object_path is a tuple """ db = activity_tool.getSQLConnection() path = '/'.join(object_path) if invoke: invoked = set() def invoke(message): try: key = self.generateMessageUID(message) if key in invoked: return invoked.add(key) except AttributeError: pass line = getattr(message, 'line', None) if (line and line.processing_node != -1 or self._getExecutableMessageSet(activity_tool, db, [message])): # Try to invoke the message - what happens if invoke calls flushActivity ?? with ActivityRuntimeEnvironment(message): activity_tool.invoke(message) if message.getExecutionState() != MESSAGE_EXECUTED: raise ActivityFlushError('Could not invoke %s on %s' % (message.method_id, path)) else: raise ActivityFlushError('Could not validate %s on %s' % (message.method_id, path)) for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and ( method_id is None or method_id == m.method_id): if invoke: invoke(m) activity_tool.unregisterMessage(self, m) uid_list = [] for line in self._getMessageList(db, path=path, **({'method_id': method_id} if method_id else {})): if only_safe and line.processing_node > -2: continue uid_list.append(line.uid) if invoke and line.processing_node <= 0: invoke(Message.load(line.message, uid=line.uid, line=line)) if uid_list: self.deleteMessageList(db, uid_list) # Required for tests def timeShift(self, activity_tool, delay, processing_node=None): """ To simulate time shift, we simply substract delay from all dates in message(_queue) table """ activity_tool.getSQLConnection().query("UPDATE %s SET" " date = DATE_SUB(date, INTERVAL %s SECOND)" % (self.sql_table, delay) + ('' if processing_node is None else "WHERE processing_node=%s" % processing_node))