Commit 982bfa4d authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-36670: Multiple regrtest bugfixes (GH-16511)

* Windows: Fix counter name in WindowsLoadTracker. Counter names are
  localized: use the registry to get the counter name. Original
  change written by Lorenz Mende.
* Regrtest.main() now ensures that the Windows load tracker is also
  killed if an exception is raised
* TestWorkerProcess now ensures that worker processes are no longer
  running before exiting: kill also worker processes when an
  exception is raised.
* Enhance regrtest messages and warnings: include test name,
  duration, add a worker identifier, etc.
* Rename MultiprocessRunner to TestWorkerProcess
* Use print_warning() to display warnings.
Co-Authored-By: default avatarLorenz Mende <Lorenz.mende@gmail.com>
parent 8462a493
...@@ -508,10 +508,6 @@ class Regrtest: ...@@ -508,10 +508,6 @@ class Regrtest:
self.run_tests_sequential() self.run_tests_sequential()
def finalize(self): def finalize(self):
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
if self.next_single_filename: if self.next_single_filename:
if self.next_single_test: if self.next_single_test:
with open(self.next_single_filename, 'w') as fp: with open(self.next_single_filename, 'w') as fp:
...@@ -680,11 +676,16 @@ class Regrtest: ...@@ -680,11 +676,16 @@ class Regrtest:
# typeperf.exe for x64, x86 or ARM # typeperf.exe for x64, x86 or ARM
print(f'Failed to create WindowsLoadTracker: {error}') print(f'Failed to create WindowsLoadTracker: {error}')
self.run_tests() try:
self.display_result() self.run_tests()
self.display_result()
if self.ns.verbose2 and self.bad:
self.rerun_failed_tests() if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
self.finalize() self.finalize()
......
...@@ -15,7 +15,7 @@ from test.libregrtest.runtest import ( ...@@ -15,7 +15,7 @@ from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result, TestResult, is_failed, TIMEOUT) format_test_result, TestResult, is_failed, TIMEOUT)
from test.libregrtest.setup import setup_tests from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration from test.libregrtest.utils import format_duration, print_warning
# Display the running tests if nothing happened last N seconds # Display the running tests if nothing happened last N seconds
...@@ -103,9 +103,10 @@ class ExitThread(Exception): ...@@ -103,9 +103,10 @@ class ExitThread(Exception):
pass pass
class MultiprocessThread(threading.Thread): class TestWorkerProcess(threading.Thread):
def __init__(self, pending, output, ns, timeout): def __init__(self, worker_id, pending, output, ns, timeout):
super().__init__() super().__init__()
self.worker_id = worker_id
self.pending = pending self.pending = pending
self.output = output self.output = output
self.ns = ns self.ns = ns
...@@ -114,12 +115,16 @@ class MultiprocessThread(threading.Thread): ...@@ -114,12 +115,16 @@ class MultiprocessThread(threading.Thread):
self.start_time = None self.start_time = None
self._popen = None self._popen = None
self._killed = False self._killed = False
self._stopped = False
def __repr__(self): def __repr__(self):
info = ['MultiprocessThread'] info = [f'TestWorkerProcess #{self.worker_id}']
test = self.current_test_name
if self.is_alive(): if self.is_alive():
info.append('alive') dt = time.monotonic() - self.start_time
info.append("running for %s" % format_duration(dt))
else:
info.append('stopped')
test = self.current_test_name
if test: if test:
info.append(f'test={test}') info.append(f'test={test}')
popen = self._popen popen = self._popen
...@@ -128,53 +133,24 @@ class MultiprocessThread(threading.Thread): ...@@ -128,53 +133,24 @@ class MultiprocessThread(threading.Thread):
return '<%s>' % ' '.join(info) return '<%s>' % ' '.join(info)
def _kill(self): def _kill(self):
dt = time.monotonic() - self.start_time if self._killed:
return
self._killed = True
popen = self._popen popen = self._popen
pid = popen.pid if popen is None:
print("Kill worker process %s running for %.1f sec" % (pid, dt), return
file=sys.stderr, flush=True)
print(f"Kill {self}", file=sys.stderr, flush=True)
try: try:
popen.kill() popen.kill()
return True
except OSError as exc: except OSError as exc:
print("WARNING: Failed to kill worker process %s: %r" % (pid, exc), print_warning(f"Failed to kill {self}: {exc!r}")
file=sys.stderr, flush=True)
return False
def _close_wait(self):
popen = self._popen
# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()
try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print("WARNING: Failed to wait for worker process %s "
"completion (timeout=%.1f sec): %r"
% (popen.pid, JOIN_TIMEOUT, exc),
file=sys.stderr, flush=True)
def kill(self):
"""
Kill the current process (if any).
This method can be called by the thread running the process,
or by another thread.
"""
self._killed = True
if self._popen is None:
return
if not self._kill():
return
self._close_wait() def stop(self):
# Method called from a different thread to stop this thread
self._stopped = True
self._kill()
def mp_result_error(self, test_name, error_type, stdout='', stderr='', def mp_result_error(self, test_name, error_type, stdout='', stderr='',
err_msg=None): err_msg=None):
...@@ -190,59 +166,69 @@ class MultiprocessThread(threading.Thread): ...@@ -190,59 +166,69 @@ class MultiprocessThread(threading.Thread):
try: try:
stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT) stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc: except (subprocess.TimeoutExpired, OSError) as exc:
print("WARNING: Failed to read worker process %s output " print_warning(f"Failed to read {self} output "
"(timeout=%.1f sec): %r" f"(timeout={format_duration(JOIN_TIMEOUT)}): "
% (popen.pid, JOIN_TIMEOUT, exc), f"{exc!r}")
file=sys.stderr, flush=True)
self._close_wait()
return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
def _runtest(self, test_name): def _run_process(self, test_name):
try: self.start_time = time.monotonic()
self.start_time = time.monotonic()
self.current_test_name = test_name
self.current_test_name = test_name
try:
self._killed = False
self._popen = run_test_in_subprocess(test_name, self.ns) self._popen = run_test_in_subprocess(test_name, self.ns)
popen = self._popen popen = self._popen
except:
self.current_test_name = None
raise
try:
if self._stopped:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self._kill()
raise ExitThread
try: try:
try: stdout, stderr = popen.communicate(timeout=self.timeout)
if self._killed: except subprocess.TimeoutExpired:
# If kill() has been called before self._popen is set, if self._stopped:
# self._popen is still running. Call again kill() # kill() has been called: communicate() fails
# to ensure that the process is killed. # on reading closed stdout/stderr
self.kill() raise ExitThread
raise ExitThread
return self._timedout(test_name)
try: except OSError:
stdout, stderr = popen.communicate(timeout=self.timeout) if self._stopped:
except subprocess.TimeoutExpired: # kill() has been called: communicate() fails
if self._killed: # on reading closed stdout/stderr
# kill() has been called: communicate() fails raise ExitThread
# on reading closed stdout/stderr raise
raise ExitThread
return self._timedout(test_name)
except OSError:
if self._killed:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
raise
except:
self.kill()
raise
finally:
self._close_wait()
retcode = popen.returncode retcode = popen.returncode
stdout = stdout.strip()
stderr = stderr.rstrip()
return (retcode, stdout, stderr)
except:
self._kill()
raise
finally: finally:
self.current_test_name = None self._wait_completed()
self._popen = None self._popen = None
self.current_test_name = None
def _runtest(self, test_name):
result = self._run_process(test_name)
stdout = stdout.strip() if isinstance(result, MultiprocessResult):
stderr = stderr.rstrip() # _timedout() case
return result
retcode, stdout, stderr = result
err_msg = None err_msg = None
if retcode != 0: if retcode != 0:
...@@ -266,7 +252,7 @@ class MultiprocessThread(threading.Thread): ...@@ -266,7 +252,7 @@ class MultiprocessThread(threading.Thread):
return MultiprocessResult(result, stdout, stderr, err_msg) return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self): def run(self):
while not self._killed: while not self._stopped:
try: try:
try: try:
test_name = next(self.pending) test_name = next(self.pending)
...@@ -284,6 +270,33 @@ class MultiprocessThread(threading.Thread): ...@@ -284,6 +270,33 @@ class MultiprocessThread(threading.Thread):
self.output.put((True, traceback.format_exc())) self.output.put((True, traceback.format_exc()))
break break
def _wait_completed(self):
popen = self._popen
# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()
try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print_warning(f"Failed to wait for {self} completion "
f"(timeout={format_duration(JOIN_TIMEOUT)}): "
f"{exc!r}")
def wait_stopped(self, start_time):
while True:
# Write a message every second
self.join(1.0)
if not self.is_alive():
break
dt = time.monotonic() - start_time
print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
if dt > JOIN_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break
def get_running(workers): def get_running(workers):
running = [] running = []
...@@ -298,7 +311,7 @@ def get_running(workers): ...@@ -298,7 +311,7 @@ def get_running(workers):
return running return running
class MultiprocessRunner: class MultiprocessTestRunner:
def __init__(self, regrtest): def __init__(self, regrtest):
self.regrtest = regrtest self.regrtest = regrtest
self.ns = regrtest.ns self.ns = regrtest.ns
...@@ -311,30 +324,20 @@ class MultiprocessRunner: ...@@ -311,30 +324,20 @@ class MultiprocessRunner:
self.workers = None self.workers = None
def start_workers(self): def start_workers(self):
self.workers = [MultiprocessThread(self.pending, self.output, self.workers = [TestWorkerProcess(index, self.pending, self.output,
self.ns, self.worker_timeout) self.ns, self.worker_timeout)
for _ in range(self.ns.use_mp)] for index in range(1, self.ns.use_mp + 1)]
print("Run tests in parallel using %s child processes" print("Run tests in parallel using %s child processes"
% len(self.workers)) % len(self.workers))
for worker in self.workers: for worker in self.workers:
worker.start() worker.start()
def wait_workers(self): def stop_workers(self):
start_time = time.monotonic() start_time = time.monotonic()
for worker in self.workers: for worker in self.workers:
worker.kill() worker.stop()
for worker in self.workers: for worker in self.workers:
while True: worker.wait_stopped(start_time)
worker.join(1.0)
if not worker.is_alive():
break
dt = time.monotonic() - start_time
print("Wait for regrtest worker %r for %.1f sec" % (worker, dt),
flush=True)
if dt > JOIN_TIMEOUT:
print("Warning -- failed to join a regrtest worker %s"
% worker, flush=True)
break
def _get_result(self): def _get_result(self):
if not any(worker.is_alive() for worker in self.workers): if not any(worker.is_alive() for worker in self.workers):
...@@ -418,10 +421,11 @@ class MultiprocessRunner: ...@@ -418,10 +421,11 @@ class MultiprocessRunner:
if self.ns.timeout is not None: if self.ns.timeout is not None:
faulthandler.cancel_dump_traceback_later() faulthandler.cancel_dump_traceback_later()
# a test failed (and --failfast is set) or all tests completed # Always ensure that all worker processes are no longer
self.pending.stop() # worker when we exit this function
self.wait_workers() self.pending.stop()
self.stop_workers()
def run_tests_multiprocess(regrtest): def run_tests_multiprocess(regrtest):
MultiprocessRunner(regrtest).run_tests() MultiprocessTestRunner(regrtest).run_tests()
...@@ -3,16 +3,22 @@ import msvcrt ...@@ -3,16 +3,22 @@ import msvcrt
import os import os
import subprocess import subprocess
import uuid import uuid
import winreg
from test import support from test import support
from test.libregrtest.utils import print_warning
# Max size of asynchronous reads # Max size of asynchronous reads
BUFSIZE = 8192 BUFSIZE = 8192
# Exponential damping factor (see below) # Exponential damping factor (see below)
LOAD_FACTOR_1 = 0.9200444146293232478931553241 LOAD_FACTOR_1 = 0.9200444146293232478931553241
# Seconds per measurement # Seconds per measurement
SAMPLING_INTERVAL = 5 SAMPLING_INTERVAL = 5
COUNTER_NAME = r'\System\Processor Queue Length' # Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
# of typeperf are registered
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
r"\Perflib\CurrentLanguage")
class WindowsLoadTracker(): class WindowsLoadTracker():
...@@ -25,7 +31,8 @@ class WindowsLoadTracker(): ...@@ -25,7 +31,8 @@ class WindowsLoadTracker():
def __init__(self): def __init__(self):
self.load = 0.0 self.load = 0.0
self.p = None self.counter_name = ''
self.popen = None
self.start() self.start()
def start(self): def start(self):
...@@ -55,31 +62,46 @@ class WindowsLoadTracker(): ...@@ -55,31 +62,46 @@ class WindowsLoadTracker():
overlap.GetOverlappedResult(True) overlap.GetOverlappedResult(True)
# Spawn off the load monitor # Spawn off the load monitor
command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)] counter_name = self._get_counter_name()
self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD) command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
# Close our copy of the write end of the pipe # Close our copy of the write end of the pipe
os.close(command_stdout) os.close(command_stdout)
def _get_counter_name(self):
# accessing the registry to get the counter localization name
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
# Convert [key1, value1, key2, value2, ...] list
# to {key1: value1, key2: value2, ...} dict
counters = iter(counters)
counters_dict = dict(zip(counters, counters))
# System counter has key '2' and Processor Queue Length has key '44'
system = counters_dict['2']
process_queue_length = counters_dict['44']
return f'"\\{system}\\{process_queue_length}"'
def close(self): def close(self):
if self.p is None: if self.popen is None:
return return
self.p.kill() self.popen.kill()
self.p.wait() self.popen.wait()
self.p = None self.popen = None
def __del__(self): def __del__(self):
self.close() self.close()
def read_output(self): def read_output(self):
import _winapi
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
bytes_read, res = overlapped.GetOverlappedResult(False) bytes_read, res = overlapped.GetOverlappedResult(False)
if res != 0: if res != 0:
return return
return overlapped.getbuffer().decode() output = overlapped.getbuffer()
return output.decode('oem', 'replace')
def getloadavg(self): def getloadavg(self):
typeperf_output = self.read_output() typeperf_output = self.read_output()
...@@ -89,14 +111,29 @@ class WindowsLoadTracker(): ...@@ -89,14 +111,29 @@ class WindowsLoadTracker():
# Process the backlog of load values # Process the backlog of load values
for line in typeperf_output.splitlines(): for line in typeperf_output.splitlines():
# Ignore the initial header:
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
if '\\\\' in line:
continue
# Ignore blank lines
if not line.strip():
continue
# typeperf outputs in a CSV format like this: # typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000" # "07/19/2018 01:32:26.605","3.000000"
toks = line.split(',') # (date, process queue length)
# Ignore blank lines and the initial header try:
if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2: tokens = line.split(',')
if len(tokens) != 2:
raise ValueError
value = tokens[1].replace('"', '')
load = float(value)
except ValueError:
print_warning("Failed to parse typeperf output: %a" % line)
continue continue
load = float(toks[1].replace('"', ''))
# We use an exponentially weighted moving average, imitating the # We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems. # load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment