Commit d9edc3fb authored by Rich Prohaska's avatar Rich Prohaska

#150 cleanup indexing cursor when a txn is retired

parent 7d843822
drop table if exists t1,t2,t3;
SET NAMES latin1;
CREATE TABLE t1 (a INT) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1),(2);
CREATE TABLE t2 (b INT) ENGINE=TokuDB;
INSERT INTO t2 VALUES (3),(4);
CREATE TABLE t3 (c VARCHAR(3), INDEX(c)) ENGINE=TokuDB;
INSERT INTO t3 VALUES ('foo'),('bar');
EXPLAIN SELECT * FROM t1
WHERE a IN ( SELECT b FROM t2 ) OR ( 'qux' ) IN ( SELECT c FROM t3 );
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY t1 ALL NULL NULL NULL NULL 2 Using where
3 SUBQUERY t3 index_subquery c c 6 const 0 Using index; Using where
2 MATERIALIZED t2 ALL NULL NULL NULL NULL 2
drop table if exists t1,t2,t3;
drop table if exists t1,t2;
CREATE TABLE t1 (a VARCHAR(8), INDEX(a)) ENGINE=TokuDB;
INSERT INTO t1 VALUES ('foo'),('bar');
CREATE TABLE t2 AS SELECT ( 'qux' ) IN ( SELECT a FROM t1 ) AS f1;
drop table t1,t2;
# this test crashes because tokudb does not handle index_end after txn commit (MDEV-5396)
source include/have_tokudb.inc;
disable_warnings;
drop table if exists t1,t2,t3;
enable_warnings;
SET NAMES latin1;
CREATE TABLE t1 (a INT) ENGINE=TokuDB;
INSERT INTO t1 VALUES (1),(2);
CREATE TABLE t2 (b INT) ENGINE=TokuDB;
INSERT INTO t2 VALUES (3),(4);
CREATE TABLE t3 (c VARCHAR(3), INDEX(c)) ENGINE=TokuDB;
INSERT INTO t3 VALUES ('foo'),('bar');
EXPLAIN SELECT * FROM t1
WHERE a IN ( SELECT b FROM t2 ) OR ( 'qux' ) IN ( SELECT c FROM t3 );
drop table if exists t1,t2,t3;
\ No newline at end of file
# this test crashes because tokudb does not handle index_end after txn commit (MDEV-5396)
source include/have_tokudb.inc;
disable_warnings;
drop table if exists t1,t2;
enable_warnings;
CREATE TABLE t1 (a VARCHAR(8), INDEX(a)) ENGINE=TokuDB;
INSERT INTO t1 VALUES ('foo'),('bar');
CREATE TABLE t2 AS SELECT ( 'qux' ) IN ( SELECT a FROM t1 ) AS f1;
drop table t1,t2;
...@@ -1240,6 +1240,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1240,6 +1240,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
prelocked_right_range_size = 0; prelocked_right_range_size = 0;
tokudb_active_index = MAX_KEY; tokudb_active_index = MAX_KEY;
invalidate_icp(); invalidate_icp();
trx_handler_list.data = this;
} }
ha_tokudb::~ha_tokudb() { ha_tokudb::~ha_tokudb() {
...@@ -4436,6 +4437,7 @@ cleanup: ...@@ -4436,6 +4437,7 @@ cleanup:
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
assert(r==0); assert(r==0);
cursor = NULL; cursor = NULL;
remove_from_trx_handler_list();
} }
} }
return error; return error;
...@@ -4480,6 +4482,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4480,6 +4482,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
DBUG_PRINT("note", ("Closing active cursor")); DBUG_PRINT("note", ("Closing active cursor"));
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
assert(r==0); assert(r==0);
remove_from_trx_handler_list();
} }
active_index = keynr; active_index = keynr;
...@@ -4517,6 +4520,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4517,6 +4520,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
} }
memset((void *) &last_key, 0, sizeof(last_key)); memset((void *) &last_key, 0, sizeof(last_key));
add_to_trx_handler_list();
if (thd_sql_command(thd) == SQLCOM_SELECT) { if (thd_sql_command(thd) == SQLCOM_SELECT) {
set_query_columns(keynr); set_query_columns(keynr);
unpack_entire_row = false; unpack_entire_row = false;
...@@ -4542,6 +4547,7 @@ int ha_tokudb::index_end() { ...@@ -4542,6 +4547,7 @@ int ha_tokudb::index_end() {
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
assert(r==0); assert(r==0);
cursor = NULL; cursor = NULL;
remove_from_trx_handler_list();
last_cursor_error = 0; last_cursor_error = 0;
} }
active_index = tokudb_active_index = MAX_KEY; active_index = tokudb_active_index = MAX_KEY;
...@@ -5546,7 +5552,7 @@ int ha_tokudb::rnd_end() { ...@@ -5546,7 +5552,7 @@ int ha_tokudb::rnd_end() {
// error otherwise // error otherwise
// //
int ha_tokudb::rnd_next(uchar * buf) { int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::rnd_next");
ha_statistic_increment(&SSV::ha_read_rnd_next_count); ha_statistic_increment(&SSV::ha_read_rnd_next_count);
int error = get_next(buf, 1, NULL); int error = get_next(buf, 1, NULL);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -5706,6 +5712,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5706,6 +5712,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
int r = cursor->c_close(cursor); int r = cursor->c_close(cursor);
assert(r==0); assert(r==0);
cursor = NULL; cursor = NULL;
remove_from_trx_handler_list();
} }
goto cleanup; goto cleanup;
} }
...@@ -8254,6 +8261,25 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) { ...@@ -8254,6 +8261,25 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
return idx_cond_arg; return idx_cond_arg;
} }
void ha_tokudb::cleanup_txn(DB_TXN *txn) {
if (transaction == txn && cursor) {
int r = cursor->c_close(cursor);
assert(r == 0);
cursor = NULL;
remove_from_trx_handler_list();
}
}
void ha_tokudb::add_to_trx_handler_list() {
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
trx->handlers = list_add(trx->handlers, &trx_handler_list);
}
void ha_tokudb::remove_from_trx_handler_list() {
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
trx->handlers = list_delete(trx->handlers, &trx_handler_list);
}
// table admin // table admin
#include "ha_tokudb_admin.cc" #include "ha_tokudb_admin.cc"
......
...@@ -787,6 +787,15 @@ private: ...@@ -787,6 +787,15 @@ private:
bool check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values); bool check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values);
int send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn); int send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn);
#endif #endif
public:
// mysql sometimes retires a txn before a cursor that references the txn is closed.
// for example, commit is sometimes called before index_end. the following methods
// put the handler on a list of handlers that get cleaned up when the txn is retired.
void cleanup_txn(DB_TXN *txn);
private:
LIST trx_handler_list;
void add_to_trx_handler_list();
void remove_from_trx_handler_list();
}; };
#if defined(MARIADB_BASE_VERSION) #if defined(MARIADB_BASE_VERSION)
......
...@@ -304,6 +304,7 @@ typedef struct st_tokudb_trx_data { ...@@ -304,6 +304,7 @@ typedef struct st_tokudb_trx_data {
uint tokudb_lock_count; uint tokudb_lock_count;
tokudb_stmt_progress stmt_progress; tokudb_stmt_progress stmt_progress;
bool checkpoint_lock_taken; bool checkpoint_lock_taken;
LIST *handlers;
} tokudb_trx_data; } tokudb_trx_data;
extern char *tokudb_data_dir; extern char *tokudb_data_dir;
......
...@@ -644,6 +644,14 @@ static void abort_txn_with_progress(DB_TXN* txn, THD* thd) { ...@@ -644,6 +644,14 @@ static void abort_txn_with_progress(DB_TXN* txn, THD* thd) {
assert(r == 0); assert(r == 0);
} }
static void tokudb_cleanup_handlers(tokudb_trx_data *trx, DB_TXN *txn) {
LIST *e;
while ((e = trx->handlers)) {
ha_tokudb *handler = (ha_tokudb *) e->data;
handler->cleanup_txn(txn);
}
}
static int tokudb_commit(handlerton * hton, THD * thd, bool all) { static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_commit"); TOKUDB_DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt")); DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
...@@ -657,6 +665,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -657,6 +665,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
} }
// test hook to induce a crash on a debug build // test hook to induce a crash on a debug build
DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE();); DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE(););
tokudb_cleanup_handlers(trx, this_txn);
commit_txn_with_progress(this_txn, syncflag, thd); commit_txn_with_progress(this_txn, syncflag, thd);
// test hook to induce a crash on a debug build // test hook to induce a crash on a debug build
DBUG_EXECUTE_IF("tokudb_crash_commit_after", DBUG_SUICIDE();); DBUG_EXECUTE_IF("tokudb_crash_commit_after", DBUG_SUICIDE(););
...@@ -683,6 +692,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -683,6 +692,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("rollback %u %p\n", all, this_txn); TOKUDB_TRACE("rollback %u %p\n", all, this_txn);
} }
tokudb_cleanup_handlers(trx, this_txn);
abort_txn_with_progress(this_txn, thd); abort_txn_with_progress(this_txn, thd);
if (this_txn == trx->sp_level) { if (this_txn == trx->sp_level) {
trx->sp_level = 0; trx->sp_level = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment