Commit 631dff69 authored by Julien Muchembled's avatar Julien Muchembled

Add support for ZEO-based unit tests with parallel execution of activities

The most simple way to use this feature is to use --activity_node option only,
but it is also possible to:
- run only a ZEO server (--activity_node=0)
- run only ZEO clients
- run only activity nodes, by specifying no test
- specify HOST:PORT to listen/connect for ZEO storage and ZServer

Load/save of catalog is done by the process running the ZEO server.
Load of static files is done by all processes. Save of static files is done by
the process running unit test.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@35374 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 470764c4
......@@ -540,7 +540,7 @@ class ActivityTool (Folder, UniqueObject):
# Filter content (ZMI))
def filtered_meta_types(self, user=None):
# Filters the list of available meta types.
all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
all = Folder.filtered_meta_types(self)
meta_types = []
for meta_type in self.all_meta_types():
if meta_type['name'] in self.allowed_types:
......
......@@ -59,6 +59,7 @@ from Products.ERP5Type.patches import StateChangeInfoPatch
from Products.ERP5Type.patches import transforms
from Products.ERP5Type.patches import OFSPdata
from Products.ERP5Type.patches import make_hidden_input
from Products.ERP5Type.patches import DemoStorage
# BACK: Forward Compatibility with Zope 2.12 or CMF 2.2. Remove when we've
# dropped support for older versions.
from Products.ERP5Type.patches import TransactionAddBeforeCommitHook
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# Copyright (c) 2010 Nexedi SARL and Contributors. All Rights Reserved.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB.DemoStorage import DemoStorage
try:
loadEx = DemoStorage.loadEx
except AttributeError:
pass # XXX Zope 2.12 ?
else:
##
# Fix bug in DemoStorage.loadEx (it uses 'load' instead of 'loadEx')
#
DemoStorage.loadEx = lambda *args: (loadEx(*args) + ('',))[:3]
##
# Implemenent conflict resolution for DemoStorage
#
from ZODB import POSException
from ZODB.ConflictResolution import tryToResolveConflict, ResolvedSerial
# copied from ZODB/DemoStorage.py and patched
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
old = self._index.get(oid, None)
if old is None:
# Hm, nothing here, check the base version:
if self._base:
try:
p, tid = self._base.load(oid, '')
except KeyError:
pass
else:
old = oid, None, None, p, tid
nv=None
if old:
oid, pre, vdata, p, tid = old
if vdata:
if vdata[0] != version:
raise POSException.VersionLockError, oid
nv=vdata[1]
else:
nv=old
if serial != tid:
# <patch>
rdata = tryToResolveConflict(self, oid, tid, serial, data)
if rdata is None:
raise POSException.ConflictError(
oid=oid, serials=(tid, serial), data=data)
data = rdata
# </patch>
r = [oid, old, version and (version, nv) or None, data, self._tid]
self._tindex.append(r)
s=self._tsize
s=s+72+(data and (16+len(data)) or 4)
if version: s=s+32+len(version)
if self._quota is not None and s > self._quota:
raise POSException.StorageError, (
'''<b>Quota Exceeded</b><br>
The maximum quota for this demonstration storage
has been exceeded.<br>Have a nice day.''')
finally: self._lock_release()
# <patch>
if old and serial != tid:
return ResolvedSerial
# </patch>
return self._tid
DemoStorage.store = store
def loadSerial(self, oid, serial):
# XXX should I use self._lock_acquire and self._lock_release ?
pre = self._index.get(oid)
while pre:
oid, pre, vdata, p, tid = pre
if tid == serial:
return p
return self._base.loadSerial(oid, serial)
DemoStorage.loadSerial = loadSerial
def loadBefore(self, oid, tid):
# XXX should I use self._lock_acquire and self._lock_release ?
end_time = None
pre = self._index.get(oid)
while pre:
oid, pre, vdata, p, start_time = pre
if start_time < tid:
return p, start_time, end_time
end_time = start_time
base = self._base.loadBefore(oid, tid)
if base:
p, start_time, base_end_time = base
return p, start_time, base_end_time or end_time
DemoStorage.loadBefore = loadBefore
def history(self, oid, version=None, length=1, filter=None):
assert not version
self._lock_acquire()
try:
r = []
pre = self._index.get(oid)
while length and pre:
oid, pre, vdata, p, tid = pre
assert vdata is None
d = {'tid': tid, 'size': len(p), 'version': ''}
if filter is None or filter(d):
r.append(d)
length -= 1
if length:
r += self._base.history(oid, version, length, filter)
return r
finally:
self._lock_release()
DemoStorage.history = history
......@@ -61,14 +61,15 @@ except ImportError:
import transaction
from Testing import ZopeTestCase
from Testing.ZopeTestCase.PortalTestCase import PortalTestCase, user_name
from Testing.ZopeTestCase import PortalTestCase, user_name
from Products.CMFCore.utils import getToolByName
from Products.DCWorkflow.DCWorkflow import ValidationFailed
from Products.ERP5Type.Base import _aq_reset
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
from zLOG import LOG, DEBUG
import backportUnittest
from Products.ERP5Type.tests.backportUnittest import SetupSiteError
from Products.ERP5Type.tests.utils import DummyMailHost, parseListeningAddress
# Quiet messages when installing products
install_product_quiet = 1
......@@ -153,6 +154,9 @@ try:
except ImportError:
pass
from Products.ERP5Type.tests.ProcessingNodeTestCase import \
ProcessingNodeTestCase
ZopeTestCase.installProduct('TimerService', quiet=install_product_quiet)
# CMF
......@@ -268,7 +272,7 @@ def profile_if_environ(environment_var_name):
# No profiling, return identity decorator
return lambda self, method: method
class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase):
"""TestCase for ERP5 based tests.
This TestCase setups an ERP5Site and installs business templates.
......@@ -566,7 +570,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
def _setUpDummyMailHost(self):
"""Replace Original Mail Host by Dummy Mail Host.
"""
from Products.ERP5Type.tests.utils import DummyMailHost
if 'MailHost' in self.portal.objectIds():
self.portal.manage_delObjects(['MailHost'])
self.portal._setObject('MailHost', DummyMailHost('MailHost'))
......@@ -713,50 +716,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
if rule.getValidationState() != 'validated':
rule.validate()
def tic(self, verbose=0):
"""
Start all messages
"""
portal_activities = getattr(self.getPortal(),'portal_activities',None)
if portal_activities is not None:
if verbose:
ZopeTestCase._print('Executing pending activities ...')
old_message_count = 0
start = time.time()
count = 1000
message_count = len(portal_activities.getMessageList())
while message_count:
if verbose and old_message_count != message_count:
ZopeTestCase._print(' %i' % message_count)
old_message_count = message_count
portal_activities.process_timer(None, None)
message_count = len(portal_activities.getMessageList())
# This prevents an infinite loop.
count -= 1
if count == 0:
# Get the last error message from error_log.
error_message = ''
error_log = self.getPortal().error_log._getLog()
if len(error_log):
last_log = error_log[-1]
error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
last_log['type'],
last_log['value'],
last_log['tb_text'],
)
raise RuntimeError,\
'tic is looping forever. These messages are pending: %r %s' % (
[('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in portal_activities.getMessageList()],
error_message
)
# This give some time between messages
if count % 10 == 0:
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
if verbose:
ZopeTestCase._print(' done (%.3fs)\n' % (time.time() - start))
def createSimpleUser(self, title, reference, function):
"""
Helper function to create a Simple ERP5 User.
......@@ -832,37 +791,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
self.assertEqual(method(), reference_workflow_state)
return workflow_error_message
def startZServer(self):
"""Starts an HTTP ZServer thread."""
from Testing.ZopeTestCase import threadutils, utils
if utils._Z2HOST is None:
randint = random.Random(hash(os.environ['INSTANCE_HOME'])).randint
def zserverRunner():
try:
threadutils.zserverRunner(utils._Z2HOST, utils._Z2PORT)
except socket.error, e:
if e.args[0] != errno.EADDRINUSE:
raise
utils._Z2HOST = None
from ZServer import setNumberOfThreads
setNumberOfThreads(1)
port_list = []
for i in range(3):
utils._Z2HOST = '127.0.0.1'
utils._Z2PORT = randint(55000, 55500)
t = threadutils.QuietThread(target=zserverRunner)
t.setDaemon(1)
t.start()
time.sleep(0.1)
if utils._Z2HOST:
ZopeTestCase._print("Running ZServer on port %i\n" % utils._Z2PORT)
break
port_list.append(str(utils._Z2PORT))
else:
ZopeTestCase._print("Can't find free port to start ZServer"
" (tried ports %s)\n" % ', '.join(port_list))
return utils._Z2HOST, utils._Z2PORT
def _installBusinessTemplateList(self, business_template_list,
light_install=True,
quiet=True):
......@@ -921,7 +849,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
title = self.getTitle()
from Products.ERP5Type.Base import _aq_reset
if portal_name in failed_portal_installation:
raise backportUnittest.SetupSiteError(
raise SetupSiteError(
'Installation of %s already failed, giving up' % portal_name)
try:
if app is None:
......@@ -930,6 +858,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
# make it's REQUEST available during setup
global current_app
current_app = app
app.test_portal_name = portal_name
global setup_done
if not (hasattr(aq_base(app), portal_name) and
......@@ -992,6 +921,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
except ImportError:
pass
self.serverhost, self.serverport = self.startZServer()
self._registerNode(distributing=1, processing=1)
self._updateConversionServerConfiguration()
self._updateConnectionStrings()
......@@ -1073,11 +1003,6 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
if count:
LOG('Products.ERP5Type.tests.ERP5TypeTestCase.beforeClose', DEBUG,
'dropped %d left-over activity messages' % (count,))
# portal_activities.process_timer automatically registers current node
# (localhost:<random_port>). We must unregister it so that Data.fs can
# be reused without reconfiguring portal_activities.
del portal_activities.distributingNode
del portal_activities._nodes
transaction.commit()
except AttributeError:
pass
......@@ -1248,11 +1173,6 @@ class ERP5ReportTestCase(ERP5TypeTestCase):
if diff_list:
self.fail('Lines differs:\n' + '\n'.join(diff_list))
from unittest import _makeLoader, TestSuite
def dummy_makeSuite(testCaseClass, prefix='dummy_test', sortUsing=cmp, suiteClass=TestSuite):
return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase(testCaseClass)
def dummy_setUp(self):
'''
This one is overloaded so that it dos not execute beforeSetUp and afterSetUp
......@@ -1276,6 +1196,34 @@ def dummy_tearDown(self):
'''
self._clear(1)
class ZEOServerTestCase(ERP5TypeTestCase):
"""TestCase class to run a ZEO storage
Main method is 'asyncore_loop' (inherited) since there is nothing to do
except processing I/O.
"""
def setUp(self):
# Start ZEO storage and send address to parent process if any.
from Zope2.custom_zodb import zeo_client, Storage
from ZEO.StorageServer import StorageServer
storage = {'1': Storage}
for host_port in parseListeningAddress(os.environ.get('zeo_server')):
try:
self.zeo_server = StorageServer(host_port, storage)
break
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
if zeo_client:
os.write(zeo_client, repr(host_port))
os.close(zeo_client)
ZopeTestCase._print("\nZEO Storage started at %s:%s ... " % host_port)
def tearDown(self):
self.zeo_server.close_server()
@onsetup
def optimize():
'''Significantly reduces portal creation time.'''
......
# This module must be imported before CMFActivity product is installed.
import base64, errno, select, socket, time
from threading import Thread
import Lifetime
import transaction
from BTrees.OIBTree import OIBTree
from Testing import ZopeTestCase
from Products.CMFActivity import ActivityTool as _ActivityTool
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests import backportUnittest
from Products.ERP5Type.tests.utils import createZServer
class ActivityTool(_ActivityTool.ActivityTool):
"""Class redefining CMFActivity.ActivityTool.ActivityTool for unit tests
"""
# When a ZServer can't be started, the node name ends with ':' (no port).
def _isValidNodeName(self, node_name):
return True
# Divert location to register processing and distributing nodes.
# Load balancing is configured at the root instead of the activity tool,
# so that additional can register even if there is no portal set up yet.
# Properties at the root are:
# - 'test_processing_nodes' to list processing nodes
# - 'test_distributing_node' to select the distributing node
def getNodeDict(self):
app = self.getPhysicalRoot()
if getattr(app, 'test_processing_nodes', None) is None:
app.test_processing_nodes = OIBTree()
return app.test_processing_nodes
def getDistributingNode(self):
return self.getPhysicalRoot().test_distributing_node
def manage_setDistributingNode(self, distributingNode, REQUEST=None):
# A property to catch setattr on 'distributingNode' doesn't work
# because self would lose all acquisition wrappers.
previous_node = self.distributingNode
try:
super(ActivityTool, self).manage_setDistributingNode(distributingNode,
REQUEST=REQUEST)
self.getPhysicalRoot().test_distributing_node = self.distributingNode
finally:
self.distributingNode = previous_node
# When there is more than 1 node, prevent the distributing node from
# processing activities.
def tic(self, processing_node=1, force=0):
processing_node_list = self.getProcessingNodeList()
if len(processing_node_list) > 1 and \
self.getCurrentNode() == self.getDistributingNode():
# Sleep between each distribute.
time.sleep(0.3)
transaction.commit()
else:
super(ActivityTool, self).tic(processing_node, force)
_ActivityTool.ActivityTool = ActivityTool
class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
"""Minimal ERP5 TestCase class to process activities
When a processing node starts, the portal may not exist yet, or its name is
unknown, so an additional 'test_portal_name' property at the root is set by
the node running the unit tests to tell other nodes on which portal activities
should be processed.
"""
@staticmethod
def asyncore_loop():
try:
Lifetime.lifetime_loop()
except KeyboardInterrupt:
pass
Lifetime.graceful_shutdown_loop()
def startZServer(self):
"""Start HTTP ZServer in background"""
utils = ZopeTestCase.utils
if utils._Z2HOST is None:
try:
hs = createZServer()
except RuntimeError, e:
ZopeTestCase._print(str(e))
else:
utils._Z2HOST, utils._Z2PORT = hs.server_name, hs.server_port
t = Thread(target=Lifetime.loop)
t.setDaemon(1)
t.start()
return utils._Z2HOST, utils._Z2PORT
def _registerNode(self, distributing, processing):
"""Register node to process and/or distribute activities"""
try:
activity_tool = self.portal.portal_activities
except AttributeError:
activity_tool = ActivityTool().__of__(self.app)
currentNode = activity_tool.getCurrentNode()
if distributing:
activity_tool.manage_setDistributingNode(currentNode)
if processing:
activity_tool.manage_addToProcessingList((currentNode,))
else:
activity_tool.manage_removeFromProcessingList((currentNode,))
def tic(self, verbose=0):
"""Execute pending activities"""
portal_activities = self.portal.portal_activities
if 1:
if verbose:
ZopeTestCase._print('Executing pending activities ...')
old_message_count = 0
start = time.time()
count = 1000
getMessageList = portal_activities.getMessageList
message_count = len(getMessageList(include_processing=1))
while message_count:
if verbose and old_message_count != message_count:
ZopeTestCase._print(' %i' % message_count)
old_message_count = message_count
portal_activities.process_timer(None, None)
if Lifetime._shutdown_phase:
# XXX CMFActivity contains bare excepts
raise KeyboardInterrupt
message_count = len(getMessageList(include_processing=1))
# This prevents an infinite loop.
count -= 1
if count == 0:
# Get the last error message from error_log.
error_message = ''
error_log = self.portal.error_log._getLog()
if len(error_log):
last_log = error_log[-1]
error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
last_log['type'],
last_log['value'],
last_log['tb_text'],
)
raise RuntimeError,\
'tic is looping forever. These messages are pending: %r %s' % (
[('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in portal_activities.getMessageList()],
error_message
)
# This give some time between messages
if count % 10 == 0:
portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
if verbose:
ZopeTestCase._print(' done (%.3fs)\n' % (time.time() - start))
def afterSetUp(self):
"""Initialize a node that will only process activities"""
createZServer() #self.startZServer()
self._registerNode(distributing=0, processing=1)
transaction.commit()
def processing_node(self):
"""Main loop for nodes that process activities"""
try:
while not Lifetime._shutdown_phase:
time.sleep(.3)
transaction.begin()
try:
portal = self.app[self.app.test_portal_name]
except AttributeError:
continue
portal.portal_activities.process_timer(None, None)
except KeyboardInterrupt:
pass
......@@ -107,8 +107,6 @@ class TestCase(unittest.TestCase):
_testMethodDoc = property(lambda self: self.__testMethodDoc)
def run(self, result=None):
import pdb
#pdb.set_trace()
orig_result = result
if result is None:
result = self.defaultTestResult()
......@@ -138,6 +136,8 @@ class TestCase(unittest.TestCase):
result.addSkip(self, str(e))
except SetupSiteError, e:
result.errors.append(None)
except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
raise # Python >= 2.5
except Exception:
result.addError(self, sys.exc_info())
else:
......@@ -151,6 +151,8 @@ class TestCase(unittest.TestCase):
result.addUnexpectedSuccess(self)
except SkipTest, e:
result.addSkip(self, str(e))
except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
raise # Python >= 2.5
except Exception:
result.addError(self, sys.exc_info())
else:
......@@ -158,6 +160,8 @@ class TestCase(unittest.TestCase):
try:
self.tearDown()
except (KeyboardInterrupt, SystemExit): # BACK: Not needed for
raise # Python >= 2.5
except Exception:
result.addError(self, sys.exc_info())
success = False
......@@ -252,7 +256,10 @@ class TextTestRunner(unittest.TextTestRunner):
result = self._makeResult()
startTime = time.time()
# BACK: 2.7 implementation wraps run with result.(start|stop)TestRun
test(result)
try:
test(result)
except KeyboardInterrupt:
pass
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
......
import os
import shutil
import socket
import sys
import glob
import threading
import ZODB
from asyncore import socket_map
from ZODB.DemoStorage import DemoStorage
from ZODB.FileStorage import FileStorage
from Products.ERP5Type.tests.utils import getMySQLArguments
from ZEO.ClientStorage import ClientStorage
from Products.ERP5Type.tests.utils import getMySQLArguments, instance_random
from Products.ERP5Type.tests.runUnitTest import instance_home, static_dir_list
def _print(message):
sys.stderr.write(message + "\n")
zserver_list = os.environ.get('zserver', '').split(',')
os.environ['zserver'] = zserver_list[0]
zeo_client = os.environ.get('zeo_client')
if zeo_client:
zeo_client = zeo_client.rsplit(':', 1)
zeo_client = (len(zeo_client) == 1 and 'localhost' or zeo_client[0],
int(zeo_client[-1]))
try:
activity_node = int(os.environ['activity_node'])
except KeyError:
activity_node = (zeo_client or 'zeo_server' in os.environ) and 1 or None
data_fs_path = os.environ.get('erp5_tests_data_fs_path',
os.path.join(instance_home, 'Data.fs'))
load = int(os.environ.get('erp5_load_data_fs', 0))
save = int(os.environ.get('erp5_save_data_fs', 0))
save_mysql = None
if not zeo_client:
def save_mysql(verbosity=1):
# The output of mysqldump needs to merge many lines at a time
# for performance reasons (merging lines is at most 10 times
# faster, so this produce somewhat not nice to read sql
command = 'mysqldump %s > dump.sql' % getMySQLArguments()
if verbosity:
_print('Dumping MySQL database with %s...' % command)
os.system(command)
_print("Cleaning static files ... ")
for dir in static_dir_list:
for f in glob.glob(os.path.join(instance_home, dir, '*')):
os.remove(f)
if load:
dump_sql = os.path.join(instance_home, 'dump.sql')
if os.path.exists(dump_sql):
_print("Restoring MySQL database ... ")
ret = os.system("mysql %s < %s" % (getMySQLArguments(), dump_sql))
assert not ret
else:
os.environ['erp5_tests_recreate_catalog'] = '1'
if save_mysql:
dump_sql = os.path.join(instance_home, 'dump.sql')
if os.path.exists(dump_sql):
_print("Restoring MySQL database ... ")
ret = os.system("mysql %s < %s" % (getMySQLArguments(), dump_sql))
assert not ret
else:
os.environ['erp5_tests_recreate_catalog'] = '1'
_print("Restoring static files ... ")
for dir in static_dir_list:
full_path = os.path.join(instance_home, dir)
if os.path.exists(full_path + '.bak'):
os.rmdir(full_path)
shutil.copytree(full_path + '.bak', full_path, symlinks=True)
elif save and os.path.exists(data_fs_path):
elif save and not zeo_client and os.path.exists(data_fs_path):
os.remove(data_fs_path)
if save:
Storage = FileStorage(data_fs_path)
elif load:
Storage = DemoStorage(base=FileStorage(data_fs_path))
zeo_server_pid = None
zeo_client_pid_list = []
ZEvent = sys.modules.get('ZServer.PubCore.ZEvent')
def fork():
pid = os.fork()
if pid:
# recreate the event pipe if it already exists
for obj in socket_map.values():
assert obj is ZEvent.the_trigger
obj.close()
ZEvent.the_trigger = ZEvent.simple_trigger()
# make sure parent and child have 2 different RNG
instance_random.seed(instance_random.random())
return pid
while not zeo_client:
if activity_node:
r, zeo_client = os.pipe()
zeo_server_pid = fork()
if zeo_server_pid:
save_mysql = None
os.close(zeo_client)
zeo_client = eval(os.fdopen(r).read())
continue
else:
zeo_client_pid_list = activity_node = None
os.close(r)
elif activity_node is not None:
# run ZEO server but no need to fork
zeo_server_pid = 0
if save:
Storage = FileStorage(data_fs_path)
elif load:
Storage = DemoStorage(base=FileStorage(data_fs_path))
else:
Storage = DemoStorage()
break
else:
Storage = DemoStorage()
for i in xrange(1, activity_node):
pid = fork()
if not pid:
zeo_client_pid_list = None
os.environ['zserver'] = i < len(zserver_list) and zserver_list[i] or ''
break
zeo_client_pid_list.append(pid)
Storage = ClientStorage(zeo_client)
_print("Instance at %r loaded ... " % instance_home)
if zeo_client_pid_list is not None:
_print("Instance at %r loaded ... " % instance_home)
......@@ -6,6 +6,7 @@ import re
import time
import getopt
import unittest
import signal
import shutil
import errno
import random
......@@ -58,52 +59,52 @@ Options:
dependency provider (ie, the name of the BT
containing ZSQLMethods matching the desired
catalog storage).
--run_only=STRING
Run only specified test methods delimited with
--run_only=STRING Run only specified test methods delimited with
commas (e.g. testFoo,testBar). This can be regular
expressions.
-D
Invoke debugger on errors / failures.
-D Invoke debugger on errors / failures.
--update_business_templates
Update all business templates prior to runing
tests. This only has a meaning when doing
upgratability checks, in conjunction with --load.
--update_only can be use to restrict the list of
templates to update.
--update_only=STRING
Specify the list of business template to update if
--update_only=STRING Specify the list of business template to update if
you don't want to update them all. You can give a list
delimited with commas (e.g. erp5_core,erp5_xhtml_style).
This can be regular expressions.
--enable_full_indexing=STRING
By default, unit test do not reindex everything
for performance reasons. Provide list of documents
(delimited with comas) for which we want to force
indexing. This can only be for now 'portal_types'
--conversion_server_hostname=STRING
Hostname used to connect to conversion server (Oood),
this value will stored at default preference.
By default localhost is used.
--conversion_server_port=STRING
Port number used to connect to conversion server
(Oood), the value will be stored at default preference.
By default 8008 is used.
--use_dummy_mail_host
Replace the MailHost by DummyMailHost.
--use_dummy_mail_host Replace the MailHost by DummyMailHost.
This prevent the instance send emails.
By default Original MailHost is used.
--random_activity_priority=[SEED]
Force activities to have a random priority, to make
random failures (due to bad activity dependencies)
almost always reproducible. A random number
generator with the specified seed (or a random one
otherwise) is created for this purpose.
--activity_node=NUMBER Create given number of ZEO clients, to process
activities.
--zeo_server=[[HOST:]PORT] Bind the ZEO server to the given host/port.
--zeo_client=[HOST:]PORT Use specified ZEO server as storage.
--zserver=[HOST:]PORT[,...]
Make ZServer listen on given host:port
If used with --activity_node=, this can be a
comma-separated list of addresses.
When no unit test is specified, only activities are processed.
"""
# This script is usually executed directly, and is also imported using its full
......@@ -260,17 +261,15 @@ tests_home = os.path.join(instance_home, 'tests')
initializeInstanceHome(tests_framework_home, real_instance_home, instance_home)
class FilteredTestSuite(unittest.TestSuite):
"""Marker class to identify TestSuites that we have already filtered"""
pass
class ERP5TypeTestLoader(unittest.TestLoader):
"""Load test cases from the name passed on the command line.
"""
def __init__(self, test_pattern_list=None):
super(ERP5TypeTestLoader, self).__init__()
if test_pattern_list is not None:
self.test_pattern_list = map(re.compile, test_pattern_list)
filter_test_list = None
_testMethodPrefix = 'test'
testMethodPrefix = property(
lambda self: self._testMethodPrefix,
lambda self, value: None)
def loadTestsFromName(self, name, module=None):
"""This method is here for compatibility with old style arguments.
......@@ -281,47 +280,32 @@ class ERP5TypeTestLoader(unittest.TestLoader):
if name.endswith('.py'):
name = name[:-3]
name = name.replace(':', '.')
return unittest.TestLoader.loadTestsFromName(self, name, module)
return super(ERP5TypeTestLoader, self).loadTestsFromName(name, module)
def loadTestsFromModule(self, module):
"""ERP5Type test loader supports a function named 'test_suite'
"""
if hasattr(module, 'test_suite'):
return self.suiteClass(module.test_suite())
return unittest.TestLoader.loadTestsFromModule(self, module)
def _filterTestList(self, test_list):
"""test_list is a list of TestCase or TestSuite instances.
Returns a list of objects that contain only TestCase/TestSuite
matching --run_only"""
filtered = []
# Using FilteredTestSuite as a marker ensures that each Test
# is checked only once.
for item in test_list:
if isinstance(item, unittest.TestCase):
test_method_name = item.id().rsplit('.', 1)[-1]
for valid_test_method_name_re in self.test_pattern_list:
if valid_test_method_name_re.search(test_method_name):
filtered.append(item)
elif isinstance(item, FilteredTestSuite):
# has already been filtered, dont check it again
filtered.append(item)
elif isinstance(item, unittest.TestSuite):
filtered.append(FilteredTestSuite(self._filterTestList(item)))
else:
# should not happen.
raise ValueError, "What what what?"
return filtered
return super(ERP5TypeTestLoader, self).loadTestsFromModule(module)
def suiteClass(self, test_list):
"""Constructs a Test Suite from test lists.
Keep only tests matching commandline parameter --run_only"""
if hasattr(self, 'test_pattern_list'):
test_list = self._filterTestList(test_list)
def getTestCaseNames(self, testCaseClass):
"""Return a sorted sequence of method names found within testCaseClass
return FilteredTestSuite(test_list)
The returned list only contain names matching --run_only
"""
name_list = super(ERP5TypeTestLoader, self).getTestCaseNames(testCaseClass)
if self.filter_test_list:
filtered_name_list = []
for name in name_list:
for test in self.filter_test_list:
if test(name):
filtered_name_list.append(name)
break
return filtered_name_list
return name_list
unittest.TestLoader = ERP5TypeTestLoader
class DebugTestResult:
"""Wrap an unittest.TestResult, invoking pdb on errors / failures
......@@ -356,9 +340,10 @@ class DebugTestResult:
_print = sys.stderr.write
def runUnitTestList(test_list, verbosity=1, debug=0):
if not test_list:
_print("No test to run, exiting immediately.\n")
return
if "zeo_client" in os.environ and "zeo_server" in os.environ:
_print("conflicting options: --zeo_client and --zeo_server")
sys.exit(1)
os.environ.setdefault('INSTANCE_HOME', instance_home)
os.environ.setdefault('SOFTWARE_HOME', software_home)
os.environ.setdefault('COPY_OF_INSTANCE_HOME', instance_home)
......@@ -449,41 +434,6 @@ def runUnitTestList(test_list, verbosity=1, debug=0):
# it is then possible to run the debugger by "import pdb; pdb.set_trace()"
sys.path.insert(0, tests_framework_home)
save = int(os.environ.get('erp5_save_data_fs', 0))
dummy_test = save and (int(os.environ.get('update_business_templates', 0))
or not int(os.environ.get('erp5_load_data_fs', 0)))
TestRunner = backportUnittest.TextTestRunner
if dummy_test:
# Skip all tests in save mode and monkeypatch PortalTestCase.setUp
# to skip beforeSetUp and afterSetUp. Also patch unittest.makeSuite,
# as it's used in test_suite function in test cases.
from Products.ERP5Type.tests.ERP5TypeTestCase import \
dummy_makeSuite, dummy_setUp, dummy_tearDown
from Testing.ZopeTestCase.PortalTestCase import PortalTestCase
unittest.makeSuite = dummy_makeSuite
PortalTestCase.setUp = dummy_setUp
PortalTestCase.tearDown = dummy_tearDown
test_loader = ERP5TypeTestLoader()
test_loader.testMethodPrefix = 'dummy_test'
else:
# Hack the profiler to run only specified test methods, and wrap results when
# running in debug mode.
if debug:
class DebugTextTestRunner(TestRunner):
def _makeResult(self):
result = super(DebugTextTestRunner, self)._makeResult()
return DebugTestResult(result)
TestRunner = DebugTextTestRunner
test_method_list = os.environ.get('run_only', '').split(',')
test_loader = ERP5TypeTestLoader(test_method_list)
suite = test_loader.loadTestsFromNames(test_list)
# change current directory to the test home, to create zLOG.log in this dir.
os.chdir(tests_home)
try:
......@@ -499,28 +449,105 @@ def runUnitTestList(test_list, verbosity=1, debug=0):
# ourselves
layer.ZopeLite.setUp()
_print('done (%.3fs)' % (time.time() - _start))
result = TestRunner(verbosity=verbosity).run(suite)
TestRunner = backportUnittest.TextTestRunner
import Lifetime
from ZEO.ClientStorage import ClientStorage
from Zope2.custom_zodb import \
save_mysql, zeo_server_pid, zeo_client_pid_list, Storage
from Products.ERP5Type.tests.ERP5TypeTestCase import \
ProcessingNodeTestCase, ZEOServerTestCase, dummy_setUp, dummy_tearDown
def shutdown(signum, frame, signum_set=set()):
Lifetime.shutdown(0)
signum_set.add(signum)
if zeo_client_pid_list is None and len(signum_set) > 1:
# in case of ^C, a child should also receive a SIGHUP from the parent,
# so we merge the first 2 different signals in a single exception
signum_set.remove(signal.SIGHUP)
else:
raise KeyboardInterrupt
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGHUP, shutdown)
try:
save = int(os.environ.get('erp5_save_data_fs', 0))
load = int(os.environ.get('erp5_load_data_fs', 0))
dummy = save and (int(os.environ.get('update_business_templates', 0))
or not load)
if zeo_server_pid == 0:
suite = ZEOServerTestCase('asyncore_loop')
elif zeo_client_pid_list is None or not test_list:
suite = ProcessingNodeTestCase('processing_node')
if not (dummy or load):
_print('WARNING: either --save or --load should be used because static'
' files are only reloaded by the node installing business'
' templates.')
else:
if dummy:
# Skip all tests and monkeypatch PortalTestCase to skip
# afterSetUp/beforeTearDown.
ERP5TypeTestLoader._testMethodPrefix = 'dummy_test'
ZopeTestCase.PortalTestCase.setUp = dummy_setUp
ZopeTestCase.PortalTestCase.tearDown = dummy_tearDown
elif debug:
# Hack the profiler to run only specified test methods,
# and wrap results when running in debug mode.
class DebugTextTestRunner(TestRunner):
def _makeResult(self):
result = super(DebugTextTestRunner, self)._makeResult()
return DebugTestResult(result)
TestRunner = DebugTextTestRunner
suite = ERP5TypeTestLoader().loadTestsFromNames(test_list)
if not isinstance(Storage, ClientStorage):
# Remove nodes that were registered during previous execution.
# Set an empty dict (instead of delete the property)
# in order to avoid conflicts on / when several ZEO clients registers.
from BTrees.OIBTree import OIBTree
app = ZopeTestCase.app()
app.test_processing_nodes = OIBTree()
import transaction
transaction.commit()
ZopeTestCase.close(app)
if zeo_client_pid_list is None:
result = suite()
else:
_print('done (%.3fs)' % (time.time() - _start))
result = TestRunner(verbosity=verbosity).run(suite)
finally:
Storage.close()
if zeo_client_pid_list is not None:
# Wait that child processes exit. Stop ZEO storage (if any) after all
# other nodes disconnected.
for pid in zeo_client_pid_list:
os.kill(pid, signal.SIGHUP)
for pid in zeo_client_pid_list:
os.waitpid(pid, 0)
if zeo_server_pid:
os.kill(zeo_server_pid, signal.SIGHUP)
os.waitpid(zeo_server_pid, 0)
if save:
os.chdir(instance_home)
from Products.ERP5Type.tests.utils import getMySQLArguments
# The output of mysqldump needs to merge many lines at a time
# for performance reasons (merging lines is at most 10 times
# faster, so this produce somewhat not nice to read sql
command = 'mysqldump %s > dump.sql' % getMySQLArguments()
if verbosity:
_print('Dumping MySQL database with %s...\n' % command)
os.system(command)
if verbosity:
_print('Dumping static files...\n')
for static_dir in static_dir_list:
try:
shutil.rmtree(static_dir + '.bak')
except OSError, e:
if e.errno != errno.ENOENT:
raise
shutil.copytree(static_dir, static_dir + '.bak', symlinks=True)
if save_mysql:
save_mysql(verbosity)
if suite not in (ProcessingNodeTestCase, ZEOServerTestCase):
# Static files are modified by the node installing business templates,
# i.e. by the node running the unit test. There is no point saving them
# on a ZEO server, or on nodes that only process activities: this has to
# be done manually.
if verbosity:
_print('Dumping static files...\n')
for static_dir in static_dir_list:
try:
shutil.rmtree(static_dir + '.bak')
except OSError, e:
if e.errno != errno.ENOENT:
raise
shutil.copytree(static_dir, static_dir + '.bak', symlinks=True)
elif zeo_client_pid_list is not None:
_print('WARNING: No static files saved. You will have to do it manually.')
return result
......@@ -551,6 +578,10 @@ def main():
"use_dummy_mail_host",
"update_business_templates",
"random_activity_priority=",
"activity_node=",
"zeo_client=",
"zeo_server=",
"zserver=",
])
except getopt.GetoptError, msg:
usage(sys.stderr, msg)
......@@ -605,7 +636,8 @@ def main():
elif opt == "--erp5_catalog_storage":
os.environ["erp5_catalog_storage"] = arg
elif opt == "--run_only":
os.environ["run_only"] = arg
ERP5TypeTestLoader.filter_test_list = [re.compile(x).search
for x in arg.split(',')]
elif opt == "--update_only":
os.environ["update_only"] = arg
os.environ["update_business_templates"] = "1"
......@@ -620,13 +652,16 @@ def main():
elif opt == "--random_activity_priority":
os.environ["random_activity_priority"] = arg or \
str(random.randrange(0, 1<<16))
test_list = args
if not test_list:
_print("No test to run, exiting immediately.\n")
sys.exit(1)
result = runUnitTestList(test_list=test_list,
elif opt == "--activity_node":
os.environ["activity_node"] = arg
elif opt == "--zeo_client":
os.environ["zeo_client"] = arg
elif opt == "--zeo_server":
os.environ["zeo_server"] = arg
elif opt == "--zserver":
os.environ["zserver"] = arg
result = runUnitTestList(test_list=args,
verbosity=verbosity,
debug=debug)
try:
......@@ -636,7 +671,7 @@ def main():
_print("Profiler support is not available from ZopeTestCase in Zope 2.12\n")
else:
profiler.print_stats()
sys.exit(len(result.failures) + len(result.errors))
return result and len(result.failures) + len(result.errors) or 0
if __name__ == '__main__':
# Force stdout to be totally unbuffered.
......@@ -644,5 +679,4 @@ if __name__ == '__main__':
sys.stdout = os.fdopen(1, "wb", 0)
except OSError:
pass
main()
sys.exit(main())
......@@ -28,10 +28,12 @@
"""Utility functions and classes for unit testing
"""
import errno
import os
import logging
import random
import socket
import sys
import transaction
import zLOG
import Products.ERP5Type
......@@ -249,6 +251,58 @@ def getExtraSqlConnectionStringList():
return os.environ.get('extra_sql_connection_string_list',
'test2 test2:test3 test3').split(':')
instance_random = random.Random(hash(os.environ['INSTANCE_HOME']))
def parseListeningAddress(host_port=None, default_host='127.0.0.1'):
"""Parse string specifying the address to bind to
If the specified address is incomplete or missing, several (host, random_port)
will be returned. It must be used as follows (an appropriate error is raised
if all returned values failed):
for host, port in parseListeningAddress(os.environ.get('some_address')):
try:
s.bind((host, port))
break
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
"""
if host_port:
host_port = host_port.rsplit(':', 1)
if len(host_port) == 1:
host_port = default_host, host_port[0]
try:
yield host_port[0], int(host_port[1])
raise RuntimeError("Can't bind to %s:%s" % host_port)
except ValueError:
default_host = host_port[1]
port_list = []
for i in xrange(3):
port_list.append(instance_random.randint(55000, 55500))
yield default_host, port_list[-1]
raise RuntimeError("Can't find free port (tried ports %s)\n"
% ', '.join(map(str, port_list)))
def createZServer(log=os.devnull):
from ZServer import logger, zhttp_server, zhttp_handler
lg = logger.file_logger(log)
class new_zhttp_server:
# I can't use __new__ because zhttp_handler is an old-style class :(
def __init__(self):
self.__class__ = zhttp_server
for ip, port in parseListeningAddress(os.environ.get('zserver')):
hs = new_zhttp_server()
try:
hs.__init__(ip, port, resolver=None, logger_object=lg)
hs.install_handler(zhttp_handler(module='Zope2', uri_base=''))
sys.stderr.write("Running ZServer at %s:%s\n" % (ip, port))
return hs
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
hs.close()
# decorators
class reindex(object):
"""Decorator to commit transaction and flush activities after the method is
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment