Commit 44522e02 authored by Xavier Thompson's avatar Xavier Thompson Committed by Julien Jerphanion

Dispatch queues to workers randomly

parent adb02cd7
......@@ -21,7 +21,6 @@ cdef cypclass Worker
cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler
cdef int num_remaining_queues
# Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier)
while 1:
......@@ -66,33 +65,33 @@ cdef cypclass Worker:
lock SequentialMailBox get_queue(lock self):
# Get the next queue in the worker's list or steal one.
with wlocked self:
if not self.queues.empty():
queue = self.queues.front()
self.queues.pop_front()
return queue
if not self.queues.empty():
queue = self.queues.front()
self.queues.pop_front()
return queue
return self.steal_queue()
lock SequentialMailBox steal_queue(lock self):
# Steal a queue from another worker:
# - inspect each worker in order starting at a random offset
# - skip this worker and any worker with an empty queue list
# - skip any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
for i in range(num_workers):
index = (i + random_offset) % num_workers
index = rand() % num_workers
while True:
victim = sch.workers[index]
if victim is self:
continue
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
victim.queues.pop_back()
stolen_queue.has_worker = True
return stolen_queue
return NULL
index += 1
if index >= num_workers:
index = 0
int join(self):
# Join the worker thread.
......@@ -138,16 +137,20 @@ cdef cypclass Scheduler:
sem_destroy(&self.num_free_queues)
sem_destroy(&self.done)
void post_queue(self, lock SequentialMailBox queue):
# Add a queue to the first worker.
receiver = self.workers[rand() % self.num_workers]
void post_queue(lock self, lock SequentialMailBox queue):
cdef int num_workers, random_offset
sch = <Scheduler> <void*> self
# Add a queue to a random worker.
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
receiver = sch.workers[random_offset]
with wlocked receiver:
queue.has_worker = True
receiver.queues.push_back(queue)
queue.has_worker = True
receiver.queues.push_back(queue)
# Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1)
sch.num_pending_queues.fetch_add(1)
# Signal that a queue is available.
sem_post(&self.num_free_queues)
sem_post(&sch.num_free_queues)
void finish(lock self):
# Wait until there is no more work.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment