Commit 82e3709a authored by Jason Madden's avatar Jason Madden

Parrellize cythoncpp.py.

On my machine, this cuts the build time by a fourth to a third.

Not ready to merge, submitting for testing on CI (esp Windows).

Cython calls are threaded because they release the GIL.

The most expensive part was merging results, which holds the GIL, so use
multiprocessing for that. Pre-combine the identical results that we
expect to get for two sets of defines to reduce the number of merges.

Change internal data structures to be immutable to make debugging this
easier (and substantially faster when pickling through multiprocessing.)
parent 4fe26b0d
/* Copyright (c) 2011-2012 Denis Bilenko. See LICENSE for details. */
#ifdef Py_PYTHON_H
#define _GEVENTLOOP struct __pyx_vtabstruct_8corecext_loop
static void gevent_handle_error(struct PyGeventLoopObject* loop, PyObject* context) {
PyThreadState *tstate;
PyObject *type, *value, *traceback, *result;
......@@ -19,7 +21,7 @@ static void gevent_handle_error(struct PyGeventLoopObject* loop, PyObject* conte
PyErr_Clear();
result = ((struct __pyx_vtabstruct_6gevent_8corecext_loop *)loop->__pyx_vtab)->handle_error(loop, context, type, value, traceback, 0);
result = ((_GEVENTLOOP *)loop->__pyx_vtab)->handle_error(loop, context, type, value, traceback, 0);
if (result) {
Py_DECREF(result);
......@@ -195,7 +197,7 @@ static void gevent_run_callbacks(struct ev_loop *_loop, void *watcher, int reven
loop = GET_OBJECT(PyGeventLoopObject, watcher, _prepare);
Py_INCREF(loop);
gevent_check_signals(loop);
result = ((struct __pyx_vtabstruct_6gevent_8corecext_loop *)loop->__pyx_vtab)->_run_callbacks(loop);
result = ((_GEVENTLOOP *)loop->__pyx_vtab)->_run_callbacks(loop);
if (result) {
Py_DECREF(result);
}
......
......@@ -9,12 +9,13 @@ import time
from datetime import timedelta
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import util
from util import log
TIMEOUT = 180
NWORKERS = int(os.environ.get('NWORKERS') or 4)
NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
# tests that don't do well when run on busy box
......
#!/usr/bin/env python
# Copyright (C) 2011-2012 Denis Bilenko (http://denisbilenko.com)
# Copyright (C) 2015-2016 gevent contributors
from __future__ import print_function
import sys
import os
import re
......@@ -9,6 +11,18 @@ import pipes
import difflib
from hashlib import md5
from itertools import combinations, product
import subprocess
import multiprocessing
import tempfile
import shutil
import threading
class Thread(threading.Thread):
value = None
def run(self):
self.value = self._target(*self._args)
do_exec = None
if sys.version_info >= (3, 0):
......@@ -30,7 +44,7 @@ if os.getenv('READTHEDOCS'):
os.environ['PATH'] = new_path
# Parameter name in macros must match this regex:
param_name_re = re.compile('^[a-zA-Z_]\w*$')
param_name_re = re.compile(r'^[a-zA-Z_]\w*$')
# First line of a definition of a new macro:
define_re = re.compile(r'^#define\s+([a-zA-Z_]\w*)(\((?:[^,)]+,)*[^,)]+\))?\s+(.*)$')
......@@ -52,6 +66,83 @@ def match_condition(line):
newline_token = ' <cythonpp.py: REPLACE WITH NEWLINE!> '
def _run_cython_on_file(configuration, pyx_filename,
py_banner, banner,
output_filename,
counter, lines,
cache=None):
# XXX: Note that this causes cython to generate
# a "corecext" name instead of "gevent.corecext"
value = ''.join(lines)
sourcehash = md5(value.encode("utf-8")).hexdigest()
comment = format_tag(frozenset(configuration))
if os.path.isabs(output_filename):
raise ValueError("output cannot be absolute")
# We can't change the actual name of the pyx file because
# cython generates function names based in that string.
tempdir = tempfile.mkdtemp()
#unique_pyx_filename = pyx_filename #os.path.join(tempdir, pyx_filename)
#unique_output_filename = output_filename #os.path.join(tempdir, output_filename)
unique_pyx_filename = os.path.join(tempdir, pyx_filename)
unique_output_filename = os.path.join(tempdir, output_filename)
dirname = os.path.dirname(unique_pyx_filename) # output must be in same dir
log("Output filename %s", unique_output_filename)
if dirname:
print("Making dir", dirname)
os.makedirs(dirname)
try:
atomic_write(unique_pyx_filename, py_banner + value)
if WRITE_OUTPUT:
atomic_write(unique_pyx_filename + '.deb', '# %s (%s)\n%s' % (banner, comment, value))
output = run_cython(unique_pyx_filename, sourcehash, unique_output_filename, banner, comment,
cache)
if WRITE_OUTPUT:
atomic_write(unique_output_filename + '.deb', output)
finally:
shutil.rmtree(tempdir, True)
#pass
return attach_tags(output, configuration), configuration, sourcehash
def _run_cython_on_files(pyx_filename, py_banner, banner, output_filename, preprocessed):
counter = 0
threads = []
cache = {}
for configuration, lines in sorted(preprocessed.items()):
counter += 1
threads.append(Thread(target=_run_cython_on_file,
args=(configuration, pyx_filename,
py_banner, banner, output_filename,
counter, lines,
cache)))
threads[-1].start()
#threads[-1].join()
for t in threads:
t.join()
same_results = {}
for t in threads:
sourcehash = t.value[2]
tagged_output = t.value[0]
if sourcehash not in same_results:
same_results[sourcehash] = tagged_output
else:
# Nice, something to combine with tags
other_tagged_output = same_results[sourcehash]
assert len(tagged_output) == len(other_tagged_output)
combined_lines = []
for line_a, line_b in zip(tagged_output, other_tagged_output):
combined_tags = line_a.tags + line_b.tags
combined_lines.append(Str(line_a, simplify_tags(combined_tags)))
same_results[sourcehash] = combined_lines
# ordered_results = []
# for t in threads:
# if t.value[0] not in ordered_results:
# ordered_results.append(same_results[t.value[2]])
return list(same_results.values())
def process_filename(filename, output_filename=None):
"""Process the .ppyx file with preprocessor and compile it with cython.
......@@ -84,33 +175,20 @@ def process_filename(filename, output_filename=None):
preprocessed = expand_to_match(preprocessed.items())
reference_pyx = preprocessed.pop(None)
sources = []
sources = _run_cython_on_files(pyx_filename, py_banner, banner, output_filename,
preprocessed)
counter = 0
for configuration, lines in sorted(preprocessed.items()):
counter += 1
value = ''.join(lines)
sourcehash = md5(value.encode("utf-8")).hexdigest()
comment = format_tag(set(configuration))
atomic_write(pyx_filename, py_banner + value)
if WRITE_OUTPUT:
atomic_write(pyx_filename + '.%s' % counter, '# %s (%s)\n%s' % (banner, comment, value))
output = run_cython(pyx_filename, sourcehash, output_filename, banner, comment)
if WRITE_OUTPUT:
atomic_write(output_filename + '.%s' % counter, output)
sources.append(attach_tags(output, configuration))
sys.stderr.write('Generating %s ' % output_filename)
result = generate_merged(output_filename, sources)
log('Generating %s ', output_filename)
result = generate_merged(sources)
atomic_write(output_filename, result)
sys.stderr.write('%s bytes\n' % len(result))
log('%s bytes\n', len(result))
if filename != pyx_filename:
log('Saving %s', pyx_filename)
atomic_write(pyx_filename, py_banner + ''.join(reference_pyx))
def generate_merged(output_filename, sources):
def generate_merged(sources):
result = []
for line in produce_preprocessor(merge(sources)):
result.append(line.replace(newline_token, '\n'))
......@@ -217,16 +295,70 @@ def merge(sources):
Str('world\n', [set([('defined(world)', True)])]),
Str('everyone\n', [set([('defined(world)', False)])])]
"""
sources = list(sources) # own copy
log("Merging %s", len(sources))
if len(sources) <= 1:
return [Str(str(x), simplify_tags(x.tags)) for x in sources[0]]
return merge([list(_merge(sources[0], sources[1]))] + sources[2:])
#return merge([_merge(sources[0], sources[1])] + sources[2:])
pool = multiprocessing.Pool()
# class SerialPool(object):
# def imap(self, func, iterable):
# for args in iterable:
# yield func(*args)
#pool = SerialPool()
groups = []
while len(sources) >= 2:
one, two = sources.pop(), sources.pop()
groups.append((one, two))
log("Merge groups %s", len(groups))
# len sources == 0 or 1
for merged in pool.imap_unordered(_merge, groups):
log("Completed a merge in %s", os.getpid())
sources.append(merged)
# len sources == 1 or 2
if len(sources) == 2:
one, two = sources.pop(), sources.pop()
sources.append(pool.apply(_merge, (one, two)))
# len sources == 1
# len sources should now be 1
print("Now merging", len(sources))
return merge(sources)
def _merge(*args):
#log("imerging %s", len(args))
if isinstance(args[0], tuple):
a, b = args[0]
else:
a, b = args
#log("Merging %s and %s (%s %s) in %s", id(a), id(b), len(a), len(b), os.getpid())
return list(_imerge(a, b))
def _flatten(tags):
s = set()
for tag in tags:
s.update(tag)
return frozenset(s)
def _merge(a, b):
def _imerge(a, b):
# caching the tags speeds up serialization and future merges
flat_tag_cache = {}
tag_cache = {}
for tag, i1, i2, j1, j2 in difflib.SequenceMatcher(None, a, b).get_opcodes():
if tag == 'equal':
for line_a, line_b in zip(a[i1:i2], b[j1:j2]):
tags = getattr(line_a, 'tags', []) + getattr(line_b, 'tags', [])
# tags is a tuple of frozensets
line_a_tags = getattr(line_a, 'tags', ())
line_b_tags = getattr(line_b, 'tags', ())
key = _flatten(line_a_tags) | _flatten(line_b_tags)
tags = tag_cache.setdefault(key, line_a_tags + line_b_tags)
yield Str(line_a, tags)
else:
for line in a[i1:i2]:
......@@ -298,15 +430,15 @@ def exact_reverse(tags1, tags2):
return
if not tags2:
return
if not isinstance(tags1, list):
if not isinstance(tags1, tuple):
raise TypeError(repr(tags1))
if not isinstance(tags2, list):
if not isinstance(tags2, tuple):
raise TypeError(repr(tags2))
if len(tags1) == 1 and len(tags2) == 1:
tag1 = tags1[0]
tag2 = tags2[0]
assert isinstance(tag1, set), tag1
assert isinstance(tag2, set), tag2
assert isinstance(tag1, frozenset), tag1
assert isinstance(tag2, frozenset), tag2
if len(tag1) == 1 and len(tag2) == 1:
tag1 = list(tag1)[0]
tag2 = list(tag2)[0]
......@@ -326,29 +458,30 @@ def format_cond(cond):
def format_tag(tag):
if not isinstance(tag, set):
if not isinstance(tag, frozenset):
raise TypeError(repr(tag))
return ' && '.join([format_cond(x) for x in sorted(tag)])
def format_tags(tags):
if not isinstance(tags, list):
if not isinstance(tags, tuple):
raise TypeError(repr(tags))
return ' || '.join('(%s)' % format_tag(x) for x in tags)
def attach_tags(text, tags):
tags = frozenset(tags)
result = [x for x in text.split('\n')]
if result and not result[-1]:
del result[-1]
return [Str(x + '\n', set(tags)) for x in result]
return [Str(x + '\n', tags) for x in result]
def is_tags_type(tags):
if not isinstance(tags, list):
if not isinstance(tags, tuple):
return False
for tag in tags:
if not isinstance(tag, set):
if not isinstance(tag, frozenset):
return False
for item in tag:
if isinstance(item, tuple) and len(item) == 2 and isinstance(item[1], bool) and isinstance(item[0], str):
......@@ -367,14 +500,17 @@ class Str(str):
def __new__(cls, string, tags):
if not isinstance(string, str):
raise TypeError('string must be str: %s' % (type(string), ))
if isinstance(tags, set):
tags = [tags]
if isinstance(tags, frozenset):
tags = (tags,)
if not is_tags_type(tags):
raise TypeError('tags must be a list of sets of 2-tuples: %r' % (tags, ))
raise TypeError('tags must be a tuple of frozensets of 2-tuples: %r' % (tags, ))
self = str.__new__(cls, string)
self.tags = tags
return self
def __getnewargs__(self):
return str(self), self.tags
def __repr__(self):
return '%s(%s, %r)' % (self.__class__.__name__, str.__repr__(self), self.tags)
......@@ -406,32 +542,33 @@ def simplify_tags(tags):
... set([('defined(LIBEV_EMBED)', False), ('defined(_WIN32)', True)])])
[]
"""
if not isinstance(tags, list):
if not isinstance(tags, tuple):
raise TypeError
# First, strip any empty sets
tags = list(tags)
for x in tags:
if not x:
tags.remove(x)
return simplify_tags(tags)
return simplify_tags(tuple(tags))
for tag1, tag2 in combinations(tags, 2):
if tag1 == tag2:
tags.remove(tag1)
return simplify_tags(tags)
return simplify_tags(tuple(tags))
for item in tag1:
reverted_item = reverted(item)
if reverted_item in tag2:
tag1_copy = tag1.copy()
tag1_copy.remove(item)
tag2_copy = tag2.copy()
tag2_copy.remove(reverted_item)
inverted_item = inverted(item)
if inverted_item in tag2:
tag1_copy = tag1 - {inverted_item}
tag2_copy = tag2 - {inverted_item}
if tag1_copy == tag2_copy:
tags.remove(tag1)
tags.remove(tag2)
tags.append(tag1_copy)
return simplify_tags(tags)
return tags
return simplify_tags(tuple(tags))
return tuple(tags)
def reverted(item):
def inverted(item):
if not isinstance(item, tuple):
raise TypeError(repr(item))
if len(item) != 2:
......@@ -523,6 +660,8 @@ class Str_sourceline(str):
self.sourceline = sourceline
return self
def __getnewargs__(self):
return str(self), self.sourceline
def atomic_write(filename, data):
tmpname = filename + '.tmp.%s' % os.getpid()
......@@ -537,29 +676,33 @@ def atomic_write(filename, data):
dbg('Wrote %s bytes to %s', len(data), filename)
def run_cython(filename, sourcehash, output_filename, banner, comment, cache={}):
result = cache.get(sourcehash)
command = '%s -o %s %s' % (CYTHON, pipes.quote(output_filename), pipes.quote(filename))
def run_cython(filename, sourcehash, output_filename, banner, comment, cache=None):
log("Cython output to %s", output_filename)
result = cache.get(sourcehash) if cache is not None else None
command = '%s -o %s -I gevent %s' % (CYTHON, pipes.quote(output_filename), pipes.quote(filename))
if result is not None:
log('Reusing %s # %s', command, comment)
return result
system(command, comment)
result = postprocess_cython_output(output_filename, banner)
if cache is not None:
cache[sourcehash] = result
return result
def system(command, comment):
log('Running %s # %s', command, comment)
result = os.system(command)
if result:
try:
subprocess.check_call(command, shell=True)
log('\tDone running %s # %s', command, comment)
except subprocess.CalledProcessError:
# debugging code
log("Path: %s", os.getenv("PATH"))
bin_dir = os.path.dirname(sys.executable)
bin_files = os.listdir(bin_dir)
bin_files.sort()
log("Bin: %s files: %s", bin_dir, ' '.join(bin_files))
raise AssertionError('%r failed with code %s' % (command, result))
raise
def postprocess_cython_output(filename, banner):
......@@ -734,7 +877,7 @@ def log(message, *args):
except Exception:
traceback.print_exc()
else:
sys.stderr.write(string + '\n')
print(string, file=sys.stderr)
def dbg(*args):
......@@ -743,7 +886,7 @@ def dbg(*args):
return log(*args)
if __name__ == '__main__':
def main():
import optparse
parser = optparse.OptionParser()
parser.add_option('--debug', action='store_true')
......@@ -787,3 +930,7 @@ if __name__ == '__main__':
if run:
process_filename(filename, options.output_file)
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment