Commit cfa1ce81 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0

The problem occured when using parallel replication, and an error occured that
caused the SQL thread to stop when the IO thread had already reached a
following binlog file from the master (or otherwise performed a relay log
rotation).

In this case, the Rotate Event at the end of the relay log file could still be
executed, even though an earlier event in that relay log file had gotten an
error. This would cause the position to be incorrectly updated, so that upon
restart of the SQL thread, the event that had failed would be silently skipped
and ignored, causing replication corruption.

Fixed by checking before executing Rotate Event, whether an earlier event
has failed. If so, the Rotate Event is not executed, just dequeued, same as
for other normal events following a failing event.
parent 65ac881c
...@@ -882,6 +882,47 @@ a ...@@ -882,6 +882,47 @@ a
SELECT * FROM t6 ORDER BY a; SELECT * FROM t6 ORDER BY a;
a a
4 4
*** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***
INSERT INTO t2 VALUES (31);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads= 0;
include/start_slave.inc
SET sql_log_bin= 0;
INSERT INTO t2 VALUES (32);
SET sql_log_bin= 1;
INSERT INTO t2 VALUES (32);
FLUSH LOGS;
INSERT INTO t2 VALUES (33);
INSERT INTO t2 VALUES (34);
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
33
34
include/save_master_gtid.inc
include/wait_for_slave_sql_error.inc [errno=1062]
include/stop_slave_io.inc
SET GLOBAL slave_parallel_threads=10;
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1062]
START SLAVE SQL_THREAD;
include/wait_for_slave_sql_error.inc [errno=1062]
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
SET sql_slave_skip_counter= 1;
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
a
31
32
33
34
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
......
...@@ -1408,6 +1408,64 @@ SELECT * FROM t6 ORDER BY a; ...@@ -1408,6 +1408,64 @@ SELECT * FROM t6 ORDER BY a;
SELECT * FROM t6 ORDER BY a; SELECT * FROM t6 ORDER BY a;
--echo *** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***
--connection server_1
INSERT INTO t2 VALUES (31);
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads= 0;
--source include/start_slave.inc
# Force a duplicate key error on the slave.
SET sql_log_bin= 0;
INSERT INTO t2 VALUES (32);
SET sql_log_bin= 1;
--connection server_1
INSERT INTO t2 VALUES (32);
# Rotate the binlog; the bug is triggered when the master binlog file changes
# after the event group that causes the duplicate key error.
FLUSH LOGS;
INSERT INTO t2 VALUES (33);
INSERT INTO t2 VALUES (34);
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
--source include/save_master_gtid.inc
--connection server_2
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc
--connection server_2
--source include/stop_slave_io.inc
SET GLOBAL slave_parallel_threads=10;
START SLAVE;
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc
# Note: IO thread is still running at this point.
# The bug seems to have been that restarting the SQL thread after an error with
# the IO thread still running, somehow picks up a later relay log position and
# thus ends up skipping the failing event, rather than re-executing.
START SLAVE SQL_THREAD;
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
# Skip the duplicate error, so we can proceed.
SET sql_slave_skip_counter= 1;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
...@@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi; rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd; THD *thd= rgi->thd;
Log_event *ev;
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
ev= qev->ev;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
ev->thd= thd;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf; rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
thread_safe_increment64(&rli->executed_entries, thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock); &slave_executed_entries_lock);
...@@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{ {
int cmp; int cmp;
Relay_log_info *rli; Relay_log_info *rli;
rpl_parallel_entry *e;
/* /*
Events that are not part of an event group, such as Format Description, Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread, Stop, GTID List and such, are executed directly in the driver SQL thread,
...@@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) && if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions) opt_using_transactions)
return; return;
/* Do not update position if an earlier event group caused an error abort. */
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
e= qev->entry_for_queued;
if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
return;
rli= qev->rgi->rli; rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock); mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
...@@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending; bool end_of_group, group_ending;
total_event_size+= events->event_size; total_event_size+= events->event_size;
if (!events->ev) if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{ {
handle_queued_pos_update(thd, events); handle_queued_pos_update(thd, events);
events->next= qevs_to_free; events->next= qevs_to_free;
...@@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg)
events= next; events= next;
continue; continue;
} }
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi; thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco; gco= rgi->gco;
...@@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) ...@@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL; return NULL;
} }
qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev; qev->ev= ev;
qev->event_size= event_size; qev->event_size= event_size;
qev->next= NULL; qev->next= NULL;
...@@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1; return 1;
} }
/* /*
Queue an empty event, so that the position will be updated in a Queue a position update, so that the position will be updated in a
reasonable way relative to other events: reasonable way relative to other events:
- If the currently executing events are queued serially for a single - If the currently executing events are queued serially for a single
...@@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached least the position will not be updated until one of them has reached
the current point. the current point.
*/ */
qev->ev= NULL; qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
qev->entry_for_queued= e;
} }
else else
{ {
......
...@@ -72,7 +72,15 @@ struct rpl_parallel_thread { ...@@ -72,7 +72,15 @@ struct rpl_parallel_thread {
rpl_parallel_entry *current_entry; rpl_parallel_entry *current_entry;
struct queued_event { struct queued_event {
queued_event *next; queued_event *next;
Log_event *ev; /*
queued_event can hold either an event to be executed, or just a binlog
position to be updated without any associated event.
*/
enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ;
union {
Log_event *ev; /* QUEUED_EVENT */
rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */
};
rpl_group_info *rgi; rpl_group_info *rgi;
inuse_relaylog *ir; inuse_relaylog *ir;
ulonglong future_event_relay_log_pos; ulonglong future_event_relay_log_pos;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment