Commit 40e2c4ab authored by Jérome Perrin's avatar Jérome Perrin

Unittest and Python3 support

These are the necessary changes to run `SlapOS.Eggs.UnitTest-*` and `SlapOS.SoftwareReleases.IntegrationTest-*` using nxdtest

See merge request nexedi/nxdtest!6

/reviewed-by @kirr
parents d829e9ca a129b560
......@@ -60,12 +60,14 @@ from subprocess import Popen, PIPE
from time import time, strftime, gmtime, localtime
import os, sys, threading, argparse, logging, traceback, re, pwd, socket
import six
from golang import b
# loadNXDTestFile loads .nxdtest file located @path.
def loadNXDTestFile(path): # -> TestEnv
t = TestEnv()
g = {'TestCase': t.TestCase, # TODO + all other public TestEnv methods
'PyTest': PyTest}
'PyTest': PyTest,
'UnitTest': UnitTest,}
with open(path, "r") as f:
src = f.read()
six.exec_(src, g)
......@@ -99,6 +101,12 @@ class TestEnv:
self.testv.append(t)
self.byname[name] = t
def emit(*message):
"""Emit a message on stdout and flush output.
"""
print(*message)
sys.stdout.flush()
def main():
# testnode executes us giving URL to master results collecting instance and other details
# https://lab.nexedi.com/nexedi/erp5/blob/744f3fde/erp5/util/testnode/UnitTestRunner.py#L137
......@@ -134,7 +142,7 @@ def main():
# --list
if args.list:
for t in tenv.testv:
print(t.name)
emit(t.name)
return
# master_url provided -> run tests under master control
......@@ -164,6 +172,13 @@ def main():
# log information about local node
system_info()
if sys.version_info < (3,):
bstdout = sys.stdout
bstderr = sys.stderr
else:
bstdout = sys.stdout.buffer
bstderr = sys.stderr.buffer
# run the tests
devnull = open(os.devnull)
while 1:
......@@ -176,8 +191,8 @@ def main():
t = tenv.byname[test_result_line.name]
tstart = time()
print('\n>>> %s' % t.name)
print('$ %s' % t.command_str())
emit('\n>>> %s' % t.name)
emit('$ %s' % t.command_str())
# default status dict
status = {
......@@ -193,29 +208,28 @@ def main():
# In addition to kw['env'], kw['envadj'] allows users to define
# only adjustments instead of providing full env dict.
# Test command is spawned with unchanged cwd. Instance wrapper cares to set cwd before running us.
# bufsize=1 means 'line buffered'
kw = t.kw.copy()
env = kw.pop('env', os.environ)
env = env.copy()
envadj = kw.pop('envadj', {})
env.update(envadj)
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=1, **kw)
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, **kw)
except:
stdout, stderr = '', traceback.format_exc()
sys.stderr.write(stderr)
stdout, stderr = b'', b(traceback.format_exc())
bstderr.write(stderr)
status['error_count'] += 1
else:
# tee >stdout,stderr so we can also see in testnode logs
# (explicit teeing instead of p.communicate() to be able to see incremental progress)
buf_out = []
buf_err = []
tout = threading.Thread(target=tee, args=(p.stdout, sys.stdout, buf_out))
terr = threading.Thread(target=tee, args=(p.stderr, sys.stderr, buf_err))
tout = threading.Thread(target=tee, args=(p.stdout, bstdout, buf_out))
terr = threading.Thread(target=tee, args=(p.stderr, bstderr, buf_err))
tout.start()
terr.start()
tout.join(); stdout = ''.join(buf_out)
terr.join(); stderr = ''.join(buf_err)
tout.join(); stdout = b''.join(buf_out)
terr.join(); stderr = b''.join(buf_err)
p.wait()
if p.returncode != 0:
......@@ -224,10 +238,10 @@ def main():
# postprocess output, if we can
if t.summaryf is not None:
try:
summary = t.summaryf(stdout)
summary = t.summaryf(stdout, stderr)
except:
bad = traceback.format_exc()
sys.stderr.write(bad)
bad = b(traceback.format_exc())
bstderr.write(bad)
stderr += bad
status['error_count'] += 1
......@@ -248,7 +262,7 @@ def main():
}
tres.update(status)
print(_test_result_summary(t.name, tres))
emit(_test_result_summary(t.name, tres))
test_result_line.stop(**tres)
# tee, similar to tee(1) utility, copies data from fin to fout appending them to buf.
......@@ -296,11 +310,11 @@ def _test_result_summary(name, kw):
# system_info prints information about local computer.
def system_info():
print('date:\t%s' % (strftime("%a, %d %b %Y %H:%M:%S %Z", localtime())))
emit('date:\t%s' % (strftime("%a, %d %b %Y %H:%M:%S %Z", localtime())))
whoami = pwd.getpwuid(os.getuid()).pw_name
print('xnode:\t%s@%s' % (whoami, socket.getfqdn()))
print('uname:\t%s' % ' '.join(os.uname()))
print('cpu:\t%s' % get1('/proc/cpuinfo', 'model name'))
emit('xnode:\t%s@%s' % (whoami, socket.getfqdn()))
emit('uname:\t%s' % ' '.join(os.uname()))
emit('cpu:\t%s' % get1('/proc/cpuinfo', 'model name'))
# get1 returns first entry from file @path prefixed with ^<field>\s*:
def get1(path, field, default=None):
......@@ -350,7 +364,7 @@ class LocalTestResultLine:
# support for well-known summary functions
class PyTest:
@staticmethod
def summary(out): # -> status_dict
def summary(out, err): # -> status_dict
# end of output is like
# ================ 1 failed, 1 passed, 12 skipped in 0.39 seconds ================
# ...
......@@ -363,7 +377,7 @@ class PyTest:
return {}
def get(name, default=None):
m = re.search(r'\b([0-9]+) '+name+r'\b', pytail)
m = re.search(br'\b([0-9]+) ' + name.encode() + br'\b', pytail)
if m is None:
return default
return int(m.group(1))
......@@ -384,5 +398,46 @@ class PyTest:
return stat
class UnitTest:
@staticmethod
def summary(out, err): # -> status_dict
run_re = re.compile(
br'.*Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s',
re.DOTALL)
status_re = re.compile(br"""
.*(OK|FAILED)\s+\(
(failures=(?P<failures>\d+),?\s*)?
(errors=(?P<errors>\d+),?\s*)?
(skipped=(?P<skips>\d+),?\s*)?
(expected\s+failures=(?P<expected_failures>\d+),?\s*)?
(unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)?
\)
""", re.DOTALL | re.VERBOSE)
status_dict = {
}
run = run_re.search(err)
if run:
groupdict = run.groupdict()
status_dict.update(
duration=float(groupdict['seconds']),
test_count=int(groupdict['all_tests']),
error_count=0,
failure_count=0,
skip_count=0,
)
status = status_re.search(err)
if status:
groupdict = status.groupdict()
status_dict.update(
error_count=int(groupdict.get('errors') or 0),
failure_count=int(groupdict.get('failures') or 0)
+ int(groupdict.get('unexpected_successes') or 0),
skip_count=int(groupdict.get('skips') or 0)
+ int(groupdict.get('expected_failures') or 0))
return status_dict
if __name__ == '__main__':
main()
......@@ -21,12 +21,13 @@
from nxdtest import _test_result_summary, PyTest
import pytest
from golang import b
# [] of (name, textout, summaryok)
# [] of (name, out, err, summaryok)
testv = []
def case1(name, textout, summaryok): testv.append((name, textout, summaryok))
def case1(name, out, err, summaryok): testv.append((name, out, err, summaryok))
case1('ok+xfail', """\
case1('ok+xfail', b("""\
============================= test session starts ==============================
platform linux2 -- Python 2.7.18, pytest-4.6.11, py-1.9.0, pluggy-0.13.1
rootdir: /srv/slapgrid/slappart9/srv/testnode/dfq/soft/46d349541123ed5fc6ceea58fd013a51/parts/zodbtools-dev
......@@ -39,10 +40,11 @@ zodbtools/test/test_tidrange.py ............................. [ 81%]
zodbtools/test/test_zodb.py ........ [100%]
=============== 41 passed, 2 xfailed, 1 warnings in 4.62 seconds ===============
""",
"""),
b(''),
'?\ttestname\t1.000s\t# 43t ?e ?f ?s')
case1('ok+fail', """\
case1('ok+fail', b("""\
============================= test session starts ==============================
platform linux2 -- Python 2.7.18, pytest-4.6.11, py-1.9.0, pluggy-0.13.1
rootdir: /srv/slapgrid/slappart16/srv/testnode/dfj/soft/8b9988ce0aa31334c6bd56b40e4bba65/parts/pygolang-dev
......@@ -111,10 +113,11 @@ E Use -v to get the full diff
golang/time_test.py:106: AssertionError
=============== 1 failed, 98 passed, 13 skipped in 26.85 seconds ===============
""",
"""),
b(''),
'?\ttestname\t1.000s\t# 112t ?e 1f 13s')
case1('ok+tailtext', """\
case1('ok+tailtext', b("""\
date: Sun, 08 Nov 2020 12:26:24 MSK
xnode: kirr@deco.navytux.spb.ru
uname: Linux deco 5.9.0-1-amd64 #1 SMP Debian 5.9.1-1 (2020-10-17) x86_64
......@@ -129,13 +132,14 @@ wcfs: 2020/11/08 12:26:38 /tmp/testdb_fs.Z9IvT0/1.fs: watcher: stat /tmp/testdb_
# unmount/stop wcfs pid39653 @ /tmp/wcfs/40cc7154ed758d6a867205e79e320c1d3b56458d
wcfs: 2020/11/08 12:26:38 /tmp/testdb_fs.B3rbby/1.fs: watcher: stat /tmp/testdb_fs.B3rbby/1.fs: use of closed file
# unmount/stop wcfs pid39595 @ /tmp/wcfs/d0b5d036a2cce47fe73003cf2d9f0b22c7043817
""",
"""),
b(''),
'?\ttestname\t1.000s\t# 55t ?e ?f ?s')
@pytest.mark.parametrize("name,textout,summaryok", testv)
def test_pytest_summary(name,textout, summaryok):
@pytest.mark.parametrize("name,out,err,summaryok", testv)
def test_pytest_summary(name, out, err, summaryok):
kw = {'duration': 1.0}
kw.update(PyTest.summary(textout))
kw.update(PyTest.summary(out, err))
summary = _test_result_summary('testname', kw)
assert summary == summaryok
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# verify general functionality
import sys
import re
import pytest
from nxdtest import main
@pytest.fixture
def run_nxdtest(tmpdir):
"""Fixture which returns a function which invokes nxdtest in a temporary
directory, with the provided .nxdtest file content and with arguments
passed as `argv`.
"""
def _run_nxdtest(nxdtest_file_content, argv=("nxdtest",)):
with tmpdir.as_cwd():
with open(".nxdtest", "w") as f:
f.write(nxdtest_file_content)
sys_argv = sys.argv
sys.argv = argv
try:
main()
finally:
sys.argv = sys_argv
return _run_nxdtest
def test_main(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TESTNAME', ['echo', 'TEST OUPUT'])
"""
)
captured = capsys.readouterr()
output_lines = captured.out.splitlines()
assert ">>> TESTNAME" in output_lines
assert "$ echo TEST OUPUT" in output_lines
assert "TEST OUPUT" in output_lines
assert re.match("ok\tTESTNAME\t.*s\t# 1t 0e 0f 0s", output_lines[-1])
def test_error_invoking_command(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TESTNAME', ['not exist command'])
"""
)
captured = capsys.readouterr()
assert "No such file or directory" in captured.err
def test_error_invoking_summary(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TESTNAME', ['echo'], summaryf="error")
"""
)
captured = capsys.readouterr()
assert "TypeError" in captured.err
def test_run_argument(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TEST1', ['echo', 'TEST1'])
TestCase('TEST2', ['echo', 'TEST2'])
""",
argv=["nxdtest", "--run", "TEST1"],
)
captured = capsys.readouterr()
assert "TEST1" in captured.out
assert "TEST2" not in captured.out
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# verify unittest-related functionality
from nxdtest import _test_result_summary, UnitTest
import pytest
from golang import b
# [] of (name, out, err, summaryok)
testv = []
def case1(name, out, err, summaryok): testv.append((name, out, err, summaryok))
case1('ok', b(''), b("""\
test_1 (test.Test) ... ok
test_2 (test.Test) ... ok
test_3 (test.Test) ... ok
----------------------------------------------------------------------
Ran 3 tests in 1.761s
OK
"""),
'ok\ttestname\t1.761s\t# 3t 0e 0f 0s')
case1('ok+xfail', b(''), b("""\
test_1 (test.Test) ... ok
test_2 (test.Test) ... ok
test_3 (test.Test) ... expected failure
----------------------------------------------------------------------
Ran 3 tests in 1.098s
OK (expected failures=1)
"""),
'ok\ttestname\t1.098s\t# 3t 0e 0f 1s')
case1('fail', b(''), b("""\
test_1 (test.Test) ... ok
test_2 (test.Test) ... ok
test_3 (test.Test) ... FAIL
======================================================================
FAIL: test_3 (test.Test)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/srv/slapgrid/slappart4/srv/project/nxdtest/tmp/test.py", line 14, in test_3
self.assertEqual(1, 2)
AssertionError: 1 != 2
----------------------------------------------------------------------
Ran 3 tests in 2.198s
FAILED (failures=1)
"""),
'fail\ttestname\t2.198s\t# 3t 0e 1f 0s')
case1('error', b(''), b("""\
test_1 (test.Test) ... ok
test_2 (test.Test) ... ok
test_3 (test.Test) ... ERROR
======================================================================
ERROR: test_3 (test.Test)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/srv/slapgrid/slappart4/srv/project/nxdtest/tmp/test.py", line 14, in test_3
boom
NameError: name 'boom' is not defined
----------------------------------------------------------------------
Ran 3 tests in 1.684s
FAILED (errors=1)
"""),
'error\ttestname\t1.684s\t# 3t 1e 0f 0s')
case1('error-no-test', b(''), b("""\
usage: python -m unittest discover [-h] [-v] [-q] [--locals] [-f] [-c] [-b]
[-k TESTNAMEPATTERNS] [-s START]
[-p PATTERN] [-t TOP]
python -m unittest discover: error: unrecognized arguments: --argument-error
"""),
'?\ttestname\t1.000s\t# ?t ?e ?f ?s')
case1('error-no-output', b(''), b(''), '?\ttestname\t1.000s\t# ?t ?e ?f ?s')
case1('failed+unexpected_success', b(''), b("""\
test_1 (test.Test) ... ok
test_2 (test.Test) ... ok
test_3 (test.Test) ... unexpected success
----------------------------------------------------------------------
Ran 3 tests in 1.039s
FAILED (unexpected successes=1)
"""),
'fail\ttestname\t1.039s\t# 3t 0e 1f 0s')
case1('mixed-output', b(''), b("""\
----------------------------------------------------------------------
Ran 1 tests in 1.111s
FAILED (failures=1)
----------------------------------------------------------------------
Ran 3 tests in 2.222s
FAILED (failures=3)
"""),
'fail\ttestname\t2.222s\t# 3t 0e 3f 0s')
@pytest.mark.parametrize("name,out,err,summaryok", testv)
def test_unittest_summary(name, out, err, summaryok):
kw = {'duration': 1.0}
kw.update(UnitTest.summary(out, err))
summary = _test_result_summary('testname', kw)
assert summary == summaryok
......@@ -13,7 +13,7 @@ setup(
keywords = 'Nexedi testing infrastructure tool tox',
packages = find_packages(),
install_requires = ['erp5.util', 'six'],
install_requires = ['erp5.util', 'six', 'pygolang'],
extras_require = {
'test': ['pytest'],
},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment