diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 34738e43b3e8d2f4dd3ac9d33ec94d39b2f9fa24..02b17f68dd2cbd9fee3231888ba87068b8e17a50 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -103,7 +103,7 @@ class SQLDict(RAMDict): group_method_id_list = group_method_id_list, tag_list = tag_list, order_validation_text_list = order_validation_text_list) - + def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction path = '/'.join(m.object_path) @@ -165,7 +165,7 @@ class SQLDict(RAMDict): # string is false, so we must use a non-empty string for this. return 'none' return sha.new(repr(order_validation_item_list)).hexdigest() - + def validateMessage(self, activity_tool, message, uid_list, priority, processing_node): validation_state = message.validate(self, activity_tool) if validation_state is not VALID: @@ -195,159 +195,163 @@ class SQLDict(RAMDict): get_transaction().commit() # Release locks before starting a potentially long calculation return 0 return 1 - + # Queue semantic def dequeueMessage(self, activity_tool, processing_node): - if hasattr(activity_tool,'SQLDict_readMessage'): - now_date = DateTime() - priority = random.choice(priority_weight) - # Try to find a message at given priority level which is scheduled for now - result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, - to_date=now_date) - if len(result) == 0: - # If empty, take any message which is scheduled for now - priority = None - result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date) - if len(result) == 0: - # If the result is still empty, shift the dates so that SQLDict can dispatch pending active - # objects quickly. - self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1) - elif len(result) > 0: - #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) - line = result[0] - path = line.path - method_id = line.method_id - order_validation_text = line.order_validation_text - uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, - processing_node = None, to_date = now_date, - order_validation_text = order_validation_text) - uid_list = [x.uid for x in uid_list] - uid_list_list = [uid_list] - priority_list = [line.priority] - # Make sure message can not be processed anylonger - if len(uid_list) > 0: - # Set selected messages to processing - activity_tool.SQLDict_processMessage(uid = uid_list) - get_transaction().commit() # Release locks before starting a potentially long calculation - # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state - m = self.loadMessage(line.message, uid = line.uid) - message_list = [m] - # Validate message (make sure object exists, priority OK, etc.) - if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): - group_method_id = m.activity_kw.get('group_method_id') - if group_method_id is not None: - # Count the number of objects to prevent too many objects. - if m.hasExpandMethod(): - try: - count = len(m.getObjectList(activity_tool)) - except: - # Here, simply ignore an exception. The same exception should be handled later. - LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info()) - count = 0 - else: - count = 1 - - group_method = activity_tool.restrictedTraverse(group_method_id) - - if count < MAX_GROUPED_OBJECTS: - # Retrieve objects which have the same group method. - result = activity_tool.SQLDict_readMessage(processing_node = processing_node, priority = priority, - to_date = now_date, group_method_id = group_method_id, - order_validation_text = order_validation_text) - #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result))) - for line in result: - path = line.path - method_id = line.method_id - uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, - processing_node = None, to_date = now_date, - order_validation_text = order_validation_text) - uid_list = [x.uid for x in uid_list] - if len(uid_list) > 0: - # Set selected messages to processing - activity_tool.SQLDict_processMessage(uid = uid_list) - get_transaction().commit() # Release locks before starting a potentially long calculation - m = self.loadMessage(line.message, uid = line.uid) - if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): - if m.hasExpandMethod(): - try: - count += len(m.getObjectList(activity_tool)) - except: - # Here, simply ignore an exception. The same exception should be handled later. - LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info()) - pass - else: - count += 1 - message_list.append(m) - uid_list_list.append(uid_list) - priority_list.append(line.priority) - if count >= MAX_GROUPED_OBJECTS: - break - - # Release locks before starting a potentially long calculation - get_transaction().commit() - # Try to invoke - if group_method_id is not None: - LOG('SQLDict', TRACE, - 'invoking a group method %s with %d objects '\ - ' (%d objects in expanded form)' % ( - group_method_id, len(message_list), count)) - activity_tool.invokeGroup(group_method_id, message_list) + readMessage = getattr(activity_tool, 'SQLDict_readMessage', None) + if readMessage is None: + return 1 + + now_date = DateTime() + priority = random.choice(priority_weight) + # Try to find a message at given priority level which is scheduled for now + result = readMessage(processing_node=processing_node, priority=priority, + to_date=now_date) + if len(result) == 0: + # If empty, take any message which is scheduled for now + priority = None + result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date) + if len(result) == 0: + # If the result is still empty, shift the dates so that SQLDict can dispatch pending active + # objects quickly. + self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1) + elif len(result) > 0: + #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) + line = result[0] + path = line.path + method_id = line.method_id + order_validation_text = line.order_validation_text + uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, + processing_node = None, to_date = now_date, + order_validation_text = order_validation_text) + uid_list = [x.uid for x in uid_list] + uid_list_list = [uid_list] + priority_list = [line.priority] + # Make sure message can not be processed anylonger + if len(uid_list) > 0: + # Set selected messages to processing + activity_tool.SQLDict_processMessage(uid = uid_list) + get_transaction().commit() # Release locks before starting a potentially long calculation + # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state + m = self.loadMessage(line.message, uid = line.uid) + message_list = [m] + # Validate message (make sure object exists, priority OK, etc.) + if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): + group_method_id = m.activity_kw.get('group_method_id') + if group_method_id is not None: + # Count the number of objects to prevent too many objects. + if m.hasExpandMethod(): + try: + count = len(m.getObjectList(activity_tool)) + except: + # Here, simply ignore an exception. The same exception should be handled later. + LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info()) + count = 0 else: - activity_tool.invoke(message_list[0]) - - # Check if messages are executed successfully. - # When some of them are executed successfully, it may not be acceptable to - # abort the transaction, because these remain pending, only due to other - # invalid messages. This means that a group method should not be used if - # it has a side effect. For now, only indexing uses a group method, and this - # has no side effect. - for m in message_list: - if m.is_executed: - break + count = 1 + + group_method = activity_tool.restrictedTraverse(group_method_id) + + if count < MAX_GROUPED_OBJECTS: + # Retrieve objects which have the same group method. + result = readMessage(processing_node = processing_node, priority = priority, + to_date = now_date, group_method_id = group_method_id, + order_validation_text = order_validation_text) + #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result))) + for line in result: + path = line.path + method_id = line.method_id + uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, + processing_node = None, to_date = now_date, + order_validation_text = order_validation_text) + uid_list = [x.uid for x in uid_list] + if len(uid_list) > 0: + # Set selected messages to processing + activity_tool.SQLDict_processMessage(uid = uid_list) + get_transaction().commit() # Release locks before starting a potentially long calculation + m = self.loadMessage(line.message, uid = line.uid) + if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): + if m.hasExpandMethod(): + try: + count += len(m.getObjectList(activity_tool)) + except: + # Here, simply ignore an exception. The same exception should be handled later. + LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info()) + pass + else: + count += 1 + message_list.append(m) + uid_list_list.append(uid_list) + priority_list.append(line.priority) + if count >= MAX_GROUPED_OBJECTS: + break + + # Release locks before starting a potentially long calculation + get_transaction().commit() + # Try to invoke + if group_method_id is not None: + LOG('SQLDict', TRACE, + 'invoking a group method %s with %d objects '\ + ' (%d objects in expanded form)' % ( + group_method_id, len(message_list), count)) + activity_tool.invokeGroup(group_method_id, message_list) + else: + activity_tool.invoke(message_list[0]) + + # Check if messages are executed successfully. + # When some of them are executed successfully, it may not be acceptable to + # abort the transaction, because these remain pending, only due to other + # invalid messages. This means that a group method should not be used if + # it has a side effect. For now, only indexing uses a group method, and this + # has no side effect. + for m in message_list: + if m.is_executed: + break + else: + get_transaction().abort() + + for i in xrange(len(message_list)): + m = message_list[i] + uid_list = uid_list_list[i] + priority = priority_list[i] + if m.is_executed: + activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it + get_transaction().commit() # If successful, commit + if m.active_process: + active_process = activity_tool.unrestrictedTraverse(m.active_process) + if not active_process.hasActivity(): + # No more activity + m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ??? else: - get_transaction().abort() - - for i in xrange(len(message_list)): - m = message_list[i] - uid_list = uid_list_list[i] - priority = priority_list[i] - if m.is_executed: - activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it - get_transaction().commit() # If successful, commit - if m.active_process: - active_process = activity_tool.unrestrictedTraverse(m.active_process) - if not active_process.hasActivity(): - # No more activity - m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ??? + if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError): + # If this is a conflict error, do not lower the priority but only delay. + activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, + retry = 1) + get_transaction().commit() # Release locks before starting a potentially long calculation + elif priority > MAX_PRIORITY: + # This is an error + if len(uid_list) > 0: + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) + # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit else: - if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError): - # If this is a conflict error, do not lower the priority but only delay. + # Lower priority + if len(uid_list) > 0: activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, - retry = 1) + priority = priority + 1, retry = 1) get_transaction().commit() # Release locks before starting a potentially long calculation - elif priority > MAX_PRIORITY: - # This is an error - if len(uid_list) > 0: - activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) - # Assign message back to 'error' state - m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - if len(uid_list) > 0: - activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, - priority = priority + 1, retry = 1) - get_transaction().commit() # Release locks before starting a potentially long calculation - - return 0 - get_transaction().commit() # Release locks before starting a potentially long calculation + + return 0 + get_transaction().commit() # Release locks before starting a potentially long calculation return 1 def hasActivity(self, activity_tool, object, **kw): - if hasattr(activity_tool,'SQLDict_readMessageList'): + hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None) + if hasMessage is not None: if object is not None: my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw) + result = hasMessage(path=my_object_path, **kw) if len(result) > 0: return result[0].message_count > 0 else: @@ -369,7 +373,8 @@ class SQLDict(RAMDict): path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) method_dict = {} - if hasattr(activity_tool,'SQLDict_readMessageList'): + readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) + if readMessageList is not None: # Parse each message in registered for m in activity_tool.getRegisteredMessageList(self): if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): @@ -391,8 +396,8 @@ class SQLDict(RAMDict): raise ActivityFlushError, ( 'The document %s does not exist' % path) # Parse each message in SQL dict - result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id, - processing_node=None,include_processing=0) + result = readMessageList(path=path, method_id=method_id, + processing_node=None,include_processing=0) for line in result: path = line.path method_id = line.method_id @@ -422,8 +427,10 @@ class SQLDict(RAMDict): def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw): # YO: reading all lines might cause a deadlock message_list = [] - if hasattr(activity_tool,'SQLDict_readMessageList'): - result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing) + readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) + if readMessageList is not None: + result = readMessageList(path=None, method_id=None, processing_node=None, + to_processing_date=None,include_processing=include_processing) for line in result: m = self.loadMessage(line.message, uid = line.uid) m.processing_node = line.processing_node @@ -435,8 +442,9 @@ class SQLDict(RAMDict): def dumpMessageList(self, activity_tool): # Dump all messages in the table. message_list = [] - if hasattr(activity_tool, 'SQLDict_dumpMessageList'): - result = activity_tool.SQLDict_dumpMessageList() + dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None) + if dumpMessageList is not None: + result = dumpMessageList() for line in result: m = self.loadMessage(line.message, uid = line.uid) message_list.append(m) @@ -444,7 +452,8 @@ class SQLDict(RAMDict): def distribute(self, activity_tool, node_count): processing_node = 1 - if hasattr(activity_tool,'SQLDict_readMessageList'): + readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) + if readMessageList is not None: now_date = DateTime() if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME: # Sticky processing messages should be set back to non processing @@ -452,9 +461,9 @@ class SQLDict(RAMDict): self.max_processing_date = now_date else: max_processing_date = None - result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1, - to_processing_date = max_processing_date, - include_processing=0) # Only assign non assigned messages + result = readMessageList(path=None, method_id=None, processing_node = -1, + to_processing_date = max_processing_date, + include_processing=0) # Only assign non assigned messages get_transaction().commit() # Release locks before starting a potentially long calculation path_dict = {} for line in result: @@ -534,7 +543,7 @@ class SQLDict(RAMDict): if result[0].uid_count > 0: return INVALID_ORDER return VALID - + def _validate_after_tag_and_method_id(self, activity_tool, message, value): # Count number of occurances of tag and method_id if (type(value) != TupleType and type(value) != ListType) or len(value)<2: diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index a965c033f9e910e69b7615874deb086f0450dca9..a959ff9fa013131eb20d77fa23bf170cac8e7972 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -74,72 +74,76 @@ class SQLQueue(RAMQueue): activity_tool.SQLQueue_delMessage(uid = m.uid) def dequeueMessage(self, activity_tool, processing_node): - if hasattr(activity_tool,'SQLQueue_readMessageList'): - now_date = DateTime() - # Next processing date in case of error - next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400 - priority = random.choice(priority_weight) - # Try to find a message at given priority level - result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority, - to_date=now_date) - if len(result) == 0: - # If empty, take any message - result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date) - if len(result) > 0: - line = result[0] - path = line.path - method_id = line.method_id - # Make sure message can not be processed anylonger - activity_tool.SQLQueue_processMessage(uid=line.uid) - get_transaction().commit() # Release locks before starting a potentially long calculation - m = self.loadMessage(line.message) - # Make sure object exists - validation_state = m.validate(self, activity_tool) - if validation_state is not VALID: - if validation_state in (EXCEPTION, INVALID_PATH): - if line.priority > MAX_PRIORITY: - # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) - # Assign message back to 'error' state - #m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation + readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None) + if readMessage is None: + return 1 + + now_date = DateTime() + # Next processing date in case of error + next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400 + priority = random.choice(priority_weight) + # Try to find a message at given priority level + result = readMessage(processing_node=processing_node, priority=priority, + to_date=now_date) + if len(result) == 0: + # If empty, take any message + result = readMessage(processing_node=processing_node, priority=None,to_date=now_date) + if len(result) > 0: + line = result[0] + path = line.path + method_id = line.method_id + # Make sure message can not be processed anylonger + activity_tool.SQLQueue_processMessage(uid=line.uid) + get_transaction().commit() # Release locks before starting a potentially long calculation + m = self.loadMessage(line.message) + # Make sure object exists + validation_state = m.validate(self, activity_tool) + if validation_state is not VALID: + if validation_state in (EXCEPTION, INVALID_PATH): + if line.priority > MAX_PRIORITY: + # This is an error + activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) + # Assign message back to 'error' state + #m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit else: - # We do not lower priority for INVALID_ORDER errors but we do postpone execution - activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, - priority = line.priority) + # Lower priority + activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) get_transaction().commit() # Release locks before starting a potentially long calculation else: - # Try to invoke - activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? - if m.is_executed: # Make sure message could be invoked - activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it - get_transaction().commit() # If successful, commit + # We do not lower priority for INVALID_ORDER errors but we do postpone execution + activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, + priority = line.priority) + get_transaction().commit() # Release locks before starting a potentially long calculation + else: + # Try to invoke + activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? + if m.is_executed: # Make sure message could be invoked + activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it + get_transaction().commit() # If successful, commit + else: + get_transaction().abort() # If not, abort transaction and start a new one + if line.priority > MAX_PRIORITY: + # This is an error + activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) + # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit else: - get_transaction().abort() # If not, abort transaction and start a new one - if line.priority > MAX_PRIORITY: - # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) - # Assign message back to 'error' state - m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, - priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation - return 0 - get_transaction().commit() # Release locks before starting a potentially long calculation + # Lower priority + activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, + priority = line.priority + 1) + get_transaction().commit() # Release locks before starting a potentially long calculation + return 0 + get_transaction().commit() # Release locks before starting a potentially long calculation return 1 def hasActivity(self, activity_tool, object, **kw): - if hasattr(activity_tool,'SQLQueue_readMessageList'): + hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None) + if hasMessage is not None: if object is not None: my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) + result = hasMessage(path=my_object_path, **kw) if len(result) > 0: return result[0].message_count > 0 else: @@ -158,7 +162,8 @@ class SQLQueue(RAMQueue): NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible """ - if hasattr(activity_tool,'SQLQueue_readMessageList'): + readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) + if readMessageList is not None: #return # Do nothing here to precent overlocking path = '/'.join(object_path) # Parse each message in registered @@ -168,7 +173,7 @@ class SQLQueue(RAMQueue): activity_tool.unregisterMessage(self, m) # Parse each message in SQL queue #LOG('Flush', 0, str((path, invoke, method_id))) - result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) + result = readMessageList(path=path, method_id=method_id,processing_node=None) #LOG('Flush', 0, str(len(result))) method_dict = {} for line in result: @@ -202,8 +207,9 @@ class SQLQueue(RAMQueue): def getMessageList(self, activity_tool, processing_node=None,**kw): message_list = [] - if hasattr(activity_tool,'SQLQueue_readMessageList'): - result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) + readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) + if readMessageList is not None: + result = readMessageList(path=None, method_id=None, processing_node=None) for line in result: m = self.loadMessage(line.message) m.processing_node = line.processing_node @@ -214,17 +220,19 @@ class SQLQueue(RAMQueue): def dumpMessageList(self, activity_tool): # Dump all messages in the table. message_list = [] - if hasattr(activity_tool, 'SQLQueue_dumpMessageList'): - result = activity_tool.SQLQueue_dumpMessageList() + dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None) + if dumpMessageList is not None: + result = dumpMessageList() for line in result: m = self.loadMessage(line.message, uid = line.uid) message_list.append(m) return message_list - + def distribute(self, activity_tool, node_count): processing_node = 1 - if hasattr(activity_tool,'SQLQueue_readMessageList'): - result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages + readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) + if readMessageList is not None: + result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages #LOG('distribute count',0,str(len(result)) ) #LOG('distribute count',0,str(map(lambda x:x.uid, result))) #get_transaction().commit() # Release locks before starting a potentially long calculation @@ -296,7 +304,7 @@ class SQLQueue(RAMQueue): if result[0].uid_count > 0: return INVALID_ORDER return VALID - + def _validate_after_tag(self, activity_tool, message, value): # Count number of occurances of tag if type(value) == type(''): @@ -305,7 +313,7 @@ class SQLQueue(RAMQueue): if result[0].uid_count > 0: return INVALID_ORDER return VALID - + def _validate_after_tag_and_method_id(self, activity_tool, message, value): # Count number of occurances of tag and method_id if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2: @@ -321,7 +329,7 @@ class SQLQueue(RAMQueue): if result[0].uid_count > 0: return INVALID_ORDER return VALID - + # Required for tests (time shift) def timeShift(self, activity_tool, delay, processing_node = None): """