Commit 5645d0da authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: move SQLDict specific code out of SQLBase.getProcessableMessageList

parent 31b4bb58
......@@ -168,6 +168,14 @@ class SQLBase(Queue):
activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
uid=uid_list)
def getProcessableMessageLoader(self, activity_tool, processing_node):
# do not merge anything
def load(line):
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
return m, uid, ()
return load
def getProcessableMessageList(self, activity_tool, processing_node):
"""
Always true:
......@@ -206,24 +214,16 @@ class SQLBase(Queue):
if line_list:
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
now_date = self.getNow(activity_tool)
uid_to_duplicate_uid_list_dict = {}
try:
result = getReservedMessageList(1)
if result:
line = result[0]
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
load = self.getProcessableMessageLoader(activity_tool, processing_node)
m, uid, uid_list = load(result[0])
message_list = [m]
uid_to_duplicate_uid_list_dict[uid] = getDuplicateMessageUidList(line)
group_method_id = line.group_method_id
activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
uid_to_duplicate_uid_list_dict[uid] = uid_list
group_method_id = m.line.group_method_id
if group_method_id != '\0':
# Count the number of objects to prevent too many objects.
cost = m.activity_kw.get('group_method_cost', .01)
......@@ -235,39 +235,24 @@ class SQLBase(Queue):
if limit > 1: # <=> cost * count < 1
cost *= count
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit, group_method_id)
if self.merge_duplicate:
path_and_method_id_dict = {(line.path, line.method_id): uid}
unreserve_uid_list = []
result = iter(getReservedMessageList(limit, group_method_id))
for line in result:
if line.uid == uid:
if line.uid in uid_to_duplicate_uid_list_dict:
continue
m, uid, uid_list = load(line)
if m is None:
uid_to_duplicate_uid_list_dict[uid] += uid_list
continue
# All fetched lines have the same group_method_id and
# processing_node.
# Their dates are lower-than or equal-to now_date.
# We read each line once so lines have distinct uids.
# So what remains to be filtered on are path and method_id.
if self.merge_duplicate:
key = line.path, line.method_id
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict[original_uid].append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict[line.uid] = \
getDuplicateMessageUidList(line)
if cost < 1:
m = self.loadMessage(line.message, uid=line.uid, line=line)
cost += len(m.getObjectList(activity_tool)) * \
m.activity_kw.get('group_method_cost', .01)
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
activity_tool.SQLBase_processMessage(table=self.sql_table,
uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=unreserve_uid_list)
uid_to_duplicate_uid_list_dict[uid] = uid_list
cost += len(m.getObjectList(activity_tool)) * \
m.activity_kw.get('group_method_cost', .01)
message_list.append(m)
if cost >= 1:
# Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=[line.uid for line in result if line.uid != uid])
activity_tool.SQLBase_processMessage(table=self.sql_table,
uid=uid_to_duplicate_uid_list_dict.keys())
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except:
self._log(WARNING, 'Exception while reserving messages.')
......
......@@ -109,32 +109,42 @@ class SQLDict(SQLBase):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Reserve unreserved messages matching given line.
Return their uids.
"""
try:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=line.path,
method_id=line.method_id,
group_method_id=line.group_method_id,
)
uid_list = [x.uid for x in result]
if uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node, uid=uid_list)
else:
# Release locks
activity_tool.SQLDict_commit()
except:
# Log
LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
# Release lock
activity_tool.SQLDict_rollback()
# And re-raise
raise
return uid_list
def getProcessableMessageLoader(self, activity_tool, processing_node):
path_and_method_id_dict = {}
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path and
# method_id.
# XXX: What about tag ?
path = line.path
method_id = line.method_id
key = path, method_id
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = self.loadMessage(line.message, uid=uid, line=line)
try:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
)
uid_list = [x.uid for x in result]
if uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node, uid=uid_list)
else:
activity_tool.SQLDict_commit() # release locks
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
activity_tool.SQLDict_rollback() # release locks
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
path_and_method_id_dict[key] = uid
return m, uid, uid_list
return None, original_uid, [uid]
return load
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
......
......@@ -82,13 +82,6 @@ class SQLQueue(SQLBase):
processing_node_list=None,
serialization_tag_list=serialization_tag_list)
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Reserve unreserved messages matching given line.
Return their uids.
"""
return ()
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
if hasMessage is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment