diff --git a/Cython/Compiler/Nodes.py b/Cython/Compiler/Nodes.py index 8eaa72f75cdc1a83202a1075490192bd5ede2d5d..67461d5cef2bbcdf3f6fab2cc3b5769dd46d805b 100644 --- a/Cython/Compiler/Nodes.py +++ b/Cython/Compiler/Nodes.py @@ -5770,36 +5770,101 @@ class ParallelStatNode(StatNode, ParallelNode): def __init__(self, pos, **kwargs): super(ParallelStatNode, self).__init__(pos, **kwargs) + + # All assignments in this scope self.assignments = kwargs.get('assignments') or {} - # Insertion point before the outermost parallel section - self.before_parallel_section_point = None - # Insertion point after the outermost parallel section - self.post_parallel_section_point = None - def analyse_expressions(self, env): - self.body.analyse_expressions(env) + # All seen closure cnames and their temporary cnames + self.seen_closure_vars = set() + + # Dict of variables that should be declared (first|last|)private or + # reduction { Entry: op }. If op is not None, it's a reduction. + self.privates = {} def analyse_declarations(self, env): - super(ParallelStatNode, self).analyse_declarations(env) self.body.analyse_declarations(env) - def lookup_assignment(self, entry): + def analyse_expressions(self, env): + self.body.analyse_expressions(env) + + def analyse_sharing_attributes(self, env): """ - Return an assignment's pos and operator. If the parent has the - assignment, return the parent's assignment, otherwise our own. + Analyse the privates for this block and set them in self.privates. + This should be called in a post-order fashion during the + analyse_expressions phase """ - parent_assignment = self.parent and self.parent.lookup_assignment(entry) - return parent_assignment or self.assignments.get(entry) + for entry, (pos, op) in self.assignments.iteritems(): + if self.is_private(entry): + self.propagate_var_privatization(entry, op) def is_private(self, entry): """ True if this scope should declare the variable private, lastprivate or reduction. """ - parent_or_our_entry = self.lookup_assignment(entry) - our_entry = self.assignments.get(entry) + return (self.is_parallel or + (self.parent and entry not in self.parent.privates)) + + def propagate_var_privatization(self, entry, op): + """ + Propagate the sharing attributes of a variable. If the privatization is + determined by a parent scope, done propagate further. + + If we are a prange, we propagate our sharing attributes outwards to + other pranges. If we are a prange in parallel block and the parallel + block does not determine the variable private, we propagate to the + parent of the parent. Recursion stops at parallel blocks, as they have + no concept of lastprivate or reduction. + + So the following cases propagate: + + sum is a reduction for all loops: + + for i in prange(n): + for j in prange(n): + for k in prange(n): + sum += i * j * k + + sum is a reduction for both loops, local_var is private to the + parallel with block: + + for i in prange(n): + with parallel: + local_var = ... # private to the parallel + for j in prange(n): + sum += i * j + + This does not propagate to the outermost prange: + + #pragma omp parallel for lastprivate(i) + for i in prange(n): + + #pragma omp parallel private(j, sum) + with parallel: + + #pragma omp parallel + with parallel: + + #pragma omp for lastprivate(j) reduction(+:sum) + for j in prange(n): + sum += i + + # sum and j are well-defined here + + # sum and j are undefined here + + # sum and j are undefined here + """ + self.privates[entry] = op + if self.is_prange: + if not self.is_parallel and entry not in self.parent.assignments: + # Parent is a parallel with block + parent = self.parent.parent + else: + parent = self.parent - return self.is_parallel or parent_or_our_entry == our_entry + if parent: + parent.propagate_var_privatization(entry, op) def _allocate_closure_temp(self, code, entry): """ @@ -5809,11 +5874,19 @@ class ParallelStatNode(StatNode, ParallelNode): if self.parent: return self.parent._allocate_closure_temp(code, entry) + if entry.cname in self.seen_closure_vars: + return entry.cname + cname = code.funcstate.allocate_temp(entry.type, False) + + # Add both the actual cname and the temp cname, as the actual cname + # will be replaced with the temp cname on the entry + self.seen_closure_vars.add(entry.cname) + self.seen_closure_vars.add(cname) + self.modified_entries.append((entry, entry.cname)) code.putln("%s = %s;" % (cname, entry.cname)) entry.cname = cname - return cname def declare_closure_privates(self, code): """ @@ -5827,19 +5900,18 @@ class ParallelStatNode(StatNode, ParallelNode): after the parallel section. This kind of copying should be done only in the outermost parallel section. """ - self.privates = {} self.modified_entries = [] for entry, (pos, op) in self.assignments.iteritems(): - cname = entry.cname if entry.from_closure or entry.in_closure: - cname = self._allocate_closure_temp(code, entry) - - if self.is_private(entry): - self.privates[cname] = op + self._allocate_closure_temp(code, entry) def release_closure_privates(self, code): - "Release any temps used for variables in scope objects" + """ + Release any temps used for variables in scope objects. As this is the + outermost parallel block, we don't need to delete the cnames from + self.seen_closure_vars + """ for entry, original_cname in self.modified_entries: code.putln("%s = %s;" % (original_cname, entry.cname)) code.funcstate.release_temp(entry.cname) @@ -5853,15 +5925,23 @@ class ParallelWithBlockNode(ParallelStatNode): nogil_check = None + def analyse_expressions(self, env): + super(ParallelWithBlockNode, self).analyse_expressions(env) + self.analyse_sharing_attributes(env) + def generate_execution_code(self, code): self.declare_closure_privates(code) code.putln("#ifdef _OPENMP") code.put("#pragma omp parallel ") - code.putln(' '.join(["private(%s)" % e.cname - for e in self.assignments - if self.is_private(e)])) - code.putln("#endif") + + if self.privates: + code.put( + 'private(%s)' % ', '.join([e.cname for e in self.privates])) + + code.putln("") + code.putln("#endif /* _OPENMP */") + code.begin_block() self.body.generate_execution_code(code) code.end_block() @@ -5930,11 +6010,18 @@ class ParallelRangeNode(ParallelStatNode): return self.target.analyse_target_types(env) - self.index_type = self.target.type - if self.index_type.is_pyobject: - # nogil_check will catch this, for now, assume a valid type + if not self.target.type.is_numeric: + # Not a valid type, assume one for now anyway + + if not self.target.type.is_pyobject: + # nogil_check will catch the is_pyobject case + error(self.target.pos, + "Must be of numeric type, not %s" % self.target.type) + self.index_type = PyrexTypes.c_py_ssize_t_type + else: + self.index_type = self.target.type # Setup start, stop and step, allocating temps if needed self.names = 'start', 'stop', 'step' @@ -5957,10 +6044,20 @@ class ParallelRangeNode(ParallelStatNode): self.index_type = PyrexTypes.widest_numeric_type( self.index_type, node.type) - self.body.analyse_expressions(env) + super(ParallelRangeNode, self).analyse_expressions(env) if self.else_clause is not None: self.else_clause.analyse_expressions(env) + # Although not actually an assignment in this scope, it should be + # treated as such to ensure it is unpacked if a closure temp, and to + # ensure lastprivate behaviour and propagation. If the target index is + # not a NameNode, it won't have an entry, and an error was issued by + # ParallelRangeTransform + if hasattr(self.target, 'entry'): + self.assignments[self.target.entry] = self.target.pos, None + + self.analyse_sharing_attributes(env) + def nogil_check(self, env): names = 'start', 'stop', 'step', 'target' nodes = self.start, self.stop, self.step, self.target @@ -6009,9 +6106,7 @@ class ParallelRangeNode(ParallelStatNode): 4) release our temps and write back any private closure variables """ - # Ensure to unpack the target index variable if it's a closure temp - self.assignments[self.target.entry] = self.target.pos, None - self.declare_closure_privates(code) #self.insertion_point(code)) + self.declare_closure_privates(code) # This can only be a NameNode target_index_cname = self.target.entry.cname @@ -6070,8 +6165,6 @@ class ParallelRangeNode(ParallelStatNode): code.end_block() def generate_loop(self, code, fmt_dict): - target_index_cname = fmt_dict['target'] - code.putln("#ifdef _OPENMP") if not self.is_parallel: @@ -6079,23 +6172,18 @@ class ParallelRangeNode(ParallelStatNode): else: code.put("#pragma omp parallel for") - for private, op in self.privates.iteritems(): + for entry, op in self.privates.iteritems(): # Don't declare the index variable as a reduction - if private != target_index_cname: - if op and op in "+*-&^|": - code.put(" reduction(%s:%s)" % (op, private)) - else: - code.put(" lastprivate(%s)" % private) + if op and op in "+*-&^|" and entry != self.target.entry: + code.put(" reduction(%s:%s)" % (op, entry.cname)) + else: + code.put(" lastprivate(%s)" % entry.cname) if self.schedule: code.put(" schedule(%s)" % self.schedule) - if self.is_parallel or self.target.entry not in self.parent.assignments: - code.putln(" lastprivate(%s)" % target_index_cname) - else: - code.putln("") - - code.putln("#endif") + code.putln("") + code.putln("#endif /* _OPENMP */") code.put("for (%(i)s = 0; %(i)s < %(nsteps)s; %(i)s++)" % fmt_dict) code.begin_block() @@ -6104,6 +6192,8 @@ class ParallelRangeNode(ParallelStatNode): code.end_block() + + #------------------------------------------------------------------------------------ # # Runtime support code diff --git a/Cython/Compiler/ParseTreeTransforms.py b/Cython/Compiler/ParseTreeTransforms.py index c736c1cb940436b8624f3f0af2f80b3a4533a843..644eb3e1c2ac0c4c2e2e331abe0561a78e27b28e 100644 --- a/Cython/Compiler/ParseTreeTransforms.py +++ b/Cython/Compiler/ParseTreeTransforms.py @@ -978,6 +978,10 @@ class ParallelRangeTransform(CythonTransform, SkipDeclarations): # Keep track of whether we are in a parallel range section in_prange = False + # One of 'prange' or 'with parallel'. This is used to disallow closely + # nested 'with parallel:' blocks + state = None + directive_to_node = { u"cython.parallel.parallel": Nodes.ParallelWithBlockNode, # u"cython.parallel.threadsavailable": ExprNodes.ParallelThreadsAvailableNode, @@ -1070,9 +1074,16 @@ class ParallelRangeTransform(CythonTransform, SkipDeclarations): # There was an error, stop here and now return None + if self.state == 'parallel with': + error(node.manager.pos, + "Closely nested 'with parallel:' blocks are disallowed") + + self.state = 'parallel with' self.visit(node.body) + self.state = None newnode = Nodes.ParallelWithBlockNode(node.pos, body=node.body) + else: newnode = node @@ -1088,6 +1099,7 @@ class ParallelRangeTransform(CythonTransform, SkipDeclarations): was_in_prange = self.in_prange self.in_prange = isinstance(node.iterator.sequence, Nodes.ParallelRangeNode) + previous_state = self.state if self.in_prange: # This will replace the entire ForInStatNode, so copy the @@ -1104,11 +1116,13 @@ class ParallelRangeTransform(CythonTransform, SkipDeclarations): error(node.target.pos, "Can only iterate over an iteration variable") - self.visit(node.body) + self.state = 'prange' + self.visit(node.body) + self.state = previous_state self.in_prange = was_in_prange - self.visit(node.else_clause) + self.visit(node.else_clause) return node def ensure_not_in_prange(name): diff --git a/tests/errors/e_cython_parallel.pyx b/tests/errors/e_cython_parallel.pyx index a715a2164767bb9dbd2466cdcfaf01bef7045915..beb958066c9a867b4ba1a5f5c05c6202e296d841 100644 --- a/tests/errors/e_cython_parallel.pyx +++ b/tests/errors/e_cython_parallel.pyx @@ -30,6 +30,12 @@ with nogil, cython.parallel.parallel: for x[1] in prange(10): pass + for x in prange(10): + pass + + with cython.parallel.parallel: + pass + _ERRORS = u""" e_cython_parallel.pyx:3:8: cython.parallel.parallel is not a module e_cython_parallel.pyx:4:0: No such directive: cython.parallel.something @@ -41,4 +47,6 @@ e_cython_parallel.pyx:18:19: Invalid schedule argument to prange: 'invalid_sched e_cython_parallel.pyx:21:5: The parallel section may only be used without the GIL e_cython_parallel.pyx:27:10: target may not be a Python object as we don't have the GIL e_cython_parallel.pyx:30:9: Can only iterate over an iteration variable +e_cython_parallel.pyx:33:10: Must be of numeric type, not int * +e_cython_parallel.pyx:36:24: Closely nested 'with parallel:' blocks are disallowed """ diff --git a/tests/run/parallel.pyx b/tests/run/parallel.pyx index 79dfed6314e579928c89e3860cc1e2f4ad65fcfe..93cde580e4835c36e014e8a27e81c9d4f92c1375 100644 --- a/tests/run/parallel.pyx +++ b/tests/run/parallel.pyx @@ -43,30 +43,25 @@ def test_descending_prange(): return sum -def test_nested_prange(): +def test_propagation(): """ - Reduction propagation is not (yet) supported. - - >>> test_nested_prange() - 50 + >>> test_propagation() + (9, 9, 9, 9, 450, 450) """ - cdef int i, j - cdef int sum = 0 - - for i in prange(5, nogil=True): - for j in prange(5): - sum += i - - # The value of sum is undefined here + cdef int i, j, x, y + cdef int sum1 = 0, sum2 = 0 - sum = 0 + for i in prange(10, nogil=True): + for j in prange(10): + sum1 += i - for i in prange(5, nogil=True): - for j in prange(5): - sum += i - sum += 0 + with nogil, cython.parallel.parallel: + for x in prange(10): + with cython.parallel.parallel: + for y in prange(10): + sum2 += y - return sum + return i, j, x, y, sum1, sum2 def test_parallel(): @@ -225,11 +220,11 @@ def test_parallel_numpy_arrays(): print i return - x = numpy.zeros(10, dtype=np.int) + x = numpy.zeros(10, dtype=numpy.int) for i in prange(x.shape[0], nogil=True): x[i] = i - 5 for i in x: - print x + print i