Commit e26ad644 authored by Denis Bilenko's avatar Denis Bilenko

Merge pull request #401 from surfly/testrunner_no_gevent

Testrunner: do not depend on gevent #38
parents b2806057 13057d83
...@@ -16,6 +16,8 @@ def get_absolute_pythonpath(): ...@@ -16,6 +16,8 @@ def get_absolute_pythonpath():
def TESTRUNNER(tests=None): def TESTRUNNER(tests=None):
if not os.path.exists(directory):
return
preferred_version = open(os.path.join(directory, 'version')).read().strip() preferred_version = open(os.path.join(directory, 'version')).read().strip()
if preferred_version != version: if preferred_version != version:
util.log('WARNING: The tests in %s/ are from version %s and your Python is %s', directory, preferred_version, version) util.log('WARNING: The tests in %s/ are from version %s and your Python is %s', directory, preferred_version, version)
......
from __future__ import print_function from __future__ import print_function
from gevent import monkey; monkey.patch_all() from gevent import monkey; monkey.patch_all(subprocess=True)
import sys import sys
import socket import socket
from time import sleep from time import sleep
......
from gevent import monkey; monkey.patch_all() from gevent import monkey; monkey.patch_all(subprocess=True)
import sys import sys
from gevent.server import DatagramServer from gevent.server import DatagramServer
from unittest import TestCase, main from unittest import TestCase, main
......
#!/usr/bin/env python #!/usr/bin/env python
from __future__ import print_function from __future__ import print_function
import gevent
gevent.get_hub('select') # this is just to make sure we don't pass any fds to children
from gevent import monkey; monkey.patch_all()
import six import six
import sys import sys
import os import os
import glob import glob
import traceback import traceback
from time import time import time
from gevent.pool import Pool from multiprocessing.pool import ThreadPool
import util import util
TIMEOUT = 180 TIMEOUT = 180
NWORKERS = int(os.environ.get('NWORKERS') or 8) NWORKERS = int(os.environ.get('NWORKERS') or 8)
pool = None
def spawn(*args, **kwargs):
g = pool.spawn(*args, **kwargs)
g.link_exception(lambda *args: sys.exit('Internal error in testrunner.py: %s %s' % (g, g.exception)))
return g
def run_many(tests, expected=None, failfast=False): def run_many(tests, expected=None, failfast=False):
global NWORKERS, pool global NWORKERS
start = time() start = time.time()
total = 0 total = 0
failed = {} failed = {}
NWORKERS = min(len(tests), NWORKERS) NWORKERS = min(len(tests), NWORKERS)
pool = Pool(NWORKERS) pool = ThreadPool(NWORKERS)
util.BUFFER_OUTPUT = NWORKERS > 1 util.BUFFER_OUTPUT = NWORKERS > 1
def run_one(cmd, **kwargs): def run_one(cmd, **kwargs):
...@@ -45,32 +35,61 @@ def run_many(tests, expected=None, failfast=False): ...@@ -45,32 +35,61 @@ def run_many(tests, expected=None, failfast=False):
# we therefore will retry them sequentially # we therefore will retry them sequentially
failed[result.name] = [cmd, kwargs, 'AssertionError' in (result.output or '')] failed[result.name] = [cmd, kwargs, 'AssertionError' in (result.output or '')]
results = []
def reap():
for r in results[:]:
if not r.ready():
continue
if r.successful():
results.remove(r)
else:
r.get()
sys.exit('Internal error in testrunner.py: %r' % (r, ))
return len(results)
def reap_all():
while reap() > 0:
time.sleep(0.1)
def spawn(args, kwargs):
while True:
if reap() < NWORKERS:
r = pool.apply_async(run_one, (cmd, ), options or {})
results.append(r)
return
else:
time.sleep(0.1)
try: try:
try: try:
for cmd, options in tests: for cmd, options in tests:
total += 1 total += 1
spawn(run_one, cmd, **(options or {})) spawn((cmd, ), options or {})
gevent.wait() pool.close()
pool.join()
except KeyboardInterrupt: except KeyboardInterrupt:
try: try:
if pool:
util.log('Waiting for currently running to finish...') util.log('Waiting for currently running to finish...')
pool.join() reap_all()
except KeyboardInterrupt: except KeyboardInterrupt:
util.report(total, failed, exit=False, took=time() - start, expected=expected) pool.terminate()
util.report(total, failed, exit=False, took=time.time() - start, expected=expected)
util.log('(partial results)\n') util.log('(partial results)\n')
raise raise
except: except:
traceback.print_exc() traceback.print_exc()
pool.kill() # this needed to kill the processes pool.terminate()
raise raise
reap_all()
toretry = [key for (key, (cmd, kwargs, can_retry)) in failed.items() if can_retry] toretry = [key for (key, (cmd, kwargs, can_retry)) in failed.items() if can_retry]
failed_then_succeeded = [] failed_then_succeeded = []
if NWORKERS > 1 and toretry: if NWORKERS > 1 and toretry:
util.log('\nWill retry %s failed tests sequentially:\n- %s\n', len(toretry), '\n- '.join(toretry)) util.log('\nWill retry %s failed tests sequentially:\n- %s\n', len(toretry), '\n- '.join(toretry))
for name, (cmd, kwargs, _ignore) in failed.items(): for name, (cmd, kwargs, _ignore) in list(failed.items()):
if not util.run(cmd, buffer_output=False, **kwargs): if not util.run(cmd, buffer_output=False, **kwargs):
failed.pop(name) failed.pop(name)
failed_then_succeeded.append(name) failed_then_succeeded.append(name)
...@@ -79,9 +98,7 @@ def run_many(tests, expected=None, failfast=False): ...@@ -79,9 +98,7 @@ def run_many(tests, expected=None, failfast=False):
util.log('\n%s tests failed during concurrent run but succeeded when ran sequentially:', len(failed_then_succeeded)) util.log('\n%s tests failed during concurrent run but succeeded when ran sequentially:', len(failed_then_succeeded))
util.log('- ' + '\n- '.join(failed_then_succeeded)) util.log('- ' + '\n- '.join(failed_then_succeeded))
util.log('gevent version %s from %s', gevent.__version__, gevent.__file__) util.report(total, failed, took=time.time() - start, expected=expected)
util.report(total, failed, took=time() - start, expected=expected)
assert not pool, pool
def discover(tests=None, ignore=None): def discover(tests=None, ignore=None):
......
...@@ -5,9 +5,9 @@ import six ...@@ -5,9 +5,9 @@ import six
import traceback import traceback
import unittest import unittest
import threading import threading
import subprocess
import time import time
from datetime import timedelta from datetime import timedelta
from gevent import subprocess, sleep, spawn_later
SLEEP = 0.1 SLEEP = 0.1
...@@ -82,6 +82,8 @@ def _kill(popen): ...@@ -82,6 +82,8 @@ def _kill(popen):
def kill(popen): def kill(popen):
if popen.poll() is not None:
return
try: try:
if getattr(popen, 'setpgrp_enabled', None): if getattr(popen, 'setpgrp_enabled', None):
killpg(popen.pid) killpg(popen.pid)
...@@ -139,10 +141,9 @@ def start(command, **kwargs): ...@@ -139,10 +141,9 @@ def start(command, **kwargs):
popen.name = name popen.name = name
popen.setpgrp_enabled = preexec_fn is not None popen.setpgrp_enabled = preexec_fn is not None
if timeout is not None: if timeout is not None:
popen._killer = spawn_later(timeout, kill, popen) t = threading.Timer(timeout, kill, args=(popen, ))
popen._killer._start_event.ref = False # XXX add 'ref' property to greenlet t.setDaemon(True)
else: t.start()
popen._killer = None
return popen return popen
...@@ -164,6 +165,9 @@ class RunResult(object): ...@@ -164,6 +165,9 @@ class RunResult(object):
return self.code return self.code
lock = threading.Lock()
def run(command, **kwargs): def run(command, **kwargs):
buffer_output = kwargs.pop('buffer_output', BUFFER_OUTPUT) buffer_output = kwargs.pop('buffer_output', BUFFER_OUTPUT)
if buffer_output: if buffer_output:
...@@ -181,12 +185,11 @@ def run(command, **kwargs): ...@@ -181,12 +185,11 @@ def run(command, **kwargs):
else: else:
result = popen.poll() result = popen.poll()
finally: finally:
if popen._killer is not None:
popen._killer.kill(block=False)
kill(popen) kill(popen)
assert not err assert not err
with lock:
if out: if out:
out = out.strip() out = out.strip().decode()
if out: if out:
out = ' ' + out.replace('\n', '\n ') out = ' ' + out.replace('\n', '\n ')
out = out.rstrip() out = out.rstrip()
...@@ -380,12 +383,12 @@ class TestServer(unittest.TestCase): ...@@ -380,12 +383,12 @@ class TestServer(unittest.TestCase):
def before(self): def before(self):
if self.before_delay is not None: if self.before_delay is not None:
sleep(self.before_delay) time.sleep(self.before_delay)
assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), ) assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), )
def after(self): def after(self):
if self.after_delay is not None: if self.after_delay is not None:
sleep(self.after_delay) time.sleep(self.after_delay)
assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), ) assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), )
def _run_all_tests(self): def _run_all_tests(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment