Commit a217dc86 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Change the scheduling of active objects.

Now VALIDATION_ERROR_DELAY is in seconds but not in days.
timeShift is performed only for objects assigned to a given node.
SQLDict postpones all active objects which have the same order validation
specification at a time.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4261 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent b7365c8c
...@@ -40,9 +40,8 @@ INVALID_PATH = 1 ...@@ -40,9 +40,8 @@ INVALID_PATH = 1
INVALID_ORDER = 2 INVALID_ORDER = 2
# Time global parameters # Time global parameters
SECONDS_IN_DAY = 86400.0 MAX_PROCESSING_TIME = 900 # in seconds
MAX_PROCESSING_TIME = 900 / SECONDS_IN_DAY # in fractions of day VALIDATION_ERROR_DELAY = 30 # in seconds
VALIDATION_ERROR_DELAY = 30 / SECONDS_IN_DAY # in fractions of day
class Queue: class Queue:
""" """
......
...@@ -29,11 +29,12 @@ ...@@ -29,11 +29,12 @@
import random import random
from DateTime import DateTime from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
import sys import sys
import sha
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -71,7 +72,8 @@ class SQLDict(RAMDict): ...@@ -71,7 +72,8 @@ class SQLDict(RAMDict):
message = self.dumpMessage(m), message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime()), date = m.activity_kw.get('at_date', DateTime()),
group_method_id = m.activity_kw.get('group_method_id', ''), group_method_id = m.activity_kw.get('group_method_id', ''),
tag = m.activity_kw.get('tag', '')) tag = m.activity_kw.get('tag', ''),
order_validation_text = self.getOrderValidationText(m))
# Also store uid of activity # Also store uid of activity
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
...@@ -90,6 +92,7 @@ class SQLDict(RAMDict): ...@@ -90,6 +92,7 @@ class SQLDict(RAMDict):
date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list] date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list] group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list] tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
activity_tool.SQLDict_writeMessageList( path_list = path_list, activity_tool.SQLDict_writeMessageList( path_list = path_list,
method_id_list = method_id_list, method_id_list = method_id_list,
priority_list = priority_list, priority_list = priority_list,
...@@ -97,7 +100,8 @@ class SQLDict(RAMDict): ...@@ -97,7 +100,8 @@ class SQLDict(RAMDict):
message_list = dumped_message_list, message_list = dumped_message_list,
date_list = date_list, date_list = date_list,
group_method_id_list = group_method_id_list, group_method_id_list = group_method_id_list,
tag_list = tag_list) tag_list = tag_list,
order_validation_text_list = order_validation_text_list)
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
...@@ -140,7 +144,20 @@ class SQLDict(RAMDict): ...@@ -140,7 +144,20 @@ class SQLDict(RAMDict):
else: else:
return () return ()
def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date, retry): def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
order_validation_item_list = []
key_list = message.activity_kw.keys()
key_list.sort()
for key in key_list:
method_id = "_validate_%s" % key
if hasattr(self, method_id):
order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0:
return ''
return sha.new(repr(order_validation_item_list)).hexdigest()
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
validation_state = message.validate(self, activity_tool) validation_state = message.validate(self, activity_tool)
if validation_state is not VALID: if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH): if validation_state in (EXCEPTION, INVALID_PATH):
...@@ -155,14 +172,17 @@ class SQLDict(RAMDict): ...@@ -155,14 +172,17 @@ class SQLDict(RAMDict):
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: # Add some delay before new processing if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = retry + 1) priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution # We do not lower priority for INVALID_ORDER errors but we do postpone execution
if len(uid_list) > 0: # Add some delay before new processing order_validation_text = self.getOrderValidationText(message)
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, activity_tool.SQLDict_setPriority(order_validation_text = order_validation_text,
priority = priority, retry = retry + 1) processing_node = processing_node,
delay = VALIDATION_ERROR_DELAY,
retry = 1,
uid = None)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
return 1 return 1
...@@ -182,23 +202,16 @@ class SQLDict(RAMDict): ...@@ -182,23 +202,16 @@ class SQLDict(RAMDict):
if len(result) == 0: if len(result) == 0:
# If the result is still empty, shift the dates so that SQLDict can dispatch pending active # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
# objects quickly. # objects quickly.
self.timeShift(activity_tool, VALIDATION_ERROR_DELAY) self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node)
elif len(result) > 0: elif len(result) > 0:
#LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
try:
retry = int(line.retry)
except TypeError:
retry = 1
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list] uid_list_list = [uid_list]
priority_list = [line.priority] priority_list = [line.priority]
retry_list = [retry]
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
if len(uid_list) > 0: if len(uid_list) > 0:
# Set selected messages to processing # Set selected messages to processing
...@@ -208,7 +221,7 @@ class SQLDict(RAMDict): ...@@ -208,7 +221,7 @@ class SQLDict(RAMDict):
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
message_list = [m] message_list = [m]
# Validate message (make sure object exists, priority OK, etc.) # Validate message (make sure object exists, priority OK, etc.)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
group_method_id = m.activity_kw.get('group_method_id') group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None: if group_method_id is not None:
# Count the number of objects to prevent too many objects. # Count the number of objects to prevent too many objects.
...@@ -234,12 +247,6 @@ class SQLDict(RAMDict): ...@@ -234,12 +247,6 @@ class SQLDict(RAMDict):
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
try:
retry = int(line.retry)
except TypeError:
retry = 1
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0: if len(uid_list) > 0:
...@@ -247,7 +254,7 @@ class SQLDict(RAMDict): ...@@ -247,7 +254,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod(): if m.hasExpandMethod():
try: try:
count += len(m.getObjectList(activity_tool)) count += len(m.getObjectList(activity_tool))
...@@ -262,7 +269,6 @@ class SQLDict(RAMDict): ...@@ -262,7 +269,6 @@ class SQLDict(RAMDict):
message_list.append(m) message_list.append(m)
uid_list_list.append(uid_list) uid_list_list.append(uid_list)
priority_list.append(line.priority) priority_list.append(line.priority)
retry_list.append(retry)
if count >= MAX_GROUPED_OBJECTS: if count >= MAX_GROUPED_OBJECTS:
break break
...@@ -293,9 +299,6 @@ class SQLDict(RAMDict): ...@@ -293,9 +299,6 @@ class SQLDict(RAMDict):
m = message_list[i] m = message_list[i]
uid_list = uid_list_list[i] uid_list = uid_list_list[i]
priority = priority_list[i] priority = priority_list[i]
retry = retry_list[i]
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
if m.is_executed: if m.is_executed:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
...@@ -315,8 +318,8 @@ class SQLDict(RAMDict): ...@@ -315,8 +318,8 @@ class SQLDict(RAMDict):
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = retry + 1) priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
...@@ -519,11 +522,11 @@ class SQLDict(RAMDict): ...@@ -519,11 +522,11 @@ class SQLDict(RAMDict):
return VALID return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay): def timeShift(self, activity_tool, delay, processing_node=None):
""" """
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
""" """
activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY) activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node)
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -30,7 +30,7 @@ import random ...@@ -30,7 +30,7 @@ import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from DateTime import DateTime from DateTime import DateTime
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
try: try:
...@@ -314,11 +314,11 @@ class SQLQueue(RAMQueue): ...@@ -314,11 +314,11 @@ class SQLQueue(RAMQueue):
return VALID return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay): def timeShift(self, activity_tool, delay, processing_node = None):
""" """
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
""" """
activity_tool.SQLQueue_timeShift(delay = delay * SECONDS_IN_DAY) activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node)
registerActivity(SQLQueue) registerActivity(SQLQueue)
...@@ -21,6 +21,7 @@ CREATE TABLE `message` ( ...@@ -21,6 +21,7 @@ CREATE TABLE `message` (
`group_method_id` VARCHAR(255) DEFAULT '', `group_method_id` VARCHAR(255) DEFAULT '',
`tag` VARCHAR(255), `tag` VARCHAR(255),
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`order_validation_text` VARCHAR(255),
`message` BLOB, `message` BLOB,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY `date` (`date`), KEY `date` (`date`),
...@@ -30,5 +31,6 @@ CREATE TABLE `message` ( ...@@ -30,5 +31,6 @@ CREATE TABLE `message` (
KEY `processing` (`processing`), KEY `processing` (`processing`),
KEY `processing_date` (`processing_date`), KEY `processing_date` (`processing_date`),
KEY `priority` (`priority`), KEY `priority` (`priority`),
KEY `tag` (`tag`) KEY `tag` (`tag`),
KEY `order_validation_text` (`order_validation_text`)
) TYPE = InnoDB; ) TYPE = InnoDB;
...@@ -26,10 +26,10 @@ AND ...@@ -26,10 +26,10 @@ AND
message message
WHERE WHERE
processing <> 1 processing <> 1
<dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
GROUP BY GROUP BY
path, method_id, processing_node, processing path, method_id, processing_node, processing
ORDER BY ORDER BY
......
...@@ -16,6 +16,6 @@ SELECT uid FROM ...@@ -16,6 +16,6 @@ SELECT uid FROM
WHERE WHERE
processing <> 1 processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if path> AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
...@@ -10,16 +10,37 @@ class_file: ...@@ -10,16 +10,37 @@ class_file:
<params>uid:list <params>uid:list
priority priority
date date
retry</params> retry
delay
processing_node
order_validation_text</params>
UPDATE UPDATE
message message
SET SET
priority = <dtml-sqlvar priority type="int">, processing = 0
processing = 0, <dtml-if priority>
date = <dtml-sqlvar date type="datetime"> , priority = <dtml-sqlvar priority type="int">
</dtml-if>
<dtml-if delay>
, date = DATE_ADD(NOW(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND)
<dtml-elif date>
, date = <dtml-sqlvar date type="datetime">
</dtml-if>
<dtml-if retry> <dtml-if retry>
, retry = <dtml-sqlvar retry type="int"> , retry = retry + <dtml-sqlvar retry type="int">
</dtml-if> </dtml-if>
WHERE WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> 1 = 1
OR </dtml-if></dtml-in> <dtml-if uid>
AND (
<dtml-in uid>
uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
)
</dtml-if>
<dtml-if processing_node>
AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if>
<dtml-if order_validation_text>
AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
</dtml-if>
\ No newline at end of file
...@@ -7,9 +7,15 @@ cache_time:0 ...@@ -7,9 +7,15 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>delay</params> <params>delay
processing_node</params>
UPDATE UPDATE
message message
SET SET
date = date - <dtml-sqlvar delay type="int">, date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND),
processing_date = processing_date - <dtml-sqlvar delay type="int"> processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND)
WHERE
1 = 1
<dtml-if processing_node>
AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if>
\ No newline at end of file
...@@ -15,7 +15,8 @@ broadcast ...@@ -15,7 +15,8 @@ broadcast
date date
processing_node=-1 processing_node=-1
group_method_id group_method_id
tag</params> tag
order_validation_text</params>
INSERT INTO message INSERT INTO message
SET SET
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
...@@ -27,4 +28,5 @@ SET ...@@ -27,4 +28,5 @@ SET
broadcast = <dtml-sqlvar broadcast type="int">, broadcast = <dtml-sqlvar broadcast type="int">,
group_method_id = <dtml-sqlvar group_method_id type="string">, group_method_id = <dtml-sqlvar group_method_id type="string">,
tag = <dtml-sqlvar tag type="string">, tag = <dtml-sqlvar tag type="string">,
order_validation_text = <dtml-sqlvar order_validation_text type="string">,
message = <dtml-sqlvar message type="string"> message = <dtml-sqlvar message type="string">
...@@ -15,9 +15,10 @@ broadcast_list ...@@ -15,9 +15,10 @@ broadcast_list
date_list date_list
processing_node_list processing_node_list
group_method_id_list group_method_id_list
tag_list</params> tag_list
order_validation_text_list</params>
INSERT INTO message INSERT INTO message
(path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, message) (path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, order_validation_text, message)
VALUES VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if> <dtml-if sequence-start><dtml-else>,</dtml-if>
...@@ -31,6 +32,7 @@ VALUES ...@@ -31,6 +32,7 @@ VALUES
<dtml-sqlvar expr="broadcast_list[loop_item]" type="int">, <dtml-sqlvar expr="broadcast_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">, <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">, <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string"> <dtml-sqlvar expr="message_list[loop_item]" type="string">
) )
</dtml-in> </dtml-in>
...@@ -7,9 +7,15 @@ cache_time:0 ...@@ -7,9 +7,15 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>delay</params> <params>delay
processing_node</params>
UPDATE UPDATE
message_queue message_queue
SET SET
date = date - <dtml-sqlvar delay type="int">, date = date - <dtml-sqlvar delay type="int">,
processing_date = processing_date - <dtml-sqlvar delay type="int"> processing_date = processing_date - <dtml-sqlvar delay type="int">
WHERE
1 = 1
<dtml-if processing_node>
AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if>
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment