Commit e744fd32 authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: some optimization and clean-up in the code reserving messages

parent 18cd0bf9
...@@ -389,7 +389,7 @@ CREATE TABLE %s ( ...@@ -389,7 +389,7 @@ CREATE TABLE %s (
distributable_uid_set.add(message.uid) distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set) distributable_count = len(distributable_uid_set)
if distributable_count: if distributable_count:
self.unreserveMessageList(db, 0, distributable_uid_set) self.assignMessageList(db, 0, distributable_uid_set)
validated_count += distributable_count validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT: if validated_count >= MAX_VALIDATED_LIMIT:
return return
...@@ -397,57 +397,40 @@ CREATE TABLE %s ( ...@@ -397,57 +397,40 @@ CREATE TABLE %s (
where_kw['from_date'] = line.date where_kw['from_date'] = line.date
where_kw['above_uid'] = line.uid where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, def getReservedMessageList(self, db, date, processing_node, limit,
limit=None, group_method_id=None): group_method_id=None):
""" """
Get and reserve a list of messages. Get and reserve a list of messages.
limit limit
Maximum number of messages to fetch. Maximum number of messages to fetch.
This number is not garanted to be reached, because of: This number is not garanted to be reached, because of not enough
- not enough messages being pending execution messages being pending execution.
- race condition (other nodes reserving the same messages at the same
time)
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
""" """
assert limit assert limit
quote = db.string_literal quote = db.string_literal
query = db.query query = db.query
sql_group = ('' if group_method_id is None else args = (self.sql_table, sqltest_dict['to_date'](date, quote),
' AND group_method_id=' + quote(group_method_id)) ' AND group_method_id=' + quote(group_method_id)
if group_method_id else '' , limit)
# Select reserved messages.
# Do not check already-assigned messages when trying to reserve more
# activities, because in such case we will find one reserved activity.
result = Results(query(
"SELECT * FROM %s WHERE processing_node=%s%s LIMIT %s" % (
self.sql_table, processing_node, sql_group, limit), 0))
limit -= len(result)
if limit:
# Get reservable messages. # Get reservable messages.
# During normal operation, sorting by date (as last criteria) is fairer # During normal operation, sorting by date (as last criteria) is fairer
# for users and reduce the probability to do the same work several times # for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of # (think of an object that is modified several times in a short period of
# time). # time).
reservable = Results(query( if 1:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s" "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % ( " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
self.sql_table, sqltest_dict['to_date'](date, quote), sql_group, if result:
limit), 0))
if reservable:
# Reserve messages. # Reserve messages.
query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % ( uid_list = [x.uid for x in result]
self.sql_table, processing_node, self.assignMessageList(db, processing_node, uid_list)
','.join(str(x.uid) for x in reservable))) self._log(TRACE, 'Reserved messages: %r' % uid_list)
# DC.ZRDB.Results.Results does not implement concatenation
# Implement an imperfect (but cheap) concatenation. Do not update
# __items__ nor _data_dictionary.
assert result._names == reservable._names, (result._names,
reservable._names)
result._data += reservable._data
return result return result
return ()
def unreserveMessageList(self, db, state, uid_list): def assignMessageList(self, db, state, uid_list):
""" """
Put messages back in given processing_node. Put messages back in given processing_node.
""" """
...@@ -490,20 +473,26 @@ CREATE TABLE %s ( ...@@ -490,20 +473,26 @@ CREATE TABLE %s (
- uid_to_duplicate_uid_list_dict - uid_to_duplicate_uid_list_dict
""" """
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
def getReservedMessageList(limit, group_method_id=None):
line_list = self.getReservedMessageList(db,
date=now_date,
processing_node=processing_node,
limit=limit,
group_method_id=group_method_id)
if line_list:
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list
now_date = getNow(db) now_date = getNow(db)
uid_to_duplicate_uid_list_dict = {} uid_to_duplicate_uid_list_dict = {}
try: try:
result = getReservedMessageList(1) while 1: # not a loop
if result: # Select messages that were either assigned manually or left
# unprocessed after a shutdown. Most of the time, there's none.
# To minimize the probability of deadlocks, we also COMMIT so that a
# new transaction starts on the first 'FOR UPDATE' query, which is all
# the more important as the current on started with getPriority().
result = db.query("SELECT * FROM %s WHERE processing_node=%s"
" ORDER BY priority, date LIMIT 1\0COMMIT" % (
self.sql_table, processing_node), 0)
already_assigned = result[1]
if already_assigned:
result = Results(result)
else:
result = self.getReservedMessageList(db, now_date, processing_node,
1)
if not result:
break
load = self.getProcessableMessageLoader(db, processing_node) load = self.getProcessableMessageLoader(db, processing_node)
m, uid, uid_list = load(result[0]) m, uid, uid_list = load(result[0])
message_list = [m] message_list = [m]
...@@ -520,7 +509,17 @@ CREATE TABLE %s ( ...@@ -520,7 +509,17 @@ CREATE TABLE %s (
if limit > 1: # <=> cost * count < 1 if limit > 1: # <=> cost * count < 1
cost *= count cost *= count
# Retrieve objects which have the same group method. # Retrieve objects which have the same group method.
result = iter(getReservedMessageList(limit, group_method_id)) result = iter(already_assigned
and Results(db.query("SELECT * FROM %s"
" WHERE processing_node=%s AND group_method_id=%s"
" ORDER BY priority, date LIMIT %s" % (
self.sql_table, processing_node,
db.string_literal(group_method_id), limit), 0))
# Do not optimize rare case: keep the code simple by not
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id))
for line in result: for line in result:
if line.uid in uid_to_duplicate_uid_list_dict: if line.uid in uid_to_duplicate_uid_list_dict:
continue continue
...@@ -536,7 +535,7 @@ CREATE TABLE %s ( ...@@ -536,7 +535,7 @@ CREATE TABLE %s (
# Unreserve extra messages as soon as possible. # Unreserve extra messages as soon as possible.
uid_list = [line.uid for line in result if line.uid != uid] uid_list = [line.uid for line in result if line.uid != uid]
if uid_list: if uid_list:
self.unreserveMessageList(db, 0, uid_list) self.assignMessageList(db, 0, uid_list)
return message_list, group_method_id, uid_to_duplicate_uid_list_dict return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except: except:
self._log(WARNING, 'Exception while reserving messages.') self._log(WARNING, 'Exception while reserving messages.')
...@@ -545,7 +544,7 @@ CREATE TABLE %s ( ...@@ -545,7 +544,7 @@ CREATE TABLE %s (
for uid_list in uid_to_duplicate_uid_list_dict.itervalues(): for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
to_free_uid_list += uid_list to_free_uid_list += uid_list
try: try:
self.unreserveMessageList(db, 0, to_free_uid_list) self.assignMessageList(db, 0, to_free_uid_list)
except: except:
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list) self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else: else:
...@@ -553,7 +552,7 @@ CREATE TABLE %s ( ...@@ -553,7 +552,7 @@ CREATE TABLE %s (
self._log(TRACE, 'Freed messages %r' % to_free_uid_list) self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
else: else:
self._log(TRACE, '(no message was reserved)') self._log(TRACE, '(no message was reserved)')
return [], None, uid_to_duplicate_uid_list_dict return (), None, None
def _abort(self): def _abort(self):
try: try:
...@@ -727,13 +726,13 @@ CREATE TABLE %s ( ...@@ -727,13 +726,13 @@ CREATE TABLE %s (
self._log(ERROR, 'Failed to delay %r' % delay_uid_list) self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
if final_error_uid_list: if final_error_uid_list:
try: try:
self.unreserveMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list) self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
except: except:
self._log(ERROR, 'Failed to set message to error state for %r' self._log(ERROR, 'Failed to set message to error state for %r'
% final_error_uid_list) % final_error_uid_list)
if make_available_uid_list: if make_available_uid_list:
try: try:
self.unreserveMessageList(db, 0, make_available_uid_list) self.assignMessageList(db, 0, make_available_uid_list)
except: except:
self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list) self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
else: else:
......
...@@ -142,11 +142,9 @@ class SQLDict(SQLBase): ...@@ -142,11 +142,9 @@ class SQLDict(SQLBase):
), 0)[1] ), 0)[1]
reserve_uid_list = uid_list = [x for x, in result] reserve_uid_list = uid_list = [x for x, in result]
if reserve_uid_list: if reserve_uid_list:
db.query( self.assignMessageList(db, processing_node, reserve_uid_list)
"UPDATE message SET processing_node=%s WHERE uid IN (%s)" % ( else:
processing_node, ','.join(map(str, reserve_uid_list)), db.query("COMMIT") # XXX: useful ?
))
db.query("COMMIT")
except: except:
self._log(WARNING, 'Failed to reserve duplicates') self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK") db.query("ROLLBACK")
......
...@@ -163,11 +163,9 @@ CREATE TABLE %s ( ...@@ -163,11 +163,9 @@ CREATE TABLE %s (
), 0)[1] ), 0)[1]
uid_list = [x for x, in result] uid_list = [x for x, in result]
if uid_list: if uid_list:
db.query( self.assignMessageList(db, processing_node, uid_list)
"UPDATE message_job SET processing_node=%s WHERE uid IN (%s)" % ( else:
processing_node, ','.join(map(str, uid_list)), db.query("COMMIT") # XXX: useful ?
))
db.query("COMMIT")
except: except:
self._log(WARNING, 'Failed to reserve duplicates') self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK") db.query("ROLLBACK")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment