Commit 140a9375 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: more versatile getMessageList() to query activity tables

New SQLBase._getMessageList method has a very simple ZSQLCatalog-like API,
and replaces SQL{Dict,Queue}_readMessageList DTML scripts.

'include_processing' parameter disappears (in favor of 'processing')
and default behaviour is to include messages being processed.
parent c63dcf76
......@@ -28,6 +28,8 @@
import sys
import transaction
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
......@@ -39,11 +41,44 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
from Queue import Queue, VALIDATION_ERROR_DELAY
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
# same sort key as in SQLBase.getMessageList
return message.line.priority, message.line.date, message.uid
_DequeueMessageException = Exception()
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
sqltest_dict = {}
no_quote_type = int, float, long
def _(name, column=None, op="="):
if column is None:
column = name
column_op = "%s %s " % (column, op)
def render(value, render_string):
if value is None: # XXX: see comment in SQLBase._getMessageList
assert op == '='
return column + " IS NULL"
if isinstance(value, no_quote_type):
return column_op + str(value)
if isinstance(value, DateTime):
value = value.toZone('UTC').ISO()
assert isinstance(value, basestring), value
return column_op + render_string(value)
sqltest_dict[name] = render
_('active_process_uid')
_('group_method_id')
_('method_id')
_('path')
_('processing')
_('processing_node')
_('serialization_tag')
_('tag')
_('to_date', column="date", op="<=")
_('uid')
return sqltest_dict
sqltest_dict = sqltest_dict()
class SQLBase(Queue):
"""
Define a set of common methods for SQL-based storage of activities.
......@@ -60,26 +95,33 @@ class SQLBase(Queue):
assert len(result[0]) == 1
return result[0][0]
def getMessageList(self, activity_tool, processing_node=None,
include_processing=0, **kw):
# YO: reading all lines might cause a deadlock
def _getMessageList(self, activity_tool, offset=0, count=1000, src__=0, **kw):
# XXX: Because most columns have NOT NULL constraint, conditions with None
# value should be ignored, instead of trying to render them
# (with comparisons with NULL).
sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection
q = sql_connection.sql_quote__
if offset:
limit = '\nLIMIT %d,%d' % (offset, sys.maxint if count is None else count)
else:
limit = '' if count is None else '\nLIMIT %d' % count
sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
self.sql_table, sql and '\nWHERE ' + sql, limit)
return sql if src__ else Results(sql_connection().query(sql, max_rows=0))
def getMessageList(self, *args, **kw):
result = self._getMessageList(*args, **kw)
if type(result) is str: # src__ == 1
return result,
class_name = self.__class__.__name__
readMessageList = getattr(activity_tool,
class_name + '_readMessageList',
None)
if readMessageList is None:
return []
return [self.loadMessage(line.message,
activity=class_name,
uid=line.uid,
processing_node=line.processing_node,
retry=line.retry,
processing=line.processing)
for line in readMessageList(path=None,
method_id=None,
processing_node=processing_node,
to_date=None,
include_processing=include_processing)]
for line in result]
def _getPriority(self, activity_tool, method, default):
result = method()
......
......@@ -211,8 +211,8 @@ class SQLDict(SQLBase):
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {}
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
readUidList = getattr(activity_tool, 'SQLDict_readUidList', None)
if readUidList is not None:
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if m.object_path == object_path and (method_id is None or method_id == m.method_id):
......@@ -237,8 +237,8 @@ class SQLDict(SQLBase):
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL dict
result = readMessageList(path=path, method_id=method_id,
processing_node=None,include_processing=0, to_date=None)
result = self._getMessageList(activity_tool, processing=0, path=path,
**({'method_id': method_id} if method_id else {}))
for line in result:
path = line.path
line_method_id = line.method_id
......@@ -270,8 +270,7 @@ class SQLDict(SQLBase):
'Could not validate %s on %s' % (m.method_id , path))
if result:
uid_list = activity_tool.SQLDict_readUidList(
path=path, method_id=method_id)
uid_list = readUidList(path=path, method_id=method_id)
if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[x.uid for x in uid_list])
......@@ -289,13 +288,13 @@ class SQLDict(SQLBase):
def distribute(self, activity_tool, node_count):
offset = 0
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None:
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0,
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date, processing=0,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
......@@ -355,7 +354,7 @@ class SQLDict(SQLBase):
uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
......
......@@ -104,8 +104,8 @@ class SQLQueue(SQLBase):
"""
object_path is a tuple
"""
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None:
delMessage = getattr(activity_tool, 'SQLBase_delMessage', None)
if delMessage is not None:
#return # Do nothing here to precent overlocking
path = '/'.join(object_path)
# Parse each message in registered
......@@ -129,7 +129,8 @@ class SQLQueue(SQLBase):
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue
result = readMessageList(path=path, method_id=method_id, processing_node=None, to_date=None, include_processing=0)
result = self._getMessageList(activity_tool, processing=0, path=path,
**({'method_id': method_id} if method_id else {}))
for line in result:
path = line.path
method_id = line.method_id
......@@ -153,10 +154,8 @@ class SQLQueue(SQLBase):
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
if len(result):
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[line.uid for line in result])
if result:
delMessage(table=self.sql_table, uid=[line.uid for line in result])
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
......@@ -194,13 +193,13 @@ class SQLQueue(SQLBase):
def distribute(self, activity_tool, node_count):
offset = 0
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None:
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0,
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date, processing=0,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
......@@ -236,7 +235,7 @@ class SQLQueue(SQLBase):
distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
......
......@@ -1375,18 +1375,19 @@ class ActivityTool (Folder, UniqueObject):
self.absolute_url(), message))
security.declarePublic('getMessageList')
def getMessageList(self,**kw):
def getMessageList(self, activity=None, **kw):
"""
List messages waiting in queues
"""
# Initialize if needed
if not is_initialized:
self.initialize()
if activity:
return activity_dict[activity].getMessageList(aq_inner(self), **kw)
message_list = []
for activity in activity_dict.itervalues():
try:
message_list += activity.getMessageList(aq_inner(self),**kw)
message_list += activity.getMessageList(aq_inner(self), **kw)
except AttributeError:
LOG('getMessageList, could not get message from Activity:',0,activity)
return message_list
......
......@@ -53,7 +53,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<th align="left" valign="top">Processing</th>
<th align="left" valign="top">Call Traceback</th>
</tr>
<dtml-in expr="getMessageList(include_processing=1)">
<dtml-in expr="getMessageList()">
<dtml-let path="'/'.join(object_path)">
<tr>
<dtml-if show_cancel_and_invoke_links>
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>path
method_id
processing_node
include_processing
to_date
offset:int=0
count:int=1000
</params>
SELECT * FROM
message
WHERE
1 = 1
<dtml-if expr="not(include_processing)"> AND processing = 0 </dtml-if>
<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if expr="path is not None"> AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="method_id is not None"> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if expr="to_date is not None"> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
priority, date, uid
LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>path
method_id
processing_node
to_date
include_processing
offset:int=0
count:int=1000
</params>
SELECT * FROM
message_queue
WHERE
1 = 1
<dtml-if expr="not include_processing"> AND processing = 0 </dtml-if>
<dtml-if expr="processing_node is not None"> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if expr="to_date is not None"> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
priority, date, uid
LIMIT <dtml-sqlvar offset type="int">, <dtml-sqlvar count type="int">
......@@ -2067,23 +2067,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
raise ValueError, 'This method always fail'
Message.notifyUser = failingMethod
Organisation.failingMethod = failingMethod
readMessageList = getattr(self.getPortalObject(), '%s_readMessageList'% (activity, ))
getMessageList = self.getPortalObject().portal_activities.getMessageList
try:
obj.activate(activity=activity, priority=6).failingMethod()
self.commit()
self.flushAllActivities(silent=1, loop_size=100)
with_processing_len = len(readMessageList(path=None,
to_date=None,
method_id='failingMethod',
include_processing=1,
processing_node=None))
without_processing_len = len(readMessageList(path=None,
to_date=None,
method_id='failingMethod',
include_processing=0,
processing_node=None))
self.assertEqual(with_processing_len, 1)
self.assertEqual(without_processing_len, 1)
message, = getMessageList(activity=activity, method_id='failingMethod')
self.assertEqual(message.processing, 0)
finally:
Message.notifyUser = original_notifyUser
delattr(Organisation, 'failingMethod')
......@@ -2628,7 +2618,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
processing_node.
This must happen on first message execution, without any delay.
"""
readMessageList = getattr(self.getPortalObject(), '%s_readMessageList' % (activity, ))
activity_tool = self.getActivityTool()
container = self.getPortal().organisation_module
organisation = container.newContent(portal_type='Organisation')
......@@ -2645,13 +2634,11 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(getattr(container, organisation_id, None), None)
self.assertEqual(len(activity_tool.getMessageList()), 1)
activity_tool.distribute()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1, path=None, method_id=None,
to_date=None)), 0)
self.assertEqual([], activity_tool.getMessageList(activity=activity,
processing_node=-3))
activity_tool.tic()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1, path=None, method_id=None,
to_date=None)), 1)
self.assertEqual(1, len(activity_tool.getMessageList(activity=activity,
processing_node=-3)))
def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0,
run=run_all_test):
......@@ -2683,7 +2670,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
'group_method_id (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
readMessageList = self.getPortalObject().SQLDict_readMessageList
activity_tool = self.getActivityTool()
container = self.getPortalObject().organisation_module
organisation = container.newContent(portal_type='Organisation')
......@@ -2702,15 +2688,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(getattr(container, organisation_id, None), None)
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1, path=None, method_id=None,
to_date=None)), 0)
self.assertEqual([], activity_tool.getMessageList(activity="SQLDict",
processing_node=-3))
activity_tool.tic()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1, path=None, method_id=None,
to_date=None)), 1)
message, = activity_tool.getMessageList()
# The message excuted on "organisation_2" must have succeeded.
self.assertEqual(len(activity_tool.getMessageList()), 1)
self.assertEqual(message.processing_node, -3)
def CheckLocalizerWorks(self, activity):
FROM_STRING = 'Foo'
......@@ -3117,21 +3100,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.commit()
from Products.CMFActivity import ActivityTool
ActivityTool.activity_dict['SQLDict'].getProcessableMessageList(activity_tool, 1)
activity = ActivityTool.activity_dict['SQLDict']
activity.getProcessableMessageList(activity_tool, 1)
self.commit()
ActivityTool.activity_dict['SQLDict'].getProcessableMessageList(activity_tool, 2)
activity.getProcessableMessageList(activity_tool, 2)
self.commit()
ActivityTool.activity_dict['SQLDict'].getProcessableMessageList(activity_tool, 3)
activity.getProcessableMessageList(activity_tool, 3)
self.commit()
result = activity_tool.SQLDict_readMessageList(include_processing=1,
processing_node=None,
path=None,
method_id=None,
to_date=None,
offset=0,
count=1000)
result = activity._getMessageList(activity_tool)
try:
self.assertEqual(len([message
for message in result
......
......@@ -216,8 +216,7 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
old_message_count = 0
start = time.time()
count = 1000
def getMessageList():
return portal_activities.getMessageList(include_processing=1)
getMessageList = portal_activities.getMessageList
message_list = getMessageList()
message_count = len(message_list)
while message_count and not stop_condition(message_list):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment