Commit e7e91ab6 authored by Rich Prohaska's avatar Rich Prohaska

#202 allow non-null AND null key index cursor operations

parent cb41cf0d
...@@ -2843,7 +2843,7 @@ DBT *ha_tokudb::pack_key( ...@@ -2843,7 +2843,7 @@ DBT *ha_tokudb::pack_key(
int8_t inf_byte int8_t inf_byte
) )
{ {
TOKUDB_HANDLER_DBUG_ENTER(""); TOKUDB_HANDLER_DBUG_ENTER("%u null=%u inf=%d", key_length, key_length > 0 ? key_ptr[0] : 0, inf_byte);
#if TOKU_INCLUDE_EXTENDED_KEYS #if TOKU_INCLUDE_EXTENDED_KEYS
if (keynr != primary_key && !tokudb_test(hidden_primary_key)) { if (keynr != primary_key && !tokudb_test(hidden_primary_key)) {
DBUG_RETURN(pack_ext_key(key, keynr, buff, key_ptr, key_length, inf_byte)); DBUG_RETURN(pack_ext_key(key, keynr, buff, key_ptr, key_length, inf_byte));
...@@ -4411,6 +4411,19 @@ cleanup: ...@@ -4411,6 +4411,19 @@ cleanup:
TOKUDB_HANDLER_DBUG_RETURN(error); TOKUDB_HANDLER_DBUG_RETURN(error);
} }
static bool index_key_is_null(TABLE *table, uint keynr, const uchar *key, uint key_len) {
bool key_can_be_null = false;
KEY *key_info = &table->key_info[keynr];
KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + get_key_parts(key_info);
for (; key_part != end; key_part++) {
if (key_part->null_bit) {
key_can_be_null = true;
break;
}
}
return key_can_be_null && key_len > 0 && key[0] != 0;
}
// //
// Notification that a range query getting all elements that equal a key // Notification that a range query getting all elements that equal a key
...@@ -4443,6 +4456,7 @@ int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) { ...@@ -4443,6 +4456,7 @@ int ha_tokudb::prepare_index_key_scan(const uchar * key, uint key_len) {
} }
range_lock_grabbed = true; range_lock_grabbed = true;
range_lock_grabbed_null = index_key_is_null(table, tokudb_active_index, key, key_len);
doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT); doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT);
bulk_fetch_iteration = 0; bulk_fetch_iteration = 0;
rows_fetched_using_bulk_fetch = 0; rows_fetched_using_bulk_fetch = 0;
...@@ -4504,6 +4518,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4504,6 +4518,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
remove_from_trx_handler_list(); remove_from_trx_handler_list();
} }
active_index = keynr; active_index = keynr;
index_init_sorted = sorted;
if (active_index < MAX_KEY) { if (active_index < MAX_KEY) {
DBUG_ASSERT(keynr <= table->s->keys); DBUG_ASSERT(keynr <= table->s->keys);
...@@ -4515,6 +4530,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4515,6 +4530,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
last_cursor_error = 0; last_cursor_error = 0;
range_lock_grabbed = false; range_lock_grabbed = false;
range_lock_grabbed_null = false;
DBUG_ASSERT(share->key_file[keynr]); DBUG_ASSERT(share->key_file[keynr]);
cursor_flags = get_cursor_isolation_flags(lock.type, thd); cursor_flags = get_cursor_isolation_flags(lock.type, thd);
if (use_write_locks) { if (use_write_locks) {
...@@ -4562,6 +4578,7 @@ exit: ...@@ -4562,6 +4578,7 @@ exit:
int ha_tokudb::index_end() { int ha_tokudb::index_end() {
TOKUDB_HANDLER_DBUG_ENTER(""); TOKUDB_HANDLER_DBUG_ENTER("");
range_lock_grabbed = false; range_lock_grabbed = false;
range_lock_grabbed_null = false;
if (cursor) { if (cursor) {
DBUG_PRINT("enter", ("table: '%s'", table_share->table_name.str)); DBUG_PRINT("enter", ("table: '%s'", table_share->table_name.str));
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
...@@ -4807,7 +4824,7 @@ cleanup: ...@@ -4807,7 +4824,7 @@ cleanup:
// error otherwise // error otherwise
// //
int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) { int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_HANDLER_DBUG_ENTER("find %d", find_flag); TOKUDB_HANDLER_DBUG_ENTER("%p %u null=%u find=%u", key, key_len, key ? key[0] : 0, find_flag);
invalidate_bulk_fetch(); invalidate_bulk_fetch();
// TOKUDB_DBUG_DUMP("key=", key, key_len); // TOKUDB_DBUG_DUMP("key=", key, key_len);
DBT row; DBT row;
...@@ -4821,6 +4838,13 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4821,6 +4838,13 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
// if we locked a non-null key range and we now have a null key, then get a new cursor without any bounds on the cursor's key range
if (range_lock_grabbed && !range_lock_grabbed_null && index_key_is_null(table, tokudb_active_index, key, key_len)) {
error = index_init(active_index, index_init_sorted);
if (error)
goto cleanup;
}
ha_statistic_increment(&SSV::ha_read_key_count); ha_statistic_increment(&SSV::ha_read_key_count);
memset((void *) &row, 0, sizeof(row)); memset((void *) &row, 0, sizeof(row));
......
...@@ -371,6 +371,7 @@ private: ...@@ -371,6 +371,7 @@ private:
// know to limit the locking overhead in a call to the fractal tree // know to limit the locking overhead in a call to the fractal tree
// //
bool range_lock_grabbed; bool range_lock_grabbed;
bool range_lock_grabbed_null;
// //
// For bulk inserts, we want option of not updating auto inc // For bulk inserts, we want option of not updating auto inc
...@@ -493,6 +494,7 @@ private: ...@@ -493,6 +494,7 @@ private:
// 0 <= active_index < table_share->keys || active_index == MAX_KEY // 0 <= active_index < table_share->keys || active_index == MAX_KEY
// tokudb_active_index = active_index if active_index < table_share->keys, else tokudb_active_index = primary_key = table_share->keys // tokudb_active_index = active_index if active_index < table_share->keys, else tokudb_active_index = primary_key = table_share->keys
uint tokudb_active_index; uint tokudb_active_index;
bool index_init_sorted;
public: public:
ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg); ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment