Commit 17aff4b1 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge MDEV-7936 into 10.0.

Conflicts:
	sql/sql_base.cc
parents 50d98e9c 60d094ae
...@@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; ...@@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value Variable_name Value
Slave_open_temp_tables 0 Slave_open_temp_tables 0
FLUSH LOGS; FLUSH LOGS;
*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/save_master_pos.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
......
...@@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; ...@@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS; FLUSH LOGS;
--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
--connection server_1
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_pos.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
# Clean up.
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
...@@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl) ...@@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
} }
static bool
use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
{
*out_table= table;
if (!table)
return false;
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return true;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
return false;
}
bool
find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
out_table);
}
bool
find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
}
/** /**
Find a temporary table specified by a key in the THD::temporary_tables list. Find a temporary table specified by a key in the THD::temporary_tables list.
...@@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd, ...@@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length && if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length)) !memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{ {
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table; result= table;
break; break;
} }
...@@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) ...@@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
if (!(table= find_temporary_table(thd, tl))) if (find_and_use_temporary_table(thd, tl, &table))
DBUG_RETURN(TRUE);
if (!table)
{ {
if (tl->open_type == OT_TEMPORARY_ONLY && if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL) tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
......
...@@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table, ...@@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name, const char *db_name,
const char *table_name); const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name); TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
bool find_and_use_temporary_table(THD *thd, const char *db,
const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key, TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length); uint table_key_length);
void close_thread_tables(THD *thd); void close_thread_tables(THD *thd);
......
...@@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd, ...@@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table()) if (create_info->tmp_table())
{ {
TABLE *tmp_table; TABLE *tmp_table;
if ((tmp_table= find_temporary_table(thd, db, table_name))) if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
goto err;
if (tmp_table)
{ {
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged; bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (create_info->options & HA_LEX_CREATE_REPLACE) if (create_info->options & HA_LEX_CREATE_REPLACE)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment