......@@ -21,7 +21,6 @@ cdef cypclass Worker
cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler
cdef int num_remaining_queues
# Wait until all the workers are ready.
while 1:
......@@ -66,33 +65,32 @@ cdef cypclass Worker:
lock SequentialMailBox get_queue(lock self):
# Get the next queue in the worker's list or steal one.
with wlocked self:
if not self.queues.empty():
queue = self.queues.front()
return queue
if not self.queues.empty():
queue = self.queues.front()
return queue
return self.steal_queue()
lock SequentialMailBox steal_queue(lock self):
# Steal a queue from another worker:
# - inspect each worker in order starting at a random offset
# - skip this worker and any worker with an empty queue list
# - skip any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
for i in range(num_workers):
index = (i + random_offset) % num_workers
index = rand() % num_workers
while True:
victim = sch.workers[index]
if victim is self:
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
stolen_queue.has_worker = True
return stolen_queue
return NULL
index += 1
if index >= num_workers:
index = 0
int join(self):
# Join the worker thread.
......@@ -106,10 +104,12 @@ cdef cypclass Scheduler:
atomic[int] num_pending_queues
sem_t done
volatile bint is_done
int num_workers
lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0)
......@@ -136,16 +136,20 @@ cdef cypclass Scheduler:
void post_queue(self, lock SequentialMailBox queue):
# Add a queue to the first worker.
main_worker = self.workers[0]
with wlocked main_worker:
queue.has_worker = True
void post_queue(lock self, lock SequentialMailBox queue):
cdef int num_workers, random_offset
sch = <Scheduler> <void*> self
# Add a queue to a random worker.
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
receiver = sch.workers[random_offset]
with wlocked receiver:
queue.has_worker = True
# Increment the number of non-completed queues.
# Signal that a queue is available.
void finish(lock self):
# Wait until there is no more work.
