Commit 3718930b authored by Denis Bilenko's avatar Denis Bilenko

testrunner.py: do not depend on gevent

parent b4d1a6cc
from __future__ import print_function
from gevent import monkey; monkey.patch_all()
from gevent import monkey; monkey.patch_all(subprocess=True)
import sys
import socket
from time import sleep
......
from gevent import monkey; monkey.patch_all()
from gevent import monkey; monkey.patch_all(subprocess=True)
import sys
from gevent.server import DatagramServer
from unittest import TestCase, main
......
#!/usr/bin/env python
from __future__ import print_function
import gevent
gevent.get_hub('select') # this is just to make sure we don't pass any fds to children
from gevent import monkey; monkey.patch_all()
import six
import sys
import os
import glob
import traceback
from time import time
import time
from gevent.pool import Pool
from multiprocessing.pool import ThreadPool
import util
TIMEOUT = 180
NWORKERS = int(os.environ.get('NWORKERS') or 8)
pool = None
def spawn(*args, **kwargs):
g = pool.spawn(*args, **kwargs)
g.link_exception(lambda *args: sys.exit('Internal error in testrunner.py: %s %s' % (g, g.exception)))
return g
def run_many(tests, expected=None, failfast=False):
global NWORKERS, pool
start = time()
global NWORKERS
start = time.time()
total = 0
failed = {}
NWORKERS = min(len(tests), NWORKERS)
pool = Pool(NWORKERS)
pool = ThreadPool(NWORKERS)
util.BUFFER_OUTPUT = NWORKERS > 1
def run_one(cmd, **kwargs):
......@@ -45,26 +35,55 @@ def run_many(tests, expected=None, failfast=False):
# we therefore will retry them sequentially
failed[result.name] = [cmd, kwargs, 'AssertionError' in (result.output or '')]
results = []
def reap():
for r in results[:]:
if not r.ready():
continue
if r.successful():
results.remove(r)
else:
r.get()
sys.exit('Internal error in testrunner.py: %r' % (r, ))
return len(results)
def reap_all():
while reap() > 0:
time.sleep(0.1)
def spawn(args, kwargs):
while True:
if reap() < NWORKERS:
r = pool.apply_async(run_one, (cmd, ), options or {})
results.append(r)
return
else:
time.sleep(0.1)
try:
try:
for cmd, options in tests:
total += 1
spawn(run_one, cmd, **(options or {}))
gevent.wait()
spawn((cmd, ), options or {})
pool.close()
pool.join()
except KeyboardInterrupt:
try:
if pool:
util.log('Waiting for currently running to finish...')
pool.join()
util.log('Waiting for currently running to finish...')
reap_all()
except KeyboardInterrupt:
util.report(total, failed, exit=False, took=time() - start, expected=expected)
pool.terminate()
util.report(total, failed, exit=False, took=time.time() - start, expected=expected)
util.log('(partial results)\n')
raise
except:
traceback.print_exc()
pool.kill() # this needed to kill the processes
pool.terminate()
raise
reap_all()
toretry = [key for (key, (cmd, kwargs, can_retry)) in failed.items() if can_retry]
failed_then_succeeded = []
......@@ -79,9 +98,7 @@ def run_many(tests, expected=None, failfast=False):
util.log('\n%s tests failed during concurrent run but succeeded when ran sequentially:', len(failed_then_succeeded))
util.log('- ' + '\n- '.join(failed_then_succeeded))
util.log('gevent version %s from %s', gevent.__version__, gevent.__file__)
util.report(total, failed, took=time() - start, expected=expected)
assert not pool, pool
util.report(total, failed, took=time.time() - start, expected=expected)
def discover(tests=None, ignore=None):
......
......@@ -5,9 +5,9 @@ import six
import traceback
import unittest
import threading
import subprocess
import time
from datetime import timedelta
from gevent import subprocess, sleep, spawn_later
SLEEP = 0.1
......@@ -82,6 +82,8 @@ def _kill(popen):
def kill(popen):
if popen.poll() is not None:
return
try:
if getattr(popen, 'setpgrp_enabled', None):
killpg(popen.pid)
......@@ -139,10 +141,9 @@ def start(command, **kwargs):
popen.name = name
popen.setpgrp_enabled = preexec_fn is not None
if timeout is not None:
popen._killer = spawn_later(timeout, kill, popen)
popen._killer._start_event.ref = False # XXX add 'ref' property to greenlet
else:
popen._killer = None
t = threading.Timer(timeout, kill, args=(popen, ))
t.setDaemon(True)
t.start()
return popen
......@@ -164,6 +165,9 @@ class RunResult(object):
return self.code
lock = threading.Lock()
def run(command, **kwargs):
buffer_output = kwargs.pop('buffer_output', BUFFER_OUTPUT)
if buffer_output:
......@@ -181,21 +185,20 @@ def run(command, **kwargs):
else:
result = popen.poll()
finally:
if popen._killer is not None:
popen._killer.kill(block=False)
kill(popen)
assert not err
if out:
out = out.strip()
with lock:
if out:
out = ' ' + out.replace('\n', '\n ')
out = out.rstrip()
out += '\n'
log('| %s\n%s', name, out)
if result:
log('! %s [code %s] [took %.1fs]', name, result, took)
else:
log('- %s [took %.1fs]', name, took)
out = out.strip().decode()
if out:
out = ' ' + out.replace('\n', '\n ')
out = out.rstrip()
out += '\n'
log('| %s\n%s', name, out)
if result:
log('! %s [code %s] [took %.1fs]', name, result, took)
else:
log('- %s [took %.1fs]', name, took)
if took >= MIN_RUNTIME:
runtimelog.append((-took, name))
return RunResult(result, out, name)
......@@ -380,12 +383,12 @@ class TestServer(unittest.TestCase):
def before(self):
if self.before_delay is not None:
sleep(self.before_delay)
time.sleep(self.before_delay)
assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), )
def after(self):
if self.after_delay is not None:
sleep(self.after_delay)
time.sleep(self.after_delay)
assert self.popen.poll() is None, '%s died with code %s' % (self.server, self.popen.poll(), )
def _run_all_tests(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment