Commit 27426854 authored by Julien Jerphanion's avatar Julien Jerphanion

Add queue to workers in round robin

parent 88933876
......@@ -104,12 +104,16 @@ cdef cypclass Scheduler:
pthread_barrier_t barrier
sem_t num_free_queues
atomic[int] num_pending_queues
int next_worker
int num_workers
sem_t done
volatile bint is_done
lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
self.next_worker = 0
sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0)
self.num_pending_queues.store(0)
......@@ -137,11 +141,12 @@ cdef cypclass Scheduler:
sem_destroy(&self.done)
void post_queue(self, lock SequentialMailBox queue):
# Add a queue to the first worker.
main_worker = self.workers[0]
with wlocked main_worker:
# Add a queue to workers in round robin
worker = self.workers[<int> self.next_worker]
self.next_worker = (self.next_worker + 1) % self.num_workers
with wlocked worker:
queue.has_worker = True
main_worker.queues.push_back(queue)
worker.queues.push_back(queue)
# Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1)
# Signal that a queue is available.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment