Commit 4d309ddd authored by Julien Jerphanion's avatar Julien Jerphanion

Add queues to workers randomly

parent 4c03e3ba
...@@ -106,10 +106,12 @@ cdef cypclass Scheduler: ...@@ -106,10 +106,12 @@ cdef cypclass Scheduler:
atomic[int] num_pending_queues atomic[int] num_pending_queues
sem_t done sem_t done
volatile bint is_done volatile bint is_done
int num_workers
lock Scheduler __new__(alloc, int num_workers=0): lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc() self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN) if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0) sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0) sem_init(&self.done, 0, 0)
self.num_pending_queues.store(0) self.num_pending_queues.store(0)
...@@ -138,10 +140,10 @@ cdef cypclass Scheduler: ...@@ -138,10 +140,10 @@ cdef cypclass Scheduler:
void post_queue(self, lock SequentialMailBox queue): void post_queue(self, lock SequentialMailBox queue):
# Add a queue to the first worker. # Add a queue to the first worker.
main_worker = self.workers[0] receiver = self.workers[rand() % self.num_workers]
with wlocked main_worker: with wlocked receiver:
queue.has_worker = True queue.has_worker = True
main_worker.queues.push_back(queue) receiver.queues.push_back(queue)
# Increment the number of non-completed queues. # Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1) self.num_pending_queues.fetch_add(1)
# Signal that a queue is available. # Signal that a queue is available.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment