Commit ad3fef8e authored by Stefan Behnel's avatar Stefan Behnel

Print pipeline timings after test runs.

parent 316a3fb1
......@@ -343,11 +343,30 @@ def insert_into_pipeline(pipeline, transform, before=None, after=None):
_pipeline_entry_points = {}
try:
from threading import local as _threadlocal
except ImportError:
class _threadlocal(object): pass
threadlocal = _threadlocal()
def get_timings():
try:
return threadlocal.cython_pipeline_timings
except AttributeError:
return {}
def run_pipeline(pipeline, source, printtree=True):
from .Visitor import PrintTree
exec_ns = globals().copy() if DebugFlags.debug_verbose_pipeline else None
try:
timings = threadlocal.cython_pipeline_timings
except AttributeError:
timings = threadlocal.cython_pipeline_timings = {}
def run(phase, data):
return phase(data)
......@@ -356,22 +375,32 @@ def run_pipeline(pipeline, source, printtree=True):
try:
try:
for phase in pipeline:
if phase is not None:
if not printtree and isinstance(phase, PrintTree):
continue
if DebugFlags.debug_verbose_pipeline:
t = time()
print("Entering pipeline phase %r" % phase)
# create a new wrapper for each step to show the name in profiles
phase_name = getattr(phase, '__name__', type(phase).__name__)
try:
run = _pipeline_entry_points[phase_name]
except KeyError:
exec("def %s(phase, data): return phase(data)" % phase_name, exec_ns)
run = _pipeline_entry_points[phase_name] = exec_ns[phase_name]
data = run(phase, data)
if DebugFlags.debug_verbose_pipeline:
print(" %.3f seconds" % (time() - t))
if phase is None:
continue
if not printtree and isinstance(phase, PrintTree):
continue
phase_name = getattr(phase, '__name__', type(phase).__name__)
if DebugFlags.debug_verbose_pipeline:
print("Entering pipeline phase %r" % phase)
# create a new wrapper for each step to show the name in profiles
try:
run = _pipeline_entry_points[phase_name]
except KeyError:
exec("def %s(phase, data): return phase(data)" % phase_name, exec_ns)
run = _pipeline_entry_points[phase_name] = exec_ns[phase_name]
t = time()
data = run(phase, data)
t = time() - t
try:
old_t, count = timings[phase_name]
except KeyError:
old_t, count = 0, 0
timings[phase_name] = (old_t + int(t * 1000000), count + 1)
if DebugFlags.debug_verbose_pipeline:
print(" %.3f seconds" % t)
except CompileError as err:
# err is set
Errors.report_error(err, use_stack=False)
......
......@@ -647,8 +647,8 @@ class Stats(object):
self.test_times = defaultdict(float)
self.top_tests = defaultdict(list)
def add_time(self, name, language, metric, t):
self.test_counts[metric] += 1
def add_time(self, name, language, metric, t, count=1):
self.test_counts[metric] += count
self.test_times[metric] += t
top = self.top_tests[metric]
push = heapq.heappushpop if len(top) >= self.top_n else heapq.heappush
......@@ -2396,16 +2396,23 @@ def main():
# NOTE: create process pool before time stamper thread to avoid forking issues.
total_time = time.time()
stats = Stats()
merged_pipeline_stats = defaultdict(lambda: (0, 0))
with time_stamper_thread(interval=keep_alive_interval):
for shard_num, shard_stats, return_code, failure_output in pool.imap_unordered(runtests_callback, tasks):
for shard_num, shard_stats, pipeline_stats, return_code, failure_output in pool.imap_unordered(runtests_callback, tasks):
if return_code != 0:
error_shards.append(shard_num)
failure_outputs.append(failure_output)
sys.stderr.write("FAILED (%s/%s)\n" % (shard_num, options.shard_count))
sys.stderr.write("ALL DONE (%s/%s)\n" % (shard_num, options.shard_count))
stats.update(shard_stats)
for stage_name, (stage_time, stage_count) in pipeline_stats.items():
old_time, old_count = merged_pipeline_stats[stage_name]
merged_pipeline_stats[stage_name] = (old_time + stage_time, old_count + stage_count)
pool.close()
pool.join()
total_time = time.time() - total_time
sys.stderr.write("Sharded tests run in %d seconds (%.1f minutes)\n" % (round(total_time), total_time / 60.))
if error_shards:
......@@ -2417,14 +2424,29 @@ def main():
return_code = 0
else:
with time_stamper_thread(interval=keep_alive_interval):
_, stats, return_code, _ = runtests(options, cmd_args, coverage)
_, stats, merged_pipeline_stats, return_code, _ = runtests(options, cmd_args, coverage)
if coverage:
if options.shard_count > 1 and options.shard_num == -1:
coverage.combine()
coverage.stop()
def as_msecs(t, unit=1000000):
# pipeline times are in msecs
return t // unit + float(t % unit) / unit
pipeline_stats = [
(as_msecs(stage_time), as_msecs(stage_time) / stage_count, stage_count, stage_name)
for stage_name, (stage_time, stage_count) in merged_pipeline_stats.items()
]
pipeline_stats.sort(reverse=True)
sys.stderr.write("Most expensive pipeline stages: %s\n" % ", ".join(
"%r: %.2f / %d (%.3f / run)" % (stage_name, total_stage_time, stage_count, stage_time)
for total_stage_time, stage_time, stage_count, stage_name in pipeline_stats[:10]
))
stats.print_stats(sys.stderr)
if coverage:
save_coverage(coverage, options)
......@@ -2789,6 +2811,9 @@ def runtests(options, cmd_args, coverage=None):
if common_utility_dir and options.shard_num < 0 and options.cleanup_workdir:
shutil.rmtree(common_utility_dir)
from Cython.Compiler.Pipeline import get_timings
pipeline_stats = get_timings()
if missing_dep_excluder.tests_missing_deps:
sys.stderr.write("Following tests excluded because of missing dependencies on your system:\n")
for test in missing_dep_excluder.tests_missing_deps:
......@@ -2805,7 +2830,7 @@ def runtests(options, cmd_args, coverage=None):
else:
failure_output = "".join(collect_failure_output(result))
return options.shard_num, stats, result_code, failure_output
return options.shard_num, stats, pipeline_stats, result_code, failure_output
def collect_failure_output(result):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment