Commit c390c645 authored by Darrick J. Wong's avatar Darrick J. Wong

xfs: convert xfarray insertion sort to heapsort using scratchpad memory

In the previous patch, we created a very basic quicksort implementation
for xfile arrays.  While the use of an alternate sorting algorithm to
avoid quicksort recursion on very small subsets reduces the runtime
modestly, we could do better than a load and store-heavy insertion sort,
particularly since each load and store requires a page mapping lookup in
the xfile.

For a small increase in kernel memory requirements, we could instead
bulk load the xfarray records into memory, use the kernel's existing
heapsort implementation to sort the records, and bulk store the memory
buffer back into the xfile.  On the author's computer, this reduces the
runtime by about 5% on a 500,000 element array.
Signed-off-by: default avatarDarrick J. Wong <djwong@kernel.org>
Reviewed-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
Reviewed-by: default avatarDave Chinner <dchinner@redhat.com>
parent 232ea052
...@@ -927,6 +927,7 @@ TRACE_EVENT(xfarray_sort_stats, ...@@ -927,6 +927,7 @@ TRACE_EVENT(xfarray_sort_stats,
__field(unsigned long long, loads) __field(unsigned long long, loads)
__field(unsigned long long, stores) __field(unsigned long long, stores)
__field(unsigned long long, compares) __field(unsigned long long, compares)
__field(unsigned long long, heapsorts)
#endif #endif
__field(unsigned int, max_stack_depth) __field(unsigned int, max_stack_depth)
__field(unsigned int, max_stack_used) __field(unsigned int, max_stack_used)
...@@ -938,6 +939,7 @@ TRACE_EVENT(xfarray_sort_stats, ...@@ -938,6 +939,7 @@ TRACE_EVENT(xfarray_sort_stats,
__entry->loads = si->loads; __entry->loads = si->loads;
__entry->stores = si->stores; __entry->stores = si->stores;
__entry->compares = si->compares; __entry->compares = si->compares;
__entry->heapsorts = si->heapsorts;
#endif #endif
__entry->max_stack_depth = si->max_stack_depth; __entry->max_stack_depth = si->max_stack_depth;
__entry->max_stack_used = si->max_stack_used; __entry->max_stack_used = si->max_stack_used;
...@@ -945,7 +947,7 @@ TRACE_EVENT(xfarray_sort_stats, ...@@ -945,7 +947,7 @@ TRACE_EVENT(xfarray_sort_stats,
), ),
TP_printk( TP_printk(
#ifdef DEBUG #ifdef DEBUG
"xfino 0x%lx loads %llu stores %llu compares %llu stack_depth %u/%u error %d", "xfino 0x%lx loads %llu stores %llu compares %llu heapsorts %llu stack_depth %u/%u error %d",
#else #else
"xfino 0x%lx stack_depth %u/%u error %d", "xfino 0x%lx stack_depth %u/%u error %d",
#endif #endif
...@@ -954,6 +956,7 @@ TRACE_EVENT(xfarray_sort_stats, ...@@ -954,6 +956,7 @@ TRACE_EVENT(xfarray_sort_stats,
__entry->loads, __entry->loads,
__entry->stores, __entry->stores,
__entry->compares, __entry->compares,
__entry->heapsorts,
#endif #endif
__entry->max_stack_used, __entry->max_stack_used,
__entry->max_stack_depth, __entry->max_stack_depth,
......
...@@ -374,10 +374,12 @@ xfarray_load_next( ...@@ -374,10 +374,12 @@ xfarray_load_next(
# define xfarray_sort_bump_loads(si) do { (si)->loads++; } while (0) # define xfarray_sort_bump_loads(si) do { (si)->loads++; } while (0)
# define xfarray_sort_bump_stores(si) do { (si)->stores++; } while (0) # define xfarray_sort_bump_stores(si) do { (si)->stores++; } while (0)
# define xfarray_sort_bump_compares(si) do { (si)->compares++; } while (0) # define xfarray_sort_bump_compares(si) do { (si)->compares++; } while (0)
# define xfarray_sort_bump_heapsorts(si) do { (si)->heapsorts++; } while (0)
#else #else
# define xfarray_sort_bump_loads(si) # define xfarray_sort_bump_loads(si)
# define xfarray_sort_bump_stores(si) # define xfarray_sort_bump_stores(si)
# define xfarray_sort_bump_compares(si) # define xfarray_sort_bump_compares(si)
# define xfarray_sort_bump_heapsorts(si)
#endif /* DEBUG */ #endif /* DEBUG */
/* Load an array element for sorting. */ /* Load an array element for sorting. */
...@@ -440,15 +442,19 @@ xfarray_sortinfo_alloc( ...@@ -440,15 +442,19 @@ xfarray_sortinfo_alloc(
/* /*
* Tail-call recursion during the partitioning phase means that * Tail-call recursion during the partitioning phase means that
* quicksort will never recurse more than log2(nr) times. We need one * quicksort will never recurse more than log2(nr) times. We need one
* extra level of stack to hold the initial parameters. * extra level of stack to hold the initial parameters. In-memory
* sort will always take care of the last few levels of recursion for
* us, so we can reduce the stack depth by that much.
*/ */
max_stack_depth = ilog2(array->nr) + 1; max_stack_depth = ilog2(array->nr) + 1 - (XFARRAY_ISORT_SHIFT - 1);
if (max_stack_depth < 1)
max_stack_depth = 1;
/* Each level of quicksort uses a lo and a hi index */ /* Each level of quicksort uses a lo and a hi index */
nr_bytes += max_stack_depth * sizeof(xfarray_idx_t) * 2; nr_bytes += max_stack_depth * sizeof(xfarray_idx_t) * 2;
/* One record for the pivot */ /* Scratchpad for in-memory sort, or one record for the pivot */
nr_bytes += array->obj_size; nr_bytes += (XFARRAY_ISORT_NR * array->obj_size);
si = kvzalloc(nr_bytes, XCHK_GFP_FLAGS); si = kvzalloc(nr_bytes, XCHK_GFP_FLAGS);
if (!si) if (!si)
...@@ -490,7 +496,7 @@ xfarray_sort_terminated( ...@@ -490,7 +496,7 @@ xfarray_sort_terminated(
return false; return false;
} }
/* Do we want an insertion sort? */ /* Do we want an in-memory sort? */
static inline bool static inline bool
xfarray_want_isort( xfarray_want_isort(
struct xfarray_sortinfo *si, struct xfarray_sortinfo *si,
...@@ -498,10 +504,10 @@ xfarray_want_isort( ...@@ -498,10 +504,10 @@ xfarray_want_isort(
xfarray_idx_t end) xfarray_idx_t end)
{ {
/* /*
* For array subsets smaller than 8 elements, it's slightly faster to * For array subsets that fit in the scratchpad, it's much faster to
* use insertion sort than quicksort's stack machine. * use the kernel's heapsort than quicksort's stack machine.
*/ */
return (end - start) < 8; return (end - start) < XFARRAY_ISORT_NR;
} }
/* Return the scratch space within the sortinfo structure. */ /* Return the scratch space within the sortinfo structure. */
...@@ -511,10 +517,8 @@ static inline void *xfarray_sortinfo_isort_scratch(struct xfarray_sortinfo *si) ...@@ -511,10 +517,8 @@ static inline void *xfarray_sortinfo_isort_scratch(struct xfarray_sortinfo *si)
} }
/* /*
* Perform an insertion sort on a subset of the array. * Sort a small number of array records using scratchpad memory. The records
* Though insertion sort is an O(n^2) algorithm, for small set sizes it's * need not be contiguous in the xfile's memory pages.
* faster than quicksort's stack machine, so we let it take over for that.
* This ought to be replaced with something more efficient.
*/ */
STATIC int STATIC int
xfarray_isort( xfarray_isort(
...@@ -522,114 +526,23 @@ xfarray_isort( ...@@ -522,114 +526,23 @@ xfarray_isort(
xfarray_idx_t lo, xfarray_idx_t lo,
xfarray_idx_t hi) xfarray_idx_t hi)
{ {
void *a = xfarray_sortinfo_isort_scratch(si); void *scratch = xfarray_sortinfo_isort_scratch(si);
void *b = xfarray_scratch(si->array); loff_t lo_pos = xfarray_pos(si->array, lo);
xfarray_idx_t tmp; loff_t len = xfarray_pos(si->array, hi - lo + 1);
xfarray_idx_t i;
xfarray_idx_t run;
int error; int error;
trace_xfarray_isort(si, lo, hi); trace_xfarray_isort(si, lo, hi);
/* xfarray_sort_bump_loads(si);
* Move the smallest element in a[lo..hi] to a[lo]. This error = xfile_obj_load(si->array->xfile, scratch, len, lo_pos);
* simplifies the loop control logic below.
*/
tmp = lo;
error = xfarray_sort_load(si, tmp, b);
if (error) if (error)
return error; return error;
for (run = lo + 1; run <= hi; run++) {
/* if a[run] < a[tmp], tmp = run */
error = xfarray_sort_load(si, run, a);
if (error)
return error;
if (xfarray_sort_cmp(si, a, b) < 0) {
tmp = run;
memcpy(b, a, si->array->obj_size);
}
if (xfarray_sort_terminated(si, &error)) xfarray_sort_bump_heapsorts(si);
return error; sort(scratch, hi - lo + 1, si->array->obj_size, si->cmp_fn, NULL);
}
/* xfarray_sort_bump_stores(si);
* The smallest element is a[tmp]; swap with a[lo] if tmp != lo. return xfile_obj_store(si->array->xfile, scratch, len, lo_pos);
* Recall that a[tmp] is already in *b.
*/
if (tmp != lo) {
error = xfarray_sort_load(si, lo, a);
if (error)
return error;
error = xfarray_sort_store(si, tmp, a);
if (error)
return error;
error = xfarray_sort_store(si, lo, b);
if (error)
return error;
}
/*
* Perform an insertion sort on a[lo+1..hi]. We already made sure
* that the smallest value in the original range is now in a[lo],
* so the inner loop should never underflow.
*
* For each a[lo+2..hi], make sure it's in the correct position
* with respect to the elements that came before it.
*/
for (run = lo + 2; run <= hi; run++) {
error = xfarray_sort_load(si, run, a);
if (error)
return error;
/*
* Find the correct place for a[run] by walking leftwards
* towards the start of the range until a[tmp] is no longer
* greater than a[run].
*/
tmp = run - 1;
error = xfarray_sort_load(si, tmp, b);
if (error)
return error;
while (xfarray_sort_cmp(si, a, b) < 0) {
tmp--;
error = xfarray_sort_load(si, tmp, b);
if (error)
return error;
if (xfarray_sort_terminated(si, &error))
return error;
}
tmp++;
/*
* If tmp != run, then a[tmp..run-1] are all less than a[run],
* so right barrel roll a[tmp..run] to get this range in
* sorted order.
*/
if (tmp == run)
continue;
for (i = run; i >= tmp; i--) {
error = xfarray_sort_load(si, i - 1, b);
if (error)
return error;
error = xfarray_sort_store(si, i, b);
if (error)
return error;
if (xfarray_sort_terminated(si, &error))
return error;
}
error = xfarray_sort_store(si, tmp, a);
if (error)
return error;
if (xfarray_sort_terminated(si, &error))
return error;
}
return 0;
} }
/* Return a pointer to the xfarray pivot record within the sortinfo struct. */ /* Return a pointer to the xfarray pivot record within the sortinfo struct. */
...@@ -783,9 +696,8 @@ xfarray_qsort_push( ...@@ -783,9 +696,8 @@ xfarray_qsort_push(
* current stack frame. This guarantees that we won't need more than * current stack frame. This guarantees that we won't need more than
* log2(nr) stack space. * log2(nr) stack space.
* *
* 4. Use insertion sort for small sets since since insertion sort is faster * 4. For small sets, load the records into the scratchpad and run heapsort on
* for small, mostly sorted array segments. In the author's experience, * them because that is very fast. In the author's experience, this yields
* substituting insertion sort for arrays smaller than 8 elements yields
* a ~10% reduction in runtime. * a ~10% reduction in runtime.
*/ */
......
...@@ -58,6 +58,10 @@ int xfarray_load_next(struct xfarray *array, xfarray_idx_t *idx, void *rec); ...@@ -58,6 +58,10 @@ int xfarray_load_next(struct xfarray *array, xfarray_idx_t *idx, void *rec);
typedef cmp_func_t xfarray_cmp_fn; typedef cmp_func_t xfarray_cmp_fn;
/* Perform an in-memory heapsort for small subsets. */
#define XFARRAY_ISORT_SHIFT (4)
#define XFARRAY_ISORT_NR (1U << XFARRAY_ISORT_SHIFT)
struct xfarray_sortinfo { struct xfarray_sortinfo {
struct xfarray *array; struct xfarray *array;
...@@ -81,6 +85,7 @@ struct xfarray_sortinfo { ...@@ -81,6 +85,7 @@ struct xfarray_sortinfo {
uint64_t loads; uint64_t loads;
uint64_t stores; uint64_t stores;
uint64_t compares; uint64_t compares;
uint64_t heapsorts;
#endif #endif
/* /*
...@@ -99,11 +104,10 @@ struct xfarray_sortinfo { ...@@ -99,11 +104,10 @@ struct xfarray_sortinfo {
* *
* union { * union {
* *
* If for a given subset we decide to use an insertion sort, we use the * If for a given subset we decide to use an in-memory sort, we use a
* scratchpad record after the xfarray and a second scratchpad record * block of scratchpad records here to compare items:
* here to compare items:
* *
* xfarray_rec_t scratch; * xfarray_rec_t scratch[ISORT_NR];
* *
* Otherwise, we want to partition the records to partition the array. * Otherwise, we want to partition the records to partition the array.
* We store the chosen pivot record here and use the xfarray scratchpad * We store the chosen pivot record here and use the xfarray scratchpad
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment