Commit ed81d102 authored by Julien Jerphanion's avatar Julien Jerphanion

Merge branch 'synchro' into 'main'

Add querying for KNN

See merge request jjerphan/cython_plus_experiments!1
parents f68236da 146eee39
INCLUDE_DIRS = -I/usr/include/python3.8 SHELL = /bin/bash
PROJECT = cython+
VENV_PATH=`conda info --base`/envs/${PROJECT}
PIP_EXECUTABLE=${VENV_PATH}/bin/pip
PYTHON_EXECUTABLE=${VENV_PATH}/bin/python
PYTEST_EXECUTABLE=${VENV_PATH}/bin/pytest
# Used when not using the python runtime
INCLUDE_DIRS = -I/usr/include/python3.9
EXE = kdtree EXE = kdtree
CXX = g++ CXX = g++
CPPFLAGS = -O2 -g -Wno-unused-result -Wsign-compare -pthread $(INCLUDE_DIRS) CPPFLAGS = -O2 -g -Wno-unused-result -Wsign-compare -pthread $(INCLUDE_DIRS) -fopenmp
LDFLAGS += -Wl,--unresolved-symbols=ignore-all LDFLAGS += -Wl,--unresolved-symbols=ignore-all
MACROS = -DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION MACROS = -DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION
LDLIBS = -lcrypto -lfmt EXT_SUFFIX := $(shell python -c "import sysconfig; print(sysconfig.get_config_var('EXT_SUFFIX'))")
EXT_SUFFIX := $(shell python3 -c "import sysconfig; print(sysconfig.get_config_var('EXT_SUFFIX'))")
EXT = $(EXE)$(EXT_SUFFIX) EXT = $(EXE)$(EXT_SUFFIX)
# Build with Python runtime .DEFAULT_GOAL := all
all: $(EXT)
$(EXT): setup.py ## help: Display list of commands
@echo "[Cython Compiling $^ -> $@]" .PHONY: help
python3 setup.py build_ext --inplace help: Makefile
@sed -n 's|^##||p' $< | column -t -s ':' | sed -e 's|^| |'
# Run with Python runtime ## all: Run the main targets
run: $(EXT) .PHONY: all
python3 -c "import $(EXE); $(EXE).python_main()" 2>/dev/null all: install benchmark
# Build without Python runtime ## install: Install conda env.
.PHONY: install
install: clean
conda env create --force -f environment.yml
${PIP_EXECUTABLE} install -e . -v
# nopython: Build without the Python runtime
.PHONY: nopython
nopython: $(EXE) nopython: $(EXE)
%.cpp: %.pyx %.cpp: %.pyx
@echo "[Cython Compiling $^ -> $@]" @echo "[Cython Compiling $^ -> $@]"
python3 -c "from Cython.Compiler.Main import main; main(command_line=1)" $^ --cplus -3 ${PYTEST_EXECUTABLE} -c "from Cython.Compiler.Main import main; main(command_line=1)" $^ --cplus -3
@rm -f $(subst .cpp,.h,$@) @rm -f $(subst .cpp,.h,$@)
%: %.cpp %: %.cpp
@echo "[C++ Compiling $^ -> $@]" @echo "[C++ Compiling $^ -> $@]"
$(LINK.cpp) $^ $(LOADLIBES) $(LDLIBS) $(MACROS) -o $@ $(LINK.cpp) $^ $(MACROS) -o $@
# Run without Python runtime ## runnopython: Run without Python runtime
.PHONY: runnopython
runnopython: $(EXE) runnopython: $(EXE)
# Information of the runtime are currently redirected to stderr. # Information of the runtime are currently redirected to stderr.
# This is just a simple way to mute them. # This is just a simple way to mute them.
./$(EXE) 2>/dev/null ./$(EXE) 2>/dev/null
## clean: Remove generated files from Cython and C/C++ compilation
.PHONY: clean
clean: clean:
-rm -f *.c *.cpp *.html -rm -f *.c *.cpp *.html
-rm -f *.h -rm -f *.h
...@@ -46,5 +64,17 @@ clean: ...@@ -46,5 +64,17 @@ clean:
-rm -f -r build -rm -f -r build
-rm -f *.json -rm -f *.json
.PHONY: all run nopython runnopython clean
.PRECIOUS: %.cpp .PRECIOUS: %.cpp
## benchmark: Run benchmarks
# Uses taskset to cap to a cpu solely
.PHONY: benchmark
benchmark:
for i in {0..5}; do \
taskset -c 0-$$((2**i-1)) ${PYTHON_EXECUTABLE} benchmarks/benchmark.py `git rev-parse --short HEAD`_$$((2**i-1))_thread ;\
done
## test: Launch all the test.
.PHONY: test
test:
${PYTEST_EXECUTABLE} tests
import argparse
import glob
import importlib
import json
import os
import subprocess
import time
import kdtree
import numpy as np
import pandas as pd
import seaborn as sns
import threadpoolctl
import yaml
from pprint import pprint
from matplotlib import pyplot as plt
from memory_profiler import memory_usage
from sklearn import set_config
from sklearn.neighbors import KDTree
# Be gentle with eyes
plt.rcParams["figure.dpi"] = 200
def benchmark(config, results_folder, bench_name):
datasets = config["datasets"]
estimators = config["estimators"]
leaf_sizes = config["leaf_sizes"]
n_neighbors = config.get("n_neighbors", [])
n_trials = config.get("n_trials", 3)
return_distance = config.get("return_distance", False)
one_GiB = 1e9
benchmarks = pd.DataFrame()
env_specs_file = f"{results_folder}/{bench_name}.json"
# TODO: This is ugly, but I haven't found something better.
commit = (
str(subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]))
.replace("b'", "")
.replace("\\n'", "")
)
env_specs = dict(
threadpool_info=threadpoolctl.threadpool_info(),
commit=commit,
config=config,
)
set_config(assume_finite=True)
with open(env_specs_file, "w") as outfile:
json.dump(env_specs, outfile)
for dataset in datasets:
for leaf_size in leaf_sizes:
for trial in range(n_trials):
dataset = {k: int(float(v)) for k, v in dataset.items()}
ns_train, ns_test, n_features = dataset.values()
X_train = np.random.rand(ns_train, n_features)
X_test = np.random.rand(ns_test, n_features)
bytes_processed_data_init = X_train.nbytes
bytes_processed_data_query = X_test.nbytes
t0_ = time.perf_counter()
sk_tree = KDTree(X_train, leaf_size=leaf_size)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="init",
implementation="sklearn",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=np.nan,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_init / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
t0_ = time.perf_counter()
tree = kdtree.KDTree(X_train, leaf_size=leaf_size)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="init",
implementation="kdtree",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=np.nan,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_init / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
benchmarks.to_csv(
f"{results_folder}/{bench_name}.csv",
mode="w+",
index=False,
)
for k in n_neighbors:
t0_ = time.perf_counter()
sk_tree.query(X_test, k=k, return_distance=False)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="query",
implementation="sklearn",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=k,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_query / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
closests = np.zeros((ns_test, k), dtype=np.int32)
t0_ = time.perf_counter()
tree.query(X_test, closests)
t1_ = time.perf_counter()
time_elapsed = round(t1_ - t0_, 5)
row = dict(
trial=trial,
func="query",
implementation="kdtree",
leaf_size=leaf_size,
n_samples_train=ns_train,
n_samples_test=ns_test,
n_features=n_features,
n_neighbors=k,
time_elapsed=time_elapsed,
throughput=bytes_processed_data_query / time_elapsed / one_GiB,
)
benchmarks = benchmarks.append(row, ignore_index=True)
pprint(row)
print("---")
benchmarks.to_csv(
f"{results_folder}/{bench_name}.csv",
mode="w+",
index=False,
)
# Overriding again now that all the dyn. lib. have been loaded
env_specs["threadpool_info"] = threadpoolctl.threadpool_info()
with open(env_specs_file, "w") as outfile:
json.dump(env_specs, outfile)
def report(results_folder, bench_name):
df = pd.read_csv(glob.glob(f"{results_folder}/*.csv")[0])
with open(glob.glob(f"{results_folder}/*.json")[0], "r") as json_file:
env_specs = json.load(json_file)
cols = [
"n_samples_train",
"n_features",
"leaf_size",
]
df[cols] = df[cols].astype(np.uint32)
df['d'] = df.n_features.apply(str)
df['leaf'] = df.leaf_size.apply(str)
df_grouped = df.groupby(cols)
for i, (vals, df) in enumerate(df_grouped):
# 16:9 ratio
fig = plt.figure(figsize=(24, 13.5))
ax = plt.gca()
splot = sns.barplot(
y="leaf", x="throughput", hue="implementation", data=df, ax=ax
)
_ = ax.set_xlabel("Throughput (in GB/s)")
_ = ax.set_ylabel("Leaf Size")
_ = ax.tick_params(labelrotation=45)
# Adding the numerical values of "x" to bar
for p in splot.patches:
_ = splot.annotate(
f"{p.get_width():.4e}",
(p.get_width(), p.get_y() + p.get_height() / 2),
ha="center",
va="center",
size=10,
xytext=(0, -12),
textcoords="offset points",
)
title = (
f"KDTree@{env_specs['commit']} - "
f"Euclidean Distance, dtype=np.float64, {df.trial.max() + 1} trials - Bench. Name: {bench_name}\n"
)
title += (
"n_samples_train=%s - n_features=%s - leaf_size=%s" % vals
)
_ = fig.suptitle(title, fontsize=16)
plt.savefig(f"{results_folder}/{bench_name}_{i}.pdf", bbox_inches="tight")
# Unifying pdf files into one
pdf_files = sorted(glob.glob(f"{results_folder}/{bench_name}*.pdf"))
subprocess.check_output(
["pdfunite", *pdf_files, f"{results_folder}/{bench_name}.pdf"]
)
if __name__ == "__main__":
parser = argparse.ArgumentParser("benchmark")
parser.add_argument("bench_name")
args = parser.parse_args()
bench_name = args.bench_name
with open("benchmarks/config.yml", "r") as f:
config = yaml.full_load(f)
results_folder = f"benchmarks/results/{bench_name}"
os.makedirs(results_folder, exist_ok=True)
print(f"Benchmarking {bench_name}")
benchmark(config, results_folder, bench_name)
print(f"Benchmark results wrote in {results_folder}")
print(f"Reporting results for {bench_name}")
report(results_folder, bench_name)
print(f"Reporting results wrote in {results_folder}")
\ No newline at end of file
estimators:
- name: sklearn
estimator: sklearn.neighbors.KDTree
- name: cython+
estimator: kdtree.KDTree
n_trials: 3
datasets:
- n_samples_train: 1e6
n_samples_test: 1e6
n_features: 5
- n_samples_train: 1e6
n_samples_test: 1e6
n_features: 10
- n_samples_train: 1e6
n_samples_test: 1e6
n_features: 50
- n_samples_train: 1e6
n_samples_test: 1e6
n_features: 100
leaf_sizes:
- 64
- 128
- 256
- 512
- 1024
- 2048
- 4096
name: cython+
channels:
- conda-forge
dependencies:
- python=3.9
- compilers
- jupyter
- numpy
- matplotlib
- seaborn
- pandas
- pyaml
- pip
- threadpoolctl
- pytest
- scikit-learn
- memory_profiler
- pip:
# Install cython+ from upstream directly
- -e git+https://gitlab.inria.fr/jjerphan/cython.git@b30eafec6a7b174afdc4f023b45b21f85104e2fe#egg=Cython
# The installation of the 'kdtree' module is made then
...@@ -5,16 +5,23 @@ import numpy as np ...@@ -5,16 +5,23 @@ import numpy as np
np.import_array() np.import_array()
from runtime.runtime cimport BatchMailBox, NullResult, Scheduler from runtime.runtime cimport BatchMailBox, NullResult, Scheduler, WaitResult
from libc.stdio cimport printf from libc.stdio cimport printf
from libc.stdlib cimport malloc, free from libc.stdlib cimport malloc, free
from openmp cimport omp_get_max_threads
from cython.parallel import prange
## Types declaration ## Types declaration
ctypedef int I_t ctypedef int I_t
ctypedef double D_t ctypedef double D_t
cdef lock Scheduler scheduler cdef lock Scheduler scheduler
cdef D_t INF = 1e38
# NOTE: The following extern definition is used to interface
# std::nth_element, a robust partitioning algorithm, in Cython
cdef extern from *: cdef extern from *:
""" """
#include <algorithm> #include <algorithm>
...@@ -63,53 +70,384 @@ cdef extern from *: ...@@ -63,53 +70,384 @@ cdef extern from *:
I n_features I n_features
) nogil except + ) nogil except +
cdef cypclass Counter activable:
""" A simple Counter.
This can be useful for synchronisation for the caller after
triggering the actors logic as it wait for the value of
the Coutner to reach a given one before moving on.
"""
I_t _n
__init__(self):
self._n = 0
self._active_result_class = WaitResult.construct
self._active_queue_class = consume BatchMailBox(scheduler)
void add(self, I_t value):
self._n += value
void reset(self):
self._n = 0
I_t value(self):
return self._n
cdef cypclass Container activable:
""" A simple wrapper of an array.
The wrapped array is passed by the initial caller which then
trigger the actors logic modifying it.
The initial caller can wait for specific number of update
before proceeding.
"""
I_t * _array
I_t _size
I_t _n_updates
__init__(self, I_t *array, I_t size):
self._array = array
self._size = size
self._active_result_class = WaitResult.construct
self._active_queue_class = consume BatchMailBox(scheduler)
self._n_updates = 0
void update(self, I_t idx, I_t value):
self._n_updates += 1
self._array[idx] = value
int n_updates(self):
return self._n_updates
cdef inline void dual_swap(D_t* darr, I_t* iarr, I_t i1, I_t i2) nogil:
"""swap the values at inex i1 and i2 of both darr and iarr"""
cdef D_t dtmp = darr[i1]
darr[i1] = darr[i2]
darr[i2] = dtmp
cdef I_t itmp = iarr[i1]
iarr[i1] = iarr[i2]
iarr[i2] = itmp
cdef void _simultaneous_sort(
D_t* dist,
I_t* idx,
I_t size
) nogil:
"""
Perform a recursive quicksort on the dist array, simultaneously
performing the same swaps on the idx array. The equivalent in
numpy (though quite a bit slower) is
def simultaneous_sort(dist, idx):
i = np.argsort(dist)
return dist[i], idx[i]
"""
cdef I_t pivot_idx, i, store_idx
cdef D_t pivot_val
# in the small-array case, do things efficiently
if size <= 1:
pass
elif size == 2:
if dist[0] > dist[1]:
dual_swap(dist, idx, 0, 1)
elif size == 3:
if dist[0] > dist[1]:
dual_swap(dist, idx, 0, 1)
if dist[1] > dist[2]:
dual_swap(dist, idx, 1, 2)
if dist[0] > dist[1]:
dual_swap(dist, idx, 0, 1)
else:
# Determine the pivot using the median-of-three rule.
# The smallest of the three is moved to the beginning of the array,
# the middle (the pivot value) is moved to the end, and the largest
# is moved to the pivot index.
pivot_idx = size // 2
if dist[0] > dist[size - 1]:
dual_swap(dist, idx, 0, size - 1)
if dist[size - 1] > dist[pivot_idx]:
dual_swap(dist, idx, size - 1, pivot_idx)
if dist[0] > dist[size - 1]:
dual_swap(dist, idx, 0, size - 1)
pivot_val = dist[size - 1]
# partition indices about pivot. At the end of this operation,
# pivot_idx will contain the pivot value, everything to the left
# will be smaller, and everything to the right will be larger.
store_idx = 0
for i in range(size - 1):
if dist[i] < pivot_val:
dual_swap(dist, idx, i, store_idx)
store_idx += 1
dual_swap(dist, idx, store_idx, size - 1)
pivot_idx = store_idx
# recursively sort each side of the pivot
if pivot_idx > 1:
_simultaneous_sort(dist, idx, pivot_idx)
if pivot_idx + 2 < size:
_simultaneous_sort(dist + pivot_idx + 1,
idx + pivot_idx + 1,
size - pivot_idx - 1)
cdef cypclass NeighborsHeap activable:
"""A max-heap structure to keep track of distances/indices of neighbors
This implements an efficient pre-allocated set of fixed-size heaps
for chasing neighbors, holding both an index and a distance.
When any row of the heap is full, adding an additional point will push
the furthest point off the heap.
Taken and adapted from:
https://github.com/scikit-learn/scikit-learn/blob/e4bb9fa86b0df873ad750b6d59090843d9d23d50/sklearn/neighbors/_binary_tree.pxi#L513
The initial caller is responsible for providing the array of indices
which will be modified in place by the actors logic.
n_pushes and is_sorted can be used by the initial caller to know
when to pursue.
Parameters
----------
n_pts : int
the number of heaps to use
n_nbrs : int
the size of each heap.
"""
D_t *_distances
I_t *_indices
I_t _n_pts
I_t _n_nbrs
I_t _n_pushes
bint _sorted
__init__(self, I_t * indices, I_t n_pts, I_t n_nbrs):
cdef I_t i
self._n_pts = n_pts
self._n_nbrs = n_nbrs
self._active_result_class = WaitResult.construct
self._active_queue_class = consume BatchMailBox(scheduler)
self._distances = <D_t *> malloc(n_pts * n_nbrs * sizeof(D_t))
self._indices = indices
self._n_pushes = 0
self._sorted = False
# We can't use memset here
for i in range(n_pts * n_nbrs):
self._distances[i] = INF
void __dealloc__(self):
free(self._distances)
void push(self, I_t row, D_t val, I_t i_val):
"""push (val, i_val) into the given row"""
cdef:
I_t i, left_child_idx, right_child_idx, swap_idx
# Getting the heap to use
I_t *indices = self._indices + row * self._n_nbrs
D_t *distances = self._distances + row * self._n_nbrs
self._n_pushes += 1
# check if val should be in heap
if val > distances[0]:
return
# insert val at position zero
distances[0] = val
indices[0] = i_val
# descend the heap, swapping values until the max heap criterion is met
i = 0
while True:
left_child_idx = 2 * i + 1
right_child_idx = left_child_idx + 1
if left_child_idx >= self._n_nbrs:
break
elif right_child_idx >= self._n_nbrs:
if distances[left_child_idx] > val:
swap_idx = left_child_idx
else:
break
elif distances[left_child_idx] >= distances[right_child_idx]:
if val < distances[left_child_idx]:
swap_idx = left_child_idx
else:
break
else:
if val < distances[right_child_idx]:
swap_idx = right_child_idx
else:
break
distances[i] = distances[swap_idx]
indices[i] = indices[swap_idx]
i = swap_idx
distances[i] = val
indices[i] = i_val
void sort(self):
# NOTE: Ideally we could sort results in parallel, but
# OpenMP threadpool and this runtime's aren't working
# nicely together (using prange here would create OpenMP
# threads within workers, causing unexpected behavior.)
cdef I_t row
for row in range(self._n_pts):
# We use a function here to be able to recurse
_simultaneous_sort(
self._distances + row * self._n_nbrs,
self._indices + row * self._n_nbrs,
self._n_nbrs)
self._sorted = True
int n_pushes(self):
return self._n_pushes
int is_sorted(self):
# TODO: is there a support for returning bool via
# promises? As of now returning a int for making
# use of getIntResult
return 1 if self._sorted else 0
cdef cypclass Node activable: cdef cypclass Node activable:
"""A KDTree Node""" """A KDTree Node
Node delegate tasks to their children Nodes.
Some Nodes are set as Leaves when they are associated
to ``leaf_size`` or less points.
Leafs are terminal Nodes and do not have children.
"""
# Reference to the head of the allocated arrays
# data gets not modified via _data_ptr
D_t * _data_ptr
I_t * _indices_ptr
# The point the Node split on
D_t * _point
I_t _n_dims
I_t _dim
# Portion of _indices covered by the Node is:
# _indices_ptr[_start:_end]
I_t _start
I_t _end
bint _is_leaf
D_t * point active Node _left
I_t n_dims active Node _right
active Node left
active Node right
__init__(self): __init__(self):
self._active_result_class = NullResult self._active_result_class = WaitResult.construct
self._active_queue_class = consume BatchMailBox(scheduler) self._active_queue_class = consume BatchMailBox(scheduler)
self.left = NULL self._left = NULL
self.right = NULL self._right = NULL
self._is_leaf = False
# We use this to allow using actors for initialisation
# because __init__ can't be reified.
void build_node( void build_node(
self, self,
D_t * data_ptr, D_t * data_ptr,
I_t * indices_ptr, I_t * indices_ptr,
I_t depth, I_t leaf_size,
I_t n_dims, I_t n_dims,
I_t dim, I_t dim,
I_t start, I_t start,
I_t end, I_t end,
active Counter counter,
): ):
cdef I_t i cdef I_t i
cdef I_t next_dim = (dim + 1) % n_dims cdef I_t next_dim = (dim + 1) % n_dims
cdef I_t mid = (start + end) // 2 cdef I_t mid = (start + end) // 2
self.n_dims = n_dims self._data_ptr = data_ptr
self._indices_ptr = indices_ptr
self._dim = dim
self._n_dims = n_dims
self._start = start
self._end = end
if (depth < 0) or (end - start <= 1): if (end - start <= leaf_size):
self._is_leaf = True
# Adding to the global counter the number
# of samples the leaf is responsible of
counter.add(NULL, end - start)
return return
# We partition the samples in two nodes on a given dimension,
# with the middle point as a pivot
partition_node_indices(data_ptr, indices_ptr, start, mid, end, dim, n_dims) partition_node_indices(data_ptr, indices_ptr, start, mid, end, dim, n_dims)
self.point = data_ptr + mid self._point = data_ptr + mid
self.left = consume Node() self._left = consume Node()
self.right = consume Node() self._right = consume Node()
self.left.build_node(NULL, # Recursing on both partition
self._left.build_node(NULL,
data_ptr, indices_ptr, data_ptr, indices_ptr,
depth - 1, n_dims, next_dim, leaf_size, n_dims, next_dim,
start, mid) start, mid, counter)
self.right.build_node(NULL, self._right.build_node(NULL,
data_ptr, indices_ptr, data_ptr, indices_ptr,
depth - 1, n_dims, next_dim, leaf_size, n_dims, next_dim,
mid, end) mid, end, counter)
void query(self,
D_t * query_points,
I_t i,
active NeighborsHeap heaps,
):
cdef:
I_t j, k
D_t dist
D_t tmp
if self._is_leaf:
# Computing all the euclideans distances here
for j in range(self._start, self._end):
dist = 0
for k in range(self._n_dims):
tmp = (
query_points[i * self._n_dims + k] -
self._data_ptr[self._indices_ptr[j] * self._n_dims + k]
)
dist += tmp * tmp
# The heap is doing the smart work of keeping
# the closest points for each query point i
heaps.push(NULL, i, dist, self._indices_ptr[j])
return
# TODO: one can implement a pruning strategy here
self._left.query(NULL, query_points, i, heaps)
self._right.query(NULL, query_points, i, heaps)
cdef cypclass KDTree: cdef cypclass KDTree:
...@@ -124,66 +462,105 @@ cdef cypclass KDTree: ...@@ -124,66 +462,105 @@ cdef cypclass KDTree:
This relies on a Cython+ runtime using actors. This relies on a Cython+ runtime using actors.
""" """
I_t n # number of data I_t _n # number of examples
I_t d # number of dimensions / features I_t _d # number of dimensions / features
I_t depth # max_depth of the tree (to be unified with leaf_size) I_t _leaf_size # maximum number of vectors at leaf
I_t _n_leafs
active Node root active Node _root
D_t *data_ptr D_t *_data_ptr
I_t *indices_ptr I_t *_indices_ptr
__init__(self, __init__(self,
np.ndarray data, np.ndarray X,
I_t depth, I_t leaf_size,
): ):
cdef I_t i cdef I_t i
cdef I_t n = data.shape[0] cdef I_t n = X.shape[0]
cdef I_t d = data.shape[1] cdef I_t d = X.shape[1]
cdef I_t initialised = 0
self.n = n # Accessing _SC_NPROCESSORS_ONLN does not return the
self.d = d # effective number of threads which were assigned to
self.depth = depth # this task using `taskset(1)` for instance.
#
self.data_ptr = <D_t *> data.data # This OpenMP API is a workable way to access it.
self.indices_ptr = <I_t *> malloc(n * sizeof(I_t)) cdef I_t num_workers = omp_get_max_threads()
self._n = n
self._d = d
self._leaf_size = leaf_size
self._data_ptr = <D_t *> X.data
self._indices_ptr = <I_t *> malloc(n * sizeof(I_t))
for i in range(n): for i in range(n):
self.indices_ptr[i] = i self._indices_ptr[i] = i
self._recursive_build() # Recurvisely building the tree here
void _recursive_build(self):
# TODO: introducing a context manager for the runtime
# would be nice here:
# ```
# with scheduler:
# self.root = ...
# ```
global scheduler global scheduler
scheduler = Scheduler() scheduler = Scheduler(num_workers)
cdef active Counter counter = consume Counter()
self.root = consume Node() self._root = consume Node()
if self.root is NULL: if self._root is NULL:
printf("Error consuming node\n") printf("Error consuming node\n")
# When object are activated (set as Actors), methods # When object are activated (set as Actors), methods
# are reified. When using those reified methods # are reified. When using those reified methods
# a new argument is prepredend for a predicate, # a new argument is prepredend for a predicate,
# which we aren't using using here, hence the extra NULL. # which we aren't using using here, hence the extra NULL.
self.root.build_node(NULL, #
self.data_ptr, # Also using this separate method allowing using actors
self.indices_ptr, # because __init__ can't be reified.
depth, n_dims=d, dim=0, start=0, end=n) self._root.build_node(NULL,
self._data_ptr,
self._indices_ptr,
self._leaf_size, n_dims=self._d,
dim=0, start=0, end=self._n,
counter=counter)
scheduler.finish() # Waiting for the tree construction to end
# Somewhat similar to a thread barrier
while(initialised < self._n):
initialised = counter.value(NULL).getIntResult()
counter.reset(NULL)
void __dealloc__(self): void __dealloc__(self):
printf("Deallocating KDTree datastructures\n") scheduler.finish()
free(self.data_ptr) free(self._indices_ptr)
free(self.indices_ptr)
printf("Done deallocating KDTree datastructures\n")
# XXX It sometimes blocks over here and then segfaults void query(self,
np.ndarray query_points, # IN
np.ndarray closests, # OUT
):
cdef:
I_t completed_queries = 0
I_t i
I_t n_query = query_points.shape[0]
I_t n_neighbors = closests.shape[1]
I_t total_n_pushes = n_query * self._n
active NeighborsHeap heaps
heaps = consume NeighborsHeap(<I_t *> closests.data,
n_query,
n_neighbors)
for i in range(n_query):
self._root.query(NULL, <D_t *> query_points.data, i, heaps)
while(completed_queries < total_n_pushes):
completed_queries = heaps.n_pushes(NULL).getIntResult()
heaps.sort(NULL)
while not(heaps.is_sorted(NULL).getIntResult()):
pass
cdef public int main() nogil: cdef public int main() nogil:
# Entry point for the compiled binary file # Entry point for the compiled binary file
......
import numpy as np
import kdtree
if __name__ == "__main__":
n, d = 1000000, 2
golden_ratio = (1 + 5 ** 0.5) / 2
X = np.zeros((n, d))
for i in range(n):
X[i, 0] = (i / golden_ratio) % 1
X[i, 1] = i / n
tree = kdtree.KDTree(X, depth=10)
del tree
...@@ -217,3 +217,47 @@ cdef cypclass BatchMailBox(SequentialMailBox): ...@@ -217,3 +217,47 @@ cdef cypclass BatchMailBox(SequentialMailBox):
cdef inline ActhonResultInterface NullResult() nogil: cdef inline ActhonResultInterface NullResult() nogil:
return NULL return NULL
# Taken from:
# https://lab.nexedi.com/nexedi/cython/blob/3.0a6-cypclass/tests/run/cypclass_acthon.pyx#L66
cdef cypclass WaitResult(ActhonResultInterface):
union result_t:
int int_val
void* ptr
result_t result
sem_t semaphore
__init__(self):
self.result.ptr = NULL
sem_init(&self.semaphore, 0, 0)
__dealloc__(self):
sem_destroy(&self.semaphore)
@staticmethod
ActhonResultInterface construct():
return WaitResult()
void pushVoidStarResult(self, void* result):
self.result.ptr = result
sem_post(&self.semaphore)
void pushIntResult(self, int result):
self.result.int_val = result
sem_post(&self.semaphore)
result_t _getRawResult(const self):
# We must ensure a result exists, but we can let others access it immediately
# The cast here is a way of const-casting (we're modifying the semaphore in a const method)
sem_wait(<sem_t*> &self.semaphore)
sem_post(<sem_t*> &self.semaphore)
return self.result
void* getVoidStarResult(const self):
res = self._getRawResult()
return res.ptr
int getIntResult(const self):
res = self._getRawResult()
return res.int_val
from distutils.core import setup from distutils.core import setup
from distutils.extension import Extension from distutils.extension import Extension
import numpy
from Cython.Build import cythonize from Cython.Build import cythonize
extensions = [ extensions = [
...@@ -7,11 +9,15 @@ extensions = [ ...@@ -7,11 +9,15 @@ extensions = [
"kdtree", "kdtree",
language="c++", language="c++",
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")],
include_dirs=[numpy.get_include()],
extra_compile_args=["-fopenmp"],
extra_link_args=["-fopenmp"],
sources=["kdtree.pyx"], sources=["kdtree.pyx"],
libraries=["crypto", "fmt"],
), ),
] ]
setup( setup(
ext_modules=cythonize(extensions) ext_modules=cythonize(extensions),
name="kdtree",
version="0.1",
) )
import numpy as np
import pytest
import kdtree
from sklearn.neighbors import KDTree
@pytest.mark.parametrize("n", [10, 100, 1000, 10000])
@pytest.mark.parametrize("d", [10, 100])
@pytest.mark.parametrize("k", [1, 2, 5, 10])
@pytest.mark.parametrize("leaf_size", [256, 1024])
def test_against_sklearn(n, d, k, leaf_size):
np.random.seed(1)
X = np.random.rand(n, d)
query_points = np.random.rand(n, d)
tree = kdtree.KDTree(X, leaf_size=256)
skl_tree = KDTree(X, leaf_size=256)
closests = np.zeros((n, k), dtype=np.int32)
tree.query(query_points, closests)
skl_closests = skl_tree.query(query_points, k=k, return_distance=False).astype(np.int32)
np.testing.assert_equal(closests, skl_closests)
\ No newline at end of file
ipython==7.22.0
jupyter==1.0.0
line-profiler==3.1.0
matplotlib==3.4.1
numpy==1.20.2
-e git+https://lab.nexedi.com/nexedi/cython.git@608c29a9ab7b7803c900621424279a9e71fac106#egg=Cython&subdirectory=../../cython
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment