Commit b2d2d85e authored by Rich Prohaska's avatar Rich Prohaska

#218 cursor set range with bound

parent cecd7481
......@@ -608,6 +608,7 @@ static void print_dbc_struct (void) {
"int (*c_getf_set)(DBC *, uint32_t, DBT *, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_getf_set_range)(DBC *, uint32_t, DBT *, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_getf_set_range_reverse)(DBC *, uint32_t, DBT *, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_getf_set_range_with_bound)(DBC *, uint32_t, DBT *k, DBT *k_bound, YDB_CALLBACK_FUNCTION, void *)",
"int (*c_set_bounds)(DBC*, const DBT*, const DBT*, bool pre_acquire, int out_of_range_error)",
"void (*c_set_check_interrupt_callback)(DBC*, bool (*)(void*), void *)",
"void (*c_remove_restriction)(DBC*)",
......
......@@ -4945,6 +4945,19 @@ ft_cursor_shortcut (
void **val
);
// Return true if this key is within the search bound. If there is no search bound then the tree search continues.
static bool search_continue(ft_search *search, void *key, uint32_t key_len) {
bool result = true;
if (search->direction == FT_SEARCH_LEFT && search->k_bound) {
FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
FAKE_DB(db, &ft_handle->ft->cmp_descriptor);
DBT this_key = { .data = key, .size = key_len };
// search continues if this key <= key bound
result = (ft_handle->ft->compare_fun(&db, &this_key, search->k_bound) <= 0);
}
return result;
}
// This is a bottom layer of the search functions.
static int
ft_search_basement_node(
......@@ -4984,12 +4997,14 @@ ok: ;
goto got_a_good_value; // leaf mode cursors see all leaf entries
if (is_le_val_del(le,ftcursor)) {
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
// So we need to scan in the direction to see if we can find something.
// Every 100 deleted leaf entries check if the leaf's key is within the search bounds.
for (uint n_deleted = 1; ; n_deleted++) {
switch (search->direction) {
case FT_SEARCH_LEFT:
idx++;
if (idx >= bn->data_buffer.num_klpairs()) {
if (idx >= bn->data_buffer.num_klpairs() ||
((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) {
if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra)) {
return TOKUDB_INTERRUPTED;
}
......@@ -5318,6 +5333,26 @@ maybe_search_save_bound(
}
}
// Returns true if there are still children left to search in this node within the search bound (if any).
static bool search_try_again(FTNODE node, int child_to_search, ft_search_t *search) {
bool try_again = false;
if (search->direction == FT_SEARCH_LEFT) {
if (child_to_search < node->n_children-1) {
try_again = true;
// if there is a search bound and the bound is within the search pivot then continue the search
if (search->k_bound) {
FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
FAKE_DB(db, &ft_handle->ft->cmp_descriptor);
try_again = (ft_handle->ft->compare_fun(&db, search->k_bound, &search->pivot_bound) > 0);
}
}
} else if (search->direction == FT_SEARCH_RIGHT) {
if (child_to_search > 0)
try_again = true;
}
return try_again;
}
static int
ft_search_node(
FT_HANDLE ft_handle,
......@@ -5405,12 +5440,12 @@ ft_search_node(
// If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
// So save the pivot key in the search object.
maybe_search_save_bound(node, child_to_search, search);
// as part of #5770, if we can continue searching,
// we MUST return TOKUDB_TRY_AGAIN,
// because there is no guarantee that messages have been applied
// on any other path.
if ((search->direction == FT_SEARCH_LEFT && child_to_search < node->n_children-1) ||
(search->direction == FT_SEARCH_RIGHT && child_to_search > 0)) {
if (search_try_again(node, child_to_search, search)) {
r = TOKUDB_TRY_AGAIN;
}
......@@ -5610,7 +5645,8 @@ toku_ft_cursor_current(FT_CURSOR cursor, int op, FT_GET_CALLBACK_FUNCTION getf,
cursor->direction = 0;
if (op == DB_CURRENT) {
struct ft_cursor_search_struct bcss = {getf, getf_v, cursor, 0};
ft_search_t search; ft_search_init(&search, ft_cursor_compare_set, FT_SEARCH_LEFT, &cursor->key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_set, FT_SEARCH_LEFT, &cursor->key, nullptr, cursor->ft_handle);
int r = toku_ft_search(cursor->ft_handle, &search, ft_cursor_current_getf, &bcss, cursor, false);
ft_search_finish(&search);
return r;
......@@ -5622,7 +5658,8 @@ int
toku_ft_cursor_first(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = 0;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_LEFT, 0, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_LEFT, nullptr, nullptr, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
ft_search_finish(&search);
return r;
......@@ -5632,7 +5669,8 @@ int
toku_ft_cursor_last(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = 0;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_RIGHT, 0, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_one, FT_SEARCH_RIGHT, nullptr, nullptr, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
ft_search_finish(&search);
return r;
......@@ -5708,7 +5746,8 @@ int
toku_ft_cursor_next(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = +1;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_next, FT_SEARCH_LEFT, &cursor->key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_next, FT_SEARCH_LEFT, &cursor->key, nullptr, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, true);
ft_search_finish(&search);
if (r == 0) ft_cursor_set_prefetching(cursor);
......@@ -5755,7 +5794,8 @@ int
toku_ft_cursor_prev(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = -1;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_prev, FT_SEARCH_RIGHT, &cursor->key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_prev, FT_SEARCH_RIGHT, &cursor->key, nullptr, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, true);
ft_search_finish(&search);
return r;
......@@ -5770,17 +5810,19 @@ int
toku_ft_cursor_set(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = 0;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, nullptr, cursor->ft_handle);
int r = ft_cursor_search_eq_k_x(cursor, &search, getf, getf_v);
ft_search_finish(&search);
return r;
}
int
toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, DBT *key_bound, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = 0;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_set_range, FT_SEARCH_LEFT, key, key_bound, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
ft_search_finish(&search);
return r;
......@@ -5795,7 +5837,8 @@ int
toku_ft_cursor_set_range_reverse(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v)
{
cursor->direction = 0;
ft_search_t search; ft_search_init(&search, ft_cursor_compare_set_range_reverse, FT_SEARCH_RIGHT, key, cursor->ft_handle);
ft_search_t search;
ft_search_init(&search, ft_cursor_compare_set_range_reverse, FT_SEARCH_RIGHT, key, nullptr, cursor->ft_handle);
int r = ft_cursor_search(cursor, &search, getf, getf_v, false);
ft_search_finish(&search);
return r;
......@@ -5834,7 +5877,7 @@ toku_ft_cursor_get (FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, v
case DB_SET:
return toku_ft_cursor_set(cursor, key, getf, getf_v);
case DB_SET_RANGE:
return toku_ft_cursor_set_range(cursor, key, getf, getf_v);
return toku_ft_cursor_set_range(cursor, key, nullptr, getf, getf_v);
default: ;// Fall through
}
return EINVAL;
......@@ -6310,7 +6353,7 @@ int toku_ft_get_key_after_bytes(FT_HANDLE ft_h, const DBT *start_key, uint64_t s
struct unlock_ftnode_extra unlock_extra = {ft_h, root, false};
struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS) nullptr};
ft_search_t search;
ft_search_init(&search, (start_key == nullptr ? ft_cursor_compare_one : ft_cursor_compare_set_range), FT_SEARCH_LEFT, start_key, ft_h);
ft_search_init(&search, (start_key == nullptr ? ft_cursor_compare_one : ft_cursor_compare_set_range), FT_SEARCH_LEFT, start_key, nullptr, ft_h);
int r;
// We can't do this because of #5768, there may be dictionaries in the wild that have negative stats. This won't affect mongo so it's ok:
......
......@@ -278,7 +278,7 @@ int toku_ft_cursor_next(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *g
int toku_ft_cursor_prev(FT_CURSOR cursor, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_current(FT_CURSOR cursor, int op, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range(FT_CURSOR cursor, DBT *key, DBT *key_bound, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_set_range_reverse(FT_CURSOR cursor, DBT *key, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_get_both_range(FT_CURSOR cursor, DBT *key, DBT *val, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
int toku_ft_cursor_get_both_range_reverse(FT_CURSOR cursor, DBT *key, DBT *val, FT_GET_CALLBACK_FUNCTION getf, void *getf_v) __attribute__ ((warn_unused_result));
......
......@@ -136,15 +136,18 @@ typedef struct ft_search {
// no guarantee that we will get everything pinned again. We ought to keep nodes pinned when we retry, except that on the
// way out with a DB_NOTFOUND we ought to unpin those nodes. See #3528.
DBT pivot_bound;
const DBT *k_bound;
} ft_search_t;
/* initialize the search compare object */
static inline ft_search_t *ft_search_init(ft_search_t *so, ft_search_compare_func_t compare, enum ft_search_direction_e direction, const DBT *k, void *context) {
static inline ft_search_t *ft_search_init(ft_search_t *so, ft_search_compare_func_t compare, enum ft_search_direction_e direction,
const DBT *k, const DBT *k_bound, void *context) {
so->compare = compare;
so->direction = direction;
so->k = k;
so->context = context;
toku_init_dbt(&so->pivot_bound);
so->k_bound = k_bound;
return so;
}
......
......@@ -230,13 +230,7 @@ test2(int fd, FT ft_h, FTNODE *dn) {
fill_bfe_for_subset_read(
&bfe_subset,
ft_h,
ft_search_init(
&search_t,
search_cmp,
FT_SEARCH_LEFT,
NULL,
NULL
),
ft_search_init(&search_t, search_cmp, FT_SEARCH_LEFT, nullptr, nullptr, nullptr),
&left,
&right,
true,
......
......@@ -587,7 +587,7 @@ c_getf_set_range(DBC *c, uint32_t flag, DBT *key, YDB_CALLBACK_FUNCTION f, void
query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
while (r == 0) {
//toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
r = toku_ft_cursor_set_range(dbc_struct_i(c)->c, key, c_getf_set_range_callback, &context);
r = toku_ft_cursor_set_range(dbc_struct_i(c)->c, key, nullptr, c_getf_set_range_callback, &context);
if (r == DB_LOCK_NOTGRANTED) {
r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
} else {
......@@ -631,6 +631,27 @@ c_getf_set_range_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec v
return r;
}
static int
c_getf_set_range_with_bound(DBC *c, uint32_t flag, DBT *key, DBT *key_bound, YDB_CALLBACK_FUNCTION f, void *extra) {
HANDLE_PANICKED_DB(c->dbp);
HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c);
int r = 0;
QUERY_CONTEXT_WITH_INPUT_S context; //Describes the context of this query.
query_context_with_input_init(&context, c, flag, key, NULL, f, extra);
while (r == 0) {
//toku_ft_cursor_set_range will call c_getf_set_range_callback(..., context) (if query is successful)
r = toku_ft_cursor_set_range(dbc_struct_i(c)->c, key, key_bound, c_getf_set_range_callback, &context);
if (r == DB_LOCK_NOTGRANTED) {
r = toku_db_wait_range_lock(context.base.db, context.base.txn, &context.base.request);
} else {
break;
}
}
query_context_base_destroy(&context.base);
return r;
}
static int c_getf_set_range_reverse_callback(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool);
static int
......@@ -834,6 +855,7 @@ toku_db_cursor_internal(DB * db, DB_TXN * txn, DBC ** c, uint32_t flags, int is_
SCRS(c_getf_current);
SCRS(c_getf_set_range);
SCRS(c_getf_set_range_reverse);
SCRS(c_getf_set_range_with_bound);
SCRS(c_set_bounds);
SCRS(c_remove_restriction);
SCRS(c_set_check_interrupt_callback);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment