Commit a7d796a5 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: new 'merge_parent=<path>' SQLDict parameter

If specified, <path> must be the path of an ancestor and SQLDict will merge all
similar messages under this path and process only the root message.

This will be used in ERP5 simulation to drop 'expand' activities that are
children of other 'expand' activities.

Because it can be used with grouped messages, it may be interesting to used it
for recursiveImmediateReindexObject.
parent 275799fe
......@@ -132,16 +132,51 @@ class SQLDict(SQLBase):
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = self.loadMessage(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent')
try:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
)
uid_list = [x.uid for x in result]
if uid_list:
if merge_parent:
path_list = []
while merge_parent != path:
path = path.rsplit('/', 1)[0]
assert path
original_uid = path_and_method_id_dict.get((path, method_id))
if original_uid is not None:
return None, original_uid, [uid]
path_list.append(path)
uid_list = []
if path_list:
result = activity_tool.SQLDict_selectParentMessage(
path=path_list,
method_id=method_id,
group_method_id=line.group_method_id,
processing_node=processing_node)
if result: # found a parent
# mark child as duplicate
uid_list.append(uid)
# switch to parent
line = result[0]
key = line.path, method_id
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
# return unreserved similar children
result = activity_tool.SQLDict_selectChildMessageList(
path=line.path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = [x.uid for x in result]
uid_list += reserve_uid_list
if not line.processing_node:
# reserve found parent
reserve_uid_list.append(uid)
else:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node, uid=uid_list)
processing_node=processing_node, uid=reserve_uid_list)
else:
activity_tool.SQLDict_commit() # release locks
except:
......@@ -152,6 +187,8 @@ class SQLDict(SQLBase):
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
path_and_method_id_dict[key] = uid
return m, uid, uid_list
# We know that original_uid != uid because caller skips lines we returned
# earlier.
return None, original_uid, [uid]
return load
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
</params>
SELECT uid FROM
message
WHERE
processing_node = 0
AND path LIKE <dtml-sqlvar type="string" expr="path + '/%'">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
processing_node
</params>
SELECT * FROM
message
WHERE
processing_node IN (0, <dtml-sqlvar processing_node type="int">)
AND <dtml-sqltest path type="string" multiple>
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
ORDER BY path
LIMIT 1
FOR UPDATE
......@@ -35,6 +35,7 @@ from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import DummyMailHost
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.ERP5Type.Base import Base
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
......@@ -3586,6 +3587,70 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.__class__.invokeGroup = ActivityTool_invokeGroup
self.assertEqual(invoked, [1])
def test_mergeParent(self):
category_tool = self.portal.portal_categories
# Test data: c0
# / \
# c1 c2
# / \ |
# c3 c4 c5
c = [category_tool.newContent()]
for i in xrange(5):
c.append(c[i//2].newContent())
transaction.commit()
self.tic()
def activate(i, priority=1, **kw):
kw.setdefault('merge_parent', c[0].getPath())
c[i].activate(priority=priority, **kw).doSomething()
def check(*expected):
transaction.commit()
self.tic()
self.assertEquals(tuple(invoked), expected)
del invoked[:]
invoked = []
def doSomething(self):
invoked.append(c.index(self))
Base.doSomething = doSomething
try:
for t in (0, 1), (0, 4, 2), (1, 0, 5), (3, 2, 0):
for p, i in enumerate(t):
activate(i, p)
check(0)
activate(1, 0); activate(5, 1); check(1, 5)
activate(3, 0); activate(1, 1); check(1)
activate(2, 0); activate(1, 1); activate(4, 2); check(2, 1)
activate(4, 0); activate(5, 1); activate(3, 2); check(4, 5, 3)
activate(3, 0, merge_parent=c[1].getPath()); activate(0, 1); check(3, 0)
# Following test shows that a child can be merged with a parent even if
# 'merge_parent' is not specified. This can't be avoided without loading
# all found duplicates, which would be bad for performance.
activate(0, 0); activate(4, 1, merge_parent=None); check(0)
finally:
del Base.doSomething
def activate(i, priority=1, **kw):
c[i].activate(group_method_id='portal_categories/invokeGroup',
merge_parent=c[(i-1)//2 or i].getPath(),
priority=priority, **kw).doSomething()
def invokeGroup(self, message_list):
invoked.append([c.index(m[0]) for m in message_list])
category_tool.__class__.invokeGroup = invokeGroup
try:
activate(5, 0); activate(1, 1); check([5, 1])
activate(4, 0); activate(1, 1); activate(2, 0); check([1, 2])
activate(1, 0); activate(5, 0); activate(3, 1); check([1, 5])
for p, i in enumerate((5, 3, 2, 1, 4)):
activate(i, p, group_id=str(2 != i != 5))
check([2], [1])
for cost in 0.3, 0.1:
activate(2, 0, group_method_cost=cost)
activate(3, 1); activate(4, 2); activate(1, 3)
check([2, 1])
finally:
del category_tool.__class__.invokeGroup
category_tool._delObject(c[0].getId())
transaction.commit()
self.tic()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment