Commit c4e76b20 authored by unknown's avatar unknown

MDEV-5363: Make parallel replication waits killable

Add another test case. This one for killing the SQL driver thread while it is
waiting for room in the list of events queued for a worker thread.

Fix bugs found:

 - Several memory leaks in various error cases.

 - SQL error code was not set (for SHOW SLAVE STATUS etc.) when killed.
parent 86a2c03b
...@@ -630,6 +630,46 @@ SET GLOBAL binlog_format=@old_format; ...@@ -630,6 +630,46 @@ SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10; SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc include/start_slave.inc
*** 5. Test killing thread that is waiting for queue of max length to shorten ***
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued=9000;
SET binlog_format=statement;
INSERT INTO t3 VALUES (70, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
SET debug_sync='now WAIT_FOR query_waiting';
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
INSERT INTO t3 VALUES (72, 0);
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
a b
70 0
71 10000
72 0
SET debug_sync='now WAIT_FOR wait_queue_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR wait_queue_killed';
SET debug_sync='now SIGNAL query_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
a b
70 0
71 10000
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
INSERT INTO t3 VALUES (73,0);
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
a b
70 0
71 10000
72 0
73 0
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
......
...@@ -940,6 +940,75 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) ...@@ -940,6 +940,75 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
--delimiter ; --delimiter ;
SET sql_log_bin=1; SET sql_log_bin=1;
--connection server_2
# Respawn all worker threads to clear any left-over debug_sync or other stuff.
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
--source include/wait_condition.inc
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued=9000;
--connection server_1
--let bigstring= `SELECT REPEAT('x', 10000)`
SET binlog_format=statement;
# Create an event that will wait to be signalled.
INSERT INTO t3 VALUES (70, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
--disable_query_log
# Create an event that will fill up the queue.
eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
--enable_query_log
--connection server_2
SET debug_sync='now WAIT_FOR query_waiting';
# Inject that the SQL driver thread will signal `wait_queue_ready' to debug_sync
# as it goes to wait for the event queue to become smaller than the value of
# @@slave_parallel_max_queued.
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
--connection server_1
# This event will have to wait for the queue to become shorter before it can
# be queued. We will test that things work when we kill the SQL driver thread
# during this wait.
INSERT INTO t3 VALUES (72, 0);
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
--connection server_2
SET debug_sync='now WAIT_FOR wait_queue_ready';
--replace_result $thd_id THD_ID
eval KILL $thd_id;
SET debug_sync='now WAIT_FOR wait_queue_killed';
SET debug_sync='now SIGNAL query_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1
INSERT INTO t3 VALUES (73,0);
--save_master_pos
--connection server_2
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
......
...@@ -801,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -801,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
sql_thread_stopping= true; sql_thread_stopping= true;
if (sql_thread_stopping) if (sql_thread_stopping)
{ {
delete ev;
/* QQ: Need a better comment why we return false here */ /* QQ: Need a better comment why we return false here */
return false; return false;
} }
...@@ -809,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -809,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
MYF(0)))) MYF(0))))
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(0)); my_error(ER_OUT_OF_RESOURCES, MYF(0));
delete ev;
return true; return true;
} }
qev->ev= ev; qev->ev= ev;
...@@ -831,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -831,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete rgi; delete rgi;
my_free(qev);
delete ev;
return true; return true;
} }
rgi->is_parallel_exec = true; rgi->is_parallel_exec = true;
...@@ -903,6 +907,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -903,6 +907,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
my_error(ER_CONNECTION_KILLED, MYF(0)); my_error(ER_CONNECTION_KILLED, MYF(0));
delete rgi; delete rgi;
my_free(qev);
delete ev;
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
slave_output_error_info(rli, rli->sql_driver_thd);
return true; return true;
} }
else else
...@@ -918,6 +930,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -918,6 +930,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread, (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
"Waiting for room in worker thread event queue"); "Waiting for room in worker thread event queue");
did_enter_cond= true; did_enter_cond= true;
DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
{
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
};);
} }
mysql_cond_wait(&cur_thread->COND_rpl_thread, mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread); &cur_thread->LOCK_rpl_thread);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment