Commit 2264ab3a authored by Yoshinori Okuji's avatar Yoshinori Okuji

Broadcasting messages is supported.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2003 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent cacdc675
...@@ -43,7 +43,7 @@ priority_weight = \ ...@@ -43,7 +43,7 @@ priority_weight = \
[3] * 10 + \ [3] * 10 + \
[4] * 5 + \ [4] * 5 + \
[5] * 1 [5] * 1
class ActivityFlushError(Exception): class ActivityFlushError(Exception):
"""Error during active message flush""" """Error during active message flush"""
...@@ -59,6 +59,7 @@ class SQLDict(RAMDict): ...@@ -59,6 +59,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) , activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
broadcast = m.activity_kw.get('broadcast', 0),
message = self.dumpMessage(m), message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime())) date = m.activity_kw.get('at_date', DateTime()))
# Also store uid of activity # Also store uid of activity
...@@ -69,26 +70,26 @@ class SQLDict(RAMDict): ...@@ -69,26 +70,26 @@ class SQLDict(RAMDict):
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None) uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list) uid_list = map(lambda x:x.uid, uid_list)
if len(uid_list)>0: if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = uid_list) activity_tool.SQLDict_delMessage(uid = uid_list)
# Registration management # Registration management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
setattr(activity_buffer, '_%s_message_list' % class_name, []) setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name) uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
return uid_dict.has_key((tuple(m.object_path), m.method_id)) return uid_dict.has_key((tuple(m.object_path), m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 1 m.is_registered = 1
class_name = self.__class__.__name__ class_name = self.__class__.__name__
uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name) uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
uid_dict[(tuple(m.object_path), m.method_id)] = 1 uid_dict[(tuple(m.object_path), m.method_id)] = 1
getattr(activity_buffer,'_%s_message_list' % class_name).append(m) getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
def unregisterMessage(self, activity_buffer, activity_tool, m): def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0 # This prevents from inserting deleted messages into the queue m.is_registered = 0 # This prevents from inserting deleted messages into the queue
class_name = self.__class__.__name__ class_name = self.__class__.__name__
...@@ -103,7 +104,7 @@ class SQLDict(RAMDict): ...@@ -103,7 +104,7 @@ class SQLDict(RAMDict):
return filter(lambda m: m.is_registered, message_list) return filter(lambda m: m.is_registered, message_list)
else: else:
return () return ()
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessage'): if hasattr(activity_tool,'SQLDict_readMessage'):
...@@ -126,7 +127,7 @@ class SQLDict(RAMDict): ...@@ -126,7 +127,7 @@ class SQLDict(RAMDict):
uid_list = map(lambda x:x.uid, uid_list) uid_list = map(lambda x:x.uid, uid_list)
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
if len(uid_list) > 0: if len(uid_list) > 0:
# Set selected messages to processing # Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
...@@ -134,11 +135,11 @@ class SQLDict(RAMDict): ...@@ -134,11 +135,11 @@ class SQLDict(RAMDict):
# Validate message (make sure object exists, priority OK, etc.) # Validate message (make sure object exists, priority OK, etc.)
validation_state = m.validate(self, activity_tool) validation_state = m.validate(self, activity_tool)
if validation_state is not VALID: if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH): if validation_state in (EXCEPTION, INVALID_PATH):
# There is a serious validation error - we must lower priority # There is a serious validation error - we must lower priority
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
...@@ -231,7 +232,7 @@ class SQLDict(RAMDict): ...@@ -231,7 +232,7 @@ class SQLDict(RAMDict):
else: else:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Parse each message in SQL dict # Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
for line in result: for line in result:
...@@ -239,6 +240,8 @@ class SQLDict(RAMDict): ...@@ -239,6 +240,8 @@ class SQLDict(RAMDict):
method_id = line.method_id method_id = line.method_id
if not method_dict.has_key(method_id): if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
# This is optimisation with the goal to process objects on the same
# node and minimize network traffic with ZEO server
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
...@@ -265,8 +268,8 @@ class SQLDict(RAMDict): ...@@ -265,8 +268,8 @@ class SQLDict(RAMDict):
m.processing_node = line.processing_node m.processing_node = line.processing_node
m.priority = line.priority m.priority = line.priority
message_list.append(m) message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
if hasattr(activity_tool,'SQLDict_readMessageList'): if hasattr(activity_tool,'SQLDict_readMessageList'):
...@@ -276,17 +279,30 @@ class SQLDict(RAMDict): ...@@ -276,17 +279,30 @@ class SQLDict(RAMDict):
max_processing_date = now_date - MAX_PROCESSING_TIME max_processing_date = now_date - MAX_PROCESSING_TIME
self.max_processing_date = now_date self.max_processing_date = now_date
else: else:
max_processing_date = None max_processing_date = None
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1, result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
to_processing_date = max_processing_date) # Only assign non assigned messages to_processing_date = max_processing_date) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {} path_dict = {}
for line in result: for line in result:
path = line.path path = line.path
if not path_dict.has_key(path): broadcast = line.broadcast
if broadcast:
# Broadcast messages must be distributed into all nodes.
uid = line.uid
activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
for node in range(2, node_count+1):
activity_tool.SQLDict_writeMessage( path = path,
method_id = line.method_id,
priority = line.priority,
broadcast = 1,
processing_node = node,
message = line.message,
date = line.date)
elif not path_dict.has_key(path):
# Only assign once (it would be different for a queue) # Only assign once (it would be different for a queue)
path_dict[path] = 1 path_dict[path] = 1
activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None) activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages get_transaction().commit() # Release locks immediately to allow processing of messages
processing_node = processing_node + 1 processing_node = processing_node + 1
if processing_node > node_count: if processing_node > node_count:
...@@ -295,18 +311,21 @@ class SQLDict(RAMDict): ...@@ -295,18 +311,21 @@ class SQLDict(RAMDict):
# Validation private methods # Validation private methods
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id # Count number of occurances of method_id
result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None) if type(value) == type(''):
if result[0].uid_count > 0: value = [value]
return INVALID_ORDER for method_id in value:
result = activity_tool.SQLDict_validateMessageList(method_id=method_id, message_uid=None, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID return VALID
def _validate_after_path(self, activity_tool, message, value): def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path # Count number of occurances of path
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value) result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0: if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def _validate_after_message_uid(self, activity_tool, message, value): def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid # Count number of occurances of message_uid
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None) result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
...@@ -314,12 +333,12 @@ class SQLDict(RAMDict): ...@@ -314,12 +333,12 @@ class SQLDict(RAMDict):
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay): def timeShift(self, activity_tool, delay):
""" """
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
""" """
activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY) activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY)
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -10,7 +10,8 @@ class_file: ...@@ -10,7 +10,8 @@ class_file:
<params>path <params>path
processing_node processing_node
method_id method_id
uid=None</params> uid
broadcast</params>
UPDATE message UPDATE message
SET SET
processing_node=<dtml-sqlvar processing_node type="int">, processing_node=<dtml-sqlvar processing_node type="int">,
...@@ -22,4 +23,7 @@ WHERE ...@@ -22,4 +23,7 @@ WHERE
<dtml-else> <dtml-else>
path = <dtml-sqlvar path type="string"> path = <dtml-sqlvar path type="string">
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
</dtml-if> </dtml-if>
\ No newline at end of file <dtml-if broadcast>
AND broadcast = <dtml-sqlvar broadcast type="int">
</dtml-if>
...@@ -17,6 +17,7 @@ CREATE TABLE `message` ( ...@@ -17,6 +17,7 @@ CREATE TABLE `message` (
`processing` INT DEFAULT 0, `processing` INT DEFAULT 0,
`processing_date` datetime, `processing_date` datetime,
`priority` INT DEFAULT 0, `priority` INT DEFAULT 0,
`broadcast` INT DEFAULT 0,
`message` BLOB, `message` BLOB,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY `date` (`date`), KEY `date` (`date`),
......
...@@ -11,13 +11,16 @@ class_file: ...@@ -11,13 +11,16 @@ class_file:
method_id method_id
message message
priority priority
date</params> broadcast
date
processing_node=-1</params>
INSERT INTO message INSERT INTO message
SET SET
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if> <dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if>
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
processing_node = -1, processing_node = <dtml-sqlvar processing_node type="int">,
processing = -1, processing = -1,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
broadcast = <dtml-sqlvar broadcast type="int">,
message = <dtml-sqlvar message type="string"> message = <dtml-sqlvar message type="string">
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment