Commit 90392daf authored by Xavier Thompson's avatar Xavier Thompson

Adapt to compiler changes

parent d7272944
......@@ -15,10 +15,10 @@ from stdlib.dirent cimport DIR, struct_dirent, opendir, readdir, closedir
from posix.unistd cimport readlink
cdef Scheduler scheduler
cdef locked Scheduler scheduler
cdef cypclass Node nolock activable:
cdef cypclass Node activable:
string path
string name
Stat st
......@@ -26,12 +26,12 @@ cdef cypclass Node nolock activable:
__init__(self, string path, string name, Stat st):
self._active_result_class = NullResult
self._active_queue_class = BatchMailBox(scheduler)
self._active_queue_class = consume BatchMailBox(scheduler)
self.path = path
self.name = name
self.st = st
void build_node(self, const cyplist[dev_t] dev_whitelist, const cyplist[string] ignore_paths):
void build_node(self, locked cyplist[dev_t] dev_whitelist, locked cyplist[string] ignore_paths):
# abstract
pass
......@@ -52,27 +52,28 @@ cdef cypclass Node nolock activable:
pass
cdef Node make_node(string path, string name) nogil:
cdef iso Node make_node(string path, string name) nogil:
s = Stat(path)
if s is NULL:
return NULL
elif s.is_symlink():
return SymlinkNode(path, name, s)
return consume SymlinkNode(path, name, consume s)
elif s.is_dir():
return DirNode(path, name, s)
return consume DirNode(path, name, consume s)
elif s.is_regular():
return FileNode(path, name, s)
return consume FileNode(path, name, consume s)
return NULL
cdef cypclass DirNode(Node) nolock:
cyplist[Node] children
cdef cypclass DirNode(Node):
cyplist[active Node] children
__init__(self, string path, string name, Stat st):
Node.__init__(self, path, name, st)
self.children = cyplist[Node]()
self.children = new cyplist[active Node]()
self.children.__init__()
void build_node(self, const cyplist[dev_t] dev_whitelist, const cyplist[string] ignore_paths):
void build_node(self, locked cyplist[dev_t] dev_whitelist, locked cyplist[string] ignore_paths):
cdef DIR *d
cdef struct_dirent *entry
cdef string entry_name
......@@ -104,17 +105,21 @@ cdef cypclass DirNode(Node) nolock:
entry_node = make_node(entry_path, entry_name)
if entry_node is NULL:
continue
self.children.append(entry_node)
active_entry = activate(consume entry_node)
self.children.append(active_entry)
closedir(d)
self.format_node()
for child in self.children:
activate(child).build_node(NULL, dev_whitelist, ignore_paths)
for active_child in self.children:
active_child.build_node(NULL, dev_whitelist, ignore_paths)
void write_node(self, FILE * stream):
fwrite(self.formatted.data(), 1, self.formatted.size(), stream)
for child in self.children:
while self.children.__len__() > 0:
active_child = self.children[self.children.__len__() -1]
del self.children[self.children.__len__() -1]
child = consume active_child
child.write_node(stream)
......@@ -122,7 +127,7 @@ cdef enum:
BUFSIZE = 64 * 1024
cdef cypclass FileNode(Node) nolock:
cdef cypclass FileNode(Node):
string md5_data
string sha1_data
string sha256_data
......@@ -133,7 +138,7 @@ cdef cypclass FileNode(Node) nolock:
Node.__init__(self, path, name, st)
self.error = False
void build_node(self, const cyplist[dev_t] dev_whitelist, const cyplist[string] ignore_paths):
void build_node(self, locked cyplist[dev_t] dev_whitelist, locked cyplist[string] ignore_paths):
cdef unsigned char buffer[BUFSIZE]
cdef bint eof = False
cdef bint md5_ok
......@@ -210,11 +215,11 @@ cdef cypclass FileNode(Node) nolock:
fwrite(self.formatted.data(), 1, self.formatted.size(), stream)
cdef cypclass SymlinkNode(Node) nolock:
cdef cypclass SymlinkNode(Node):
string target
int error
void build_node(self, const cyplist[dev_t] dev_whitelist, const cyplist[string] ignore_paths):
void build_node(self, locked cyplist[dev_t] dev_whitelist, locked cyplist[string] ignore_paths):
size = self.st.st_data.st_size + 1
self.target.resize(size)
real_size = readlink(self.path.c_str(), <char*> self.target.data(), size)
......@@ -267,10 +272,14 @@ cdef int start(string path) nogil:
if node is NULL:
return -1
activate(node).build_node(NULL, dev_whitelist, ignore_paths)
active_node = activate(consume node)
active_node.build_node(NULL, consume dev_whitelist, consume ignore_paths)
scheduler.finish()
node = consume active_node
result = fopen('result.json', 'w')
if result is NULL:
return -1
......
......@@ -13,14 +13,14 @@ cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to not be included in "posix.unistd".
cdef cypclass Scheduler nolock
cdef cypclass Worker nolock
cdef cypclass Scheduler
cdef cypclass Worker
# The 'inline' qualifier on this function is a hack to convince Cython to allow a definition in a .pxd file.
# The C compiler will dismiss it because we pass the function pointer to create a thread which prevents inlining.
cdef inline void * worker_function(void * arg) nogil:
worker = <Worker> arg
sch = worker.scheduler
worker = <locked Worker> arg
sch = <Scheduler> <void*> worker.scheduler
cdef int num_remaining_queues
# Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier)
......@@ -37,56 +37,48 @@ cdef inline void * worker_function(void * arg) nogil:
queue.activate()
if queue.is_empty():
# Mark the empty queue as not assigned to any worker.
del queue.worker
queue.has_worker = False
# Decrement the number of non-completed queues.
if sch.num_pending_queues.fetch_sub(1) == 1:
# Signal that there are no more queues.
sem_post(&sch.done)
# Discard the empty queue and continue the main loop.
continue
pthread_mutex_lock(&worker.lock)
# The queue is not empty: reinsert it in this worker's queues.
worker.queues.push_back(queue)
# Signal that the queue is available.
sem_post(&sch.num_free_queues)
pthread_mutex_lock(&worker.lock)
cdef cypclass Worker nolock:
deque[SequentialMailBox] queues
Scheduler scheduler
cdef cypclass Worker:
deque[locked SequentialMailBox] queues
locked Scheduler scheduler
pthread_t thread
pthread_mutex_t lock
Worker __new__(alloc, Scheduler scheduler):
instance = alloc()
locked Worker __new__(alloc, locked Scheduler scheduler):
instance = consume alloc()
instance.scheduler = scheduler
pthread_mutex_init(&instance.lock, NULL)
if not pthread_create(&instance.thread, NULL, worker_function, <void *> instance):
return instance
locked_instance = <locked Worker> consume instance
if not pthread_create(&locked_instance.thread, NULL, worker_function, <void *> locked_instance):
return locked_instance
printf("pthread_create() failed\n")
__dealloc__(self):
pthread_mutex_destroy(&self.lock)
SequentialMailBox get_queue(self):
locked SequentialMailBox get_queue(locked self):
# Get the next queue in the worker's list or steal one.
pthread_mutex_lock(&self.lock)
with wlocked self:
if not self.queues.empty():
queue = self.queues.front()
self.queues.pop_front()
pthread_mutex_unlock(&self.lock)
return queue
pthread_mutex_unlock(&self.lock)
return self.steal_queue()
SequentialMailBox steal_queue(self):
locked SequentialMailBox steal_queue(locked self):
# Steal a queue from another worker:
# - inspect each worker in order starting at a random offset
# - skip this worker and any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list
cdef int i, index, num_workers, random_offset
sch = self.scheduler
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
for i in range(num_workers):
......@@ -94,14 +86,12 @@ cdef cypclass Worker nolock:
victim = sch.workers[index]
if victim is self:
continue
pthread_mutex_lock(&victim.lock)
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
victim.queues.pop_back()
stolen_queue.worker = self
pthread_mutex_unlock(&victim.lock)
stolen_queue.has_worker = True
return stolen_queue
pthread_mutex_unlock(&victim.lock)
return NULL
int join(self):
......@@ -109,15 +99,16 @@ cdef cypclass Worker nolock:
return pthread_join(self.thread, NULL)
cdef cypclass Scheduler nolock:
vector[Worker] workers
cdef cypclass Scheduler:
vector[locked Worker] workers
pthread_barrier_t barrier
sem_t num_free_queues
atomic[int] num_pending_queues
sem_t done
volatile bint is_done
__init__(self, int num_workers=0):
locked Scheduler __new__(alloc, int num_workers=0):
self = <locked Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0)
......@@ -126,7 +117,7 @@ cdef cypclass Scheduler nolock:
printf("Could not allocate memory for the thread barrier\n")
# Signal that no work will be done.
sem_post(&self.done)
return
return self
self.is_done = False
self.workers.reserve(num_workers)
for i in range(num_workers):
......@@ -134,60 +125,62 @@ cdef cypclass Scheduler nolock:
if worker is NULL:
# Signal that no work will be done.
sem_post(&self.done)
return
return self
self.workers.push_back(worker)
# Wait until all the worker threads are ready.
pthread_barrier_wait(&self.barrier)
return self
__dealloc__(self):
pthread_barrier_destroy(&self.barrier)
sem_destroy(&self.num_free_queues)
sem_destroy(&self.done)
void post_queue(self, SequentialMailBox queue):
void post_queue(self, locked SequentialMailBox queue):
# Add a queue to the first worker.
main_worker = self.workers[0]
pthread_mutex_lock(&main_worker.lock)
queue.worker = main_worker
with wlocked main_worker:
queue.has_worker = True
main_worker.queues.push_back(queue)
pthread_mutex_unlock(&main_worker.lock)
# Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1)
# Signal that a queue is available.
sem_post(&self.num_free_queues)
void finish(self):
void finish(locked self):
# Wait until there is no more work.
sem_wait(&self.done)
done = &self.done
sem_wait(done)
# Signal the worker threads that there is no more work.
self.is_done = True
# Pretend that there are new queues to wake up the workers.
num_free_queues = &self.num_free_queues
for worker in self.workers:
sem_post(&self.num_free_queues)
sem_post(num_free_queues)
# Clear the workers to break reference cycles.
self.workers.clear()
cdef cypclass SequentialMailBox(ActhonQueueInterface):
deque[ActhonMessageInterface] messages
Scheduler scheduler
Worker worker
locked Scheduler scheduler
bint has_worker
__init__(self, Scheduler scheduler):
__init__(self, locked Scheduler scheduler):
self.scheduler = scheduler
self.worker = NULL
self.has_worker = False
bint is_empty(const self):
return self.messages.empty()
void push(self, ActhonMessageInterface message):
void push(locked& self, ActhonMessageInterface message):
# Add a task to the queue.
self.messages.push_back(message)
if message._sync_method is not NULL:
message._sync_method.insertActivity(message)
# If not worker is already assigned this queue
message._sync_method.insertActivity()
# If no worker is already assigned this queue
# register it with the scheduler.
if self.worker is NULL:
if not self.has_worker:
self.scheduler.post_queue(self)
bint activate(self):
......@@ -200,7 +193,7 @@ cdef cypclass SequentialMailBox(ActhonQueueInterface):
one_message_processed = next_message.activate()
if one_message_processed:
if next_message._sync_method is not NULL:
next_message._sync_method.removeActivity(next_message)
next_message._sync_method.removeActivity()
else:
printf("Pushed front message to back :/\n")
self.messages.push_back(next_message)
......@@ -218,7 +211,7 @@ cdef cypclass BatchMailBox(SequentialMailBox):
self.messages.push_back(next_message)
return False
if next_message._sync_method is not NULL:
next_message._sync_method.removeActivity(next_message)
next_message._sync_method.removeActivity()
return True
......
......@@ -45,7 +45,7 @@ cdef extern from * nogil:
cdef const char hexdigits[]
cdef cypclass MessageDigest nolock:
cdef cypclass MessageDigest:
EVP_MD_CTX * md_ctx
MessageDigest __new__(alloc, const EVP_MD * algo):
......
......@@ -90,7 +90,7 @@ cdef extern from "<unistd.h>" nogil:
# Cypclass to expose minimal stat support.
cdef cypclass Stat nolock:
cdef cypclass Stat:
struct_stat st_data
Stat __new__(alloc, string path):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment