Commit cee3e728 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: some optimization and clean-up in the code reserving messages

parent 17dc7e23
......@@ -389,7 +389,7 @@ CREATE TABLE %s (
distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
self.unreserveMessageList(db, 0, distributable_uid_set)
self.assignMessageList(db, 0, distributable_uid_set)
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
......@@ -397,57 +397,40 @@ CREATE TABLE %s (
where_kw['from_date'] = line.date
where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None):
def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None):
"""
Get and reserve a list of messages.
limit
Maximum number of messages to fetch.
This number is not garanted to be reached, because of:
- not enough messages being pending execution
- race condition (other nodes reserving the same messages at the same
time)
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
This number is not garanted to be reached, because of not enough
messages being pending execution.
"""
assert limit
quote = db.string_literal
query = db.query
sql_group = ('' if group_method_id is None else
' AND group_method_id=' + quote(group_method_id))
# Select reserved messages.
# Do not check already-assigned messages when trying to reserve more
# activities, because in such case we will find one reserved activity.
result = Results(query(
"SELECT * FROM %s WHERE processing_node=%s%s LIMIT %s" % (
self.sql_table, processing_node, sql_group, limit), 0))
limit -= len(result)
if limit:
# Get reservable messages.
# During normal operation, sorting by date (as last criteria) is fairer
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
reservable = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % (
self.sql_table, sqltest_dict['to_date'](date, quote), sql_group,
limit), 0))
if reservable:
# Reserve messages.
query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, processing_node,
','.join(str(x.uid) for x in reservable)))
# DC.ZRDB.Results.Results does not implement concatenation
# Implement an imperfect (but cheap) concatenation. Do not update
# __items__ nor _data_dictionary.
assert result._names == reservable._names, (result._names,
reservable._names)
result._data += reservable._data
return result
def unreserveMessageList(self, db, state, uid_list):
args = (self.sql_table, sqltest_dict['to_date'](date, quote),
' AND group_method_id=' + quote(group_method_id)
if group_method_id else '' , limit)
# Get reservable messages.
# During normal operation, sorting by date (as last criteria) is fairer
# for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of
# time).
if 1:
result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
if result:
# Reserve messages.
uid_list = [x.uid for x in result]
self.assignMessageList(db, processing_node, uid_list)
self._log(TRACE, 'Reserved messages: %r' % uid_list)
return result
return ()
def assignMessageList(self, db, state, uid_list):
"""
Put messages back in given processing_node.
"""
......@@ -490,20 +473,26 @@ CREATE TABLE %s (
- uid_to_duplicate_uid_list_dict
"""
db = activity_tool.getSQLConnection()
def getReservedMessageList(limit, group_method_id=None):
line_list = self.getReservedMessageList(db,
date=now_date,
processing_node=processing_node,
limit=limit,
group_method_id=group_method_id)
if line_list:
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list
now_date = getNow(db)
uid_to_duplicate_uid_list_dict = {}
try:
result = getReservedMessageList(1)
if result:
while 1: # not a loop
# Select messages that were either assigned manually or left
# unprocessed after a shutdown. Most of the time, there's none.
# To minimize the probability of deadlocks, we also COMMIT so that a
# new transaction starts on the first 'FOR UPDATE' query, which is all
# the more important as the current on started with getPriority().
result = db.query("SELECT * FROM %s WHERE processing_node=%s"
" ORDER BY priority, date LIMIT 1\0COMMIT" % (
self.sql_table, processing_node), 0)
already_assigned = result[1]
if already_assigned:
result = Results(result)
else:
result = self.getReservedMessageList(db, now_date, processing_node,
1)
if not result:
break
load = self.getProcessableMessageLoader(db, processing_node)
m, uid, uid_list = load(result[0])
message_list = [m]
......@@ -520,7 +509,17 @@ CREATE TABLE %s (
if limit > 1: # <=> cost * count < 1
cost *= count
# Retrieve objects which have the same group method.
result = iter(getReservedMessageList(limit, group_method_id))
result = iter(already_assigned
and Results(db.query("SELECT * FROM %s"
" WHERE processing_node=%s AND group_method_id=%s"
" ORDER BY priority, date LIMIT %s" % (
self.sql_table, processing_node,
db.string_literal(group_method_id), limit), 0))
# Do not optimize rare case: keep the code simple by not
# adding more results from getReservedMessageList if the
# limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id))
for line in result:
if line.uid in uid_to_duplicate_uid_list_dict:
continue
......@@ -536,7 +535,7 @@ CREATE TABLE %s (
# Unreserve extra messages as soon as possible.
uid_list = [line.uid for line in result if line.uid != uid]
if uid_list:
self.unreserveMessageList(db, 0, uid_list)
self.assignMessageList(db, 0, uid_list)
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except:
self._log(WARNING, 'Exception while reserving messages.')
......@@ -545,7 +544,7 @@ CREATE TABLE %s (
for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
to_free_uid_list += uid_list
try:
self.unreserveMessageList(db, 0, to_free_uid_list)
self.assignMessageList(db, 0, to_free_uid_list)
except:
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else:
......@@ -553,7 +552,7 @@ CREATE TABLE %s (
self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
else:
self._log(TRACE, '(no message was reserved)')
return [], None, uid_to_duplicate_uid_list_dict
return (), None, None
def _abort(self):
try:
......@@ -727,13 +726,13 @@ CREATE TABLE %s (
self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
if final_error_uid_list:
try:
self.unreserveMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
except:
self._log(ERROR, 'Failed to set message to error state for %r'
% final_error_uid_list)
if make_available_uid_list:
try:
self.unreserveMessageList(db, 0, make_available_uid_list)
self.assignMessageList(db, 0, make_available_uid_list)
except:
self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
else:
......
......@@ -142,11 +142,9 @@ class SQLDict(SQLBase):
), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
if reserve_uid_list:
db.query(
"UPDATE message SET processing_node=%s WHERE uid IN (%s)" % (
processing_node, ','.join(map(str, reserve_uid_list)),
))
db.query("COMMIT")
self.assignMessageList(db, processing_node, reserve_uid_list)
else:
db.query("COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
......
......@@ -163,11 +163,9 @@ CREATE TABLE %s (
), 0)[1]
uid_list = [x for x, in result]
if uid_list:
db.query(
"UPDATE message_job SET processing_node=%s WHERE uid IN (%s)" % (
processing_node, ','.join(map(str, uid_list)),
))
db.query("COMMIT")
self.assignMessageList(db, processing_node, uid_list)
else:
db.query("COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment