Commit 0ad45a9c authored by Kirill Smelkov's avatar Kirill Smelkov

Detect if a test leaks processes and terminate them

For every TestCase nxdtest spawns test process to run with stdout/stderr
redirected to pipes that nxdtest reads. Nxdtest, in turn, tees those
pipes to its stdout/stderr until the pipes become EOF. If the test
process, in turn, spawns other processes, those other processes will
inherit opened pipes, and so the pipes won't become EOF untill _all_
spawned test processes (main test process + other processes that it
spawns) exit. Thus, if there will be any process, that the main test
process spawned, but did not terminated upon its own exit, nxdtest will
get stuck waiting for pipes to become EOF which won't happen at all if a
spawned test subprocess persists not to terminate.

I hit this problem for real on a Wendelin.core 2 test - there the main
test processes was segfaulting and so did not instructed other spawned
processes (ZEO, WCFS, ...) to terminate. As the result the whole test
was becoming stuck instead of being promptly reported as failed:

    runTestSuite: Makefile:175: recipe for target 'test.wcfs' failed
    runTestSuite: make: *** [test.wcfs] Segmentation fault
    runTestSuite: wcfs: 2021/08/09 17:32:09 zlink [::1]:52052 - [::1]:23386: recvPkt: EOF
    runTestSuite: E0809 17:32:09.376800   38082 wcfs.go:2574] zwatch zeo://localhost:23386: zlink [::1]:52052 - [::1]:23386: recvPkt: EOF
    runTestSuite: E0809 17:32:09.377431   38082 wcfs.go:2575] zwatcher failed -> switching filesystem to EIO mode (TODO)
    <LONG WAIT>
    runTestSuite: PROCESS TOO LONG OR DEAD, GOING TO BE TERMINATED

-> Fix it.

/reviewed-by @jerome
/reviewed-on nexedi/nxdtest!9
parent b5a74214
Pipeline #16923 passed with stage
in 0 seconds
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors. # Copyright (C) 2018-2021 Nexedi SA and Contributors.
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -57,11 +57,13 @@ from __future__ import print_function, absolute_import ...@@ -57,11 +57,13 @@ from __future__ import print_function, absolute_import
from erp5.util.taskdistribution import TaskDistributor from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from time import time, strftime, gmtime, localtime from time import time, sleep, strftime, gmtime, localtime
import os, sys, argparse, logging, traceback, re, pwd, socket import os, sys, argparse, logging, traceback, re, pwd, socket
from errno import ESRCH, EPERM
import six import six
from golang import b from golang import b, select, default
from golang import context, sync from golang import context, sync
import psutil
# loadNXDTestFile loads .nxdtest file located @path. # loadNXDTestFile loads .nxdtest file located @path.
def loadNXDTestFile(path): # -> TestEnv def loadNXDTestFile(path): # -> TestEnv
...@@ -214,7 +216,11 @@ def main(): ...@@ -214,7 +216,11 @@ def main():
env = env.copy() env = env.copy()
envadj = kw.pop('envadj', {}) envadj = kw.pop('envadj', {})
env.update(envadj) env.update(envadj)
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, **kw) # run the command in a new session, so that it is easy to find out leaked spawned subprocesses.
# TODO session -> cgroup, because a child process could create another new session.
def newsession():
os.setsid()
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, preexec_fn=newsession, **kw)
except: except:
stdout, stderr = b'', b(traceback.format_exc()) stdout, stderr = b'', b(traceback.format_exc())
bstderr.write(stderr) bstderr.write(stderr)
...@@ -227,10 +233,40 @@ def main(): ...@@ -227,10 +233,40 @@ def main():
wg = sync.WorkGroup(context.background()) wg = sync.WorkGroup(context.background())
wg.go(tee, p.stdout, bstdout, buf_out) wg.go(tee, p.stdout, bstdout, buf_out)
wg.go(tee, p.stderr, bstderr, buf_err) wg.go(tee, p.stderr, bstderr, buf_err)
# wait for p to exit
def _(ctx):
while 1:
done = p.poll()
if done is not None:
break
# cancel -> kill p
_, _rx = select(
default, # 0
ctx.done().recv, # 1
)
if _ == 1:
p.kill()
break
sleep(0.1)
# p is done - check if it leaked processes and kill them
while 1:
procv = session_proclist(sid=p.pid)
if len(procv) == 0:
break
for proc in procv:
emit('# leaked pid=%d %r %s' % (proc.pid, proc.name(), proc.cmdline()))
proc.terminate()
gone, alive = psutil.wait_procs(procv, timeout=5)
for proc in alive:
p.kill()
wg.go(_)
wg.wait() wg.wait()
stdout = b''.join(buf_out) stdout = b''.join(buf_out)
stderr = b''.join(buf_err) stderr = b''.join(buf_err)
p.wait()
if p.returncode != 0: if p.returncode != 0:
status['error_count'] += 1 status['error_count'] += 1
...@@ -337,6 +373,23 @@ def get1(path, field, default=None): ...@@ -337,6 +373,23 @@ def get1(path, field, default=None):
raise KeyError('%s does not have field %r' % (path, field)) raise KeyError('%s does not have field %r' % (path, field))
# session_proclist returns all processes that belong to specified session.
def session_proclist(sid):
procv = []
for proc in psutil.process_iter(['pid']):
try:
proc_sid = os.getsid(proc.pid)
except OSError as e:
if e.errno in (ESRCH, EPERM):
# proc either finished, or we are not allowed to retrieve its sid
# (see getsid(1) for details)
continue
raise
if proc_sid == sid:
procv.append(proc)
return procv
# LocalTestResult* handle tests runs, when master_url was not provided and tests are run locally. # LocalTestResult* handle tests runs, when master_url was not provided and tests are run locally.
class LocalTestResult: class LocalTestResult:
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2020 Nexedi SA and Contributors. # Copyright (C) 2020-2021 Nexedi SA and Contributors.
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -21,6 +21,10 @@ ...@@ -21,6 +21,10 @@
import sys import sys
import re import re
import time
from os.path import dirname
from golang import chan, select, default, func, defer
from golang import context, sync
import pytest import pytest
...@@ -110,3 +114,41 @@ TestCase('TEST10', ['echo', 'TEST10']) ...@@ -110,3 +114,41 @@ TestCase('TEST10', ['echo', 'TEST10'])
assert "TEST1" in captured.out assert "TEST1" in captured.out
assert "TEST10" in captured.out assert "TEST10" in captured.out
assert "TEST2" not in captured.out assert "TEST2" not in captured.out
# verify that nxdtest detects leaked processes.
@pytest.mark.timeout(timeout=10)
def test_run_procleak(run_nxdtest, capsys):
procleak = "%s/testprog/procleak" % (dirname(__file__),)
# run nxdtest in thread so that timeout handling works
# ( if nxdtest is run on main thread, then non-py wait in WorkGroup.wait, if
# stuck, prevents signals from being handled at python-level )
wg = sync.WorkGroup(context.background())
done = chan()
@func
def _(ctx):
defer(done.close)
run_nxdtest(
"""\
TestCase('TEST_WITH_PROCLEAK', ['%s', 'AAA', 'BBB', 'CCC'])
""" % procleak
)
wg.go(_)
while 1:
_, _rx = select(
default, # 0
done.recv, # 1
)
if _ == 0:
time.sleep(0.1)
continue
wg.wait()
break
captured = capsys.readouterr()
assert "AAA: terminating" in captured.out
assert "BBB: terminating" in captured.out
assert "CCC: terminating" in captured.out
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program procleak helps to verify that nxdtest detects processes leaked during test run."""
from __future__ import print_function, absolute_import
import sys, subprocess, time
from signal import signal, SIGTERM
from setproctitle import setproctitle
def main():
# leak jobs as (grand(grand))children. This verifies that nxdtest
# kills/terminates not only direct children of main spawned process.
if sys.argv[1] == '__leak':
jobs = sys.argv[2:]
if len(jobs) > 1:
leak(jobs[1:])
hang(jobs[0])
return # unreachable
jobs = sys.argv[1:]
print(">>> procleak %r" % (jobs,))
leak(jobs)
# give time to spawned processes to install their signal handlers
time.sleep(1)
# do not wait for spawned processes to terminate
print("<<< procleak")
# leak spawns jobs processes that will hang forever.
def leak(jobs):
proc = subprocess.Popen([__file__, '__leak'] + jobs)
# do not wait for proc to terminate
# hang hands current process forever.
def hang(job):
setproctitle("procleak: %s" % job)
def _(sig, frame):
print('%s: terminating' % job)
raise SystemExit
signal(SIGTERM, _)
while 1:
print('%s: hanging ...' % job)
time.sleep(1)
if __name__ == '__main__':
main()
...@@ -13,9 +13,9 @@ setup( ...@@ -13,9 +13,9 @@ setup(
keywords = 'Nexedi testing infrastructure tool tox', keywords = 'Nexedi testing infrastructure tool tox',
packages = find_packages(), packages = find_packages(),
install_requires = ['erp5.util', 'six', 'pygolang'], install_requires = ['erp5.util', 'six', 'pygolang', 'psutil'],
extras_require = { extras_require = {
'test': ['pytest'], 'test': ['pytest', 'pytest-timeout', 'setproctitle'],
}, },
entry_points= {'console_scripts': ['nxdtest = nxdtest:main']}, entry_points= {'console_scripts': ['nxdtest = nxdtest:main']},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment