Commit 2480b60b authored by unknown's avatar unknown

MDEV-5938: Exec_master_log_pos not updated at log rotate in parallel replication

The code did not correctly handle the update of position for Rotate events in the
binlog/relaylog when using parallel replication.
parent b1a1a79a
...@@ -801,11 +801,29 @@ a b ...@@ -801,11 +801,29 @@ a b
5 NULL 5 NULL
6 6 6 6
7 NULL 7 NULL
*** MDEV-5938: Exec_master_log_pos not updated at log rotate in parallel replication ***
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=1;
SET DEBUG_SYNC= 'RESET';
include/start_slave.inc
CREATE TABLE t5 (a INT PRIMARY KEY, b INT);
INSERT INTO t5 VALUES (1,1);
INSERT INTO t5 VALUES (2,2), (3,8);
INSERT INTO t5 VALUES (4,16);
test_check
OK
test_check
OK
FLUSH LOGS;
test_check
OK
test_check
OK
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
SET DEBUG_SYNC= 'RESET'; SET DEBUG_SYNC= 'RESET';
DROP function foo; DROP function foo;
DROP TABLE t1,t2,t3,t4; DROP TABLE t1,t2,t3,t4,t5;
SET DEBUG_SYNC= 'RESET'; SET DEBUG_SYNC= 'RESET';
include/rpl_end.inc include/rpl_end.inc
...@@ -1250,6 +1250,47 @@ SET debug_sync='RESET'; ...@@ -1250,6 +1250,47 @@ SET debug_sync='RESET';
SELECT * FROM t4 ORDER BY a; SELECT * FROM t4 ORDER BY a;
--echo *** MDEV-5938: Exec_master_log_pos not updated at log rotate in parallel replication ***
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=1;
SET DEBUG_SYNC= 'RESET';
--source include/start_slave.inc
--connection server_1
CREATE TABLE t5 (a INT PRIMARY KEY, b INT);
INSERT INTO t5 VALUES (1,1);
INSERT INTO t5 VALUES (2,2), (3,8);
INSERT INTO t5 VALUES (4,16);
--save_master_pos
--connection server_2
--sync_with_master
let $io_file= query_get_value(SHOW SLAVE STATUS, Master_Log_File, 1);
let $io_pos= query_get_value(SHOW SLAVE STATUS, Read_Master_Log_Pos, 1);
let $sql_file= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1);
let $sql_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1);
--disable_query_log
eval SELECT IF('$io_file' = '$sql_file', "OK", "Not ok, $io_file <> $sql_file") AS test_check;
eval SELECT IF('$io_pos' = '$sql_pos', "OK", "Not ok, $io_pos <> $sql_pos") AS test_check;
--enable_query_log
--connection server_1
FLUSH LOGS;
--save_master_pos
--connection server_2
--sync_with_master
let $io_file= query_get_value(SHOW SLAVE STATUS, Master_Log_File, 1);
let $io_pos= query_get_value(SHOW SLAVE STATUS, Read_Master_Log_Pos, 1);
let $sql_file= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1);
let $sql_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1);
--disable_query_log
eval SELECT IF('$io_file' = '$sql_file', "OK", "Not ok, $io_file <> $sql_file") AS test_check;
eval SELECT IF('$io_pos' = '$sql_pos', "OK", "Not ok, $io_pos <> $sql_pos") AS test_check;
--enable_query_log
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
...@@ -1258,7 +1299,7 @@ SET DEBUG_SYNC= 'RESET'; ...@@ -1258,7 +1299,7 @@ SET DEBUG_SYNC= 'RESET';
--connection server_1 --connection server_1
DROP function foo; DROP function foo;
DROP TABLE t1,t2,t3,t4; DROP TABLE t1,t2,t3,t4,t5;
SET DEBUG_SYNC= 'RESET'; SET DEBUG_SYNC= 'RESET';
--source include/rpl_end.inc --source include/rpl_end.inc
...@@ -1495,7 +1495,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1495,7 +1495,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
} }
else if (!is_group_event) else if (!is_group_event)
{ {
my_off_t log_pos;
int err; int err;
bool tmp; bool tmp;
/* /*
...@@ -1509,7 +1508,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1509,7 +1508,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
serial_rgi->is_parallel_exec= true; serial_rgi->is_parallel_exec= true;
err= rpt_handle_event(qev, NULL); err= rpt_handle_event(qev, NULL);
serial_rgi->is_parallel_exec= tmp; serial_rgi->is_parallel_exec= tmp;
log_pos= ev->log_pos; if (ev->is_relay_log_event())
qev->future_event_master_log_pos= 0;
else if (typ == ROTATE_EVENT)
qev->future_event_master_log_pos=
(static_cast<Rotate_log_event *>(ev))->pos;
else
qev->future_event_master_log_pos= ev->log_pos;
delete_or_keep_event_post_apply(serial_rgi, typ, ev); delete_or_keep_event_post_apply(serial_rgi, typ, ev);
if (err) if (err)
...@@ -1532,7 +1537,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1532,7 +1537,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
the current point. the current point.
*/ */
qev->ev= NULL; qev->ev= NULL;
qev->future_event_master_log_pos= log_pos;
} }
else else
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment