Commit 51eaa7fe authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-8193: UNTIL clause in START SLAVE is sporadically disobeyed by parallel replication

The code was using the wrong variable when comparing the binlog name
for the UNTIL position. This could cause the comparison to fail after
binlog rotation, in turn causing the UNTIL clause to not trigger slave
stop.
parent 09bfaf3a
include/master-slave.inc
[connection master]
include/stop_slave_sql.inc
CALL mtr.add_suppression("Statement is unsafe because it uses a system function that may return a different value on the slave");
create table t1 (i int);
insert into t1 values (1),(2);
insert into t1 values (3),(4);
insert into t1 select i+20+0*sleep(1) from t1 where i=1;
Warnings:
Note 1592 Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement is unsafe because it uses a system function that may return a different value on the slave.
flush logs;
insert into t1 values (5),(6);
insert into t1 values (7),(8);
insert into t1 values (9),(10);
insert into t1 values (11),(12);
insert into t1 values (13),(14);
insert into t1 values (15),(16);
set global slave_parallel_threads = 1;
start slave until master_log_file='MASTER_FILE', master_log_pos=MASTER_POS;
drop table t1;
include/stop_slave_io.inc
set global slave_parallel_threads = DEFAULT;
drop table t1;
include/rpl_end.inc
--source include/master-slave.inc
--source include/have_binlog_format_statement.inc
--connection slave
--source include/stop_slave_sql.inc
--connection master
CALL mtr.add_suppression("Statement is unsafe because it uses a system function that may return a different value on the slave");
create table t1 (i int);
insert into t1 values (1),(2);
insert into t1 values (3),(4);
# This sleep() helps trigger the failure more reliably.
insert into t1 select i+20+0*sleep(1) from t1 where i=1;
flush logs;
insert into t1 values (5),(6);
insert into t1 values (7),(8);
insert into t1 values (9),(10);
--let $master_file = query_get_value(show master status,File,1)
--let $master_pos = query_get_value(show master status,Position,1)
insert into t1 values (11),(12);
insert into t1 values (13),(14);
insert into t1 values (15),(16);
--connection slave
set global slave_parallel_threads = 1;
--replace_result $master_file MASTER_FILE $master_pos MASTER_POS
eval start slave until master_log_file='$master_file', master_log_pos=$master_pos;
--let $show_statement = SHOW SLAVE STATUS
--let $field = Slave_SQL_Running
--let $condition = = 'No'
--let $wait_timeout = 10
--source include/wait_show_condition.inc
if (`select COUNT(*) <> 11 from t1`)
{
SELECT * FROM t1;
query_vertical show slave status;
die "Wrong number of rows in the table";
}
drop table t1;
--source include/stop_slave_io.inc
set global slave_parallel_threads = DEFAULT;
--connection master
drop table t1;
--let $rpl_only_running_threads= 1
--source include/rpl_end.inc
...@@ -95,7 +95,6 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -95,7 +95,6 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if (cmp < 0) if (cmp < 0)
{ {
strcpy(rli->group_master_log_name, qev->future_event_master_log_name); strcpy(rli->group_master_log_name, qev->future_event_master_log_name);
rli->notify_group_master_log_name_update();
rli->group_master_log_pos= qev->future_event_master_log_pos; rli->group_master_log_pos= qev->future_event_master_log_pos;
} }
else if (cmp == 0 else if (cmp == 0
...@@ -2065,6 +2064,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -2065,6 +2064,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{ {
memcpy(rli->future_event_master_log_name, memcpy(rli->future_event_master_log_name,
rev->new_log_ident, rev->ident_len+1); rev->new_log_ident, rev->ident_len+1);
rli->notify_group_master_log_name_update();
} }
} }
......
...@@ -996,7 +996,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, ...@@ -996,7 +996,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
if (cmp < 0) if (cmp < 0)
{ {
strcpy(group_master_log_name, rgi->future_event_master_log_name); strcpy(group_master_log_name, rgi->future_event_master_log_name);
notify_group_master_log_name_update();
group_master_log_pos= log_pos; group_master_log_pos= log_pos;
} }
else if (group_master_log_pos < log_pos) else if (group_master_log_pos < log_pos)
...@@ -1218,7 +1217,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ...@@ -1218,7 +1217,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
if (ev && ev->server_id == (uint32) global_system_variables.server_id && if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
!replicate_same_server_id) !replicate_same_server_id)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
log_name= group_master_log_name; log_name= (opt_slave_parallel_threads > 0 ?
future_event_master_log_name : group_master_log_name);
log_pos= ((!ev)? group_master_log_pos : log_pos= ((!ev)? group_master_log_pos :
(get_flag(IN_TRANSACTION) || !ev->log_pos) ? (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
group_master_log_pos : ev->log_pos - ev->data_written); group_master_log_pos : ev->log_pos - ev->data_written);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment