Commit 196a98b1 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Do not use hasattr, because hasattr drains exceptions.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8156 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent fe27aada
...@@ -103,7 +103,7 @@ class SQLDict(RAMDict): ...@@ -103,7 +103,7 @@ class SQLDict(RAMDict):
group_method_id_list = group_method_id_list, group_method_id_list = group_method_id_list,
tag_list = tag_list, tag_list = tag_list,
order_validation_text_list = order_validation_text_list) order_validation_text_list = order_validation_text_list)
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
path = '/'.join(m.object_path) path = '/'.join(m.object_path)
...@@ -165,7 +165,7 @@ class SQLDict(RAMDict): ...@@ -165,7 +165,7 @@ class SQLDict(RAMDict):
# string is false, so we must use a non-empty string for this. # string is false, so we must use a non-empty string for this.
return 'none' return 'none'
return sha.new(repr(order_validation_item_list)).hexdigest() return sha.new(repr(order_validation_item_list)).hexdigest()
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node): def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
validation_state = message.validate(self, activity_tool) validation_state = message.validate(self, activity_tool)
if validation_state is not VALID: if validation_state is not VALID:
...@@ -195,159 +195,163 @@ class SQLDict(RAMDict): ...@@ -195,159 +195,163 @@ class SQLDict(RAMDict):
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
return 1 return 1
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessage'): readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
now_date = DateTime() if readMessage is None:
priority = random.choice(priority_weight) return 1
# Try to find a message at given priority level which is scheduled for now
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, now_date = DateTime()
to_date=now_date) priority = random.choice(priority_weight)
if len(result) == 0: # Try to find a message at given priority level which is scheduled for now
# If empty, take any message which is scheduled for now result = readMessage(processing_node=processing_node, priority=priority,
priority = None to_date=now_date)
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date) if len(result) == 0:
if len(result) == 0: # If empty, take any message which is scheduled for now
# If the result is still empty, shift the dates so that SQLDict can dispatch pending active priority = None
# objects quickly. result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1) if len(result) == 0:
elif len(result) > 0: # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
#LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) # objects quickly.
line = result[0] self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
path = line.path elif len(result) > 0:
method_id = line.method_id #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
order_validation_text = line.order_validation_text line = result[0]
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, path = line.path
processing_node = None, to_date = now_date, method_id = line.method_id
order_validation_text = order_validation_text) order_validation_text = line.order_validation_text
uid_list = [x.uid for x in uid_list] uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
uid_list_list = [uid_list] processing_node = None, to_date = now_date,
priority_list = [line.priority] order_validation_text = order_validation_text)
# Make sure message can not be processed anylonger uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0: uid_list_list = [uid_list]
# Set selected messages to processing priority_list = [line.priority]
activity_tool.SQLDict_processMessage(uid = uid_list) # Make sure message can not be processed anylonger
get_transaction().commit() # Release locks before starting a potentially long calculation if len(uid_list) > 0:
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # Set selected messages to processing
m = self.loadMessage(line.message, uid = line.uid) activity_tool.SQLDict_processMessage(uid = uid_list)
message_list = [m] get_transaction().commit() # Release locks before starting a potentially long calculation
# Validate message (make sure object exists, priority OK, etc.) # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): m = self.loadMessage(line.message, uid = line.uid)
group_method_id = m.activity_kw.get('group_method_id') message_list = [m]
if group_method_id is not None: # Validate message (make sure object exists, priority OK, etc.)
# Count the number of objects to prevent too many objects. if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod(): group_method_id = m.activity_kw.get('group_method_id')
try: if group_method_id is not None:
count = len(m.getObjectList(activity_tool)) # Count the number of objects to prevent too many objects.
except: if m.hasExpandMethod():
# Here, simply ignore an exception. The same exception should be handled later. try:
LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info()) count = len(m.getObjectList(activity_tool))
count = 0 except:
else: # Here, simply ignore an exception. The same exception should be handled later.
count = 1 LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
count = 0
group_method = activity_tool.restrictedTraverse(group_method_id)
if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method.
result = activity_tool.SQLDict_readMessage(processing_node = processing_node, priority = priority,
to_date = now_date, group_method_id = group_method_id,
order_validation_text = order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
for line in result:
path = line.path
method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
processing_node = None, to_date = now_date,
order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod():
try:
count += len(m.getObjectList(activity_tool))
except:
# Here, simply ignore an exception. The same exception should be handled later.
LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
pass
else:
count += 1
message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS:
break
# Release locks before starting a potentially long calculation
get_transaction().commit()
# Try to invoke
if group_method_id is not None:
LOG('SQLDict', TRACE,
'invoking a group method %s with %d objects '\
' (%d objects in expanded form)' % (
group_method_id, len(message_list), count))
activity_tool.invokeGroup(group_method_id, message_list)
else: else:
activity_tool.invoke(message_list[0]) count = 1
# Check if messages are executed successfully. group_method = activity_tool.restrictedTraverse(group_method_id)
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other if count < MAX_GROUPED_OBJECTS:
# invalid messages. This means that a group method should not be used if # Retrieve objects which have the same group method.
# it has a side effect. For now, only indexing uses a group method, and this result = readMessage(processing_node = processing_node, priority = priority,
# has no side effect. to_date = now_date, group_method_id = group_method_id,
for m in message_list: order_validation_text = order_validation_text)
if m.is_executed: #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
break for line in result:
path = line.path
method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
processing_node = None, to_date = now_date,
order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod():
try:
count += len(m.getObjectList(activity_tool))
except:
# Here, simply ignore an exception. The same exception should be handled later.
LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
pass
else:
count += 1
message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS:
break
# Release locks before starting a potentially long calculation
get_transaction().commit()
# Try to invoke
if group_method_id is not None:
LOG('SQLDict', TRACE,
'invoking a group method %s with %d objects '\
' (%d objects in expanded form)' % (
group_method_id, len(message_list), count))
activity_tool.invokeGroup(group_method_id, message_list)
else:
activity_tool.invoke(message_list[0])
# Check if messages are executed successfully.
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other
# invalid messages. This means that a group method should not be used if
# it has a side effect. For now, only indexing uses a group method, and this
# has no side effect.
for m in message_list:
if m.is_executed:
break
else:
get_transaction().abort()
for i in xrange(len(message_list)):
m = message_list[i]
uid_list = uid_list_list[i]
priority = priority_list[i]
if m.is_executed:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else: else:
get_transaction().abort() if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
# If this is a conflict error, do not lower the priority but only delay.
for i in xrange(len(message_list)): activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
m = message_list[i] retry = 1)
uid_list = uid_list_list[i] get_transaction().commit() # Release locks before starting a potentially long calculation
priority = priority_list[i] elif priority > MAX_PRIORITY:
if m.is_executed: # This is an error
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it if len(uid_list) > 0:
get_transaction().commit() # If successful, commit activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
if m.active_process: # Assign message back to 'error' state
active_process = activity_tool.unrestrictedTraverse(m.active_process) m.notifyUser(activity_tool) # Notify Error
if not active_process.hasActivity(): get_transaction().commit() # and commit
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else: else:
if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError): # Lower priority
# If this is a conflict error, do not lower the priority but only delay. if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
retry = 1) priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
elif priority > MAX_PRIORITY:
# This is an error return 0
if len(uid_list) > 0: get_transaction().commit() # Release locks before starting a potentially long calculation
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if hasattr(activity_tool,'SQLDict_readMessageList'): hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
if hasMessage is not None:
if object is not None: if object is not None:
my_object_path = '/'.join(object.getPhysicalPath()) my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw) result = hasMessage(path=my_object_path, **kw)
if len(result) > 0: if len(result) > 0:
return result[0].message_count > 0 return result[0].message_count > 0
else: else:
...@@ -369,7 +373,8 @@ class SQLDict(RAMDict): ...@@ -369,7 +373,8 @@ class SQLDict(RAMDict):
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {} method_dict = {}
if hasattr(activity_tool,'SQLDict_readMessageList'): readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
...@@ -391,8 +396,8 @@ class SQLDict(RAMDict): ...@@ -391,8 +396,8 @@ class SQLDict(RAMDict):
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Parse each message in SQL dict # Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id, result = readMessageList(path=path, method_id=method_id,
processing_node=None,include_processing=0) processing_node=None,include_processing=0)
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
...@@ -422,8 +427,10 @@ class SQLDict(RAMDict): ...@@ -422,8 +427,10 @@ class SQLDict(RAMDict):
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw): def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
message_list = [] message_list = []
if hasattr(activity_tool,'SQLDict_readMessageList'): readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing) if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node=None,
to_processing_date=None,include_processing=include_processing)
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node m.processing_node = line.processing_node
...@@ -435,8 +442,9 @@ class SQLDict(RAMDict): ...@@ -435,8 +442,9 @@ class SQLDict(RAMDict):
def dumpMessageList(self, activity_tool): def dumpMessageList(self, activity_tool):
# Dump all messages in the table. # Dump all messages in the table.
message_list = [] message_list = []
if hasattr(activity_tool, 'SQLDict_dumpMessageList'): dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
result = activity_tool.SQLDict_dumpMessageList() if dumpMessageList is not None:
result = dumpMessageList()
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
message_list.append(m) message_list.append(m)
...@@ -444,7 +452,8 @@ class SQLDict(RAMDict): ...@@ -444,7 +452,8 @@ class SQLDict(RAMDict):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
if hasattr(activity_tool,'SQLDict_readMessageList'): readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
now_date = DateTime() now_date = DateTime()
if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME: if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
# Sticky processing messages should be set back to non processing # Sticky processing messages should be set back to non processing
...@@ -452,9 +461,9 @@ class SQLDict(RAMDict): ...@@ -452,9 +461,9 @@ class SQLDict(RAMDict):
self.max_processing_date = now_date self.max_processing_date = now_date
else: else:
max_processing_date = None max_processing_date = None
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1, result = readMessageList(path=None, method_id=None, processing_node = -1,
to_processing_date = max_processing_date, to_processing_date = max_processing_date,
include_processing=0) # Only assign non assigned messages include_processing=0) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {} path_dict = {}
for line in result: for line in result:
...@@ -534,7 +543,7 @@ class SQLDict(RAMDict): ...@@ -534,7 +543,7 @@ class SQLDict(RAMDict):
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_tag_and_method_id(self, activity_tool, message, value): def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id # Count number of occurances of tag and method_id
if (type(value) != TupleType and type(value) != ListType) or len(value)<2: if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
......
...@@ -74,72 +74,76 @@ class SQLQueue(RAMQueue): ...@@ -74,72 +74,76 @@ class SQLQueue(RAMQueue):
activity_tool.SQLQueue_delMessage(uid = m.uid) activity_tool.SQLQueue_delMessage(uid = m.uid)
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLQueue_readMessageList'): readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
now_date = DateTime() if readMessage is None:
# Next processing date in case of error return 1
next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
priority = random.choice(priority_weight) now_date = DateTime()
# Try to find a message at given priority level # Next processing date in case of error
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority, next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
to_date=now_date) priority = random.choice(priority_weight)
if len(result) == 0: # Try to find a message at given priority level
# If empty, take any message result = readMessage(processing_node=processing_node, priority=priority,
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date) to_date=now_date)
if len(result) > 0: if len(result) == 0:
line = result[0] # If empty, take any message
path = line.path result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
method_id = line.method_id if len(result) > 0:
# Make sure message can not be processed anylonger line = result[0]
activity_tool.SQLQueue_processMessage(uid=line.uid) path = line.path
get_transaction().commit() # Release locks before starting a potentially long calculation method_id = line.method_id
m = self.loadMessage(line.message) # Make sure message can not be processed anylonger
# Make sure object exists activity_tool.SQLQueue_processMessage(uid=line.uid)
validation_state = m.validate(self, activity_tool) get_transaction().commit() # Release locks before starting a potentially long calculation
if validation_state is not VALID: m = self.loadMessage(line.message)
if validation_state in (EXCEPTION, INVALID_PATH): # Make sure object exists
if line.priority > MAX_PRIORITY: validation_state = m.validate(self, activity_tool)
# This is an error if validation_state is not VALID:
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) if validation_state in (EXCEPTION, INVALID_PATH):
# Assign message back to 'error' state if line.priority > MAX_PRIORITY:
#m.notifyUser(activity_tool) # Notify Error # This is an error
get_transaction().commit() # and commit activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
else: # Assign message back to 'error' state
# Lower priority #m.notifyUser(activity_tool) # Notify Error
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) get_transaction().commit() # and commit
get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution # Lower priority
activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Try to invoke # We do not lower priority for INVALID_ORDER errors but we do postpone execution
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
if m.is_executed: # Make sure message could be invoked priority = line.priority)
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it get_transaction().commit() # Release locks before starting a potentially long calculation
get_transaction().commit() # If successful, commit else:
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else: else:
get_transaction().abort() # If not, abort transaction and start a new one # Lower priority
if line.priority > MAX_PRIORITY: activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
# This is an error priority = line.priority + 1)
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) get_transaction().commit() # Release locks before starting a potentially long calculation
# Assign message back to 'error' state return 0
m.notifyUser(activity_tool) # Notify Error get_transaction().commit() # Release locks before starting a potentially long calculation
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if hasattr(activity_tool,'SQLQueue_readMessageList'): hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
if hasMessage is not None:
if object is not None: if object is not None:
my_object_path = '/'.join(object.getPhysicalPath()) my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) result = hasMessage(path=my_object_path, **kw)
if len(result) > 0: if len(result) > 0:
return result[0].message_count > 0 return result[0].message_count > 0
else: else:
...@@ -158,7 +162,8 @@ class SQLQueue(RAMQueue): ...@@ -158,7 +162,8 @@ class SQLQueue(RAMQueue):
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
if hasattr(activity_tool,'SQLQueue_readMessageList'): readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None:
#return # Do nothing here to precent overlocking #return # Do nothing here to precent overlocking
path = '/'.join(object_path) path = '/'.join(object_path)
# Parse each message in registered # Parse each message in registered
...@@ -168,7 +173,7 @@ class SQLQueue(RAMQueue): ...@@ -168,7 +173,7 @@ class SQLQueue(RAMQueue):
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue # Parse each message in SQL queue
#LOG('Flush', 0, str((path, invoke, method_id))) #LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) result = readMessageList(path=path, method_id=method_id,processing_node=None)
#LOG('Flush', 0, str(len(result))) #LOG('Flush', 0, str(len(result)))
method_dict = {} method_dict = {}
for line in result: for line in result:
...@@ -202,8 +207,9 @@ class SQLQueue(RAMQueue): ...@@ -202,8 +207,9 @@ class SQLQueue(RAMQueue):
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
message_list = [] message_list = []
if hasattr(activity_tool,'SQLQueue_readMessageList'): readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node=None)
for line in result: for line in result:
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
m.processing_node = line.processing_node m.processing_node = line.processing_node
...@@ -214,17 +220,19 @@ class SQLQueue(RAMQueue): ...@@ -214,17 +220,19 @@ class SQLQueue(RAMQueue):
def dumpMessageList(self, activity_tool): def dumpMessageList(self, activity_tool):
# Dump all messages in the table. # Dump all messages in the table.
message_list = [] message_list = []
if hasattr(activity_tool, 'SQLQueue_dumpMessageList'): dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
result = activity_tool.SQLQueue_dumpMessageList() if dumpMessageList is not None:
result = dumpMessageList()
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
message_list.append(m) message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
if hasattr(activity_tool,'SQLQueue_readMessageList'): readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
#LOG('distribute count',0,str(len(result)) ) #LOG('distribute count',0,str(len(result)) )
#LOG('distribute count',0,str(map(lambda x:x.uid, result))) #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
#get_transaction().commit() # Release locks before starting a potentially long calculation #get_transaction().commit() # Release locks before starting a potentially long calculation
...@@ -296,7 +304,7 @@ class SQLQueue(RAMQueue): ...@@ -296,7 +304,7 @@ class SQLQueue(RAMQueue):
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_tag(self, activity_tool, message, value): def _validate_after_tag(self, activity_tool, message, value):
# Count number of occurances of tag # Count number of occurances of tag
if type(value) == type(''): if type(value) == type(''):
...@@ -305,7 +313,7 @@ class SQLQueue(RAMQueue): ...@@ -305,7 +313,7 @@ class SQLQueue(RAMQueue):
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_tag_and_method_id(self, activity_tool, message, value): def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id # Count number of occurances of tag and method_id
if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2: if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
...@@ -321,7 +329,7 @@ class SQLQueue(RAMQueue): ...@@ -321,7 +329,7 @@ class SQLQueue(RAMQueue):
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None): def timeShift(self, activity_tool, delay, processing_node = None):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment