Commit 14a91750 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3e246b1d
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""nxdtest - test a project under Nexedi testing infrastructure.
XXX more docs
"""
from __future__ import print_function, absolute_import
# XXX split -> nxdtest + NXDTestfile (name=?)
from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE
from time import time, strftime, gmtime
import os, sys, threading, argparse, logging, traceback, re
import six
# loadNXDTestFile loads .nxdtest file located @path.
def loadNXDTestFile(path): # -> TestEnv
t = TestEnv()
g = {'TestCase': t.TestCase, # TODO + all other public TestEnv methods
'PyTest': PyTest}
with open(path, "r") as f:
src = f.read()
six.exec_(src, g)
return t
# TestCase defines one test case to run.
class TestCase:
def __init__(self, name, argv, summaryf=None, **kw):
self.name = name
self.argv = argv
self.kw = kw
self.summaryf = summaryf
# TestEnv represents a testing environment with set of TestCases to run.
class TestEnv:
def __init__(self):
self.byname = {} # of TestCase
self.testv = [] # of TestCase
# TestCase adds new test case to the environment.
def TestCase(self, name, argv, **kw):
assert name not in self.byname
t = TestCase(name, argv, **kw)
self.testv.append(t)
self.byname[name] = t
def main():
# testnode executes us giving URL to master results collecting instance and other details
# https://lab.nexedi.com/nexedi/erp5/blob/744f3fde/erp5/util/testnode/UnitTestRunner.py#L137
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--master_url', help='The URL of Master controling many suites')
parser.add_argument('--revision', help='The revision to test', default='dummy_revision')
parser.add_argument('--test_suite', help='The test suite name')
parser.add_argument('--test_suite_title', help='The test suite title')
parser.add_argument('--test_node_title', help='The test node title')
parser.add_argument('--project_title', help='The project title')
parser.add_argument('--verbose', action='store_true', help='increase output verbosity')
args = parser.parse_args()
# if verbose -> log to stderr
logger = None
if args.verbose:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
# load list of tests to run
tenv = loadNXDTestFile('.nxdtest')
# master_url provided -> run tests under master control
if args.master_url is not None:
# connect to master and create 'test result' object with list of tests to run
tool = TaskDistributor(portal_url = args.master_url, logger = logger)
test_result = tool.createTestResult(
revision = args.revision,
test_name_list = [t.name for t in tenv.testv],
node_title = args.test_node_title,
test_title = args.test_suite_title or args.test_suite,
project_title = args.project_title)
if test_result is None:
# a test run for given name and revision has already been completed
return
# master_url not provided -> run tests locally
else:
test_result = LocalTestResult(tenv)
# make sure we get output from subprocesses without delay.
# go does not buffer stdout/stderr by default, but python does for stdout.
# tell python not to buffer anything.
os.environ['PYTHONUNBUFFERED'] = 'y'
# run the tests
devnull = open(os.devnull)
while 1:
# ask master for next test to run; stop if no more.
test_result_line = test_result.start()
if test_result_line is None:
break
# run tenv[name]
t = tenv.byname[test_result_line.name]
tstart = time()
# default status dict
status = {
'test_count': 1,
'error_count': 0,
'failure_count': 0,
'skip_count': 0,
#html_test_result
}
try:
# Run t.argv in t.kw['env'] environment.
# In addition to kw['env'], kw['envadj'] allows users to define
# only adjustments instead of providing full env dict.
# Test command is spawned with unchanged cwd. Instance wrapper cares to set cwd before running us.
# bufsize=1 means 'line buffered'
kw = t.kw.copy()
env = kw.pop('env', os.environ)
env = env.copy()
envadj = kw.pop('envadj', {})
env.update(envadj)
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=1, **kw)
except:
stdout = ''
stderr = 'run %r\n%s' % (t.argv, traceback.format_exc())
sys.stderr.write(stderr)
status['error_count'] += 1
else:
# tee >stdout,stderr so we can also see in testnode logs
# (explicit teeing instead of p.communicate() to be able to see incremental progress)
buf_out = []
buf_err = []
tout = threading.Thread(target=tee, args=(p.stdout, sys.stdout, buf_out))
terr = threading.Thread(target=tee, args=(p.stderr, sys.stderr, buf_err))
tout.start()
terr.start()
tout.join(); stdout = ''.join(buf_out)
terr.join(); stderr = ''.join(buf_err)
p.wait()
if p.returncode != 0:
status['error_count'] += 1
# postprocess output, if we can
if t.summaryf is not None:
try:
summary = t.summaryf(stdout)
except:
bad = traceback.format_exc()
sys.stderr.write(bad)
stderr += bad
status['error_count'] += 1
else:
status.update(summary)
tend = time()
# report result of test run back to master
test_result_line.stop(
command = ' '.join(t.argv),
duration = tend - tstart,
date = strftime("%Y/%m/%d %H:%M:%S", gmtime(tend)),
stdout = stdout,
stderr = stderr,
**status
)
# tee, similar to tee(1) utility, copies data from fin to fout appending them to buf.
def tee(fin, fout, buf):
while 1:
# NOTE use raw os.read because it does not wait for full data to be available.
# ( we could use fin.readline(), but there are cases when e.g. progress
# is reported via printing consequent dots on the same line and
# readline() won't work for that.
#
# besides when a lot of output is available it would be a waste to
# read/flush it line-by-line. )
data = os.read(fin.fileno(), 4096)
if not(data):
return # EOF
fout.write(data)
fout.flush()
buf.append(data)
# LocalTestResult* handle tests runs, when master_url was not provided and tests are run locally.
class LocalTestResult:
def __init__(self, tenv):
assert isinstance(tenv, TestEnv)
self.tenv = tenv
self.next = 0 # tenv.testv[next] is next test to execute
def start(self): # -> test_result_line
if self.next >= len(self.tenv.testv):
return None # all tests are done
test_result_line = LocalTestResultLine()
test_result_line.name = self.tenv.testv[self.next].name
self.next += 1
return test_result_line
class LocalTestResultLine:
def stop(self, **kw):
def v(name):
return kw.get(name, '?')
_ = v('error_count')
if _ == '?':
st = '?'
elif _ == 0:
st = 'ok'
else:
st = 'fail'
print('%s\t%s\t%.3fs\t# %st %se %sf %ss' % (st, self.name, kw['duration'], v('test_count'), v('error_count'), v('failure_count'), v('skip_count')))
# XXX + dump .json ?
# support for well-known summary functions
class PyTest:
@staticmethod
def summary(out): # -> status_dict
# last line is like
# ================ 1 failed, 1 passed, 12 skipped in 0.39 seconds ================
textv = out.splitlines()
tail = textv[-1]
def get(name, default=None):
m = re.search(r'\b([0-9]+) '+name+r'\b', tail)
if m is None:
return default
return int(m.group(1))
stat = {}
def stat_set(stat_key, from_name):
v = get(from_name)
if v is None:
return
stat[stat_key] = v
stat_set('skip_count', 'skipped')
stat_set('error_count', 'failed')
stat_set('failure_count', 'xfailed') # XXX ok?
npass = get('passed', 0)
stat['test_count'] = npass + stat.get('failure_count', 0) + stat.get('skip_count', 0) + stat.get('error_count', 0)
return stat
def unittest_summary(out): # -> status_dict
# XXX -> erp5.util.testsuite.EggTestSuite
pass
def gotest_summary(out): # -> status_dict
# TODO
pass
if __name__ == '__main__':
main()
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
neotest must be on $PATH. neotest must be on $PATH.
""" """
# XXX split -> nxdtest + NXDTestfile (name=?) # FIXME -> use https://lab.nexedi.com/kirr/nxdtest
from erp5.util.taskdistribution import TaskDistributor from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment