Commit ae0ba2bd authored by test's avatar test

Update the runtime. It fixes a bug (accoring to the Cython+ developers).

parent bb6410d3
# Cython+ Runtime
A minimal yet efficient runtime for Cython+.
This code was written by Xavier Thompson <xavier.thompson@nexedi.com>
## Using the runtime
In your Cython+ projects, you can add this repository as a [git submodule](https://git-scm.com/book/en/v2/Git-Tools-Submodules)
at the root of your project:
```sh
git submodule add https://lab.nexedi.com/cython-plus/runtime.git
git commit -m "Register the runtime as a git submodule"
```
If you were previously vendoring the runtime, you need to remove the associated directory from the repository cache first:
```sh
git rm --cached -r runtime/
git submodule add https://lab.nexedi.com/cython-plus/runtime.git
git commit -m "Unvendor the runtime and instead register it as a git submodule"
```
This way, you can version your code with a specific version of the runtime and update it when needed.
......@@ -21,7 +21,6 @@ cdef cypclass Worker
cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler
cdef int num_remaining_queues
# Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier)
while 1:
......@@ -45,6 +44,7 @@ cdef inline void * worker_function(void * arg) nogil:
# Discard the empty queue and continue the main loop.
continue
# The queue is not empty: reinsert it in this worker's queues.
with wlocked worker:
worker.queues.push_back(queue)
# Signal that the queue is available.
sem_post(&sch.num_free_queues)
......@@ -75,24 +75,23 @@ cdef cypclass Worker:
lock SequentialMailBox steal_queue(lock self):
# Steal a queue from another worker:
# - inspect each worker in order starting at a random offset
# - skip this worker and any worker with an empty queue list
# - skip any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
for i in range(num_workers):
index = (i + random_offset) % num_workers
index = rand() % num_workers
while True:
victim = sch.workers[index]
if victim is self:
continue
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
victim.queues.pop_back()
stolen_queue.has_worker = True
return stolen_queue
return NULL
index += 1
if index >= num_workers:
index = 0
int join(self):
# Join the worker thread.
......@@ -106,10 +105,12 @@ cdef cypclass Scheduler:
atomic[int] num_pending_queues
sem_t done
volatile bint is_done
int num_workers
lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0)
self.num_pending_queues.store(0)
......@@ -136,16 +137,20 @@ cdef cypclass Scheduler:
sem_destroy(&self.num_free_queues)
sem_destroy(&self.done)
void post_queue(self, lock SequentialMailBox queue):
# Add a queue to the first worker.
main_worker = self.workers[0]
with wlocked main_worker:
void post_queue(lock self, lock SequentialMailBox queue):
cdef int num_workers, random_offset
sch = <Scheduler> <void*> self
# Add a queue to a random worker.
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
receiver = sch.workers[random_offset]
with wlocked receiver:
queue.has_worker = True
main_worker.queues.push_back(queue)
receiver.queues.push_back(queue)
# Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1)
sch.num_pending_queues.fetch_add(1)
# Signal that a queue is available.
sem_post(&self.num_free_queues)
sem_post(&sch.num_free_queues)
void finish(lock self):
# Wait until there is no more work.
......@@ -217,3 +222,47 @@ cdef cypclass BatchMailBox(SequentialMailBox):
cdef inline ActhonResultInterface NullResult() nogil:
return NULL
# Taken from:
# https://lab.nexedi.com/nexedi/cython/blob/3.0a6-cypclass/tests/run/cypclass_acthon.pyx#L66
cdef cypclass WaitResult(ActhonResultInterface):
union result_t:
int int_val
void* ptr
result_t result
sem_t semaphore
__init__(self):
self.result.ptr = NULL
sem_init(&self.semaphore, 0, 0)
__dealloc__(self):
sem_destroy(&self.semaphore)
@staticmethod
ActhonResultInterface construct():
return WaitResult()
void pushVoidStarResult(self, void* result):
self.result.ptr = result
sem_post(&self.semaphore)
void pushIntResult(self, int result):
self.result.int_val = result
sem_post(&self.semaphore)
result_t _getRawResult(const self):
# We must ensure a result exists, but we can let others access it immediately
# The cast here is a way of const-casting (we're modifying the semaphore in a const method)
sem_wait(<sem_t*> &self.semaphore)
sem_post(<sem_t*> &self.semaphore)
return self.result
void* getVoidStarResult(const self):
res = self._getRawResult()
return res.ptr
int getIntResult(const self):
res = self._getRawResult()
return res.int_val
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment