diff --git a/product/ERP5/bin/run_test_suite b/product/ERP5/bin/run_test_suite deleted file mode 100755 index 7289ce747887436c286d20fda102ec5ff5695edc..0000000000000000000000000000000000000000 --- a/product/ERP5/bin/run_test_suite +++ /dev/null @@ -1,498 +0,0 @@ -#!/usr/bin/python - -import atexit, errno, imp, os, pprint, random, re, socket, shlex, shutil -import signal, string, subprocess, sys, threading, time, urlparse, xmlrpclib - -SVN_UP_REV=re.compile(r'^(?:At|Updated to) revision (\d+).$') -SVN_CHANGED_REV=re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE) - -def killallIfParentDies(): - os.setsid() - atexit.register(lambda: os.kill(0, 9)) - from ctypes import cdll - libc = cdll.LoadLibrary('libc.so.6') - def PR_SET_PDEATHSIG(sig): - if libc.prctl(1, sig): - raise OSError - PR_SET_PDEATHSIG(signal.SIGINT) - -_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search -_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'")) -def format_command(*args, **kw): - cmdline = [] - for k, v in sorted(kw.items()): - if _format_command_search(v): - v = _format_command_escape(v) - cmdline.append('%s=%s' % (k, v)) - for v in args: - if _format_command_search(v): - v = _format_command_escape(v) - cmdline.append(v) - return ' '.join(cmdline) - -def subprocess_capture(p, quiet=False): - def readerthread(input, output, buffer): - while True: - data = input.readline() - if not data: - break - output(data) - buffer.append(data) - if p.stdout: - stdout = [] - output = quiet and (lambda data: None) or sys.stdout.write - stdout_thread = threading.Thread(target=readerthread, - args=(p.stdout, output, stdout)) - stdout_thread.setDaemon(True) - stdout_thread.start() - if p.stderr: - stderr = [] - stderr_thread = threading.Thread(target=readerthread, - args=(p.stderr, sys.stderr.write, stderr)) - stderr_thread.setDaemon(True) - stderr_thread.start() - if p.stdout: - stdout_thread.join() - if p.stderr: - stderr_thread.join() - p.wait() - return (p.stdout and ''.join(stdout), - p.stderr and ''.join(stderr)) - - -class Persistent(object): - """Very simple persistent data storage for optimization purpose - - This tool should become a standalone daemon communicating only with an ERP5 - instance. But for the moment, it only execute 1 test suite and exists, - and test suite classes may want some information from previous runs. - """ - - def __init__(self, filename): - self._filename = filename - - def __getattr__(self, attr): - if attr == '_db': - try: - db = file(self._filename, 'r+') - except IOError, e: - if e.errno != errno.ENOENT: - raise - db = file(self._filename, 'w+') - else: - try: - self.__dict__.update(eval(db.read())) - except StandardError: - pass - self._db = db - return db - self._db - return super(Persistent, self).__getattribute__(attr) - - def sync(self): - self._db.seek(0) - db = dict(x for x in self.__dict__.iteritems() if x[0][:1] != '_') - pprint.pprint(db, self._db) - self._db.truncate() - - -class SubprocessError(EnvironmentError): - def __init__(self, status_dict): - self.status_dict = status_dict - def __getattr__(self, name): - return self.status_dict[name] - def __str__(self): - return 'Error %i' % self.status_code - - -class Updater(object): - - _git_cache = {} - realtime_output = True - stdin = file(os.devnull) - - def __init__(self, revision=None): - self.revision = revision - self._path_list = [] - - def deletePycFiles(self, path): - """Delete *.pyc files so that deleted/moved files can not be imported""" - for path, dir_list, file_list in os.walk(path): - for file in file_list: - if file[-4:] in ('.pyc', '.pyo'): - # allow several processes clean the same folder at the same time - try: - os.remove(os.path.join(path, file)) - except OSError, e: - if e.errno != errno.ENOENT: - raise - - def spawn(self, *args, **kw): - quiet = kw.pop('quiet', False) - env = kw and dict(os.environ, **kw) or None - command = format_command(*args, **kw) - print '\n$ ' + command - sys.stdout.flush() - p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, env=env) - if self.realtime_output: - stdout, stderr = subprocess_capture(p, quiet) - else: - stdout, stderr = p.communicate() - if not quiet: - sys.stdout.write(stdout) - sys.stderr.write(stderr) - result = dict(status_code=p.returncode, command=command, - stdout=stdout, stderr=stderr) - if p.returncode: - raise SubprocessError(result) - return result - - def _git(self, *args, **kw): - return self.spawn('git', *args, **kw)['stdout'].strip() - - def _git_find_rev(self, ref): - try: - return self._git_cache[ref] - except KeyError: - if os.path.exists('.git/svn'): - r = self._git('svn', 'find-rev', ref) - assert r - self._git_cache[ref[0] != 'r' and 'r%u' % int(r) or r] = ref - else: - r = self._git('rev-list', '--topo-order', '--count', ref), ref - self._git_cache[ref] = r - return r - - def getRevision(self, *path_list): - if not path_list: - path_list = self._path_list - if os.path.isdir('.git'): - h = self._git('log', '-1', '--format=%H', '--', *path_list) - return self._git_find_rev(h) - if os.path.isdir('.svn'): - stdout = self.spawn('svn', 'info', *path_list)['stdout'] - return str(max(map(int, SVN_CHANGED_REV.findall(stdout)))) - raise NotImplementedError - - def checkout(self, *path_list): - if not path_list: - path_list = '.', - revision = self.revision - if os.path.isdir('.git'): - # edit .git/info/sparse-checkout if you want sparse checkout - if revision: - if type(revision) is str: - h = self._git_find_rev('r' + revision) - else: - h = revision[1] - if h != self._git('rev-parse', 'HEAD'): - self.deletePycFiles('.') - self._git('reset', '--merge', h) - else: - self.deletePycFiles('.') - if os.path.exists('.git/svn'): - self._git('svn', 'rebase') - else: - self._git('pull', '--ff-only') - self.revision = self._git_find_rev(self._git('rev-parse', 'HEAD')) - elif os.path.isdir('.svn'): - # following code allows sparse checkout - def svn_mkdirs(path): - path = os.path.dirname(path) - if path and not os.path.isdir(path): - svn_mkdirs(path) - self.spawn(*(args + ['--depth=empty', path])) - for path in path_list: - args = ['svn', 'up', '--force', '--non-interactive'] - if revision: - args.append('-r%s' % revision) - svn_mkdirs(path) - args += '--set-depth=infinity', path - self.deletePycFiles(path) - try: - status_dict = self.spawn(*args) - except SubprocessError, e: - if 'cleanup' not in e.stderr: - raise - self.spawn('svn', 'cleanup', path) - status_dict = self.spawn(*args) - if not revision: - self.revision = revision = SVN_UP_REV.findall( - status_dict['stdout'].splitlines()[-1])[0] - else: - raise NotImplementedError - self._path_list += path_list - - -class TestSuite(Updater): - - mysql_db_count = 1 - allow_restart = False - - def __init__(self, max_instance_count, **kw): - self.__dict__.update(kw) - self._path_list = ['tests'] - pool = threading.Semaphore(max_instance_count) - self.acquire = pool.acquire - self.release = pool.release - self._instance = threading.local() - self._pool = max_instance_count == 1 and [None] or \ - range(1, max_instance_count + 1) - self._ready = set() - self.running = {} - if max_instance_count != 1: - self.realtime_output = False - elif os.isatty(1): - self.realtime_output = True - self.persistent = Persistent('run_test_suite-%s.tmp' - % self.__class__.__name__) - - instance = property(lambda self: self._instance.id) - - def start(self, test, on_stop=None): - assert test not in self.running - self.running[test] = instance = self._pool.pop(0) - def run(): - self._instance.id = instance - if instance not in self._ready: - self._ready.add(instance) - self.setup() - status_dict = self.run(test) - if on_stop is not None: - on_stop(status_dict) - self._pool.append(self.running.pop(test)) - self.release() - thread = threading.Thread(target=run) - thread.setDaemon(True) - thread.start() - - def update(self): - self.checkout() # by default, update everything - - def setup(self): - pass - - def run(self, test): - raise NotImplementedError - - def getTestList(self): - raise NotImplementedError - - -class ERP5TypeTestSuite(TestSuite): - - RUN_RE = re.compile( - r'Ran (?P<all_tests>\d+) tests? in (?P<seconds>\d+\.\d+)s', - re.DOTALL) - - STATUS_RE = re.compile(r""" - (OK|FAILED)\s+\( - (failures=(?P<failures>\d+),?\s*)? - (errors=(?P<errors>\d+),?\s*)? - (skipped=(?P<skips>\d+),?\s*)? - (expected\s+failures=(?P<expected_failures>\d+),?\s*)? - (unexpected\s+successes=(?P<unexpected_successes>\d+),?\s*)? - \) - """, re.DOTALL | re.VERBOSE) - - def setup(self): - instance_home = self.instance and 'unit_test.%u' % self.instance \ - or 'unit_test' - tests = os.path.join(instance_home, 'tests') - if os.path.exists(tests): - shutil.rmtree(instance_home + '.previous', True) - shutil.move(tests, instance_home + '.previous') - - def run(self, test): - return self.runUnitTest(test) - - def runUnitTest(self, *args, **kw): - if self.instance: - args = ('--instance_home=unit_test.%u' % self.instance,) + args - mysql_db_list = [string.Template(x).substitute(I=self.instance or '1') - for x in self.mysql_db_list] - if len(mysql_db_list) > 1: - kw['extra_sql_connection_string_list'] = ','.join(mysql_db_list[1:]) - try: - runUnitTest = os.environ.get('RUN_UNIT_TEST', - 'Products/ERP5Type/tests/runUnitTest.py') - args = tuple(shlex.split(runUnitTest)) \ - + ('--verbose', '--erp5_sql_connection_string=' + mysql_db_list[0]) \ - + args - status_dict = self.spawn(*args, **kw) - except SubprocessError, e: - status_dict = e.status_dict - test_log = status_dict['stderr'] - search = self.RUN_RE.search(test_log) - if search: - groupdict = search.groupdict() - status_dict.update(duration=float(groupdict['seconds']), - test_count=int(groupdict['all_tests'])) - search = self.STATUS_RE.search(test_log) - if search: - groupdict = search.groupdict() - status_dict.update( - error_count=int(groupdict['errors'] or 0), - failure_count=int(groupdict['failures'] or 0) - +int(groupdict['unexpected_successes'] or 0), - skip_count=int(groupdict['skips'] or 0) - +int(groupdict['expected_failures'] or 0)) - return status_dict - - -#class LoadSaveExample(ERP5TypeTestSuite): -# def getTestList(self): -# return [test_path.split(os.sep)[-1][:-3] -# for test_path in glob.glob('tests/test*.py')] -# -# def setup(self): -# TestSuite.setup(self) -# return self.runUnitTest(self, '--save', 'testFoo') -# -# def run(self, test): -# return self.runUnitTest(self, '--load', test) - - -sys.modules['test_suite'] = module = imp.new_module('test_suite') -for var in SubprocessError, TestSuite, ERP5TypeTestSuite: - setattr(module, var.__name__, var) - - -class DummyTaskDistributionTool(object): - - def __init__(self): - self.lock = threading.Lock() - - def createTestResult(self, name, revision, test_name_list, allow_restart): - self.test_name_list = list(test_name_list) - return None, revision - - def startUnitTest(self, test_result_path, exclude_list=()): - self.lock.acquire() - try: - for i, test in enumerate(self.test_name_list): - if test not in exclude_list: - del self.test_name_list[i] - return None, test - finally: - self.lock.release() - - def stopUnitTest(self, test_path, status_dict): - pass - - -def safeRpcCall(function, *args): - retry = 64 - while True: - try: - return function(*args) - except (socket.error, xmlrpclib.ProtocolError), e: - print >>sys.stderr, e - pprint.pprint(args, file(function._Method__name, 'w')) - time.sleep(retry) - retry += retry >> 1 - -def getOptionParser(): - from optparse import OptionParser - parser = OptionParser( - usage="%prog [options] [SUITE_CLASS@]<SUITE_NAME>[=<MAX_INSTANCES>]") - _ = parser.add_option - _("--master", help="URL of ERP5 instance, used as master node") - _("--mysql_db_list", help="comma-separated list of connection strings") - return parser - -def main(): - os.environ['LC_ALL'] = 'C' - - parser = getOptionParser() - options, args = parser.parse_args() - try: - name, = args - if '=' in name: - name, max_instance_count = name.rsplit('=', 1) - max_instance_count = int(max_instance_count) - else: - max_instance_count = 1 - if '@' in name: - suite_class_name, name = name.split('@', 1) - else: - suite_class_name = name - except ValueError: - parser.error("invalid arguments") - db_list = options.mysql_db_list - if db_list: - db_list = db_list.split(',') - multi = max_instance_count != 1 - try: - for db in db_list: - if db == string.Template(db).substitute(I=1) and multi: - raise KeyError - except KeyError: - parser.error("invalid value for --mysql_db_list") - else: - db_list = (max_instance_count == 1 and 'test test' or 'test$I test'), - - def makeSuite(revision=None): - updater = Updater(revision) - if portal_url: - updater.checkout('tests') - for k in sys.modules.keys(): - if k == 'tests' or k.startswith('tests.'): - del sys.modules[k] - module_name, class_name = ('tests.' + suite_class_name).rsplit('.', 1) - try: - suite_class = getattr(__import__(module_name, None, None, [class_name]), - class_name) - except (AttributeError, ImportError): - parser.error("unknown test suite") - if len(db_list) < suite_class.mysql_db_count: - parser.error("%r suite needs %u DB (only %u given)" % - (name, suite_class.mysql_db_count, len(db_list))) - suite = suite_class(revision=updater.revision, - max_instance_count=max_instance_count, - mysql_db_list=db_list[:suite_class.mysql_db_count]) - if portal_url: - suite.update() - return suite - - portal_url = options.master - if portal_url: - if portal_url[-1] != '/': - portal_url += '/' - portal = xmlrpclib.ServerProxy(portal_url, allow_none=1) - master = portal.portal_task_distribution - assert master.getProtocolRevision() == 1 - else: - master = DummyTaskDistributionTool() - - suite = makeSuite() - revision = suite.getRevision() - test_result = safeRpcCall(master.createTestResult, - name, revision, suite.getTestList(), suite.allow_restart) - if test_result: - test_result_path, test_revision = test_result - if portal_url: # for buildbot - url_parts = list(urlparse.urlparse(portal_url + test_result_path)) - url_parts[1] = url_parts[1].split('@')[-1] - print 'ERP5_TEST_URL %s OK' % urlparse.urlunparse(url_parts) - while suite.acquire(): - test = safeRpcCall(master.startUnitTest, test_result_path, - suite.running.keys()) - if test: - if revision != test_revision: - suite = makeSuite(test_revision) - revision = test_revision - suite.acquire() - suite.start(test[1], lambda status_dict, __test_path=test[0]: - safeRpcCall(master.stopUnitTest, __test_path, status_dict)) - elif not suite.running: - break - # We are finishing the suite. Let's disable idle nodes. - - -if __name__ == '__main__': - sys.path[0] = '' - if not os.isatty(0): - killallIfParentDies() - sys.exit(main())