Commit 54c6b000 authored by Kirill Smelkov's avatar Kirill Smelkov

Propagate cancellation to spawned test jobs

A user might cancel test result in ERP5 UI if e.g. some misbehaviour is
detected and a new revision is ready to be tested. This works by
test_result.start() returning None - indicating that there is no more
test_result_lines to exercise. Master also indicates this cancellation
via test_result.isAlive() returning False, but until now we were not
using that information and were always waiting for completion of current
test job that is already spawned.

This works well in practice if individual tests are not long, but e.g.
for SlapOS.SoftwareReleases.IntegrationTest-* it is not good, because
there an individual test might takes _hours_ to execute.

-> Fix it by first setting global context to where we'll propagate
cancellation from test_result.isAlive, and by using that context as the
base for all other activities. This should terminate spawned test
process if test_result is canceled.

The interval to check is picked up as 5 minutes not to overload master.
@jerome says that

    We now have 341 active test nodes, but sometimes we are using
    more, we did in the past to stress test some new machines.

    For the developer, if we reduce the waiting time from a few hours to 1
    minutes or 5 minutes seems more or less equivalent.

For 350 testnodes and each nxdtest checking its test_result status via
isAlive query to master every 5 minutes, it results in ~ 1 isAlive
request/second to master on average.

Had to change time to golang.time to use time.after().
Due to that time() and sleep() are changed to time.now() and
time.sleep() correspondingly.

/helped-by @jerome
parent fa265211
...@@ -57,12 +57,12 @@ from __future__ import print_function, absolute_import ...@@ -57,12 +57,12 @@ from __future__ import print_function, absolute_import
from erp5.util.taskdistribution import TaskDistributor from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from time import time, sleep, strftime, gmtime, localtime from time import strftime, gmtime, localtime
import os, sys, argparse, logging, traceback, re, pwd, socket import os, sys, argparse, logging, traceback, re, pwd, socket
from errno import ESRCH, EPERM from errno import ESRCH, EPERM
import six import six
from golang import b, defer, func, select, default from golang import b, defer, func, select, default
from golang import errors, context, sync from golang import errors, context, sync, time
import psutil import psutil
# loadNXDTestFile loads .nxdtest file located @path. # loadNXDTestFile loads .nxdtest file located @path.
...@@ -211,6 +211,28 @@ def main(): ...@@ -211,6 +211,28 @@ def main():
bstdout = sys.stdout.buffer bstdout = sys.stdout.buffer
bstderr = sys.stderr.buffer bstderr = sys.stderr.buffer
# setup context that is canceled when/if test_result is canceled on master
# we will use this context as the base for all spawned jobs
ctx, cancel = context.with_cancel(context.background())
cancelWG = sync.WorkGroup(ctx)
@func
def _(ctx):
defer(cancel)
while 1:
_, _rx = select(
ctx.done().recv, # 0
time.after(5*time.minute).recv, # 1 NOTE not e.g. one second not to overload master
)
if _ == 0:
break
if not test_result.isAlive():
emit("# master asks to cancel test run")
break
cancelWG.go(_)
defer(cancelWG.wait)
defer(cancel)
# run the tests # run the tests
devnull = open(os.devnull) devnull = open(os.devnull)
while 1: while 1:
...@@ -225,7 +247,7 @@ def main(): ...@@ -225,7 +247,7 @@ def main():
# run tenv[name] # run tenv[name]
t = tenv.byname[test_result_line.name] t = tenv.byname[test_result_line.name]
tstart = time() tstart = time.now()
emit('\n>>> %s' % t.name) emit('\n>>> %s' % t.name)
emit('$ %s' % t.command_str()) emit('$ %s' % t.command_str())
...@@ -263,7 +285,7 @@ def main(): ...@@ -263,7 +285,7 @@ def main():
# (explicit teeing instead of p.communicate() to be able to see incremental progress) # (explicit teeing instead of p.communicate() to be able to see incremental progress)
buf_out = [] buf_out = []
buf_err = [] buf_err = []
wg = sync.WorkGroup(context.background()) wg = sync.WorkGroup(ctx)
wg.go(tee, p.stdout, bstdout, buf_out) wg.go(tee, p.stdout, bstdout, buf_out)
wg.go(tee, p.stderr, bstderr, buf_err) wg.go(tee, p.stderr, bstderr, buf_err)
# wait for p to exit # wait for p to exit
...@@ -285,7 +307,7 @@ def main(): ...@@ -285,7 +307,7 @@ def main():
err = ctx.err() err = ctx.err()
break break
sleep(0.1) time.sleep(0.1)
# p should be done - check if it leaked processes and terminate/kill them # p should be done - check if it leaked processes and terminate/kill them
# kill p in the end if it does not stop from just SIGTERM. # kill p in the end if it does not stop from just SIGTERM.
...@@ -333,7 +355,7 @@ def main(): ...@@ -333,7 +355,7 @@ def main():
status.update(summary) status.update(summary)
tend = time() tend = time.now()
# print summary and report result of test run back to master # print summary and report result of test run back to master
tres = { tres = {
...@@ -465,6 +487,9 @@ class LocalTestResult: ...@@ -465,6 +487,9 @@ class LocalTestResult:
test_result_line.name = t.name test_result_line.name = t.name
return test_result_line return test_result_line
def isAlive(self): # -> bool (whether still running)
return True # don't need to handle SIGINT - CTRL+C interrupts whole process
class LocalTestResultLine: class LocalTestResultLine:
def stop(self, **kw): def stop(self, **kw):
# XXX + dump .json ? # XXX + dump .json ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment