Commit 287ab6ec authored by Rafael Monnerat's avatar Rafael Monnerat

Use erp5.util API for runTestSuite

parent 3039ec7a
import cloudooo
import re, imp, sys, threading, os, shlex, subprocess, glob, random
import errno
from pprint import pprint
import traceback
# The content of this file might be partially moved to an egg
# in order to allows parallel tests without the code of ERP5
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, quiet=False):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
output(data)
buffer.append(data)
if p.stdout:
stdout = []
output = quiet and (lambda data: None) or sys.stdout.write
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, output, stdout))
stdout_thread.setDaemon(True)
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, sys.stderr.write, stderr))
stderr_thread.setDaemon(True)
stderr_thread.start()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
p.wait()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
class Persistent(object):
"""Very simple persistent data storage for optimization purpose
This tool should become a standalone daemon communicating only with an ERP5
instance. But for the moment, it only execute 1 test suite and exists,
and test suite classes may want some information from previous runs.
"""
def __init__(self, filename):
self._filename = filename
def __getattr__(self, attr):
if attr == '_db':
try:
db = file(self._filename, 'r+')
except IOError, e:
if e.errno != errno.ENOENT:
raise
db = file(self._filename, 'w+')
else:
try:
self.__dict__.update(eval(db.read()))
except StandardError:
pass
self._db = db
return db
self._db
return super(Persistent, self).__getattribute__(attr)
def sync(self):
self._db.seek(0)
db = dict(x for x in self.__dict__.iteritems() if x[0][:1] != '_')
pprint.pprint(db, self._db)
self._db.truncate()
class TestSuite(object):
"""
"""
allow_restart = False
realtime_output = True
stdin = file(os.devnull)
def __init__(self, max_instance_count, **kw):
self.__dict__.update(kw)
self._path_list = ['tests']
pool = threading.Semaphore(max_instance_count)
self.acquire = pool.acquire
self.release = pool.release
self._instance = threading.local()
self._pool = max_instance_count == 1 and [None] or \
range(1, max_instance_count + 1)
self._ready = set()
self.running = {}
if max_instance_count != 1:
self.realtime_output = False
elif os.isatty(1):
self.realtime_output = True
self.persistent = Persistent('run_test_suite-%s.tmp'
% self.__class__.__name__)
instance = property(lambda self: self._instance.id)
def start(self, test, on_stop=None):
assert test not in self.running
self.running[test] = instance = self._pool.pop(0)
def run():
self._instance.id = instance
if instance not in self._ready:
self._ready.add(instance)
self.setup()
status_dict = self.run(test)
if on_stop is not None:
on_stop(status_dict)
self._pool.append(self.running.pop(test))
self.release()
thread = threading.Thread(target=run)
thread.setDaemon(True)
thread.start()
def update(self):
self.checkout() # by default, update everything
def setup(self):
pass
def run(self, test):
raise NotImplementedError
def getTestList(self):
raise NotImplementedError
def spawn(self, *args, **kw):
quiet = kw.pop('quiet', False)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '\n$ ' + command
sys.stdout.flush()
try:
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
except Exception:
# Catch any exception here, to warn user instead of beeing silent,
# by generating fake error result
result = dict(status_code=-1,
command=command,
stderr=traceback.format_exc(),
stdout='')
raise SubprocessError(result)
if self.realtime_output:
stdout, stderr = subprocess_capture(p, quiet)
else:
stdout, stderr = p.communicate()
if not quiet:
sys.stdout.write(stdout)
sys.stderr.write(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
class CloudoooTestSuite(TestSuite):
RUN_RE = re.compile(
r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s',
re.DOTALL)
STATUS_RE = re.compile(r"""
(OK|FAILED)\s+\(
(failures=(?P<failures>\d+),?\s*)?
(errors=(?P<errors>\d+),?\s*)?
(skipped=(?P<skips>\d+),?\s*)?
(expected\s+failures=(?P<expected_failures>\d+),?\s*)?
(unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)?
\)
""", re.DOTALL | re.VERBOSE)
FTEST_PASS_FAIL_RE = re.compile(
'.*Functional Tests (?P<total>\d+) Tests, (?P<failures>\d+) Failures')
def run(self, test):
return self.runUnitTest(test)
def runUnitTest(self, *args, **kw):
try:
runUnitTest = os.environ.get('RUN_UNIT_TEST',
'runUnitTest')
args = tuple(shlex.split(runUnitTest)) + args
status_dict = self.spawn(*args, **kw)
except SubprocessError, e:
status_dict = e.status_dict
test_log = status_dict['stderr']
search = self.RUN_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(duration=float(groupdict['seconds']),
test_count=int(groupdict['all_tests']))
search = self.STATUS_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(error_count=int(groupdict['errors'] or 0),
failure_count=int(groupdict['failures'] or 0),
skip_count=int(groupdict['skips'] or 0)
+int(groupdict['expected_failures'] or 0)
+int(groupdict['unexpected_successes'] or 0))
return status_dict
def getTestList(self):
test_list = []
for test_path in glob.glob('/%s/handler/*/tests/test*.py' %
"/".join(cloudooo.__file__.split('/')[:-1])):
test_case = test_path.split(os.sep)[-1][:-3] # remove .py
# testOooMonitorRequest is making testsuite stall.
if test_case not in ['testOooHighLoad', 'testOooMonitorRequest'] and \
not test_case.startswith("testFfmpeg"):
test_list.append(test_case)
return test_list
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
# Rafael Monnerat <rafael@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import cloudooo
from CloudoooTestSuite import CloudoooTestSuite
import argparse
import sys
import glob
import os
import shlex
from erp5.util.testsuite import TestSuite as BaseTestSuite
from erp5.util.testsuite import SubprocessError
from erp5.util import taskdistribution
import glob, sys, time, argparse, xmlrpclib, pprint, socket
import threading
def _parsingErrorHandler(data, _):
print >> sys.stderr, 'Error parsing data:', repr(data)
taskdistribution.patchRPCParser(_parsingErrorHandler)
class DummyTaskDistributionTool(object):
class TestSuite(BaseTestSuite):
def __init__(self):
self.lock = threading.Lock()
def run(self, test):
return self.runUnitTest(test)
def createTestResult(self, name, revision, test_name_list, allow_restart,
*args):
self.test_name_list = list(test_name_list)
return None, revision
def updateTestResult(self, name, revision, test_name_list):
self.test_name_list = list(test_name_list)
return None, revision
def startUnitTest(self, test_result_path, exclude_list=()):
with self.lock:
for i, test in enumerate(self.test_name_list):
if test not in exclude_list:
del self.test_name_list[i]
return None, test
def runUnitTest(self, *args, **kw):
try:
runUnitTest = os.environ.get('RUN_UNIT_TEST',
'runUnitTest')
args = tuple(shlex.split(runUnitTest)) + args
status_dict = self.spawn(*args, **kw)
except SubprocessError, e:
status_dict = e.status_dict
test_log = status_dict['stderr']
search = self.RUN_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(duration=float(groupdict['seconds']),
test_count=int(groupdict['all_tests']))
search = self.STATUS_RE.search(test_log)
if search:
groupdict = search.groupdict()
status_dict.update(
error_count=int(groupdict['errors'] or 0),
failure_count=int(groupdict['failures'] or 0)
+int(groupdict['unexpected_successes'] or 0),
skip_count=int(groupdict['skips'] or 0)
+int(groupdict['expected_failures'] or 0))
return status_dict
def stopUnitTest(self, test_path, status_dict):
pass
def getTestList(self):
test_list = []
for test_path in glob.glob('/%s/handler/*/tests/test*.py' %
"/".join(cloudooo.__file__.split('/')[:-1])):
test_case = test_path.split(os.sep)[-1][:-3] # remove .py
# testOooMonitorRequest is making testsuite stall.
if test_case not in ['testOooHighLoad', 'testOooMonitorRequest'] and \
not test_case.startswith("testFfmpeg"):
test_list.append(test_case)
return test_list
def safeRpcCall(function, *args):
retry = 64
xmlrpc_arg_list = []
for argument in args:
if isinstance(argument, dict):
argument = dict([(x, isinstance(y,str) and xmlrpclib.Binary(y) or y) \
for (x,y) in argument.iteritems()])
xmlrpc_arg_list.append(argument)
while True:
try:
return function(*xmlrpc_arg_list)
except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def run():
parser = argparse.ArgumentParser(description='Run a test suite.')
parser.add_argument('--test_suite', help='The test suite name')
......@@ -65,35 +100,22 @@ def run():
args = parser.parse_args()
if args.master_url is not None:
master_url = args.master_url
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
else:
master = DummyTaskDistributionTool()
master = taskdistribution.TaskDistributionTool(args.master_url)
test_suite_title = args.test_suite_title or args.test_suite
revision = args.revision
suite = CloudoooTestSuite(1, test_suite=args.test_suite,
node_quantity=args.node_quantity,
revision=revision)
suite = TestSuite(1, test_suite=args.test_suite,
node_quantity=args.node_quantity,
revision=revision)
test_result = safeRpcCall(master.createTestResult,
args.test_suite, revision, suite.getTestList(),
suite.allow_restart, test_suite_title, args.test_node_title,
test_result = master.createTestResult(revision, suite.getTestList(),
args.test_node_title, suite.allow_restart, test_suite_title,
args.project_title)
if test_result:
test_result_path, test_revision = test_result
if test_result is not None:
assert revision == test_result.revision, (revision, test_result.revision)
while suite.acquire():
test = safeRpcCall(master.startUnitTest, test_result_path,
suite.running.keys())
if test:
suite.start(test[1], lambda status_dict, __test_path=test[0]:
safeRpcCall(master.stopUnitTest, __test_path, status_dict))
test = test_result.start(suite.running.keys())
if test is not None:
suite.start(test.name, lambda status_dict, __test=test:
__test.stop(**status_dict))
elif not suite.running:
break
......@@ -17,7 +17,8 @@ install_requires = [
'psutil>=0.2.0',
'lxml',
'python-magic',
'argparse'
'argparse',
'erp5.util'
]
setup(name='cloudooo',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment