Extract prototype from experimentations

This has been extracted from:
https://lab.nexedi.com/jjerphan/cython_plus_experiments/tree/queue-dispatch/kdtree

The runtime was written by Xavier T. and adapted by Julien J.

The experimentation KDTree was written by Julien J.
Co-authored-by: Xavier Thompson's avatarXavier Thompson <xavier.thompson@nexedi.com>
parents
.idea
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
**/*.c
**/*.cpp
**/kdtree
**/*.h
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
SHELL = /bin/bash
PROJECT = cython+
VENV_PATH=`conda info --base`/envs/${PROJECT}
PIP_EXECUTABLE=${VENV_PATH}/bin/pip
PYTHON_EXECUTABLE=${VENV_PATH}/bin/python
PYTEST_EXECUTABLE=${VENV_PATH}/bin/pytest
# Used when not using the python runtime
INCLUDE_DIRS = -I/usr/include/python3.9
EXE = kdtree
CXX = g++
CPPFLAGS = -O2 -g -Wno-unused-result -Wsign-compare -pthread $(INCLUDE_DIRS) -fopenmp
LDFLAGS += -Wl,--unresolved-symbols=ignore-all
MACROS = -DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION
EXT_SUFFIX := $(shell python -c "import sysconfig; print(sysconfig.get_config_var('EXT_SUFFIX'))")
EXT = $(EXE)$(EXT_SUFFIX)
.DEFAULT_GOAL := all
## help: Display list of commands
.PHONY: help
help: Makefile
@sed -n 's|^##||p' $< | column -t -s ':' | sed -e 's|^| |'
## all: Run the main targets
.PHONY: all
all: setup benchmark
## setup: Setup the conda environment
.PHONY: setup
setup: clean
conda env create --force -f environment.yml
${PIP_EXECUTABLE} install -e . -v
## install: install the project in the env
.PHONY: install
install:
${PIP_EXECUTABLE} install -e . -v
# nopython: Build without the Python runtime
.PHONY: nopython
nopython: $(EXE)
%.cpp: %.pyx
@echo "[Cython Compiling $^ -> $@]"
${PYTEST_EXECUTABLE} -c "from Cython.Compiler.Main import main; main(command_line=1)" $^ --cplus -3
@rm -f $(subst .cpp,.h,$@)
%: %.cpp
@echo "[C++ Compiling $^ -> $@]"
$(LINK.cpp) $^ $(MACROS) -o $@
## runnopython: Run without Python runtime
.PHONY: runnopython
runnopython: $(EXE)
# Information of the runtime are currently redirected to stderr.
# This is just a simple way to mute them.
./$(EXE) 2>/dev/null
## clean: Remove generated files from Cython and C/C++ compilation
.PHONY: clean
clean:
-rm -f *.c *.cpp *.html
-rm -f *.h
-rm -f *.so
-rm -f $(EXE)
-rm -f *.o
-rm -f -r build
-rm -f *.json
.PRECIOUS: %.cpp
## benchmark: Run benchmarks
# Uses taskset to cap to a cpu solely
.PHONY: benchmark
benchmark:
for i in {0..5}; do \
taskset -c 0-$$((2**i-1)) ${PYTHON_EXECUTABLE} benchmarks/benchmark.py `git rev-parse --short HEAD`_$$((2**i))_threads ;\
done
${PYTHON_EXECUTABLE} benchmarks/report.py `git rev-parse --short HEAD`
## report: Report benchmark results
.PHONY: report
report:
${PYTHON_EXECUTABLE} benchmarks/report.py `git rev-parse --short HEAD`
## test: Launch all the test.
.PHONY: test
test:
${PYTEST_EXECUTABLE} tests
import numpy as np
import kdtree
if __name__ == "__main__":
X = np.load("X.npy")
tree = kdtree.KDTree(X, 256)
This diff is collapsed.
import argparse
import glob
import json
import os
import sys
import subprocess
import time
import kdtree
import numpy as np
import pandas as pd
import seaborn as sns
import threadpoolctl
import yaml
from pprint import pprint
from matplotlib import pyplot as plt
from memory_profiler import memory_usage
from sklearn import set_config
from sklearn.neighbors import KDTree
from sklearn.utils._openmp_helpers import _openmp_effective_n_threads
# Be gentle with eyes
plt.rcParams["figure.dpi"] = 200
def benchmark(config, results_folder, bench_name):
datasets = config["datasets"]
estimators = config["estimators"]
leaf_sizes = config["leaf_sizes"]
n_neighbors = config.get("n_neighbors", [])
n_trials = config.get("n_trials", 3)
return_distance = config.get("return_distance", False)
one_GiB = 1e9
benchmarks = pd.DataFrame()
n_threads = _openmp_effective_n_threads()
env_specs_file = f"{results_folder}/{bench_name}.json"
# TODO: This is ugly, but I haven't found something better.
commit = (
str(subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]))
.replace("b'", "")
.replace("\\n'", "")
)
env_specs = dict(
threadpool_info=threadpoolctl.threadpool_info(),
commit=commit,
config=config,
n_threads=n_threads,
)
set_config(assume_finite=True)
with open(env_specs_file, "w") as outfile:
json.dump(env_specs, outfile)
for dataset in datasets:
for leaf_size in leaf_sizes:
for trial in range(n_trials):
dataset = {k: int(float(v)) for k, v in dataset.items()}
ns_train, ns_test, n_features = dataset.values()
X_train = np.random.rand(ns_train, n_features)
X_test = np.random.rand(ns_test, n_features)
bytes_processed_data_init = X_train.nbytes
bytes_processed_data_query = X_test.nbytes
t0_ = time.perf_counter()
sk_tree = KDTree(X_train, leaf_size=leaf_size)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="init",
implementation="sklearn",
n_threads=n_threads,
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=np.nan,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_init / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
t0_ = time.perf_counter()
tree = kdtree.KDTree(X_train, leaf_size=leaf_size)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="init",
implementation="kdtree",
n_threads=n_threads,
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=np.nan,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_init / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
benchmarks.to_csv(
f"{results_folder}/{bench_name}.csv",
mode="w+",
index=False,
)
for k in n_neighbors:
t0_ = time.perf_counter()
sk_tree.query(X_test, k=k, return_distance=False)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="query",
implementation="sklearn",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=k,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_query / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
closests = np.zeros((ns_test, k), dtype=np.int32)
t0_ = time.perf_counter()
tree.query(X_test, closests)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="query",
implementation="kdtree",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=k,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_query / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
benchmarks.to_csv(
f"{results_folder}/{bench_name}.csv",
mode="w+",
index=False,
)
# Overriding again now that all the dyn. lib. have been loaded
env_specs["threadpool_info"] = threadpoolctl.threadpool_info()
with open(env_specs_file, "w") as outfile:
json.dump(env_specs, outfile)
if __name__ == "__main__":
parser = argparse.ArgumentParser("benchmark")
parser.add_argument("bench_name")
args = parser.parse_args()
bench_name = args.bench_name
with open("benchmarks/config.yml", "r") as f:
config = yaml.full_load(f)
results_folder = f"benchmarks/results/{bench_name}"
os.makedirs(results_folder, exist_ok=True)
print(f"Benchmarking {bench_name}")
benchmark(config, results_folder, bench_name)
print(f"Benchmark results wrote in {results_folder}")
\ No newline at end of file
estimators:
- name: sklearn
estimator: sklearn.neighbors.KDTree
- name: cython+
estimator: kdtree.KDTree
n_trials: 5
datasets:
- n_samples_train: 1e5
n_samples_test: 1 # not used yet
n_features: 32
- n_samples_train: 1e6
n_samples_test: 1 # not used yet
n_features: 32
leaf_sizes:
- 512
- 1024
- 2048
- 4096
- 8192
import os
import argparse
import numpy as np
import glob
import subprocess
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
if __name__ == "__main__":
parser = argparse.ArgumentParser("report")
parser.add_argument("commit")
args = parser.parse_args()
results_folder = os.path.abspath(os.path.join(__file__, os.pardir, "results"))
commit = args.commit
def n_threads(filename):
# Extracts '2742685_1_threads.csv'
basename = os.path.basename(filename)
return int(basename.split("_")[1])
commit_result_folder = f"{results_folder}/{commit}"
csv_bench_results = sorted(glob.glob(f"{commit_result_folder}*/*.csv"),
key=n_threads)
if len(csv_bench_results) == 0:
raise RuntimeError(f"No results for commit {commit}")
os.makedirs(commit_result_folder, exist_ok=True)
df = pd.concat(map(pd.read_csv, csv_bench_results))
df = df.drop(columns=["n_neighbors", "func"])
cols = [
"n_samples_train",
"n_samples_test",
"n_features",
"leaf_size",
]
# This creates a category used for grouping
df['t'] = df.n_threads.apply(str)
df_grouped = df.groupby(cols)
for i, (vals, df_g) in enumerate(df_grouped):
# 16:9 ratio
fig = plt.figure(figsize=(24, 13.5))
ax = plt.gca()
splot = sns.barplot(
y="t", x="throughput", hue="implementation", data=df_g, ax=ax
)
_ = ax.set_xlabel("Throughput (in GB/s)")
_ = ax.set_ylabel("Number of threads")
_ = ax.tick_params(labelrotation=45)
# Adding the numerical values of "x" to bar
for p in splot.patches:
_ = splot.annotate(
f"{p.get_width():.4e}",
(p.get_width(), p.get_y() + p.get_height() / 2),
ha="center",
va="center",
size=10,
xytext=(0, -12),
textcoords="offset points",
)
title = (
f"KDTree.__init__@{commit} - "
f"Euclidean Distance, dtype=np.float64, {df_g.trial.max() + 1} trials\n"
)
title += (
"n_samples_train=%s - n_samples_test=%s - "
"n_features=%s - leaf_size=%s"
% vals
)
_ = fig.suptitle(title, fontsize=16)
plt.savefig(f"{commit_result_folder}/{i}.pdf", bbox_inches="tight")
name: cython+
channels:
- conda-forge
dependencies:
- python=3.9
- compilers
- jupyter
- numpy
- matplotlib
- seaborn
- pandas
- pyaml
- pip
- threadpoolctl
- pytest
- scikit-learn
- memory_profiler
- pip:
# Install cython+ from upstream directly
- -e git+https://lab.nexedi.com/nexedi/cython.git@b30eafec6a7b174afdc4f023b45b21f85104e2fe#egg=Cython
# The installation of the 'kdtree' module is made then
This diff is collapsed.
cdef extern from "<sys/types.h>" nogil:
ctypedef long unsigned int pthread_t
ctypedef union pthread_attr_t:
pass
ctypedef union pthread_mutex_t:
pass
ctypedef union pthread_mutexattr_t:
pass
ctypedef union pthread_barrier_t:
pass
ctypedef union pthread_barrierattr_t:
pass
ctypedef union pthread_cond_t:
pass
ctypedef union pthread_condattr_t:
pass
cdef extern from "<pthread.h>" nogil:
int pthread_create(pthread_t *, const pthread_attr_t *, void *(*)(void *), void *)
void pthread_exit(void *)
int pthread_join(pthread_t, void **)
int pthread_cancel(pthread_t thread)
int pthread_attr_init(pthread_attr_t *)
int pthread_attr_setdetachstate(pthread_attr_t *, int)
int pthread_attr_destroy(pthread_attr_t *)
int pthread_mutex_init(pthread_mutex_t *, const pthread_mutexattr_t *)
int pthread_mutex_destroy(pthread_mutex_t *)
int pthread_mutex_lock(pthread_mutex_t *)
int pthread_mutex_unlock(pthread_mutex_t *)
int pthread_mutex_trylock(pthread_mutex_t *)
int pthread_barrier_init(pthread_barrier_t *, const pthread_barrierattr_t *, unsigned int)
int pthread_barrier_destroy(pthread_barrier_t *)
int pthread_barrier_wait(pthread_barrier_t *)
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr)
int pthread_cond_destroy(pthread_cond_t *cond)
int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mutex)
int pthread_cond_broadcast(pthread_cond_t *cond)
int pthread_cond_signal(pthread_cond_t *cond)
enum: PTHREAD_CREATE_JOINABLE
\ No newline at end of file
# distutils: language = c++
from libcpp.deque cimport deque
from libcpp.vector cimport vector
from libcpp.atomic cimport atomic
from libc.stdio cimport printf
from libc.stdlib cimport rand
from posix.unistd cimport sysconf
from runtime.pthreads cimport *
from runtime.semaphore cimport *
cdef extern from "<unistd.h>" nogil:
enum: _SC_NPROCESSORS_ONLN # Seems to not be included in "posix.unistd".
cdef cypclass Scheduler
cdef cypclass Worker
# The 'inline' qualifier on this function is a hack to convince Cython to allow a definition in a .pxd file.
# The C compiler will dismiss it because we pass the function pointer to create a thread which prevents inlining.
cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler
# Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier)
while 1:
# Wait until a queue becomes available.
sem_wait(&sch.num_free_queues)
# If the scheduler is done there is nothing to do anymore.
if sch.is_done:
return <void*> 0
# Pop or steal a queue.
queue = worker.get_queue()
with wlocked queue:
# Do one task on the queue.
queue.activate()
if queue.is_empty():
# Mark the empty queue as not assigned to any worker.
queue.has_worker = False
# Decrement the number of non-completed queues.
if sch.num_pending_queues.fetch_sub(1) == 1:
# Signal that there are no more queues.
sem_post(&sch.done)
# Discard the empty queue and continue the main loop.
continue
# The queue is not empty: reinsert it in this worker's queues.
worker.queues.push_back(queue)
# Signal that the queue is available.
sem_post(&sch.num_free_queues)
cdef cypclass Worker:
deque[lock SequentialMailBox] queues
lock Scheduler scheduler
pthread_t thread
lock Worker __new__(alloc, lock Scheduler scheduler):
instance = consume alloc()
instance.scheduler = scheduler
locked_instance = <lock Worker> consume instance
if not pthread_create(&locked_instance.thread, NULL, worker_function, <void *> locked_instance):
return locked_instance
printf("pthread_create() failed\n")
lock SequentialMailBox get_queue(lock self):
# Get the next queue in the worker's list or steal one.
with wlocked self:
if not self.queues.empty():
queue = self.queues.front()
self.queues.pop_front()
return queue
return self.steal_queue()
lock SequentialMailBox steal_queue(lock self):
# Steal a queue from another worker:
# - inspect each worker in order starting at a random offset
# - skip any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size()
index = rand() % num_workers
while True:
victim = sch.workers[index]
with wlocked victim:
if not victim.queues.empty():
stolen_queue = victim.queues.back()
victim.queues.pop_back()
return stolen_queue
index += 1
if index >= num_workers:
index = 0
int join(self):
# Join the worker thread.
return pthread_join(self.thread, NULL)
cdef cypclass Scheduler:
vector[lock Worker] workers
pthread_barrier_t barrier
sem_t num_free_queues
atomic[int] num_pending_queues
sem_t done
volatile bint is_done
int num_workers
lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0)
self.num_pending_queues.store(0)
if pthread_barrier_init(&self.barrier, NULL, num_workers + 1):
printf("Could not allocate memory for the thread barrier\n")
# Signal that no work will be done.
sem_post(&self.done)
return self
self.is_done = False
self.workers.reserve(num_workers)
for i in range(num_workers):
worker = Worker(self)
if worker is NULL:
# Signal that no work will be done.
sem_post(&self.done)
return self
self.workers.push_back(worker)
# Wait until all the worker threads are ready.
pthread_barrier_wait(&self.barrier)
return self
__dealloc__(self):
pthread_barrier_destroy(&self.barrier)
sem_destroy(&self.num_free_queues)
sem_destroy(&self.done)