Commit 318efce0 authored by Kirill Smelkov's avatar Kirill Smelkov

py.bench: Move it to -> pygolang

So that it can be available to everyone and in particular B & friends to
be available from introduced importable golang.testing package. The move
itself:

	pygolang@9bf03d9c

While moving the code was restructured / improved a bit and py.bench
interface reworked to mimic `go test -bench` in defaults.
parent 60e8f859
...@@ -21,7 +21,7 @@ all : ...@@ -21,7 +21,7 @@ all :
PYTHON ?= python PYTHON ?= python
PYTEST ?= $(PYTHON) -m pytest PYTEST ?= $(PYTHON) -m pytest
PYBENCH ?= $(PYTHON) t/py.bench PYBENCH ?= $(PYTHON) -m golang.cmd.pybench
VALGRIND?= valgrind VALGRIND?= valgrind
# use the same C compiler as python # use the same C compiler as python
...@@ -198,4 +198,4 @@ bench : bench.t bench.py ...@@ -198,4 +198,4 @@ bench : bench.t bench.py
bench.t : $(BENCHV.C:%=%.trun) bench.t : $(BENCHV.C:%=%.trun)
bench.py: bigfile/_bigfile.so bench.py: bigfile/_bigfile.so
$(PYBENCH) $(PYTEST_IGNORE) $(PYBENCH) --count=3 --forked $(PYTEST_IGNORE)
...@@ -249,7 +249,7 @@ setup( ...@@ -249,7 +249,7 @@ setup(
], ],
extras_require = { extras_require = {
'test': ['pytest'], 'test': ['pytest', 'pygolang >= 0.0.0.dev4'],
}, },
cmdclass = {'build_ext': build_ext, cmdclass = {'build_ext': build_ext,
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# benchmarking via py.test
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
#
# TODO intro
from __future__ import print_function
import pytest
import _pytest.runner
from _pytest.terminal import TerminalReporter as _pytest_TerminalReporter
from time import time
from math import ceil, log10
from py.process import ForkedFunc
import sys
from six.moves import range as xrange
# XXX hack for ForkedFunc not to capture stdout/stderr.
# so that test prints could be seen when run under --capture=no
# otoh, py.test captures stdout/stderr fds so here we don't have to.
import py._process.forkedfunc
class XForkedFunc(ForkedFunc):
def _child(self, nice, on_start, on_exit):
# we are monkeypatching only in child, so no effect on parent =safe
py._process.forkedfunc.get_unbuffered_io = self._fake_get_unbuffered_io
return ForkedFunc._child(self, nice, on_start, on_exit)
@staticmethod
def _fake_get_unbuffered_io(fd, _):
if fd == 1: return sys.stdout
if fd == 2: return sys.stderr
raise RuntimeError("only stdout/stderr expected")
# B is benchmarking timer/request passed to benchmarks as fixture
# similar to https://golang.org/pkg/testing/#B.
class B:
def __init__(self):
self.N = 1 # default when func does not accept `b` arg
self._t_start = None # t of timer started; None if timer is currently stopped
self.reset_timer()
def reset_timer(self):
self._t_total = 0.
def start_timer(self):
if self._t_start is not None:
return
self._t_start = time()
def stop_timer(self):
if self._t_start is None:
return
t = time()
self._t_total += t - self._t_start
self._t_start = None
def total_time(self):
return self._t_total
# plugin to collect & run benchmarks
class BenchPlugin:
# redirect python collector to bench_*.py and bench_*()
def pytest_configure(self, config):
# XXX a bit hacky
ini = config.inicfg
ini['python_files'] = 'bench_*.py'
ini['python_classes'] = 'Bench'
ini['python_functions'] = 'bench_'
config._inicache.clear()
def pytest_addoption(self, parser):
g = parser.getgroup('benchmarking')
g.addoption('--benchruns', action='store', type=int, dest='benchruns', default=3,
help="number of time to run each benchmark")
g.addoption('--dontfork', action='store_true', dest='dontfork', default=False,
help="run all benchmarks in the same process")
# create b for every item to run and make it available to b fixture via item.request
def pytest_runtest_setup(self, item):
b = B()
b._used_by_func = False
item._request._bench_b = b
# make created b be available as `b` func arg
@pytest.fixture(scope="function")
def b(self, request):
"""Provides access to benchmarking timer"""
# NOTE here request is subrequest of item._request in pytest_runtest_setup
b = request._parent_request._bench_b
b._used_by_func = True
return b
# run benchmark several times, and get best timing
# each benchmark run is executed in separate process (if not told otherwise)
def pytest_runtest_call(self, item):
b = item._request._bench_b
def run():
b.N = 0
t = 0.
ttarget = 1.
while t < (ttarget * 0.9):
if b.N == 0:
b.N = 1
else:
n = b.N * (ttarget / t) # exact how to adjust b.N to reach ttarget
order = int(log10(n)) # n = k·10^order, k ∈ [1,10)
k = float(n) / (10**order)
k = ceil(k) # lift up k to nearest int
b.N = int(k * 10**order) # b.N = int([1,10))·10^order
b.reset_timer()
b.start_timer()
item.runtest()
b.stop_timer()
t = b.total_time()
#print((b.N, t))
# break if func does not accept b as arg
if not b._used_by_func:
break
return (b.N, t)
rv = []
for i in xrange(item.config.option.benchruns):
if item.config.option.dontfork:
r = run()
else:
runf = XForkedFunc(run)
result = runf.waitfinish()
if result.exitstatus == XForkedFunc.EXITSTATUS_EXCEPTION:
print(result.err, file=sys.stderr) # XXX vs runf doesn't capture stderr
1/0 # TODO re-raise properly
elif result.exitstatus != 0:
print(result.err, file=sys.stderr) # XXX vs runf doesn't capture stderr
1/0 # TODO handle properly
r = result.retval
rv.append(r)
#print ('RET', rv)
return rv
# set benchmarking time in report, if run ok
def pytest_runtest_makereport(self, item, call):
report = _pytest.runner.pytest_runtest_makereport(item, call)
if call.when == 'call' and not call.excinfo:
# in pytest3 there is no way to mark pytest_runtest_call as 'firstresult'
# let's emulate firstresult logic here
assert len(call.result) == 1
report.bench_resv = call.result[0]
return report
# colors to use when printing !passed
# XXX somewhat dup from _pytest/terminal.py
def report_markup(report):
m = {}
if report.failed:
m = {'red': True, 'bold': True}
elif report.skipeed:
m = {'yellow': True}
return m
# max(seq) or 0 if !seq
def max0(seq):
seq = list(seq) # if generator -> generate
if not seq:
return 0
else:
return max(seq)
# benchname(nodeid) returns name of a benchmark from a function nodeid
def benchname(nodeid):
pyname = nodeid.split("::", 1)[1] # everything after fspath
# replace 'bench_' with 'Benchmark' prefix so that benchmark output matches
# golang format
if pyname.startswith('bench_'):
pyname = pyname[len('bench_'):]
return 'Benchmark' + pyname
# Adjujsted terminal reporting to benchmarking needs
class XTerminalReporter(_pytest_TerminalReporter):
# determine largest item name (to ralign timings)
def pytest_collection_finish(self, session):
_pytest_TerminalReporter.pytest_collection_finish(self, session)
self.benchname_maxlen = max0(len(benchname(_.nodeid)) for _ in session.items)
def pytest_runtest_logstart(self, nodeid, location):
# print `pymod: ...` header for every module
fspath = self.config.rootdir.join(nodeid.split("::")[0])
if fspath == self.currentfspath:
return
first = (self.currentfspath == None)
self.currentfspath = fspath
fspath = self.startdir.bestrelpath(fspath)
self._tw.line()
# vskip in between modules
if not first:
self._tw.line()
self.write("pymod: %s" % fspath)
def pytest_runtest_logreport(self, report):
_ =self.config.hook.pytest_report_teststatus(report=report)
cat, letter, word = _
self.stats.setdefault(cat, []).append(report) # XXX needed for bench?
if not letter and not word:
# passed setup/teardown
return
def printname():
self._tw.line()
self._tw.write('%-*s\t' % (self.benchname_maxlen, benchname(report.nodeid)))
if not report.passed:
printname()
self._tw.write('[%s]' % word, **report_markup(report))
return
# TODO ralign timing
for niter, t in report.bench_resv:
printname()
self._tw.write('%d\t%.3f µs/op' % (niter, t * 1E6 / niter))
# there is no way to override it otherwise - it is hardcoded in
# _pytest.terminal's pytest_configure()
_pytest.terminal.TerminalReporter = XTerminalReporter
def main():
pytest.main(plugins=[BenchPlugin()])
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment