diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 5ea2e9ab84c1de2e59467a100fb1836b8ca65c55..74672b5fb199d20d45c9c9ff7b59c40446347334 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -140,7 +140,7 @@ class SQLDict(RAMDict): else: return () - def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date): + def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date, retry): validation_state = message.validate(self, activity_tool) if validation_state is not VALID: if validation_state in (EXCEPTION, INVALID_PATH): @@ -156,13 +156,13 @@ class SQLDict(RAMDict): # Lower priority if len(uid_list) > 0: # Add some delay before new processing activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority + 1) + priority = priority + 1, retry = retry + 1) get_transaction().commit() # Release locks before starting a potentially long calculation else: # We do not lower priority for INVALID_ORDER errors but we do postpone execution if len(uid_list) > 0: # Add some delay before new processing activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority) + priority = priority, retry = retry + 1) get_transaction().commit() # Release locks before starting a potentially long calculation return 0 return 1 @@ -171,8 +171,6 @@ class SQLDict(RAMDict): def dequeueMessage(self, activity_tool, processing_node): if hasattr(activity_tool,'SQLDict_readMessage'): now_date = DateTime() - # Next processing date in case of error - next_processing_date = now_date + VALIDATION_ERROR_DELAY priority = random.choice(priority_weight) # Try to find a message at given priority level which is scheduled for now result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, @@ -181,15 +179,26 @@ class SQLDict(RAMDict): # If empty, take any message which is scheduled for now priority = None result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date) - if len(result) > 0: + if len(result) == 0: + # If the result is still empty, shift the dates so that SQLDict can dispatch pending active + # objects quickly. + self.timeShift(activity_tool, VALIDATION_ERROR_DELAY) + elif len(result) > 0: #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) line = result[0] path = line.path method_id = line.method_id + try: + retry = int(line.retry) + except TypeError: + retry = 1 + # Next processing date in case of error + next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = [x.uid for x in uid_list] uid_list_list = [uid_list] priority_list = [line.priority] + retry_list = [retry] # Make sure message can not be processed anylonger if len(uid_list) > 0: # Set selected messages to processing @@ -199,7 +208,7 @@ class SQLDict(RAMDict): m = self.loadMessage(line.message, uid = line.uid) message_list = [m] # Validate message (make sure object exists, priority OK, etc.) - if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date): + if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): group_method_id = m.activity_kw.get('group_method_id') if group_method_id is not None: # Count the number of objects to prevent too many objects. @@ -225,6 +234,12 @@ class SQLDict(RAMDict): for line in result: path = line.path method_id = line.method_id + try: + retry = int(line.retry) + except TypeError: + retry = 1 + # Next processing date in case of error + next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = [x.uid for x in uid_list] if len(uid_list) > 0: @@ -232,7 +247,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_processMessage(uid = uid_list) get_transaction().commit() # Release locks before starting a potentially long calculation m = self.loadMessage(line.message, uid = line.uid) - if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date): + if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): if m.hasExpandMethod(): try: count += len(m.getObjectList(activity_tool)) @@ -247,13 +262,14 @@ class SQLDict(RAMDict): message_list.append(m) uid_list_list.append(uid_list) priority_list.append(line.priority) + retry_list.append(retry) if count >= MAX_GROUPED_OBJECTS: break get_transaction().commit() # Release locks before starting a potentially long calculation # Try to invoke if group_method_id is not None: - #LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count)) + LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count)) #for m in message_list: # LOG('SQLDict', 0, '%r has objects %r' % (m, m.getObjectList(activity_tool))) activity_tool.invokeGroup(group_method_id, message_list) @@ -277,6 +293,9 @@ class SQLDict(RAMDict): m = message_list[i] uid_list = uid_list_list[i] priority = priority_list[i] + retry = retry_list[i] + # Next processing date in case of error + next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry if m.is_executed: activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it get_transaction().commit() # If successful, commit @@ -297,7 +316,7 @@ class SQLDict(RAMDict): # Lower priority if len(uid_list) > 0: activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority + 1) + priority = priority + 1, retry = retry + 1) get_transaction().commit() # Release locks before starting a potentially long calculation return 0