Commit 7553d08b authored by Boxiang Sun's avatar Boxiang Sun

Split the test code and the wrapper

parent cd02a76e
from libc.stdlib cimport exit, calloc, free, rand
from libc.stdio cimport printf, fprintf, stderr
from libc.string cimport memset
from posix.unistd cimport usleep, sysconf
cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to no be included in "posix.unistd".
from libcpp.deque cimport deque
ctypedef deque[coro_t *] coro_deque_t
cdef extern from "lwan/lwan-coro.h" nogil:
ctypedef struct coro_switcher_t:
pass
ctypedef struct coro_t:
coro_switcher_t *switcher
ctypedef int (*coro_function_t) (coro_t *, void *)
void coro_reset(coro_t *, coro_function_t, void *)
coro_t *coro_new(coro_switcher_t *, coro_function_t, void *)
void coro_free(coro_t *)
int coro_resume(coro_t *)
int coro_resume_value(coro_t *, int)
int coro_yield(coro_t *, int)
void coro_update(coro_t *, coro_switcher_t *)
cdef enum coro_yield_value:
ABORT = -1,
MAY_RESUME = 0,
FINISHED = 1
cdef extern from "<sys/types.h>" nogil:
ctypedef long unsigned int pthread_t
ctypedef union pthread_attr_t:
pass
ctypedef union pthread_mutex_t:
pass
ctypedef union pthread_mutexattr_t:
pass
ctypedef union pthread_barrier_t:
pass
ctypedef union pthread_barrierattr_t:
pass
cdef extern from "<pthread.h>" nogil:
int pthread_create(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *)
void pthread_exit(void *)
int pthread_join(pthread_t, void **)
int pthread_attr_init(pthread_attr_t *)
int pthread_attr_setdetachstate(pthread_attr_t *, int)
int pthread_attr_destroy(pthread_attr_t *)
int pthread_mutex_init(pthread_mutex_t *, const pthread_mutexattr_t *)
int pthread_mutex_destroy(pthread_mutex_t *)
int pthread_mutex_lock(pthread_mutex_t *)
int pthread_mutex_unlock(pthread_mutex_t *)
int pthread_mutex_trylock(pthread_mutex_t *)
int pthread_barrier_init(pthread_barrier_t *, const pthread_barrierattr_t *, unsigned int)
int pthread_barrier_destroy(pthread_barrier_t *)
int pthread_barrier_wait(pthread_barrier_t *)
enum: PTHREAD_CREATE_JOINABLE
cdef extern from "<semaphore.h>" nogil:
ctypedef int sem_t
int sem_destroy(sem_t *)
int sem_getvalue(sem_t *, int *)
int sem_init(sem_t *, int, unsigned int)
int sem_post(sem_t *)
int sem_wait(sem_t *)
cdef struct scheduler_t:
worker_t *workers
unsigned short num_workers
pthread_barrier_t barrier
sem_t num_sleeping_workers
bint is_done
cdef struct worker_t:
scheduler_t *scheduler
coro_switcher_t switcher
coro_deque_t *deque
pthread_t thread
pthread_mutex_t mutex
cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=?) nogil
cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg=?) nogil
cdef void scheduler_run(scheduler_t *scheduler) nogil
cdef void scheduler_destroy(scheduler_t *scheduler) nogil
cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil
cdef void *worker_run(void *arg) nogil
cdef coro_t *worker_coro_get(worker_t *worker) nogil
cdef coro_t *worker_coro_steal(worker_t *thief) nogil
cdef void worker_destroy(worker_t *worker) nogil
cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=sysconf(_SC_NPROCESSORS_ONLN)) nogil:
memset(scheduler, 0, sizeof(scheduler[0]))
scheduler.num_workers = num_workers
scheduler.is_done = False
sem_init(&scheduler.num_sleeping_workers, 0, 0)
scheduler.workers = <worker_t *>calloc(scheduler.num_workers, sizeof(worker_t))
if not scheduler.workers:
fprintf(stderr, "Could not allocate memory for the workers")
exit(-1)
if pthread_barrier_init(&scheduler.barrier, NULL, scheduler.num_workers + 1):
fprintf(stderr, "Could not allocate memory for the barrier")
exit(-1)
cdef unsigned short i
for i in range(scheduler.num_workers):
worker_init(&scheduler.workers[i], scheduler)
cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg=NULL) nogil:
cdef worker_t *main_worker = scheduler.workers
main_worker.deque.push_back(coro_new(&main_worker.switcher, task, arg))
cdef void scheduler_run(scheduler_t *scheduler) nogil:
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&scheduler.barrier)
cdef int num_sleeping_workers
while not scheduler.is_done:
sem_getvalue(&scheduler.num_sleeping_workers, &num_sleeping_workers)
if num_sleeping_workers == scheduler.num_workers:
scheduler.is_done = True
cdef void scheduler_destroy(scheduler_t *scheduler) nogil:
cdef unsigned short i
for i in range(scheduler.num_workers):
pthread_join(scheduler.workers[i].thread, NULL)
worker_destroy(&scheduler.workers[i])
free(scheduler.workers)
sem_destroy(&scheduler.num_sleeping_workers)
pthread_barrier_destroy(&scheduler.barrier)
cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil:
memset(worker, 0, sizeof(worker[0]))
worker.scheduler = scheduler
worker.deque = new coro_deque_t()
if not worker.deque:
fprintf(stderr, "Could not allocate memory for the deque\n")
exit(-1)
pthread_mutex_init(&worker.mutex, NULL)
cdef pthread_attr_t attr
if pthread_attr_init(&attr):
fprintf(stderr, "pthread_attr_init()\n")
exit(-1)
if pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE):
fprintf(stderr, "pthread_attr_setdetachstate()\n")
exit(-1)
if pthread_create(&worker.thread, &attr, worker_run, worker):
fprintf(stderr, "pthread_create()\n")
exit(-1)
if pthread_attr_destroy(&attr):
fprintf(stderr, "pthread_attr_destroy()\n")
exit(-1)
cdef void *worker_run(void *arg) nogil:
cdef:
worker_t *worker = <worker_t *>arg
coro_t *coroutine = NULL
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&worker.scheduler.barrier)
while not worker.scheduler.is_done:
coroutine = worker_coro_get(worker)
if coroutine:
if coro_resume(coroutine) == coro_yield_value.MAY_RESUME:
pthread_mutex_lock(&worker.mutex)
worker.deque.push_back(coroutine)
pthread_mutex_unlock(&worker.mutex)
else:
coro_free(coroutine)
else:
sem_post(&worker.scheduler.num_sleeping_workers)
usleep(1000000) # 1 ms (arbitrary)
sem_wait(&worker.scheduler.num_sleeping_workers)
pthread_exit(NULL)
cdef coro_t *worker_coro_get(worker_t *worker) nogil:
cdef coro_t *coroutine = NULL
pthread_mutex_lock(&worker.mutex)
if worker.deque.empty():
pthread_mutex_unlock(&worker.mutex)
coroutine = worker_coro_steal(worker)
else:
coroutine = worker.deque.front()
worker.deque.pop_front()
pthread_mutex_unlock(&worker.mutex)
return coroutine
cdef coro_t *worker_coro_steal(worker_t *thief) nogil:
cdef:
scheduler_t *scheduler = thief.scheduler
worker_t *victim = NULL
coro_t *stolen_coroutine = NULL
unsigned short i, current_index
unsigned short random_offset = rand() % scheduler.num_workers
for i in range(scheduler.num_workers):
current_index = (i + random_offset) % scheduler.num_workers
victim = scheduler.workers + current_index
if victim.deque == thief.deque:
continue
pthread_mutex_lock(&victim.mutex)
if not victim.deque.empty():
stolen_coroutine = victim.deque.back()
victim.deque.pop_back()
coro_update(stolen_coroutine, &thief.switcher) # Update coroutine's data about its parent worker/thread.
pthread_mutex_unlock(&victim.mutex)
return stolen_coroutine
pthread_mutex_unlock(&victim.mutex)
return NULL
cdef void worker_destroy(worker_t *worker) nogil:
pthread_mutex_destroy(&worker.mutex)
del worker.deque
\ No newline at end of file
......@@ -4,9 +4,13 @@ from Cython.Build import cythonize
setup(
ext_modules = cythonize([
Extension("lwan_coro",
Extension("wrapper",
language="c++",
sources=["wrapper.pyx", "lwan/lwan-array.c" ,"lwan/lwan-coro.c"],
),
Extension("lwan_coro",
language="c++",
sources=["lwan_coro.pyx", "lwan/lwan-array.c" ,"lwan/lwan-coro.c"],
extra_compile_args=["-pthread"],
extra_link_args=["-pthread"]
)
......
from libc.stdlib cimport exit, calloc, free, rand
from libc.stdio cimport printf, fprintf, stderr
from libc.string cimport memset
from posix.unistd cimport usleep, sysconf
from lwan_coro cimport scheduler_t, scheduler_init, scheduler_coro_add, scheduler_run, scheduler_destroy, coro_yield_value, coro_yield, coro_t
from libc.stdio cimport printf
cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to no be included in "posix.unistd".
from libcpp.deque cimport deque
ctypedef deque[coro_t *] coro_deque_t
cdef extern from "lwan/lwan-coro.h" nogil:
ctypedef struct coro_switcher_t:
pass
ctypedef struct coro_t:
coro_switcher_t *switcher
ctypedef int (*coro_function_t) (coro_t *, void *)
void coro_reset(coro_t *, coro_function_t, void *)
coro_t *coro_new(coro_switcher_t *, coro_function_t, void *)
void coro_free(coro_t *)
int coro_resume(coro_t *)
int coro_resume_value(coro_t *, int)
int coro_yield(coro_t *, int)
void coro_update(coro_t *, coro_switcher_t *)
cdef enum coro_yield_value:
ABORT = -1,
MAY_RESUME = 0,
FINISHED = 1
cdef extern from "<sys/types.h>" nogil:
ctypedef long unsigned int pthread_t
ctypedef union pthread_attr_t:
pass
ctypedef union pthread_mutex_t:
pass
ctypedef union pthread_mutexattr_t:
pass
ctypedef union pthread_barrier_t:
pass
ctypedef union pthread_barrierattr_t:
pass
cdef extern from "<pthread.h>" nogil:
int pthread_create(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *)
void pthread_exit(void *)
int pthread_join(pthread_t, void **)
int pthread_attr_init(pthread_attr_t *)
int pthread_attr_setdetachstate(pthread_attr_t *, int)
int pthread_attr_destroy(pthread_attr_t *)
int pthread_mutex_init(pthread_mutex_t *, const pthread_mutexattr_t *)
int pthread_mutex_destroy(pthread_mutex_t *)
int pthread_mutex_lock(pthread_mutex_t *)
int pthread_mutex_unlock(pthread_mutex_t *)
int pthread_mutex_trylock(pthread_mutex_t *)
int pthread_barrier_init(pthread_barrier_t *, const pthread_barrierattr_t *, unsigned int)
int pthread_barrier_destroy(pthread_barrier_t *)
int pthread_barrier_wait(pthread_barrier_t *)
enum: PTHREAD_CREATE_JOINABLE
cdef extern from "<semaphore.h>" nogil:
ctypedef int sem_t
int sem_destroy(sem_t *)
int sem_getvalue(sem_t *, int *)
int sem_init(sem_t *, int, unsigned int)
int sem_post(sem_t *)
int sem_wait(sem_t *)
cdef struct scheduler_t:
worker_t *workers
unsigned short num_workers
pthread_barrier_t barrier
sem_t num_sleeping_workers
bint is_done
cdef struct worker_t:
scheduler_t *scheduler
coro_switcher_t switcher
coro_deque_t *deque
pthread_t thread
pthread_mutex_t mutex
cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=sysconf(_SC_NPROCESSORS_ONLN)) nogil:
memset(scheduler, 0, sizeof(scheduler[0]))
scheduler.num_workers = num_workers
scheduler.is_done = False
sem_init(&scheduler.num_sleeping_workers, 0, 0)
scheduler.workers = <worker_t *>calloc(scheduler.num_workers, sizeof(worker_t))
if not scheduler.workers:
fprintf(stderr, "Could not allocate memory for the workers")
exit(-1)
if pthread_barrier_init(&scheduler.barrier, NULL, scheduler.num_workers + 1):
fprintf(stderr, "Could not allocate memory for the barrier")
exit(-1)
cdef unsigned short i
for i in range(scheduler.num_workers):
worker_init(&scheduler.workers[i], scheduler)
cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg=NULL) nogil:
cdef worker_t *main_worker = scheduler.workers
main_worker.deque.push_back(coro_new(&main_worker.switcher, task, arg))
cdef void scheduler_run(scheduler_t *scheduler) nogil:
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&scheduler.barrier)
cdef int num_sleeping_workers
while not scheduler.is_done:
sem_getvalue(&scheduler.num_sleeping_workers, &num_sleeping_workers)
if num_sleeping_workers == scheduler.num_workers:
scheduler.is_done = True
cdef void scheduler_destroy(scheduler_t *scheduler) nogil:
cdef unsigned short i
for i in range(scheduler.num_workers):
pthread_join(scheduler.workers[i].thread, NULL)
worker_destroy(&scheduler.workers[i])
free(scheduler.workers)
sem_destroy(&scheduler.num_sleeping_workers)
pthread_barrier_destroy(&scheduler.barrier)
cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil:
memset(worker, 0, sizeof(worker[0]))
worker.scheduler = scheduler
worker.deque = new coro_deque_t()
if not worker.deque:
fprintf(stderr, "Could not allocate memory for the deque\n")
exit(-1)
pthread_mutex_init(&worker.mutex, NULL)
cdef pthread_attr_t attr
if pthread_attr_init(&attr):
fprintf(stderr, "pthread_attr_init()\n")
exit(-1)
if pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE):
fprintf(stderr, "pthread_attr_setdetachstate()\n")
exit(-1)
if pthread_create(&worker.thread, &attr, worker_run, worker):
fprintf(stderr, "pthread_create()\n")
exit(-1)
if pthread_attr_destroy(&attr):
fprintf(stderr, "pthread_attr_destroy()\n")
exit(-1)
cdef void *worker_run(void *arg) nogil:
cdef:
worker_t *worker = <worker_t *>arg
coro_t *coroutine = NULL
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&worker.scheduler.barrier)
while not worker.scheduler.is_done:
coroutine = worker_coro_get(worker)
if coroutine:
if coro_resume(coroutine) == coro_yield_value.MAY_RESUME:
pthread_mutex_lock(&worker.mutex)
worker.deque.push_back(coroutine)
pthread_mutex_unlock(&worker.mutex)
else:
coro_free(coroutine)
else:
sem_post(&worker.scheduler.num_sleeping_workers)
usleep(1000000) # 1 ms (arbitrary)
sem_wait(&worker.scheduler.num_sleeping_workers)
pthread_exit(NULL)
cdef coro_t *worker_coro_get(worker_t *worker) nogil:
cdef coro_t *coroutine = NULL
pthread_mutex_lock(&worker.mutex)
if worker.deque.empty():
pthread_mutex_unlock(&worker.mutex)
coroutine = worker_coro_steal(worker)
else:
coroutine = worker.deque.front()
worker.deque.pop_front()
pthread_mutex_unlock(&worker.mutex)
return coroutine
cdef coro_t *worker_coro_steal(worker_t *thief) nogil:
cdef:
scheduler_t *scheduler = thief.scheduler
worker_t *victim = NULL
coro_t *stolen_coroutine = NULL
unsigned short i, current_index
unsigned short random_offset = rand() % scheduler.num_workers
for i in range(scheduler.num_workers):
current_index = (i + random_offset) % scheduler.num_workers
victim = scheduler.workers + current_index
if victim.deque == thief.deque:
continue
pthread_mutex_lock(&victim.mutex)
if not victim.deque.empty():
stolen_coroutine = victim.deque.back()
victim.deque.pop_back()
coro_update(stolen_coroutine, &thief.switcher) # Update coroutine's data about its parent worker/thread.
pthread_mutex_unlock(&victim.mutex)
return stolen_coroutine
pthread_mutex_unlock(&victim.mutex)
return NULL
cdef void worker_destroy(worker_t *worker) nogil:
pthread_mutex_destroy(&worker.mutex)
del worker.deque
cdef class SomeMemory nogil:
cdef int a;
cdef int b;
cdef void foo(self) nogil:
while self.a < 4:
self.a += 1
printf("%d\n", self.a)
def main():
cdef:
......@@ -240,7 +17,7 @@ def main():
with nogil:
scheduler_init(&s)
for i in range(500):
for i in range(5):
scheduler_coro_add(&s, task)
scheduler_run(&s)
......@@ -250,6 +27,6 @@ def main():
cdef int task(coro_t *coroutine, void *arg) nogil:
cdef int a = 5
coro_yield(coroutine, coro_yield_value.MAY_RESUME)
a *= 2
cdef SomeMemory bar = SomeMemory(1, 2)
bar.foo()
coro_yield(coroutine, coro_yield_value.FINISHED)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment