Commit 3759047d authored by Julien Muchembled's avatar Julien Muchembled

searchAndActivate: use a grouping method instead of callMethodOnObjectList

The problem with callMethodOnObjectList is that when an object can't be
processed, all other objects of the same group fail without any chance to
be retried separately.

Grouping is configurable with usual CMFActivity parameters in new 'group_kw'
parameter, to avoid any conflict with catalog parameters (**kw).
'packet_size' and 'activity_count' are still accepted for backward
compatibility.
parent 8d764c23
......@@ -77,7 +77,7 @@ previous_method_id = launchUpgraderAlarm(\'upgrader_check_pre_upgrade\')\n
\n
previous_method_id.extend([\'recursiveImmediateReindexObject\',\n
\'immediateReindexObject\',\n
\'callMethodOnObjectList\'])\n
\'Base_postCheckConsistencyResult\'])\n
\n
previous_method_id = launchUpgraderAlarm(\'upgrader_check_upgrader\',\n
after_method_id=previous_method_id)\n
......
......@@ -58,7 +58,7 @@
portal = context.getPortalObject()\n
portal_alarms = portal.portal_alarms\n
\n
after_method_id = \'callMethodOnObjectList\'\n
after_method_id = \'Base_postCheckConsistencyResult\'\n
def launchSenseAlarm(alarm_id, after_tag=[]):\n
""" Get the alarm and use sense"""\n
upgrader_alarm = getattr(portal_alarms, alarm_id, None)\n
......
......@@ -27,7 +27,6 @@
##############################################################################
import re
import transaction
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5.Tool.TemplateTool import BusinessTemplateUnknownError
from Products.ERP5Type.tests.Sequence import SequenceList
......@@ -312,21 +311,16 @@ class TestUpgrader(ERP5TypeTestCase):
self.portal.portal_templates.getInstalledBusinessTemplateTitleList())
def stepCheckNoActivitiesCreated(self, sequence=None):
transaction.commit()
portal_activities = self.getActivityTool()
message = portal_activities.getMessageList()[0]
message, = portal_activities.getMessageList()
self.assertEqual(message.method_id, "Alarm_runUpgrader")
portal_templates = self.getTemplateTool()
title_list = portal_templates.getInstalledBusinessTemplateTitleList()
self.assertTrue('erp5_web' not in title_list,
"%s found in %s" % ('erp5_web', title_list))
getTitleList = self.getTemplateTool().getInstalledBusinessTemplateTitleList
self.assertNotIn('erp5_web', getTitleList())
portal_activities.manageInvoke(message.object_path, message.method_id)
title_list = portal_templates.getInstalledBusinessTemplateTitleList()
self.assertTrue('erp5_web' in title_list,
"%s not found in %s" % ('erp5_web', title_list))
transaction.commit()
message_list = set([i.method_id for i in portal_activities.getMessageList()])
self.assertTrue('callMethodOnObjectList' not in message_list)
self.assertIn('erp5_web', getTitleList())
self.commit()
self.assertEqual({'immediateReindexObject', 'unindexObject'},
{x.method_id for x in portal_activities.getMessageList()})
def stepCreateBigIncosistentData(self, sequence=None):
for _ in range(101):
......@@ -341,18 +335,13 @@ class TestUpgrader(ERP5TypeTestCase):
title="org_%s" % self.portal.organisation_module.getLastId())
def stepCheckActivitiesCreated(self, sequence=None):
transaction.commit()
portal_activities = self.getActivityTool()
message = portal_activities.getMessageList()[0]
message, = portal_activities.getMessageList()
self.assertEqual(message.method_id, "Alarm_runUpgrader")
portal_activities.manageInvoke(message.object_path, message.method_id)
transaction.commit()
message_list = portal_activities.getMessageList()
method_id_list = set([i.method_id for i in message_list])
self.assertTrue('callMethodOnObjectList' in method_id_list)
for message in message_list:
if message.method_id == 'callMethodOnObjectList':
self.assertEqual(message.args[-1], 'Base_postCheckConsistencyResult')
self.commit()
self.assertIn('Base_postCheckConsistencyResult',
{x.method_id for x in portal_activities.getMessageList()})
def stepUninstallERP5UpgraderTestBT(self, sequence=None):
bt5 = self.portal.portal_templates.getInstalledBusinessTemplate('erp5_web')
......
......@@ -29,6 +29,7 @@
import sys
from copy import deepcopy
from collections import defaultdict
from math import ceil
from Products.CMFCore.CatalogTool import CatalogTool as CMFCoreCatalogTool
from Products.ZSQLCatalog.ZSQLCatalog import ZCatalog
from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
......@@ -972,22 +973,49 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
return result
def _searchAndActivate(self, method_id, method_args=(), method_kw={},
activate_kw={}, min_uid=None, **kw):
activate_kw={}, min_uid=None, group_kw={}, **kw):
"""Search the catalog and run a script by activity on all found objects
This method is configurable (via 'packet_size' & 'activity_count'
parameters) so that it can work efficiently with databases of any size.
In order to not generate too many activities, this method limits the
number of rows to fetch from the catalog, and if the catalog would return
more results, it resumes by calling itself by activity.
'activate_kw' is for common activate parameters between all generated
activities and is usually used for priority and dependencies.
Common usage is to call this method without 'select_method_id'.
In this case, found objects are processed via a CMFActivity grouping,
and this can be configured via 'group_kw', for additional parameters to
pass to CMFActivity (in particular: 'activity' and 'group_method_*').
A generic grouping method is used if none is given.
group_method_cost default to 30 objects per packet.
'select_method_id', if provided, will be called with partial catalog
results and returned value will be provided to the callable identified by
'method_id' (which will no longer be invoked in the context of a given
document returned by catalog) as first positional argument.
Use 'packet_size' parameter to limit the size of each group (default: 30).
'activate_kw' may specify an active process to collect results.
'activity_count' parameter is deprecated.
Its value should be hardcoded because CMFActivity can now handle many
activities efficiently and any tweak should benefit to everyone.
However, there are still rare cases where one want to limit the number
of processing nodes, to minimize latency of high-priority activities.
"""
catalog_kw = dict(kw)
packet_size = catalog_kw.pop('packet_size', 30)
limit = packet_size * catalog_kw.pop('activity_count', 100)
catalog_kw = kw.copy()
select_method_id = catalog_kw.pop('select_method_id', None)
if select_method_id:
packet_size = catalog_kw.pop('packet_size', 30)
limit = packet_size * catalog_kw.pop('activity_count', 100)
elif 'packet_size' in catalog_kw: # BBB
assert not group_kw, (kw, group_kw)
packet_size = catalog_kw.pop('packet_size')
group_method_cost = 1. / packet_size
limit = packet_size * catalog_kw.pop('activity_count', 100)
else:
group_method_cost = group_kw.get('group_method_cost', .034) # 30 objects
limit = catalog_kw.pop('activity_count', None) or \
100 * int(ceil(1 / group_method_cost))
if min_uid:
catalog_kw['min_uid'] = SimpleQuery(uid=min_uid,
comparison_operator='>')
......@@ -999,23 +1027,28 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
result_count = len(r)
if result_count:
if result_count == limit:
next_kw = dict(activate_kw, priority=1+activate_kw.get('priority', 1))
next_kw = activate_kw.copy()
next_kw['priority'] = 1 + next_kw.get('priority', 1)
self.activate(activity='SQLQueue', **next_kw) \
._searchAndActivate(method_id,method_args, method_kw,
activate_kw, r[-1].getUid(), **kw)
portal_activities = self.getPortalObject().portal_activities
active_portal_activities = portal_activities.activate(
activity='SQLQueue', **activate_kw)
if select_method_id is None:
r = [x.getPath() for x in r]
r.sort()
activate = active_portal_activities.callMethodOnObjectList
method_args = (method_id, ) + method_args
else:
if select_method_id:
portal_activities = self.getPortalObject().portal_activities
active_portal_activities = portal_activities.activate(
activity='SQLQueue', **activate_kw)
r = getattr(portal_activities, select_method_id)(r)
activate = getattr(active_portal_activities, method_id)
for i in xrange(0, result_count, packet_size):
activate(r[i:i+packet_size], *method_args, **method_kw)
for i in xrange(0, result_count, packet_size):
activate(r[i:i+packet_size], *method_args, **method_kw)
else:
kw = activate_kw.copy()
kw['activity'] = 'SQLQueue'
if group_method_cost < 1:
kw['group_method_cost'] = group_method_cost
kw['group_method_id'] = None
kw.update(group_kw)
for r in r:
getattr(r.activate(**kw), method_id)(*method_args, **method_kw)
security.declarePublic('searchAndActivate')
def searchAndActivate(self, *args, **kw):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment