Commit 60d094ae authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on...

MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode

Make sure that in parallel replication, we execute wait_for_prior_commit()
before setting table->in_use for a temporary table. Otherwise we can end up
with two parallel replication worker threads competing with each other for
use of a temporary table.

Re-factor the use of find_temporary_table() to be able to handle errors
in the caller (as wait_for_prior_commit() can return error in case of
deadlock kill).
parent c47fe0e9
......@@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/save_master_pos.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
......
......@@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS;
--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
--connection server_1
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_pos.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
# Clean up.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
......@@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
}
static bool
use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
{
*out_table= table;
if (!table)
return false;
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return true;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
return false;
}
bool
find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
out_table);
}
bool
find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
}
/**
Find a temporary table specified by a key in the THD::temporary_tables list.
......@@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
......@@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}
if (!(table= find_temporary_table(thd, tl)))
if (find_and_use_temporary_table(thd, tl, &table))
DBUG_RETURN(TRUE);
if (!table)
{
if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
......@@ -5833,25 +5878,6 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
DBUG_RETURN(true);
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (tl->partition_names)
{
......
......@@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name,
const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
bool find_and_use_temporary_table(THD *thd, const char *db,
const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length);
void close_thread_tables(THD *thd);
......
......@@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table())
{
TABLE *tmp_table;
if ((tmp_table= find_temporary_table(thd, db, table_name)))
if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
goto err;
if (tmp_table)
{
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (create_info->options & HA_LEX_CREATE_REPLACE)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment