lwan_coro.pyx 4.45 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=sysconf(_SC_NPROCESSORS_ONLN)) nogil:
  memset(scheduler, 0, sizeof(scheduler[0]))

  scheduler.num_workers = num_workers
  scheduler.is_done = False

  sem_init(&scheduler.num_sleeping_workers, 0, 0)
  
  scheduler.workers = <worker_t *>calloc(scheduler.num_workers, sizeof(worker_t))
  if not scheduler.workers:
    fprintf(stderr, "Could not allocate memory for the workers")
    exit(-1)

  if pthread_barrier_init(&scheduler.barrier, NULL, scheduler.num_workers + 1):
    fprintf(stderr, "Could not allocate memory for the barrier")
    exit(-1)

  cdef unsigned short i
  for i in range(scheduler.num_workers):
    worker_init(&scheduler.workers[i], scheduler)

cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg=NULL) nogil:
  cdef worker_t *main_worker = scheduler.workers
  main_worker.deque.push_back(coro_new(&main_worker.switcher, task, arg))

cdef void scheduler_run(scheduler_t *scheduler) nogil:
  # Wait for all workers and the main thread to be ready.
  pthread_barrier_wait(&scheduler.barrier)

  cdef int num_sleeping_workers
  while not scheduler.is_done:
    sem_getvalue(&scheduler.num_sleeping_workers, &num_sleeping_workers)
    if num_sleeping_workers == scheduler.num_workers:
      scheduler.is_done = True

cdef void scheduler_destroy(scheduler_t *scheduler) nogil:
  cdef unsigned short i
  for i in range(scheduler.num_workers):
    pthread_join(scheduler.workers[i].thread, NULL)
    worker_destroy(&scheduler.workers[i])

  free(scheduler.workers)

  sem_destroy(&scheduler.num_sleeping_workers)

  pthread_barrier_destroy(&scheduler.barrier)
 
cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil:
  memset(worker, 0, sizeof(worker[0]))

  worker.scheduler = scheduler

  worker.deque = new coro_deque_t()
  if not worker.deque:
    fprintf(stderr, "Could not allocate memory for the deque\n")
    exit(-1)

  pthread_mutex_init(&worker.mutex, NULL)

  cdef pthread_attr_t attr
  if pthread_attr_init(&attr):
    fprintf(stderr, "pthread_attr_init()\n")
    exit(-1)
  
  if pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE):
    fprintf(stderr, "pthread_attr_setdetachstate()\n")
    exit(-1)

  if pthread_create(&worker.thread, &attr, worker_run, worker):
    fprintf(stderr, "pthread_create()\n")
    exit(-1)

  if pthread_attr_destroy(&attr):
    fprintf(stderr, "pthread_attr_destroy()\n")
    exit(-1)

cdef void *worker_run(void *arg) nogil:
  cdef:
    worker_t *worker = <worker_t *>arg
    coro_t *coroutine = NULL

  # Wait for all workers and the main thread to be ready.
  pthread_barrier_wait(&worker.scheduler.barrier)

  while not worker.scheduler.is_done:
    coroutine = worker_coro_get(worker)
    if coroutine:
      if coro_resume(coroutine) == coro_yield_value.MAY_RESUME:
        pthread_mutex_lock(&worker.mutex)
        worker.deque.push_back(coroutine)
        pthread_mutex_unlock(&worker.mutex)
      else:
        coro_free(coroutine)
    else:
      sem_post(&worker.scheduler.num_sleeping_workers)
      usleep(1000000) # 1 ms (arbitrary)
      sem_wait(&worker.scheduler.num_sleeping_workers)

  pthread_exit(NULL)

cdef coro_t *worker_coro_get(worker_t *worker) nogil:
  cdef coro_t *coroutine = NULL

  pthread_mutex_lock(&worker.mutex)
  if worker.deque.empty():
    pthread_mutex_unlock(&worker.mutex)
    coroutine = worker_coro_steal(worker)
  else:
    coroutine = worker.deque.front()
    worker.deque.pop_front()
    pthread_mutex_unlock(&worker.mutex)

  return coroutine

cdef coro_t *worker_coro_steal(worker_t *thief) nogil:
  cdef:
    scheduler_t *scheduler = thief.scheduler
    worker_t *victim = NULL
    coro_t *stolen_coroutine = NULL
    unsigned short i, current_index
    unsigned short random_offset = rand() % scheduler.num_workers

  for i in range(scheduler.num_workers):
    current_index = (i + random_offset) % scheduler.num_workers
    victim = scheduler.workers + current_index

    if victim.deque == thief.deque:
      continue

    pthread_mutex_lock(&victim.mutex)
    if not victim.deque.empty():
      stolen_coroutine = victim.deque.back()
      victim.deque.pop_back()
      coro_update(stolen_coroutine, &thief.switcher) # Update coroutine's data about its parent worker/thread.
      pthread_mutex_unlock(&victim.mutex)
      return stolen_coroutine
    pthread_mutex_unlock(&victim.mutex)
  
  return NULL

cdef void worker_destroy(worker_t *worker) nogil:
  pthread_mutex_destroy(&worker.mutex)
  del worker.deque