Commit e373ac74 authored by Bryton Lacquement's avatar Bryton Lacquement 🚪

Initial revision

parents
*.pyc
*.pyo
*.cpp
*.so
/build/
/dist/
This diff is collapsed.
.PHONY: all cython_module run
all: cython_module
cython_module:
python3 setup.py build_ext --inplace
run: cython_module
python3 -c "import lwan_coro; lwan_coro.main()"
/*
* lwan - simple web server
* Copyright (c) 2017 Leandro A. F. Pereira <leandro@hardinfo.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#define _GNU_SOURCE
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#include <sys/types.h>
#include "lwan-array.h"
#define INCREMENT 16
int
lwan_array_init(struct lwan_array *a)
{
if (UNLIKELY(!a))
return -EINVAL;
a->base = NULL;
a->elements = 0;
return 0;
}
int
lwan_array_reset(struct lwan_array *a)
{
if (UNLIKELY(!a))
return -EINVAL;
free(a->base);
a->base = NULL;
a->elements = 0;
return 0;
}
#if !defined (HAS_REALLOCARRAY)
#if !defined(HAVE_BUILTIN_MUL_OVERFLOW)
/*
* This is sqrt(SIZE_MAX+1), as s1*s2 <= SIZE_MAX
* if both s1 < MUL_NO_OVERFLOW and s2 < MUL_NO_OVERFLOW
*/
#define MUL_NO_OVERFLOW ((size_t)1 << (sizeof(size_t) * 4))
static inline bool umull_overflow(size_t a, size_t b, size_t *out)
{
if ((a >= MUL_NO_OVERFLOW || b >= MUL_NO_OVERFLOW) && a > 0 && SIZE_MAX / a < b)
return true;
*out = a * b;
return false;
}
#else
#define umull_overflow __builtin_mul_overflow
#endif
void *reallocarray(void *optr, size_t nmemb, size_t size)
{
size_t total_size;
if (UNLIKELY(umull_overflow(nmemb, size, &total_size))) {
errno = ENOMEM;
return NULL;
}
return realloc(optr, total_size);
}
#endif /* HAS_REALLOCAARRAY */
#if !defined(HAVE_BUILTIN_ADD_OVERFLOW)
static inline bool add_overflow(size_t a, size_t b, size_t *out)
{
if (UNLIKELY(a > 0 && b > SIZE_MAX - a))
return true;
*out = a + INCREMENT;
return false;
}
#else
#define add_overflow __builtin_add_overflow
#endif
void *
lwan_array_append(struct lwan_array *a, size_t element_size)
{
if (!(a->elements % INCREMENT)) {
void *new_base;
size_t new_cap;
if (UNLIKELY(add_overflow(a->elements, INCREMENT, &new_cap))) {
errno = EOVERFLOW;
return NULL;
}
new_base = reallocarray(a->base, new_cap, element_size);
if (UNLIKELY(!new_base))
return NULL;
a->base = new_base;
}
return ((unsigned char *)a->base) + a->elements++ * element_size;
}
void
lwan_array_sort(struct lwan_array *a, size_t element_size, int (*cmp)(const void *a, const void *b))
{
if (LIKELY(a->elements))
qsort(a->base, a->elements - 1, element_size, cmp);
}
static void
coro_lwan_array_free(void *data)
{
struct lwan_array *array = data;
lwan_array_reset(array);
free(array);
}
struct lwan_array *
coro_lwan_array_new(struct coro *coro)
{
struct lwan_array *array;
array = coro_malloc_full(coro, sizeof(*array), coro_lwan_array_free);
if (LIKELY(array))
lwan_array_init(array);
return array;
}
/*
* lwan - simple web server
* Copyright (c) 2017 Leandro A. F. Pereira <leandro@hardinfo.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
#include <stdint.h>
#include "lwan-coro.h"
struct lwan_array {
void *base;
size_t elements;
};
int lwan_array_init(struct lwan_array *a);
int lwan_array_reset(struct lwan_array *a);
void *lwan_array_append(struct lwan_array *a, size_t element_size);
void lwan_array_sort(struct lwan_array *a, size_t element_size, int (*cmp)(const void *a, const void *b));
struct lwan_array *coro_lwan_array_new(struct coro *coro);
#define LIKELY_IS(x,y) __builtin_expect((x), (y))
#define LIKELY(x) LIKELY_IS(!!(x), 1)
#define UNLIKELY(x) LIKELY_IS((x), 0)
#define DEFINE_ARRAY_TYPE(array_type_, element_type_) \
struct array_type_ { \
struct lwan_array base; \
}; \
__attribute__((unused)) \
static inline int array_type_ ## _init(struct array_type_ *array) \
{ \
return lwan_array_init((struct lwan_array *)array); \
} \
__attribute__((unused)) \
static inline int array_type_ ## _reset(struct array_type_ *array) \
{ \
return lwan_array_reset((struct lwan_array *)array); \
} \
__attribute__((unused)) \
static inline element_type_ * array_type_ ## _append(struct array_type_ *array) \
{ \
return (element_type_ *)lwan_array_append((struct lwan_array *)array, sizeof(element_type_)); \
} \
__attribute__((unused)) \
static inline void array_type_ ## _sort(struct array_type_ *array, int (*cmp)(const void *a, const void *b)) \
{ \
lwan_array_sort((struct lwan_array *)array, sizeof(element_type_), cmp); \
} \
__attribute__((unused)) \
static inline struct array_type_ *coro_ ## array_type_ ## _new(struct coro *coro) \
{ \
return (struct array_type_ *)coro_lwan_array_new(coro); \
}
#if defined (__cplusplus)
}
#endif
This diff is collapsed.
/*
* lwan - simple web server
* Copyright (c) 2012 Leandro A. F. Pereira <leandro@hardinfo.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
#include <stddef.h>
#if defined(__x86_64__)
#include <stdint.h>
typedef uintptr_t coro_context[10];
#elif defined(__i386__)
#include <stdint.h>
typedef uintptr_t coro_context[7];
#else
#include <ucontext.h>
typedef ucontext_t coro_context;
#endif
#define DEFAULT_BUFFER_SIZE 4096
#define ALWAYS_INLINE inline __attribute__((always_inline))
struct coro;
typedef struct coro coro_t;
typedef int (*coro_function_t) (struct coro *coro, void *data);
typedef struct coro_switcher {
coro_context caller;
coro_context callee;
}coro_switcher_t;
struct coro *coro_new(struct coro_switcher *switcher, coro_function_t function, void *data);
void coro_free(struct coro *coro);
void coro_reset(struct coro *coro, coro_function_t func, void *data);
void coro_update(struct coro *coro, struct coro_switcher *new_switcher);
int coro_resume(struct coro *coro);
int coro_resume_value(struct coro *coro, int value);
int coro_yield(struct coro *coro, int value);
void coro_defer(struct coro *coro, void (*func)(void *data), void *data);
void coro_defer2(struct coro *coro, void (*func)(void *data1, void *data2),
void *data1, void *data2);
void coro_deferred_run(struct coro *coro, size_t generation);
size_t coro_deferred_get_generation(const struct coro *coro);
void *coro_malloc(struct coro *coro, size_t sz)
__attribute__((malloc));
void *coro_malloc_full(struct coro *coro, size_t size, void (*destroy_func)())
__attribute__((malloc));
char *coro_strdup(struct coro *coro, const char *str);
char *coro_strndup(struct coro *coro, const char *str, size_t len);
char *coro_printf(struct coro *coro, const char *fmt, ...);
#define CORO_DEFER(fn) ((void (*)(void *))(fn))
#define CORO_DEFER2(fn) ((void (*)(void *, void *))(fn))
#if defined (__cplusplus)
}
#endif
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
setup(
ext_modules = cythonize([
Extension("lwan_coro",
language="c++",
sources=["wrapper.pyx", "lwan/lwan-array.c" ,"lwan/lwan-coro.c"],
extra_compile_args=["-pthread"],
extra_link_args=["-pthread"]
)
])
)
from libc.stdlib cimport exit, calloc, free, rand
from libc.stdio cimport printf, fprintf, stderr
from libc.string cimport memset
from posix.unistd cimport usleep, sysconf
cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to no be included in "posix.unistd".
from libcpp.deque cimport deque
ctypedef deque[coro_t *] coro_deque_t
from libc.time cimport CLOCKS_PER_SEC, clock_t, clock
cdef extern from "lwan/lwan-coro.h" nogil:
ctypedef struct coro_switcher_t:
pass
ctypedef struct coro_t:
coro_switcher_t *switcher
ctypedef int (*coro_function_t) (coro_t *, void *)
void coro_reset(coro_t *, coro_function_t, void *)
coro_t *coro_new(coro_switcher_t *, coro_function_t, void *)
void coro_free(coro_t *)
int coro_resume(coro_t *)
int coro_resume_value(coro_t *, int)
int coro_yield(coro_t *, int)
void coro_update(coro_t *, coro_switcher_t *)
cdef enum coro_yield_value:
ABORT = -1,
MAY_RESUME = 0,
FINISHED = 1
cdef extern from "<sys/types.h>" nogil:
ctypedef long unsigned int pthread_t
ctypedef union pthread_attr_t:
pass
ctypedef union pthread_mutex_t:
pass
ctypedef union pthread_mutexattr_t:
pass
ctypedef union pthread_barrier_t:
pass
ctypedef union pthread_barrierattr_t:
pass
cdef extern from "<pthread.h>" nogil:
int pthread_create(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *)
void pthread_exit(void *)
int pthread_join(pthread_t, void **)
int pthread_attr_init(pthread_attr_t *)
int pthread_attr_setdetachstate(pthread_attr_t *, int)
int pthread_attr_destroy(pthread_attr_t *)
int pthread_mutex_init(pthread_mutex_t *, const pthread_mutexattr_t *)
int pthread_mutex_destroy(pthread_mutex_t *)
int pthread_mutex_lock(pthread_mutex_t *)
int pthread_mutex_unlock(pthread_mutex_t *)
int pthread_mutex_trylock(pthread_mutex_t *)
int pthread_barrier_init(pthread_barrier_t *, const pthread_barrierattr_t *, unsigned int)
int pthread_barrier_destroy(pthread_barrier_t *)
int pthread_barrier_wait(pthread_barrier_t *)
enum: PTHREAD_CREATE_JOINABLE
cdef extern from "<semaphore.h>" nogil:
ctypedef int sem_t
int sem_destroy(sem_t *)
int sem_getvalue(sem_t *, int *)
int sem_init(sem_t *, int, unsigned int)
int sem_post(sem_t *)
int sem_wait(sem_t *)
cdef struct scheduler_t:
worker_t *workers
unsigned short num_workers
pthread_barrier_t barrier
sem_t num_sleeping_workers
bint is_done
cdef struct worker_t:
scheduler_t *scheduler
coro_switcher_t switcher
coro_deque_t *deque
pthread_t thread
pthread_mutex_t mutex
cdef void scheduler_init(scheduler_t *scheduler, unsigned short num_workers=sysconf(_SC_NPROCESSORS_ONLN)) nogil:
memset(scheduler, 0, sizeof(scheduler[0]))
scheduler.num_workers = num_workers
scheduler.is_done = False
sem_init(&scheduler.num_sleeping_workers, 0, 0)
scheduler.workers = <worker_t *>calloc(scheduler.num_workers, sizeof(worker_t))
if not scheduler.workers:
fprintf(stderr, "Could not allocate memory for the workers")
exit(-1)
if pthread_barrier_init(&scheduler.barrier, NULL, scheduler.num_workers + 1):
fprintf(stderr, "Could not allocate memory for the barrier")
exit(-1)
cdef unsigned short i
for i in range(scheduler.num_workers):
worker_init(&scheduler.workers[i], scheduler)
cdef void scheduler_coro_add(scheduler_t *scheduler, coro_function_t task, void *arg) nogil:
cdef worker_t *main_worker = scheduler.workers
main_worker.deque.push_back(coro_new(&main_worker.switcher, task, arg))
cdef void scheduler_run(scheduler_t *scheduler) nogil:
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&scheduler.barrier)
cdef int num_sleeping_workers
while not scheduler.is_done:
sem_getvalue(&scheduler.num_sleeping_workers, &num_sleeping_workers)
if num_sleeping_workers == scheduler.num_workers:
scheduler.is_done = True
cdef void scheduler_destroy(scheduler_t *scheduler) nogil:
cdef unsigned short i
for i in range(scheduler.num_workers):
pthread_join(scheduler.workers[i].thread, NULL)
worker_destroy(&scheduler.workers[i])
free(scheduler.workers)
sem_destroy(&scheduler.num_sleeping_workers)
pthread_barrier_destroy(&scheduler.barrier)
cdef void worker_init(worker_t *worker, scheduler_t *scheduler) nogil:
memset(worker, 0, sizeof(worker[0]))
worker.scheduler = scheduler
worker.deque = new coro_deque_t()
if not worker.deque:
fprintf(stderr, "Could not allocate memory for the deque\n")
exit(-1)
pthread_mutex_init(&worker.mutex, NULL)
cdef pthread_attr_t attr
if pthread_attr_init(&attr):
fprintf(stderr, "pthread_attr_init()\n")
exit(-1)
if pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE):
fprintf(stderr, "pthread_attr_setdetachstate()\n")
exit(-1)
if pthread_create(&worker.thread, &attr, worker_run, worker):
fprintf(stderr, "pthread_create()\n")
exit(-1)
if pthread_attr_destroy(&attr):
fprintf(stderr, "pthread_attr_destroy()\n")
exit(-1)
cdef void *worker_run(void *arg) nogil:
cdef:
worker_t *worker = <worker_t *>arg
coro_t *coroutine = NULL
# Wait for all workers and the main thread to be ready.
pthread_barrier_wait(&worker.scheduler.barrier)
while not worker.scheduler.is_done:
coroutine = worker_coro_get(worker)
if coroutine:
if coro_resume(coroutine) == coro_yield_value.MAY_RESUME:
pthread_mutex_lock(&worker.mutex)
worker.deque.push_back(coroutine)
pthread_mutex_unlock(&worker.mutex)
else:
coro_free(coroutine)
else:
sem_post(&worker.scheduler.num_sleeping_workers)
usleep(1000000) # 1 ms (arbitrary)
sem_wait(&worker.scheduler.num_sleeping_workers)
pthread_exit(NULL)
cdef coro_t *worker_coro_get(worker_t *worker) nogil:
cdef coro_t *coroutine = NULL
pthread_mutex_lock(&worker.mutex)
if worker.deque.empty():
pthread_mutex_unlock(&worker.mutex)
coroutine = worker_coro_steal(worker)
else:
coroutine = worker.deque.front()
worker.deque.pop_front()
pthread_mutex_unlock(&worker.mutex)
return coroutine
cdef coro_t *worker_coro_steal(worker_t *thief) nogil:
cdef:
scheduler_t *scheduler = thief.scheduler
worker_t *victim = NULL
coro_t *stolen_coroutine = NULL
unsigned short i, current_index
unsigned short random_offset = rand() % scheduler.num_workers
for i in range(scheduler.num_workers):
current_index = (i + random_offset) % scheduler.num_workers
victim = scheduler.workers + current_index
if victim.deque == thief.deque:
continue
pthread_mutex_lock(&victim.mutex)
if not victim.deque.empty():
stolen_coroutine = victim.deque.back()
victim.deque.pop_back()
coro_update(stolen_coroutine, &thief.switcher) # Update coroutine's data about its parent worker/thread.
pthread_mutex_unlock(&victim.mutex)
return stolen_coroutine
pthread_mutex_unlock(&victim.mutex)
return NULL
cdef void worker_destroy(worker_t *worker) nogil:
pthread_mutex_destroy(&worker.mutex)
del worker.deque
def main():
cdef:
scheduler_t s
clock_t begin, end
unsigned int i
with nogil:
begin = clock()
scheduler_init(&s)
for i in range(500):
scheduler_coro_add(&s, task, NULL)
scheduler_run(&s)
scheduler_destroy(&s)
end = clock()
printf("%f\n", <double>(end - begin) / CLOCKS_PER_SEC)
# Example task
cdef int task(coro_t *coroutine, void *arg) nogil:
cdef int a = 5
coro_yield(coroutine, coro_yield_value.MAY_RESUME)
a *= 2
coro_yield(coroutine, coro_yield_value.FINISHED)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment