Commit 5c3e77d3 authored by Mark Florisson's avatar Mark Florisson

Support chunksize keyword argument to prange() + update docs

parent c17bdb28
......@@ -6681,6 +6681,7 @@ class ParallelStatNode(StatNode, ParallelNode):
error_label_used = False
num_threads = None
chunksize = None
parallel_exc = (
Naming.parallel_exc_type,
......@@ -6725,11 +6726,17 @@ class ParallelStatNode(StatNode, ParallelNode):
self.num_threads = None
if self.kwargs:
for idx, dictitem in enumerate(self.kwargs.key_value_pairs[:]):
# Try to find num_threads and chunksize keyword arguments
pairs = []
for dictitem in self.kwargs.key_value_pairs:
if dictitem.key.value == 'num_threads':
self.num_threads = dictitem.value
del self.kwargs.key_value_pairs[idx]
break
elif self.is_prange and dictitem.key.value == 'chunksize':
self.chunksize = dictitem.value
else:
pairs.append(dictitem)
self.kwargs.key_value_pairs = pairs
try:
self.kwargs = self.kwargs.compile_time_value(env)
......@@ -6748,6 +6755,10 @@ class ParallelStatNode(StatNode, ParallelNode):
def analyse_expressions(self, env):
if self.num_threads:
self.num_threads.analyse_expressions(env)
if self.chunksize:
self.chunksize.analyse_expressions(env)
self.body.analyse_expressions(env)
self.analyse_sharing_attributes(env)
......@@ -6906,21 +6917,25 @@ class ParallelStatNode(StatNode, ParallelNode):
code.putln("%s = %s;" % (entry.cname,
entry.type.cast_code(invalid_value)))
def evaluate_before_block(self, code, expr):
c = self.begin_of_parallel_control_block_point
# we need to set the owner to ourselves temporarily, as
# allocate_temp may generate a comment in the middle of our pragma
# otherwise when DebugFlags.debug_temp_code_comments is in effect
owner = c.funcstate.owner
c.funcstate.owner = c
expr.generate_evaluation_code(c)
c.funcstate.owner = owner
return expr.result()
def put_num_threads(self, code):
"""
Write self.num_threads if set as the num_threads OpenMP directive
"""
if self.num_threads is not None:
c = self.begin_of_parallel_control_block_point
# we need to set the owner to ourselves temporarily, as
# allocate_temp may generate a comment in the middle of our pragma
# otherwise when DebugFlags.debug_temp_code_comments is in effect
owner = c.funcstate.owner
c.funcstate.owner = c
self.num_threads.generate_evaluation_code(c)
c.funcstate.owner = owner
code.put(" num_threads(%s)" % (self.num_threads.result(),))
code.put(" num_threads(%s)" % self.evaluate_before_block(code,
self.num_threads))
def declare_closure_privates(self, code):
......@@ -7340,7 +7355,8 @@ class ParallelRangeNode(ParallelStatNode):
else_clause Node or None the else clause of this loop
"""
child_attrs = ['body', 'target', 'else_clause', 'args']
child_attrs = ['body', 'target', 'else_clause', 'args', 'num_threads',
'chunksize']
body = target = else_clause = args = None
......@@ -7350,9 +7366,8 @@ class ParallelRangeNode(ParallelStatNode):
nogil = None
schedule = None
num_threads = None
valid_keyword_arguments = ['schedule', 'nogil', 'num_threads']
valid_keyword_arguments = ['schedule', 'nogil', 'num_threads', 'chunksize']
def __init__(self, pos, **kwds):
super(ParallelRangeNode, self).__init__(pos, **kwds)
......@@ -7440,6 +7455,21 @@ class ParallelRangeNode(ParallelStatNode):
super(ParallelRangeNode, self).analyse_expressions(env)
if self.chunksize:
if not self.schedule:
error(self.chunksize.pos,
"Must provide schedule with chunksize")
elif self.schedule == 'runtime':
error(self.chunksize.pos,
"Chunksize not valid for the schedule runtime")
elif (self.chunksize.type.is_int and
self.chunksize.is_literal and
self.chunksize.compile_time_value(env) <= 0):
error(self.chunksize.pos, "Chunksize must not be negative")
self.chunksize = self.chunksize.coerce_to(
PyrexTypes.c_int_type, env).coerce_to_temp(env)
if self.nogil:
env.nogil = was_nogil
......@@ -7615,7 +7645,13 @@ class ParallelRangeNode(ParallelStatNode):
code.put(" %s(%s)" % (private, entry.cname))
if self.schedule:
code.put(" schedule(%s)" % self.schedule)
if self.chunksize:
chunksize = ", %s" % self.evaluate_before_block(code,
self.chunksize)
else:
chunksize = ""
code.put(" schedule(%s%s)" % (self.schedule, chunksize))
self.put_num_threads(reduction_codepoint)
......
......@@ -18,7 +18,7 @@ It currently supports OpenMP, but later on more backends might be supported.
__ nogil_
.. function:: prange([start,] stop[, step], nogil=False, schedule=None)
.. function:: prange([start,] stop[, step][, nogil=False][, schedule=None[, chunksize=None]][, num_threads=None])
This function can be used for parallel loops. OpenMP automatically
starts a thread pool and distributes the work according to the schedule
......@@ -44,21 +44,20 @@ __ nogil_
+=================+======================================================+
|static | The iteration space is divided into chunks that are |
| | approximately equal in size, and at most one chunk |
| | is distributed to each thread. |
| | is distributed to each thread, if ``chunksize`` is |
| | not given. If ``chunksize`` is specified, iterations |
| | are distributed cyclically in a static manner with a |
| | blocksize of ``chunksize``. |
+-----------------+------------------------------------------------------+
|dynamic | The iterations are distributed to threads in the team|
| | as the threads request them, with a chunk size of 1. |
| | as the threads request them, with a default chunk |
| | size of 1. |
+-----------------+------------------------------------------------------+
|guided | The iterations are distributed to threads in the team|
| | as the threads request them. The size of each chunk |
| | is proportional to the number of unassigned |
| | iterations divided by the number of threads in the |
| | team, decreasing to 1. |
+-----------------+------------------------------------------------------+
|auto | The decision regarding scheduling is delegated to the|
| | compiler and/or runtime system. The programmer gives |
| | the implementation the freedom to choose any possible|
| | mapping of iterations to threads in the team. |
| | team, decreasing to 1 (or ``chunksize`` if given). |
+-----------------+------------------------------------------------------+
|runtime | The schedule and chunk size are taken from the |
| | runtime-scheduling-variable, which can be set through|
......@@ -66,9 +65,25 @@ __ nogil_
| | ``OMP_SCHEDULE`` environment variable. |
+-----------------+------------------------------------------------------+
.. |auto | The decision regarding scheduling is delegated to the|
.. | | compiler and/or runtime system. The programmer gives |
.. | | the implementation the freedom to choose any possible|
.. | | mapping of iterations to threads in the team. |
.. +-----------------+------------------------------------------------------+
The default schedule is implementation defined. For more information consult
the OpenMP specification [#]_.
The ``num_threads`` argument indicates how many threads the team should consist of. If not given,
OpenMP will decide how many threads to use. Typically this is the number of cores available on
the machine. However, this may be controlled through the ``omp_set_num_threads()`` function, or
through the ``OMP_NUM_THREADS`` environment variable.
The ``chunksize`` argument indicates the chunksize to be used for dividing the iterations among threads.
This is only valid for ``static``, ``dynamic`` and ``guided`` scheduling, and is optional. Different chunksizes
may give substatially different performance results, depending on the schedule, the load balance it provides,
the scheduling overhead and the amount of false sharing (if any).
Example with a reduction::
from cython.parallel import prange
......@@ -91,7 +106,7 @@ __ nogil_
for i in prange(x.shape[0]):
x[i] = alpha * x[i]
.. function:: parallel
.. function:: parallel(num_threads=None)
This directive can be used as part of a ``with`` statement to execute code
sequences in parallel. This is currently useful to setup thread-local
......
......@@ -130,6 +130,21 @@ cdef int[:] dst, src = object()
for i in prange(10, nogil=True):
dst = src
for i in prange(10, nogil=True, chunksize=20):
pass
for i in prange(10, nogil=True, schedule='static', chunksize=-1):
pass
for i in prange(10, nogil=True, schedule='runtime', chunksize=10):
pass
cdef int chunksize():
return 10
for i in prange(10, nogil=True, schedule='static', chunksize=chunksize()):
pass
_ERRORS = u"""
e_cython_parallel.pyx:3:8: cython.parallel.parallel is not a module
e_cython_parallel.pyx:4:0: No such directive: cython.parallel.something
......@@ -161,4 +176,8 @@ e_cython_parallel.pyx:119:17: Cannot read reduction variable in loop body
e_cython_parallel.pyx:121:20: stop argument must be numeric
e_cython_parallel.pyx:121:19: prange() can only be used without the GIL
e_cython_parallel.pyx:131:8: Memoryview slices can only be shared in parallel sections
e_cython_parallel.pyx:133:42: Must provide schedule with chunksize
e_cython_parallel.pyx:136:62: Chunksize must not be negative
e_cython_parallel.pyx:139:62: Chunksize not valid for the schedule runtime
e_cython_parallel.pyx:145:70: Calling gil-requiring function not allowed without gil
"""
......@@ -732,3 +732,32 @@ def test_num_threads_compile():
with nogil, cython.parallel.parallel(num_threads=2):
for i in prange(10):
pass
cdef int chunksize() nogil:
return 3
def test_chunksize():
"""
>>> test_chunksize()
45
45
45
"""
cdef int i, sum
sum = 0
for i in prange(10, nogil=True, num_threads=2, schedule='static', chunksize=chunksize()):
sum += i
print sum
sum = 0
for i in prange(10, nogil=True, num_threads=6, schedule='dynamic', chunksize=chunksize()):
sum += i
print sum
sum = 0
with nogil, cython.parallel.parallel():
for i in prange(10, schedule='guided', chunksize=chunksize()):
sum += i
print sum
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment