Commit 665a9b6e authored by Rich Prohaska's avatar Rich Prohaska

DB-450 use interruptible cursors for index scans and info schema scans

parent b3601d02
......@@ -2983,12 +2983,7 @@ void ha_tokudb::init_hidden_prim_key_info(DB_TXN *txn) {
if (!(share->status & STATUS_PRIMARY_KEY_INIT)) {
int error = 0;
DBC* c = NULL;
error = share->key_file[primary_key]->cursor(
share->key_file[primary_key],
txn,
&c,
0
);
error = share->key_file[primary_key]->cursor(share->key_file[primary_key], txn, &c, 0);
assert(error == 0);
DBT key,val;
memset(&key, 0, sizeof(key));
......@@ -3208,11 +3203,12 @@ bool ha_tokudb::may_table_be_empty(DB_TXN *txn) {
error = share->file->cursor(share->file, txn, &tmp_cursor, 0);
if (error)
goto cleanup;
tmp_cursor->c_set_check_interrupt_callback(tmp_cursor, tokudb_killed_thd_callback, ha_thd());
if (empty_scan == TOKUDB_EMPTY_SCAN_LR)
error = tmp_cursor->c_getf_next(tmp_cursor, 0, smart_dbt_do_nothing, NULL);
else
error = tmp_cursor->c_getf_prev(tmp_cursor, 0, smart_dbt_do_nothing, NULL);
error = map_to_handler_error(error);
if (error == DB_NOTFOUND)
ret_val = true;
else
......@@ -3531,6 +3527,7 @@ int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint
goto cleanup;
}
else if (error) {
error = map_to_handler_error(error);
goto cleanup;
}
if (ir_info.cmp) {
......@@ -4514,6 +4511,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
cursor = NULL; // Safety
goto exit;
}
cursor->c_set_check_interrupt_callback(cursor, tokudb_killed_thd_callback, thd);
memset((void *) &last_key, 0, sizeof(last_key));
add_to_trx_handler_list();
......@@ -7025,17 +7023,17 @@ int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_nam
error = status_db->cursor(status_db, txn, &status_cursor, 0);
if (error) { goto cleanup; }
status_cursor->c_set_check_interrupt_callback(status_cursor, tokudb_killed_thd_callback, thd);
while (error != DB_NOTFOUND) {
error = status_cursor->c_get(
status_cursor,
&curr_key,
&curr_val,
DB_NEXT
);
if (error && error != DB_NOTFOUND) { goto cleanup; }
if (error == DB_NOTFOUND) { break; }
error = status_cursor->c_get(status_cursor, &curr_key, &curr_val, DB_NEXT);
if (error && error != DB_NOTFOUND) {
error = map_to_handler_error(error);
goto cleanup;
}
if (error == DB_NOTFOUND) {
break;
}
HA_METADATA_KEY mk = *(HA_METADATA_KEY *)curr_key.data;
if (mk != hatoku_key_name) {
continue;
......@@ -7904,23 +7902,30 @@ void ha_tokudb::restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_o
}
int ha_tokudb::map_to_handler_error(int error) {
if (error == DB_LOCK_DEADLOCK)
switch (error) {
case DB_LOCK_DEADLOCK:
error = HA_ERR_LOCK_DEADLOCK;
if (error == DB_LOCK_NOTGRANTED)
break;
case DB_LOCK_NOTGRANTED:
error = HA_ERR_LOCK_WAIT_TIMEOUT;
break;
#if defined(HA_ERR_DISK_FULL)
if (error == ENOSPC) {
case ENOSPC:
error = HA_ERR_DISK_FULL;
}
break;
#endif
if (error == DB_KEYEXIST) {
case DB_KEYEXIST:
error = HA_ERR_FOUND_DUPP_KEY;
}
break;
#if defined(HA_ALTER_ERROR)
if (error == HA_ALTER_ERROR) {
case HA_ALTER_ERROR:
error = HA_ERR_UNSUPPORTED;
}
break;
#endif
case TOKUDB_INTERRUPTED:
error = ER_QUERY_INTERRUPTED;
break;
}
return error;
}
......
......@@ -1522,6 +1522,8 @@ static int tokudb_file_map(TABLE *table, THD *thd) {
error = schema_table_store_record(thd, table);
}
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
error = 0;
......@@ -1667,6 +1669,8 @@ static int tokudb_fractal_tree_info(TABLE *table, THD *thd) {
if (!error) {
error = tokudb_report_fractal_tree_info_for_db(&curr_key, &curr_val, table, thd);
}
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
error = 0;
......@@ -1880,6 +1884,8 @@ static int tokudb_fractal_tree_block_map(TABLE *table, THD *thd) {
if (!error) {
error = tokudb_report_fractal_tree_block_map_for_db(&curr_key, &curr_val, table, thd);
}
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
}
if (error == DB_NOTFOUND) {
error = 0;
......@@ -2048,6 +2054,8 @@ static int tokudb_trx_callback(uint64_t txn_id, uint64_t client_id, iterate_row_
table->field[0]->store(txn_id, false);
table->field[1]->store(client_id, false);
int error = schema_table_store_record(thd, table);
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
return error;
}
......@@ -2131,6 +2139,10 @@ static int tokudb_lock_waits_callback(DB *db, uint64_t requesting_txnid, const D
table->field[8]->store(dictionary_name.c_ptr(), dictionary_name.length(), system_charset_info);
int error = schema_table_store_record(thd, table);
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
return error;
}
......@@ -2218,6 +2230,9 @@ static int tokudb_locks_callback(uint64_t txn_id, uint64_t client_id, iterate_ro
table->field[7]->store(dictionary_name.c_ptr(), dictionary_name.length(), system_charset_info);
error = schema_table_store_record(thd, table);
if (!error && thd->killed)
error = ER_QUERY_INTERRUPTED;
}
return error;
}
......
......@@ -484,6 +484,11 @@ static int tokudb_killed_callback(void) {
return thd->killed;
}
static bool tokudb_killed_thd_callback(void *extra) {
THD *thd = static_cast<THD *>(extra);
return thd->killed != 0;
}
enum {
TOKUDB_EMPTY_SCAN_DISABLED = 0,
TOKUDB_EMPTY_SCAN_LR = 1,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment