Commit 9b840986 authored by Julien Jerphanion's avatar Julien Jerphanion

No use of Cython+ KMeans

parent 5967987f
all: mrproper build bench
# Remove benchmark fixtures
clean:
-rm -Rf bench_res
-rm -f *.json
-rm -r *.html
-rm perf.data
-rm perf.data.old
# Remove all but sources
mrproper: clean
rm -Rf build
rm -f *.cpp
rm -f *.c
rm -f *.so
build:
python setup.py build_ext --inplace
# Create a full tracing html report via Viztracer
# for reference, see: https://www.maartenbreddels.com/perf/jupyter/python/tracing/gil/2021/01/14/Tracing-the-Python-GIL.html
bench: clean
@ mkdir -p bench_res
@ perf record -e sched:sched_switch -e sched:sched_process_fork -e 'sched:sched_wak*' \
-k CLOCK_MONOTONIC \
--call-graph dwarf -- viztracer -o bench_res/bench.json --ignore_frozen kmeans.py
@ perf script --no-inline | per4m perf2trace sched -o bench_res/perf.json
@ viztracer --combine bench_res/perf.json bench_res/bench.json -o bench.html
@ open bench.html
# cython: profile=True
# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True
# cython: language_level = 3
# cython: linetrace=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
# Licence: BSD 3 clause
# TODO: We still need to use ndarrays instead of typed memoryviews when using
# fused types and when the array may be read-only (for instance when it's
# provided by the user). This is fixed in cython > 0.3.
import numpy as np
cimport numpy as np
from cython cimport floating
from cython.parallel import prange, parallel
from libc.stdlib cimport malloc, calloc, free
from libc.string cimport memset
from libc.math cimport sqrt
from scipy.linalg.cython_blas cimport sgemm, dgemm
np.import_array()
ctypedef fused integral:
int
long long
ctypedef np.float64_t DOUBLE
ctypedef np.int32_t INT
# Number of samples per data chunk defined as a global constant.
_CHUNK_SIZE = 256
cdef enum _BLAS_Order:
RowMajor # C contiguous
ColMajor # Fortran contiguous
cdef enum _BLAS_Trans:
NoTrans = 110 # correspond to 'n'
Trans = 116 # correspond to 't'
def lloyd_iter_chunked_dense(
np.ndarray[floating, ndim=2, mode='c'] X, # IN
floating[::1] sample_weight, # IN
floating[::1] x_squared_norms, # IN
floating[:, ::1] centers_old, # IN
floating[:, ::1] centers_new, # OUT
floating[::1] weight_in_clusters, # OUT
int[::1] labels, # OUT
floating[::1] center_shift, # OUT
int n_threads,
bint update_centers=True):
"""Single iteration of K-means lloyd algorithm with dense input.
Update labels and centers (inplace), for one iteration, distributed
over data chunks.
Parameters
----------
X : ndarray of shape (n_samples, n_features), dtype=floating
The observations to cluster.
sample_weight : ndarray of shape (n_samples,), dtype=floating
The weights for each observation in X.
x_squared_norms : ndarray of shape (n_samples,), dtype=floating
Squared L2 norm of X.
centers_old : ndarray of shape (n_clusters, n_features), dtype=floating
Centers before previous iteration, placeholder for the centers after
previous iteration.
centers_new : ndarray of shape (n_clusters, n_features), dtype=floating
Centers after previous iteration, placeholder for the new centers
computed during this iteration.
centers_squared_norms : ndarray of shape (n_clusters,), dtype=floating
Squared L2 norm of the centers.
weight_in_clusters : ndarray of shape (n_clusters,), dtype=floating
Placeholder for the sums of the weights of every observation assigned
to each center.
labels : ndarray of shape (n_samples,), dtype=int
labels assignment.
center_shift : ndarray of shape (n_clusters,), dtype=floating
Distance between old and new centers.
n_threads : int
The number of threads to be used by openmp.
update_centers : bool
- If True, the labels and the new centers will be computed, i.e. runs
the E-step and the M-step of the algorithm.
- If False, only the labels will be computed, i.e runs the E-step of
the algorithm. This is useful especially when calling predict on a
fitted model.
"""
cdef:
int n_samples = X.shape[0]
int n_features = X.shape[1]
int n_clusters = centers_new.shape[0]
# hard-coded number of samples per chunk. Appeared to be close to
# optimal in all situations.
int n_samples_chunk = _CHUNK_SIZE if n_samples > _CHUNK_SIZE else n_samples
int n_chunks = n_samples // n_samples_chunk
int n_samples_rem = n_samples % n_samples_chunk
int chunk_idx, n_samples_chunk_eff
int start, end
int j, k
floating[::1] centers_squared_norms = row_norms(centers_old, squared=True)
floating *centers_new_chunk
floating *weight_in_clusters_chunk
floating *pairwise_distances_chunk
# count remainder chunk in total number of chunks
n_chunks += n_samples != n_chunks * n_samples_chunk
# number of threads should not be bigger than number of chunks
n_threads = min(n_threads, n_chunks)
if update_centers:
memset(&centers_new[0, 0], 0, n_clusters * n_features * sizeof(floating))
memset(&weight_in_clusters[0], 0, n_clusters * sizeof(floating))
with nogil, parallel(num_threads=n_threads):
# thread local buffers
centers_new_chunk = <floating*> calloc(n_clusters * n_features, sizeof(floating))
weight_in_clusters_chunk = <floating*> calloc(n_clusters, sizeof(floating))
pairwise_distances_chunk = <floating*> malloc(n_samples_chunk * n_clusters * sizeof(floating))
for chunk_idx in prange(n_chunks, schedule='static'):
start = chunk_idx * n_samples_chunk
if chunk_idx == n_chunks - 1 and n_samples_rem > 0:
end = start + n_samples_rem
else:
end = start + n_samples_chunk
_update_chunk_dense(
&X[start, 0],
sample_weight[start: end],
x_squared_norms[start: end],
centers_old,
centers_squared_norms,
labels[start: end],
centers_new_chunk,
weight_in_clusters_chunk,
pairwise_distances_chunk,
update_centers)
# reduction from local buffers. The gil is necessary for that to avoid
# race conditions.
if update_centers:
with gil:
for j in range(n_clusters):
weight_in_clusters[j] += weight_in_clusters_chunk[j]
for k in range(n_features):
centers_new[j, k] += centers_new_chunk[j * n_features + k]
free(centers_new_chunk)
free(weight_in_clusters_chunk)
free(pairwise_distances_chunk)
if update_centers:
_relocate_empty_clusters_dense(X, sample_weight, centers_old,
centers_new, weight_in_clusters, labels)
_average_centers(centers_new, weight_in_clusters)
_center_shift(centers_old, centers_new, center_shift)
# Fixtures
cdef void _gemm(_BLAS_Order order, _BLAS_Trans ta, _BLAS_Trans tb, int m, int n,
int k, floating alpha, floating *A, int lda, floating *B,
int ldb, floating beta, floating *C, int ldc) nogil:
"""C := alpha * op(A).op(B) + beta * C"""
cdef:
char ta_ = ta
char tb_ = tb
if order == RowMajor:
if floating is float:
sgemm(&tb_, &ta_, &n, &m, &k, &alpha, B,
&ldb, A, &lda, &beta, C, &ldc)
else:
dgemm(&tb_, &ta_, &n, &m, &k, &alpha, B,
&ldb, A, &lda, &beta, C, &ldc)
else:
if floating is float:
sgemm(&ta_, &tb_, &m, &n, &k, &alpha, A,
&lda, B, &ldb, &beta, C, &ldc)
else:
dgemm(&ta_, &tb_, &m, &n, &k, &alpha, A,
&lda, B, &ldb, &beta, C, &ldc)
cpdef void _relocate_empty_clusters_dense(
np.ndarray[floating, ndim=2, mode='c'] X, # IN
floating[::1] sample_weight, # IN
floating[:, ::1] centers_old, # IN
floating[:, ::1] centers_new, # INOUT
floating[::1] weight_in_clusters, # INOUT
int[::1] labels): # IN
"""Relocate centers which have no sample assigned to them."""
cdef:
int[::1] empty_clusters = np.where(np.equal(weight_in_clusters, 0))[0].astype(np.int32)
int n_empty = empty_clusters.shape[0]
if n_empty == 0:
return
cdef:
int n_features = X.shape[1]
floating[::1] distances = ((np.asarray(X) - np.asarray(centers_old)[labels])**2).sum(axis=1)
int[::1] far_from_centers = np.argpartition(distances, -n_empty)[:-n_empty-1:-1].astype(np.int32)
int new_cluster_id, old_cluster_id, far_idx, idx, k
floating weight
for idx in range(n_empty):
new_cluster_id = empty_clusters[idx]
far_idx = far_from_centers[idx]
weight = sample_weight[far_idx]
old_cluster_id = labels[far_idx]
for k in range(n_features):
centers_new[old_cluster_id, k] -= X[far_idx, k] * weight
centers_new[new_cluster_id, k] = X[far_idx, k] * weight
weight_in_clusters[new_cluster_id] = weight
weight_in_clusters[old_cluster_id] -= weight
def row_norms(X, squared=False):
"""Row-wise (squared) Euclidean norm of X.
Equivalent to np.sqrt((X * X).sum(axis=1)), but does not create
an X.shape-sized temporary.
Performs no input validation.
Parameters
----------
X : array-like
The input array.
squared : bool, default=False
If True, return squared norms.
Returns
-------
array-like
The row-wise (squared) Euclidean norm of X.
"""
norms = np.einsum('ij,ij->i', X, X)
if not squared:
np.sqrt(norms, norms)
return norms
cdef floating _euclidean_dense_dense(
floating* a, # IN
floating* b, # IN
int n_features,
bint squared) nogil:
"""Euclidean distance between a dense and b dense"""
cdef:
int i
int n = n_features // 4
int rem = n_features % 4
floating result = 0
# We manually unroll the loop for better cache optimization.
for i in range(n):
result += ((a[0] - b[0]) * (a[0] - b[0])
+(a[1] - b[1]) * (a[1] - b[1])
+(a[2] - b[2]) * (a[2] - b[2])
+(a[3] - b[3]) * (a[3] - b[3]))
a += 4; b += 4
for i in range(rem):
result += (a[i] - b[i]) * (a[i] - b[i])
return result if squared else sqrt(result)
cpdef floating _inertia_dense(
np.ndarray[floating, ndim=2, mode='c'] X, # IN
floating[::1] sample_weight, # IN
floating[:, ::1] centers, # IN
int[::1] labels): # IN
"""Compute inertia for dense input data
Sum of squared distance between each sample and its assigned center.
"""
cdef:
int n_samples = X.shape[0]
int n_features = X.shape[1]
int i, j
floating sq_dist = 0.0
floating inertia = 0.0
for i in range(n_samples):
j = labels[i]
sq_dist = _euclidean_dense_dense(&X[i, 0], &centers[j, 0],
n_features, True)
inertia += sq_dist * sample_weight[i]
return inertia
cdef void _average_centers(
floating[:, ::1] centers, # INOUT
floating[::1] weight_in_clusters): # IN
"""Average new centers wrt weights."""
cdef:
int n_clusters = centers.shape[0]
int n_features = centers.shape[1]
int j, k
floating alpha
for j in range(n_clusters):
if weight_in_clusters[j] > 0:
alpha = 1.0 / weight_in_clusters[j]
for k in range(n_features):
centers[j, k] *= alpha
cdef void _center_shift(
floating[:, ::1] centers_old, # IN
floating[:, ::1] centers_new, # IN
floating[::1] center_shift): # OUT
"""Compute shift between old and new centers."""
cdef:
int n_clusters = centers_old.shape[0]
int n_features = centers_old.shape[1]
int j
for j in range(n_clusters):
center_shift[j] = _euclidean_dense_dense(
&centers_new[j, 0], &centers_old[j, 0], n_features, False)
cdef void _update_chunk_dense(
floating *X, # IN
# expecting C alinged 2D array. XXX: Can be
# replaced by const memoryview when cython min
# version is >= 0.3
floating[::1] sample_weight, # IN
floating[::1] x_squared_norms, # IN
floating[:, ::1] centers_old, # IN
floating[::1] centers_squared_norms, # IN
int[::1] labels, # OUT
floating *centers_new, # OUT
floating *weight_in_clusters, # OUT
floating *pairwise_distances, # OUT
bint update_centers) nogil:
"""K-means combined EM step for one dense data chunk.
Compute the partial contribution of a single data chunk to the labels and
centers.
"""
cdef:
int n_samples = labels.shape[0]
int n_clusters = centers_old.shape[0]
int n_features = centers_old.shape[1]
floating sq_dist, min_sq_dist
int i, j, k, label
# Instead of computing the full pairwise squared distances matrix,
# ||X - C||² = ||X||² - 2 X.C^T + ||C||², we only need to store
# the - 2 X.C^T + ||C||² term since the argmin for a given sample only
# depends on the centers.
# pairwise_distances = ||C||²
for i in range(n_samples):
for j in range(n_clusters):
pairwise_distances[i * n_clusters + j] = centers_squared_norms[j]
# pairwise_distances += -2 * X.dot(C.T)
_gemm(RowMajor, NoTrans, Trans, n_samples, n_clusters, n_features,
-2.0, X, n_features, &centers_old[0, 0], n_features,
1.0, pairwise_distances, n_clusters)
for i in range(n_samples):
min_sq_dist = pairwise_distances[i * n_clusters]
label = 0
for j in range(1, n_clusters):
sq_dist = pairwise_distances[i * n_clusters + j]
if sq_dist < min_sq_dist:
min_sq_dist = sq_dist
label = j
labels[i] = label
if update_centers:
weight_in_clusters[label] += sample_weight[i]
for k in range(n_features):
centers_new[label * n_features + k] += X[i * n_features + k] * sample_weight[i]
\ No newline at end of file
import time
from viztracer import VizTracer
import numpy as np
from threadpoolctl import threadpool_limits
from sklearn.datasets import make_classification
from _kmeans import lloyd_iter_chunked_dense, _inertia_dense
def _kmeans_single_lloyd(X, centers_init, sample_weight=None, max_iter=300,
verbose=False, x_squared_norms=None, tol=1e-4,
n_threads=1):
"""A single run of k-means lloyd, assumes preparation completed prior.
Parameters
----------
X : {ndarray, sparse matrix} of shape (n_samples, n_features)
The observations to cluster. If sparse matrix, must be in CSR format.
sample_weight : ndarray of shape (n_samples,)
The weights for each observation in X.
centers_init : ndarray of shape (n_clusters, n_features)
The initial centers.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm to run.
verbose : bool, default=False
Verbosity mode
x_squared_norms : ndarray of shape (n_samples,), default=None
Precomputed x_squared_norms.
tol : float, default=1e-4
Relative tolerance with regards to Frobenius norm of the difference
in the cluster centers of two consecutive iterations to declare
convergence.
It's not advised to set `tol=0` since convergence might never be
declared due to rounding errors. Use a very small number instead.
n_threads : int, default=1
The number of OpenMP threads to use for the computation. Parallelism is
sample-wise on the main cython loop which assigns each sample to its
closest center.
Returns
-------
centroid : ndarray of shape (n_clusters, n_features)
Centroids found at the last iteration of k-means.
label : ndarray of shape (n_samples,)
label[i] is the code or index of the centroid the
i'th observation is closest to.
inertia : float
The final value of the inertia criterion (sum of squared distances to
the closest centroid for all observations in the training set).
n_iter : int
Number of iterations run.
"""
n_clusters = centers_init.shape[0]
# Buffers to avoid new allocations at each iteration.
centers = centers_init
centers_new = np.zeros_like(centers)
labels = np.full(X.shape[0], -1, dtype=np.int32)
labels_old = labels.copy()
weight_in_clusters = np.zeros(n_clusters, dtype=X.dtype)
center_shift = np.zeros(n_clusters, dtype=X.dtype)
# NOTE: We only rely on dense array for those analysis
if sample_weight is None:
sample_weight = np.ones((X.shape[0],), dtype=X.dtype)
strict_convergence = False
# Threadpoolctl context to limit the number of threads in second level of
# nested parallelism (i.e. BLAS) to avoid oversubsciption.
with threadpool_limits(limits=1, user_api="blas"):
for i in range(max_iter):
lloyd_iter_chunked_dense(X, sample_weight, x_squared_norms, centers,
centers_new, weight_in_clusters, labels,
center_shift, n_threads)
if verbose:
inertia = _inertia_dense(X, sample_weight, centers, labels)
print(f"Iteration {i}, inertia {inertia}.")
centers, centers_new = centers_new, centers
if np.array_equal(labels, labels_old):
# First check the labels for strict convergence.
if verbose:
print(f"Converged at iteration {i}: strict convergence.")
strict_convergence = True
break
else:
# No strict convergence, check for tol based convergence.
center_shift_tot = (center_shift**2).sum()
if center_shift_tot <= tol:
if verbose:
print(f"Converged at iteration {i}: center shift "
f"{center_shift_tot} within tolerance {tol}.")
break
labels_old[:] = labels
if not strict_convergence:
# rerun E-step so that predicted labels match cluster centers
lloyd_iter_chunked_dense(X, sample_weight, x_squared_norms,
centers, centers, weight_in_clusters,
labels, center_shift, n_threads,
update_centers=False)
inertia = _inertia_dense(X, sample_weight, centers, labels)
return labels, inertia, centers, i + 1
if __name__ == "__main__":
np.random.seed(1337)
n_classes = 2
X, y = make_classification(n_samples=1000,
n_classes=n_classes,
n_clusters_per_class=1,
n_informative=10)
centers_init = X[:n_classes]
start = time.time()
labels, inertia, centers, _ = _kmeans_single_lloyd(X,
centers_init,
max_iter=100000,
n_threads=100)
print(time.time() - start)
\ No newline at end of file
# Cython compile instructions
import numpy
from setuptools import setup, Extension
from Cython.Build import build_ext
# To compile, use
# python setup.py build --inplace
#
extensions = [
Extension("_kmeans",
sources=["_kmeans.pyx"],
include_dirs=[numpy.get_include()],
define_macros=[("NPY_NO_DEPRECATED_API",
"NPY_1_7_API_VERSION"),
# ("CYTHON_TRACE_NOGIL", "1")
],
)
]
setup(
name="kmeans",
cmdclass={'build_ext': build_ext},
ext_modules=extensions,
install_requires=[
'setuptools>=18.0',
'cython>=0.27.3',
'numpy'
],
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment