Commit 790292bf authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: remove processing/processing_date columns and improve watchers

The original goal was to improve performance by removing the
`processing_node_processing` index and the queries that modified
these 2 useless columns.
parent 696933a4
......@@ -102,7 +102,6 @@ def sqltest_dict():
_('group_method_id')
_('method_id')
_('path')
_('processing')
_('processing_node')
_('serialization_tag')
_('tag')
......@@ -218,8 +217,7 @@ class SQLBase(Queue):
activity=class_name,
uid=line.uid,
processing_node=line.processing_node,
retry=line.retry,
processing=line.processing)
retry=line.retry)
for line in result]
def countMessageSQL(self, quote, **kw):
......@@ -414,9 +412,8 @@ class SQLBase(Queue):
"""
Put messages back in given processing_node.
"""
db.query(
"UPDATE %s SET processing_node=%s, processing=0 WHERE uid IN (%s)\0"
"COMMIT" % (self.sql_table, state, ','.join(map(str, uid_list))))
db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, state, ','.join(map(str, uid_list))))
def getProcessableMessageLoader(self, db, processing_node):
# do not merge anything
......@@ -433,14 +430,12 @@ class SQLBase(Queue):
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of messages
- until the total "cost" of the group goes over 1
- get one message from the reserved bunch (this messages will be
"needed")
- update the total cost
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list and a group_method_id
......@@ -503,11 +498,6 @@ class SQLBase(Queue):
uid_list = [line.uid for line in result if line.uid != uid]
if uid_list:
self.unreserveMessageList(db, 0, uid_list)
# Process messages.
db.query("UPDATE %s"
" SET processing=1, processing_date=UTC_TIMESTAMP(6)"
" WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, ','.join(map(str, uid_to_duplicate_uid_list_dict))))
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except:
self._log(WARNING, 'Exception while reserving messages.')
......@@ -752,10 +742,10 @@ class SQLBase(Queue):
activity_tool.unregisterMessage(self, m)
uid_list = []
db = activity_tool.getSQLConnection()
for line in self._getMessageList(db, path=path, processing=0,
for line in self._getMessageList(db, path=path,
**({'method_id': method_id} if method_id else {})):
uid_list.append(line.uid)
if invoke:
if invoke and line.processing_node <= 0:
invoke(Message.load(line.message, uid=line.uid, line=line))
if uid_list:
self.deleteMessageList(db, uid_list)
......@@ -767,8 +757,7 @@ class SQLBase(Queue):
all dates in message(_queue) table
"""
activity_tool.getSQLConnection().query("UPDATE %s SET"
" date = DATE_SUB(date, INTERVAL %s SECOND),"
" processing_date = DATE_SUB(processing_date, INTERVAL %s SECOND)"
% (self.sql_table, delay, delay)
" date = DATE_SUB(date, INTERVAL %s SECOND)"
% (self.sql_table, delay)
+ ('' if processing_node is None else
"WHERE processing_node=%s" % processing_node))
......@@ -192,7 +192,6 @@ class Message(BaseMessage):
call_traceback = None
exc_info = None
is_executed = MESSAGE_NOT_EXECUTED
processing = None
traceback = None
oid = None
is_registered = False
......
#!/bin/sh
set -e
# Small watching script based on Sébastien idea.
# ideas:
# - more control on what would be displayed
......@@ -32,13 +31,26 @@ INTERVAL=$2
exit 1
}
SELECT=""
for t in message message_queue ; do
SELECT=$SELECT"""
SELECT count(*) AS $t, ${text_group:-method_id}, processing, processing_node AS node, min(priority) AS min_pri, max(priority) AS max_pri FROM $t GROUP BY ${text_group:-method_id}, processing, processing_node ORDER BY node;
SELECT count(*) AS $t, processing, processing_node, min(priority) AS min_pri, max(priority) AS max_pri FROM $t GROUP BY processing, processing_node;
SELECT priority as pri, MIN(timediff(NOW(), date)) AS min, AVG(timediff(NOW() , date)) AS avg, MAX(timediff(NOW() , date)) AS max FROM $t GROUP BY priority;
SELECT count(*) AS ${t}_count FROM $t;
"""
node_priority_cols="processing_node AS node, MIN(priority) AS min_pri, MAX(priority) AS max_pri"
for t in message message_queue message_job; do
SELECT=$SELECT"
SELECT count(*) AS $t, ${text_group:-method_id}, $node_priority_cols, MAX(retry) FROM $t
GROUP BY processing_node, ${text_group:-method_id}
ORDER BY processing_node, ${text_group:-method_id};
SELECT priority,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MAX(date)), \"%T\") AS min,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), AVG(date)), \"%T\") AS avg,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MIN(date)), \"%T\") AS max
FROM $t GROUP BY priority ORDER BY priority;"
[ $t = message ] || {
not_processing=$not_processing" UNION ALL "
count=$count,
}
not_processing=$not_processing"
SELECT count(*) AS count, $node_priority_cols, MIN(date) AS min_date, MAX(date) AS max_date
FROM $t WHERE processing_node<=0 GROUP BY processing_node"
count=$count"(SELECT count(*) AS $t FROM $t) as _$t"
done
exec watch -n ${INTERVAL:-5} "${MYSQL:-mysql} $MYSQL_OPT --disable-pager -t -e '$SELECT' "
exec watch -n ${INTERVAL:-5} "${MYSQL:-mysql} $MYSQL_OPT --disable-pager -t -e '
SET autocommit=off; SELECT * FROM $count;$SELECT
SELECT SUM(count) as count, node, MIN(min_pri) AS min_pri, MAX(max_pri) AS max_pri, MIN(min_date) AS min_date, MAX(max_date) AS max_date FROM ($not_processing) as t GROUP BY node;'"
......@@ -50,7 +50,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<th align="left" valign="top">Named Parameters</th>
<th align="left" valign="top">Processing Node</th>
<th align="left" valign="top">Retry</th>
<th align="left" valign="top">Processing</th>
<th align="left" valign="top">Call Traceback</th>
</tr>
<dtml-in expr="getMessageList()">
......@@ -84,11 +83,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</td>
<td align="left" valign="top"><dtml-var processing_node></td>
<td align="left" valign="top"><dtml-var retry></td>
<td align="left" valign="top">
<dtml-if expr="processing is not None">
<dtml-var processing>
</dtml-if>
</td>
<td align="left" valign="top">
<dtml-if expr="call_traceback is not None">
<pre><dtml-var call_traceback></pre>
......
......@@ -15,8 +15,6 @@ CREATE TABLE <dtml-var table> (
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME(6),
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
......@@ -27,7 +25,6 @@ CREATE TABLE <dtml-var table> (
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
......
......@@ -21,7 +21,7 @@ tag_list
serialization_tag_list
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
......@@ -32,7 +32,6 @@ VALUES
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
......
......@@ -15,8 +15,6 @@ CREATE TABLE message_job (
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME(6),
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
......@@ -28,7 +26,6 @@ CREATE TABLE message_job (
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
......
......@@ -22,7 +22,7 @@ signature_list
serialization_tag_list
</params>
INSERT INTO message_job
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, signature, serialization_tag, message)
(uid, path, active_process_uid, date, method_id, processing_node, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
......@@ -33,7 +33,6 @@ VALUES
<dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime(6)"><dtml-else>UTC_TIMESTAMP(6)</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
......
......@@ -1355,7 +1355,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
self.TryUserNotificationOnActivityFailure('SQLQueue')
def TryUserNotificationRaise(self, activity):
def test_93_tryUserNotificationRaise(self):
activity_tool = self.portal.portal_activities
obj = self.portal.organisation_module.newContent(portal_type='Organisation')
self.tic()
original_notifyUser = Message.notifyUser
......@@ -1363,35 +1364,21 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
raise ValueError('This method always fail')
Message.notifyUser = failingMethod
Organisation.failingMethod = failingMethod
getMessageList = self.portal.portal_activities.getMessageList
try:
obj.activate(activity=activity, priority=6).failingMethod()
self.commit()
self.flushAllActivities(silent=1, loop_size=100)
message, = getMessageList(activity=activity, method_id='failingMethod')
self.assertEqual(message.processing, 0)
for activity in ActivityTool.activity_dict:
obj.activate(activity=activity, priority=6).failingMethod()
self.commit()
self.flushAllActivities(silent=1, loop_size=100)
message, = activity_tool.getMessageList(
activity=activity, method_id='failingMethod')
self.assertEqual(message.processing_node, -2)
self.assertTrue(message.retry)
activity_tool.manageDelete(message.uid, activity)
self.commit()
finally:
Message.notifyUser = original_notifyUser
del Organisation.failingMethod
def test_93_userNotificationRaiseWithSQLDict(self):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
self.TryUserNotificationRaise('SQLDict')
def test_94_userNotificationRaiseWithSQLQueue(self):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
self.TryUserNotificationRaise('SQLQueue')
def test_95_userNotificationRaiseWithSQLJoblib(self):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
self.TryUserNotificationRaise('SQLJoblib')
def TryActivityRaiseInCommitDoesNotStallActivityConection(self, activity):
"""
Check that an activity which commit raises (as would a regular conflict
......
return 'ActivityTool_manageDelete?uid=%s&activity=%s' % (context.uid, context.activity)
return 'manageDelete?message_uid_list:int:list=%s&activity=%s' % (context.uid, context.activity)
SELECT count(*) AS message, method_id, processing, processing_node AS node, min(priority) AS min_pri, max(priority) AS max_pri FROM <dtml-var table> GROUP BY method_id, processing, processing_node ORDER BY node
\ No newline at end of file
SELECT count(*) AS `count`, method_id, processing_node AS node, min(priority) AS min_pri, max(priority) AS max_pri FROM <dtml-var table> GROUP BY processing_node, method_id ORDER BY processing_node, method_id
\ No newline at end of file
......@@ -133,6 +133,14 @@
<key> <string>id</string> </key>
<value> <string>ActivityTool_getCurrentActivities</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>max_rows_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
......
# searching
# processing_node column is manage by methods called by getMessageTempObjectList
if kw.get('processing_node', None) == '':
del kw['processing_node']
for k, v in kw.items():
if v:
if k == "str_object_path":
kw["path"] = v
elif k == "uid_activity":
kw["uid"] = v
elif k in ('method_id', 'processing_node', 'retry'):
continue
del kw[k]
message_kw = dict([(k,kw[k]) for k in ['uid_activity','str_object_path','method_id',
'args','retry','processing_node',
'processing'] if not(kw.get(k) in ('',None))])
if message_kw.has_key("str_object_path"):
message_kw["path"] = message_kw.pop("str_object_path")
if message_kw.has_key("uid_activity"):
message_kw["uid"] = message_kw.pop("uid_activity")
message_list = context.getMessageTempObjectList(**kw)
for message in message_list:
message.edit(
str_object_path = '/'.join(message.object_path),
uid_activity = str(message.uid) + ' ('+ message.activity[3:] +')',
arguments = str(message.args),
delete = '[Delete]',
restart = '[Restart]',
)
message_list = context.getMessageTempObjectList(**message_kw)
message_list_to_show = []
while len(message_list) > 0:
message = message_list.pop(0)
message.edit(str_object_path = '/'.join(str(i) for i in message.object_path))
message.edit(uid_activity = str(message.uid) + ' ('+ message.activity[3:] +')')
message.edit(arguments = str(message.args))
message.edit(delete = '[Delete]')
message.edit(restart = '[Restart]')
message_list_to_show.append(message)
return message_list_to_show
return message_list
SELECT priority as pri, MIN(timediff(NOW(), date)) AS min, AVG(timediff(NOW() , date)) AS avg, MAX(timediff(NOW() , date)) AS max FROM <dtml-var table> GROUP BY priority;
\ No newline at end of file
SELECT priority AS pri,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MAX(date)), '%T') AS min,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), AVG(date)), '%T') AS avg,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MIN(date)), '%T') AS max
FROM <dtml-var table> GROUP BY priority
\ No newline at end of file
......@@ -18,6 +18,14 @@
<key> <string>id</string> </key>
<value> <string>ActivityTool_getSQLActivities</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>max_rows_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
......
data = {}
for d, sql in [('SQLDict',context.ActivityTool_getCurrentActivities(table='message')),
('SQLQueue',context.ActivityTool_getCurrentActivities(table='message_queue'))]:
data[d] = {'line_list':[]}
for line in sql:
tmp = {}
for k in ['message','method_id','processing','node','min_pri','max_pri']:
tmp[k] = line[k]
data[d]['line_list'].append(tmp)
for d, sql in [('SQLDict2',context.ActivityTool_getSQLActivities(table='message')),
('SQLQueue2',context.ActivityTool_getSQLActivities(table='message_queue'))]:
data[d] = {'line_list':[]}
for line in sql:
tmp = {'pri':line['pri']}
for k in ['min','avg','max']:
tmp[k] = str(line[k])
data[d]['line_list'].append(tmp)
import json
return json.dumps(data)
return json.dumps({
q + ('2' if i else ''): {
'line_list': [dict(zip(results.names(), row)) for row in results]
}
for i, q in enumerate((context.ActivityTool_getCurrentActivities,
context.ActivityTool_getSQLActivities))
for q, results in (('SQLDict', q(table='message')),
('SQLQueue', q(table='message_queue')))
})
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>uid,activity,**kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ActivityTool_manageDelete</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>uid,activity,**kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ActivityTool_manageRestart</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
return 'ActivityTool_manageRestart?uid=%s&activity=%s' % (context.uid, context.activity)
return 'manageRestart?message_uid_list:int:list=%s&activity=%s' % (context.uid, context.activity)
......@@ -142,10 +142,6 @@
<string>retry</string>
<string>Retry</string>
</tuple>
<tuple>
<string>processing</string>
<string>Processing</string>
</tuple>
</list>
</value>
</item>
......@@ -221,10 +217,6 @@
<string>retry</string>
<string>Retry</string>
</tuple>
<tuple>
<string>processing</string>
<string>Processing</string>
</tuple>
</list>
</value>
</item>
......@@ -301,10 +293,6 @@
<string>retry</string>
<string></string>
</tuple>
<tuple>
<string>processing</string>
<string></string>
</tuple>
</list>
</value>
</item>
......
......@@ -10,7 +10,7 @@
<table>
<tr>
<th>Type</th>
<th>Message</th>
<th>Count</th>
<th>Method Id</th>
<th>Processing Node</th>
<th>Min pri</th>
......@@ -19,7 +19,7 @@
{{#each messageList1}}
<tr>
<td>{{this.messagetype}} </td>
<td>{{this.message}}</td>
<td>{{this.count}}</td>
<td>{{this.method_id}}</td>
<td>{{this.node}}</td>
<td>{{this.min_pri}}</td>
......@@ -29,7 +29,7 @@
{{#each messageList2}}
<tr>
<td>{{this.messagetype}} </td>
<td>{{this.message}}</td>
<td>{{this.count}}</td>
<td>{{this.method_id}}</td>
<td>{{this.node}}</td>
<td>{{this.min_pri}}</td>
......@@ -40,7 +40,7 @@
<table>
<tr>
<th>Type</th>
<th>Pri</th>
<th>Priority</th>
<th>Min</th>
<th>Avg</th>
<th>Max</th>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment