Commit b070a73c authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add support for multiple after methods and broadcast.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2293 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent c384cc52
...@@ -59,6 +59,7 @@ class SQLQueue(RAMQueue): ...@@ -59,6 +59,7 @@ class SQLQueue(RAMQueue):
activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
broadcast = m.activity_kw.get('broadcast', 0),
message = self.dumpMessage(m), message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime())) date = m.activity_kw.get('at_date', DateTime()))
...@@ -66,7 +67,7 @@ class SQLQueue(RAMQueue): ...@@ -66,7 +67,7 @@ class SQLQueue(RAMQueue):
# Erase all messages in a single transaction # Erase all messages in a single transaction
LOG("prepareDeleteMessage", 0, str(m.__dict__)) LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLQueue_delMessage(uid = m.uid) activity_tool.SQLQueue_delMessage(uid = m.uid)
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLQueue_readMessageList'): if hasattr(activity_tool,'SQLQueue_readMessageList'):
now_date = DateTime() now_date = DateTime()
...@@ -90,7 +91,7 @@ class SQLQueue(RAMQueue): ...@@ -90,7 +91,7 @@ class SQLQueue(RAMQueue):
# Make sure object exists # Make sure object exists
validation_state = m.validate(self, activity_tool) validation_state = m.validate(self, activity_tool)
if validation_state is not VALID: if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH): if validation_state in (EXCEPTION, INVALID_PATH):
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
...@@ -212,32 +213,49 @@ class SQLQueue(RAMQueue): ...@@ -212,32 +213,49 @@ class SQLQueue(RAMQueue):
#LOG('distribute count',0,str(len(result)) ) #LOG('distribute count',0,str(len(result)) )
#LOG('distribute count',0,str(map(lambda x:x.uid, result))) #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
#get_transaction().commit() # Release locks before starting a potentially long calculation #get_transaction().commit() # Release locks before starting a potentially long calculation
uid_list = map(lambda x:x.uid, result)[0:100] result = list(result)[0:100]
for uid in uid_list: for line in result:
#LOG("distribute", 0, "assign %s" % uid) broadcast = line.broadcast
activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) uid = line.uid
#get_transaction().commit() # Release locks immediately to allow processing of messages if broadcast:
processing_node = processing_node + 1 # Broadcast messages must be distributed into all nodes.
if processing_node > node_count: activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
processing_node = 1 # Round robin for node in range(2, node_count+1):
activity_tool.SQLQueue_writeMessage( path = line.path,
method_id = line.method_id,
priority = line.priority,
broadcast = 1,
processing_node = node,
message = line.message,
date = line.date)
else:
#LOG("distribute", 0, "assign %s" % uid)
activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
#get_transaction().commit() # Release locks immediately to allow processing of messages
processing_node = processing_node + 1
if processing_node > node_count:
processing_node = 1 # Round robin
# Validation private methods # Validation private methods
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id # Count number of occurances of method_id
LOG('SQLQueue._validate_after_method_id, message',0,message) #get_transaction().commit()
LOG('SQLQueue._validate_after_method_id, value',0,value) if type(value) == type(''):
value = [value]
result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None) result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
LOG('SQLQueue._validate_after_method_id, method_id',0,value)
LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_path(self, activity_tool, message, value): def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path # Count number of occurances of path
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value) result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_message_uid(self, activity_tool, message, value): def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid # Count number of occurances of message_uid
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None) result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
...@@ -245,8 +263,8 @@ class SQLQueue(RAMQueue): ...@@ -245,8 +263,8 @@ class SQLQueue(RAMQueue):
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay): def timeShift(self, activity_tool, delay):
""" """
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
......
...@@ -10,6 +10,7 @@ class_file: ...@@ -10,6 +10,7 @@ class_file:
<params>path <params>path
processing_node processing_node
method_id method_id
broadcast
uid:int=0</params> uid:int=0</params>
UPDATE message_queue UPDATE message_queue
SET SET
...@@ -19,3 +20,7 @@ WHERE ...@@ -19,3 +20,7 @@ WHERE
<dtml-if path> path = <dtml-sqlvar path type="string"> <dtml-if path> path = <dtml-sqlvar path type="string">
<dtml-else> uid = <dtml-sqlvar uid type="int"> </dtml-if> <dtml-else> uid = <dtml-sqlvar uid type="int"> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if broadcast>
AND broadcast = <dtml-sqlvar broadcast type="int">
</dtml-if>
...@@ -17,6 +17,7 @@ CREATE TABLE `message_queue` ( ...@@ -17,6 +17,7 @@ CREATE TABLE `message_queue` (
`processing` INT DEFAULT 0, `processing` INT DEFAULT 0,
`processing_date` datetime, `processing_date` datetime,
`priority` INT DEFAULT 0, `priority` INT DEFAULT 0,
`broadcast` INT DEFAULT 0,
`message` BLOB, `message` BLOB,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY `date` (`date`), KEY `date` (`date`),
......
...@@ -17,6 +17,12 @@ FROM ...@@ -17,6 +17,12 @@ FROM
message_queue message_queue
WHERE WHERE
processing_node >= -1 processing_node >= -1
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> <dtml-if method_id>
AND (
<dtml-in method_id>
method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
)
</dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if> <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
...@@ -11,13 +11,16 @@ class_file: ...@@ -11,13 +11,16 @@ class_file:
method_id method_id
message message
priority priority
broadcast
processing_node=-1
date</params> date</params>
INSERT INTO message_queue INSERT INTO message_queue
SET SET
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if> <dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if>
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
processing_node = -1, processing_node = <dtml-sqlvar processing_node type="int">,
broadcast = <dtml-sqlvar broadcast type="int">,
processing = -1, processing = -1,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
message = <dtml-sqlvar message type="string"> message = <dtml-sqlvar message type="string">
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment