Commit 0cb50e14 authored by Julien Muchembled's avatar Julien Muchembled

Review setup of activity load balancing in ZEO-based unit tests

Do not initialize '/test_processing_nodes' from ZEO server anymore, because this
would not work for NEO. As a result, conflict resolution on Application is
implemented and cleanup of list of activity nodes is done at shutdown.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@45694 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 40f5786d
# -*- coding: utf-8 -*-
import base64, errno, os, select, socket, sys, time
from threading import Thread
from UserDict import IterableUserDict
import Lifetime
import transaction
from BTrees.OIBTree import OIBTree
from Testing import ZopeTestCase
from ZODB.POSException import ConflictError
from zLOG import LOG, ERROR
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests import backportUnittest
from Products.ERP5Type.tests.utils import createZServer
class DictPersistentWrapper(IterableUserDict, object):
def __metaclass__(name, base, d):
def wrap(attr):
wrapped = getattr(base[0], attr)
def wrapper(self, *args, **kw):
self._persistent_object._p_changed = 1
return wrapped(self, *args, **kw)
wrapper.__name__ = attr
return wrapper
for attr in ('clear', 'setdefault', 'update', '__setitem__', '__delitem__'):
d[attr] = wrap(attr)
return type(name, base, d)
def __init__(self, dict, persistent_object):
self.data = dict
self._persistent_object = persistent_object
def patchActivityTool():
"""Redefine several methods of ActivityTool for unit tests
"""
......@@ -39,8 +59,8 @@ def patchActivityTool():
def getNodeDict(self):
app = self.getPhysicalRoot()
if getattr(app, 'test_processing_nodes', None) is None:
app.test_processing_nodes = OIBTree()
return app.test_processing_nodes
app.test_processing_nodes = {}
return DictPersistentWrapper(app.test_processing_nodes, app)
@patch
def getDistributingNode(self):
......@@ -74,6 +94,25 @@ def patchActivityTool():
self._orig_tic(processing_node, force)
def Application_resolveConflict(self, old_state, saved_state, new_state):
"""Solve conflicts in case several nodes register at the same time
"""
new_state = new_state.copy()
old, saved, new = [set(state.pop('test_processing_nodes', {}).items())
for state in old_state, saved_state, new_state]
if sorted(old_state.items()) != sorted(saved_state.items()):
raise ConflictError
new |= saved - old
new -= old - saved
new_state['test_processing_nodes'] = nodes = dict(new)
if len(nodes) != len(new):
raise ConflictError
return new_state
from OFS.Application import Application
Application._p_resolveConflict = Application_resolveConflict
class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
"""Minimal ERP5 TestCase class to process activities
......@@ -131,7 +170,16 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
if processing:
activity_tool.manage_addToProcessingList((currentNode,))
else:
activity_tool.manage_removeFromProcessingList((currentNode,))
activity_tool.manage_delNode((currentNode,))
@classmethod
def unregisterNode(cls):
if ZopeTestCase.utils._Z2HOST is not None:
self = cls('unregisterNode')
self.app = self._app()
self._registerNode(distributing=0, processing=0)
transaction.commit()
self._close()
def assertNoPendingMessage(self):
"""Get the last error message from error_log"""
......
......@@ -538,18 +538,6 @@ def runUnitTestList(test_list, verbosity=1, debug=0, run_only=None):
if run_only:
ERP5TypeTestLoader.filter_test_list = None
if not isinstance(Storage, ClientStorage):
# Remove nodes that were registered during previous execution.
# Set an empty dict (instead of delete the property)
# in order to avoid conflicts on / when several ZEO clients registers.
from BTrees.OIBTree import OIBTree
app = ZopeTestCase.app()
app.test_processing_nodes = OIBTree()
import transaction
transaction.commit()
ZopeTestCase.close(app)
del app
if zeo_client_pid_list is None:
result = suite()
else:
......@@ -558,6 +546,7 @@ def runUnitTestList(test_list, verbosity=1, debug=0, run_only=None):
_print('done (%.3fs)\n' % (time.time() - _start))
result = TestRunner(verbosity=verbosity).run(suite)
finally:
ProcessingNodeTestCase.unregisterNode()
Storage.close()
if zeo_client_pid_list is not None:
# Wait that child processes exit. Stop ZEO storage (if any) after all
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment