Commit 90a59073 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: Make ValidationErrorDelay overridable and change getResult to getResultDict

parent 36290703
...@@ -146,21 +146,24 @@ class ActiveProcess(Base): ...@@ -146,21 +146,24 @@ class ActiveProcess(Base):
except AttributeError: except AttributeError:
# BBB: self was created before implementation of __init__ # BBB: self was created before implementation of __init__
return [] return []
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree if type(result_list) is not ConflictFreeLog:
# BBB: result_list is IOBTree or LOBTree
return result_list.values() return result_list.values()
return list(result_list) return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'getResult')
def getResult(self, key, **kw): def getResultDict(self, **kw):
""" """
Returns the result with requested key else None Returns the result Dict
""" """
try: try:
result_list = self.result_list result_list = self.result_list
result = result_list[key] except AttributeError:
except KeyError: return {}
return None if type(result_list) is not ConflictFreeLog:
return result return result_list
else:
return {}
security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult')
def activateResult(self, result): def activateResult(self, result):
......
...@@ -43,7 +43,7 @@ INVALID_ORDER = 2 ...@@ -43,7 +43,7 @@ INVALID_ORDER = 2
# Time global parameters # Time global parameters
MAX_PROCESSING_TIME = 900 # in seconds MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 1 # in seconds VALIDATION_ERROR_DELAY = 15 # in seconds
class Queue(object): class Queue(object):
""" """
...@@ -261,3 +261,6 @@ class Queue(object): ...@@ -261,3 +261,6 @@ class Queue(object):
Values out of this range might work, but are non-standard. Values out of this range might work, but are non-standard.
""" """
return 128 return 128
def getValidationErrorDelay(self):
return VALIDATION_ERROR_DELAY
...@@ -37,7 +37,7 @@ from Products.CMFActivity.ActivityTool import ( ...@@ -37,7 +37,7 @@ from Products.CMFActivity.ActivityTool import (
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage) Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Products.CMFActivity.ActivityRuntimeEnvironment import (
DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable) DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH from Queue import Queue, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
# TODO: Limit by size in bytes instead of number of rows. # TODO: Limit by size in bytes instead of number of rows.
...@@ -571,6 +571,7 @@ class SQLBase(Queue): ...@@ -571,6 +571,7 @@ class SQLBase(Queue):
make_available_uid_list = [] make_available_uid_list = []
notify_user_list = [] notify_user_list = []
executed_uid_list = deletable_uid_list executed_uid_list = deletable_uid_list
VALIDATION_ERROR_DELAY = self.getValidationErrorDelay()
if uid_to_duplicate_uid_list_dict is not None: if uid_to_duplicate_uid_list_dict is not None:
for m in message_list: for m in message_list:
if m.getExecutionState() == MESSAGE_NOT_EXECUTED: if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
......
...@@ -31,13 +31,13 @@ from functools import total_ordering ...@@ -31,13 +31,13 @@ from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate. # Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000 READ_MESSAGE_LIMIT = 1000
VALIDATION_ERROR_DELAY = 1
_DequeueMessageException = Exception() _DequeueMessageException = Exception()
from SQLDict import SQLDict from SQLDict import SQLDict
...@@ -49,6 +49,9 @@ class SQLJoblib(SQLDict): ...@@ -49,6 +49,9 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job' sql_table = 'message_job'
uid_group = 'portal_activity_job' uid_group = 'portal_activity_job'
def getValidationErrorDelay(self):
return VALIDATION_ERROR_DELAY
def initialize(self, activity_tool, clear): def initialize(self, activity_tool, clear):
""" """
Initialize the message table using MYISAM Engine Initialize the message table using MYISAM Engine
......
...@@ -68,9 +68,10 @@ if ENABLE_JOBLIB: ...@@ -68,9 +68,10 @@ if ENABLE_JOBLIB:
self.callback = callback self.callback = callback
def get(self, timeout=None): def get(self, timeout=None):
if self.active_process.getResult(self.active_process_sig) is None: resultDict = self.active_process.getResultDict()
if not resultDict.has_key(self.active_process_sig):
raise ConflictError raise ConflictError
result = self.active_process.getResult(self.active_process_sig).result result = resultDict[self.active_process_sig].result
if isinstance(result, Exception): if isinstance(result, Exception):
raise result raise result
...@@ -101,7 +102,8 @@ if ENABLE_JOBLIB: ...@@ -101,7 +102,8 @@ if ENABLE_JOBLIB:
# create a signature and convert it to integer # create a signature and convert it to integer
sig = joblib_hash(batch.items[0]) sig = joblib_hash(batch.items[0])
sigint = int(sig, 16) % (10 ** 16) sigint = int(sig, 16) % (10 ** 16)
if not self.active_process.getResult(sigint): resultDict = self.active_process.getResultDict()
if not resultDict.has_key(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib', joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id, tag="joblib_%s" % active_process_id,
signature=sig, signature=sig,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment