Commit e519bcc6 authored by Julien Jerphanion's avatar Julien Jerphanion

[WIP] Introduce counter for synchronisation

parent a132039e
...@@ -5,7 +5,7 @@ import numpy as np ...@@ -5,7 +5,7 @@ import numpy as np
np.import_array() np.import_array()
from runtime.runtime cimport BatchMailBox, NullResult, Scheduler from runtime.runtime cimport BatchMailBox, NullResult, Scheduler, WaitResult
from libc.stdio cimport printf from libc.stdio cimport printf
from libc.stdlib cimport malloc, free from libc.stdlib cimport malloc, free
...@@ -68,10 +68,12 @@ cdef cypclass Node activable: ...@@ -68,10 +68,12 @@ cdef cypclass Node activable:
D_t * point D_t * point
I_t n_dims I_t n_dims
active Counter counter
active Node left active Node left
active Node right active Node right
__init__(self): __init__(self, active Counter counter):
self.counter = counter
self._active_result_class = NullResult self._active_result_class = NullResult
self._active_queue_class = consume BatchMailBox(scheduler) self._active_queue_class = consume BatchMailBox(scheduler)
self.left = NULL self.left = NULL
...@@ -94,13 +96,15 @@ cdef cypclass Node activable: ...@@ -94,13 +96,15 @@ cdef cypclass Node activable:
self.n_dims = n_dims self.n_dims = n_dims
if (depth < 0) or (end - start <= 1): if (depth < 0) or (end - start <= 1):
printf("build_node: Adding %d\n", end - start)
self.counter.add(NULL, end - start)
return return
partition_node_indices(data_ptr, indices_ptr, start, mid, end, dim, n_dims) partition_node_indices(data_ptr, indices_ptr, start, mid, end, dim, n_dims)
self.point = data_ptr + mid self.point = data_ptr + mid
self.left = consume Node() self.left = consume Node(self.counter)
self.right = consume Node() self.right = consume Node(self.counter)
self.left.build_node(NULL, self.left.build_node(NULL,
data_ptr, indices_ptr, data_ptr, indices_ptr,
...@@ -111,6 +115,23 @@ cdef cypclass Node activable: ...@@ -111,6 +115,23 @@ cdef cypclass Node activable:
depth - 1, n_dims, next_dim, depth - 1, n_dims, next_dim,
mid, end) mid, end)
cdef cypclass Counter activable:
I_t n
__init__(self):
self.n = 0
self._active_result_class = WaitResult.construct
self._active_queue_class = consume BatchMailBox(scheduler)
void add(self, I_t value):
printf("Counter: Adding %d\n", value)
self.n += value
printf("Counter: Current val: %d\n", self.n)
I_t value(self):
printf("Counter: Getting %d\n", self.n)
return self.n
cdef cypclass KDTree: cdef cypclass KDTree:
"""A KDTree based on asynchronous and parallel computations. """A KDTree based on asynchronous and parallel computations.
...@@ -129,6 +150,7 @@ cdef cypclass KDTree: ...@@ -129,6 +150,7 @@ cdef cypclass KDTree:
I_t depth # max_depth of the tree (to be unified with leaf_size) I_t depth # max_depth of the tree (to be unified with leaf_size)
active Node root active Node root
active Counter initialised_vec_counter
D_t *data_ptr D_t *data_ptr
I_t *indices_ptr I_t *indices_ptr
...@@ -159,10 +181,12 @@ cdef cypclass KDTree: ...@@ -159,10 +181,12 @@ cdef cypclass KDTree:
# with scheduler: # with scheduler:
# self.root = ... # self.root = ...
# ``` # ```
cdef I_t initialised
global scheduler global scheduler
scheduler = Scheduler() scheduler = Scheduler()
self.root = consume Node() self.initialised_vec_counter = active Counter()
self.root = consume Node(self.initialised_vec_counter)
if self.root is NULL: if self.root is NULL:
printf("Error consuming node\n") printf("Error consuming node\n")
...@@ -175,6 +199,14 @@ cdef cypclass KDTree: ...@@ -175,6 +199,14 @@ cdef cypclass KDTree:
self.indices_ptr, self.indices_ptr,
depth, n_dims=d, dim=0, start=0, end=n) depth, n_dims=d, dim=0, start=0, end=n)
initialised = self.initialised_vec_counter.value(NULL).getIntResult()
printf("Updated: %d / %d\n", initialised, self.n)
while(initialised < self.n):
initialised = self.initialised_vec_counter.value(NULL).getIntResult()
printf("Updated: %d / %d\n", initialised, self.n)
printf("!Updated: %d / %d\n", initialised, self.n)
scheduler.finish() scheduler.finish()
......
...@@ -217,3 +217,47 @@ cdef cypclass BatchMailBox(SequentialMailBox): ...@@ -217,3 +217,47 @@ cdef cypclass BatchMailBox(SequentialMailBox):
cdef inline ActhonResultInterface NullResult() nogil: cdef inline ActhonResultInterface NullResult() nogil:
return NULL return NULL
# Taken from:
# https://lab.nexedi.com/nexedi/cython/blob/3.0a6-cypclass/tests/run/cypclass_acthon.pyx#L66
cdef cypclass WaitResult(ActhonResultInterface):
union result_t:
int int_val
void* ptr
result_t result
sem_t semaphore
__init__(self):
self.result.ptr = NULL
sem_init(&self.semaphore, 0, 0)
__dealloc__(self):
sem_destroy(&self.semaphore)
@staticmethod
ActhonResultInterface construct():
return WaitResult()
void pushVoidStarResult(self, void* result):
self.result.ptr = result
sem_post(&self.semaphore)
void pushIntResult(self, int result):
self.result.int_val = result
sem_post(&self.semaphore)
result_t _getRawResult(const self):
# We must ensure a result exists, but we can let others access it immediately
# The cast here is a way of const-casting (we're modifying the semaphore in a const method)
sem_wait(<sem_t*> &self.semaphore)
sem_post(<sem_t*> &self.semaphore)
return self.result
void* getVoidStarResult(const self):
res = self._getRawResult()
return res.ptr
int getIntResult(const self):
res = self._getRawResult()
return res.int_val
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment