cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=sysconf(_SC_NPROCESSORS_ONLN)) nogil: memset(scheduler, 0, sizeof(scheduler[0])) scheduler.num_workers = num_workers scheduler.is_done = False sem_init(&scheduler.num_sleeping_workers, 0, 0) scheduler.workers = calloc(scheduler.num_workers, sizeof(worker_t)) if not scheduler.workers: fprintf(stderr, "Could not allocate memory for the workers") exit(-1) if pthread_barrier_init(&scheduler.barrier, NULL, scheduler.num_workers + 1): fprintf(stderr, "Could not allocate memory for the barrier") exit(-1) cdef unsigned short i for i in range(scheduler.num_workers): worker_init(&scheduler.workers[i], scheduler) cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg=NULL) nogil: cdef worker_t *main_worker = scheduler.workers main_worker.deque.push_back(coro_new(&main_worker.switcher, task, arg)) cdef void scheduler_run(scheduler_t *scheduler) nogil: # Wait for all workers and the main thread to be ready. pthread_barrier_wait(&scheduler.barrier) cdef int num_sleeping_workers while not scheduler.is_done: sem_getvalue(&scheduler.num_sleeping_workers, &num_sleeping_workers) if num_sleeping_workers == scheduler.num_workers: scheduler.is_done = True cdef void scheduler_destroy(scheduler_t *scheduler) nogil: cdef unsigned short i for i in range(scheduler.num_workers): pthread_join(scheduler.workers[i].thread, NULL) worker_destroy(&scheduler.workers[i]) free(scheduler.workers) sem_destroy(&scheduler.num_sleeping_workers) pthread_barrier_destroy(&scheduler.barrier) cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil: memset(worker, 0, sizeof(worker[0])) worker.scheduler = scheduler worker.deque = new coro_deque_t() if not worker.deque: fprintf(stderr, "Could not allocate memory for the deque\n") exit(-1) pthread_mutex_init(&worker.mutex, NULL) cdef pthread_attr_t attr if pthread_attr_init(&attr): fprintf(stderr, "pthread_attr_init()\n") exit(-1) if pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE): fprintf(stderr, "pthread_attr_setdetachstate()\n") exit(-1) if pthread_create(&worker.thread, &attr, worker_run, worker): fprintf(stderr, "pthread_create()\n") exit(-1) if pthread_attr_destroy(&attr): fprintf(stderr, "pthread_attr_destroy()\n") exit(-1) cdef void *worker_run(void *arg) nogil: cdef: worker_t *worker = arg coro_t *coroutine = NULL # Wait for all workers and the main thread to be ready. pthread_barrier_wait(&worker.scheduler.barrier) while not worker.scheduler.is_done: coroutine = worker_coro_get(worker) if coroutine: if coro_resume(coroutine) == coro_yield_value.MAY_RESUME: pthread_mutex_lock(&worker.mutex) worker.deque.push_back(coroutine) pthread_mutex_unlock(&worker.mutex) else: coro_free(coroutine) else: sem_post(&worker.scheduler.num_sleeping_workers) usleep(1000000) # 1 ms (arbitrary) sem_wait(&worker.scheduler.num_sleeping_workers) pthread_exit(NULL) cdef coro_t *worker_coro_get(worker_t *worker) nogil: cdef coro_t *coroutine = NULL pthread_mutex_lock(&worker.mutex) if worker.deque.empty(): pthread_mutex_unlock(&worker.mutex) coroutine = worker_coro_steal(worker) else: coroutine = worker.deque.front() worker.deque.pop_front() pthread_mutex_unlock(&worker.mutex) return coroutine cdef coro_t *worker_coro_steal(worker_t *thief) nogil: cdef: scheduler_t *scheduler = thief.scheduler worker_t *victim = NULL coro_t *stolen_coroutine = NULL unsigned short i, current_index unsigned short random_offset = rand() % scheduler.num_workers for i in range(scheduler.num_workers): current_index = (i + random_offset) % scheduler.num_workers victim = scheduler.workers + current_index if victim.deque == thief.deque: continue pthread_mutex_lock(&victim.mutex) if not victim.deque.empty(): stolen_coroutine = victim.deque.back() victim.deque.pop_back() coro_update(stolen_coroutine, &thief.switcher) # Update coroutine's data about its parent worker/thread. pthread_mutex_unlock(&victim.mutex) return stolen_coroutine pthread_mutex_unlock(&victim.mutex) return NULL cdef void worker_destroy(worker_t *worker) nogil: pthread_mutex_destroy(&worker.mutex) del worker.deque