Commit 8b9b7ec3 authored by unknown's avatar unknown

MDEV-5804: If same GTID is received on multiple master connections in...

MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure

Some fixes, mainly to make it work in non-parallel replication mode also
(--slave-parallel-threads=0).

Patch should be fairly complete now.
parent 2c2478b8
...@@ -212,6 +212,16 @@ The following options may be given as the first argument: ...@@ -212,6 +212,16 @@ The following options may be given as the first argument:
multiple masters), each independent source server must multiple masters), each independent source server must
use a distinct domain_id. For simple tree-shaped use a distinct domain_id. For simple tree-shaped
replication topologies, it can be left at its default, 0. replication topologies, it can be left at its default, 0.
--gtid-ignore-duplicates
When set, different master connections in multi-source
replication are allowed to receive and process event
groups with the same GTID (when using GTID mode). Only
one will be applied, any others will be ignored. Within a
given replication domain, just the sequence number will
be used to decide whether a given GTID has been already
applied; this means it is the responsibility of the user
to ensure that GTID sequence numbers are strictly
increasing.
--gtid-strict-mode Enforce strict seq_no ordering of events in the binary --gtid-strict-mode Enforce strict seq_no ordering of events in the binary
log. Slave stops with an error if it encounters an event log. Slave stops with an error if it encounters an event
that would cause it to generate an out-of-order binlog if that would cause it to generate an out-of-order binlog if
...@@ -1094,6 +1104,7 @@ gdb FALSE ...@@ -1094,6 +1104,7 @@ gdb FALSE
general-log FALSE general-log FALSE
group-concat-max-len 1024 group-concat-max-len 1024
gtid-domain-id 0 gtid-domain-id 0
gtid-ignore-duplicates FALSE
gtid-strict-mode FALSE gtid-strict-mode FALSE
help TRUE help TRUE
histogram-size 0 histogram-size 0
......
...@@ -151,38 +151,129 @@ a ...@@ -151,38 +151,129 @@ a
10 10
11 11
12 12
*** Test also with not using parallel replication.
SET default_master_connection = "b2a";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET default_master_connection = "c2a";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "b2a";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "c2a";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "a2b";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET default_master_connection = "c2b";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2b";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "c2b";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "a2c";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET default_master_connection = "b2c";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2c";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "b2c";
START SLAVE;
include/wait_for_slave_to_start.inc
SET default_master_connection = "a2d";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2d";
START SLAVE;
include/wait_for_slave_to_start.inc
INSERT INTO t1 VALUES (21);
BEGIN;
INSERT INTO t1 VALUES (22);
INSERT INTO t1 VALUES (23);
COMMIT;
INSERT INTO t1 VALUES (24), (25);
INSERT INTO t1 VALUES (26);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
21
22
23
24
25
26
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
21
22
23
24
25
26
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
21
22
23
24
25
26
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
21
22
23
24
25
26
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
Warnings: Warnings:
Note 1938 SLAVE 'c2a' stopped Note 1938 SLAVE 'c2a' stopped
Note 1938 SLAVE 'b2a' stopped Note 1938 SLAVE 'b2a' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
Warnings: Warnings:
Note 1938 SLAVE 'a2b' stopped Note 1938 SLAVE 'a2b' stopped
Note 1938 SLAVE 'c2b' stopped Note 1938 SLAVE 'c2b' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
Warnings: Warnings:
Note 1938 SLAVE 'a2c' stopped Note 1938 SLAVE 'a2c' stopped
Note 1938 SLAVE 'b2c' stopped Note 1938 SLAVE 'b2c' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
Warnings: Warnings:
Note 1938 SLAVE 'a2d' stopped Note 1938 SLAVE 'a2d' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1; DROP TABLE t1;
include/reset_master_slave.inc
DROP TABLE t1;
include/reset_master_slave.inc
DROP TABLE t1;
include/reset_master_slave.inc
DROP TABLE t1;
include/reset_master_slave.inc
...@@ -170,39 +170,135 @@ SET default_master_connection = "a2b"; ...@@ -170,39 +170,135 @@ SET default_master_connection = "a2b";
SELECT * FROM t1 WHERE a >= 10 ORDER BY a; SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
--echo *** Test also with not using parallel replication.
--connection server_1
SET default_master_connection = "b2a";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET default_master_connection = "c2a";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "b2a";
START SLAVE;
--source include/wait_for_slave_to_start.inc
SET default_master_connection = "c2a";
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection server_2
SET default_master_connection = "a2b";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET default_master_connection = "c2b";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2b";
START SLAVE;
--source include/wait_for_slave_to_start.inc
SET default_master_connection = "c2b";
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection server_3
SET default_master_connection = "a2c";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET default_master_connection = "b2c";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2c";
START SLAVE;
--source include/wait_for_slave_to_start.inc
SET default_master_connection = "b2c";
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection server_4
SET default_master_connection = "a2d";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
SET GLOBAL slave_parallel_threads=0;
SET default_master_connection = "a2d";
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection server_2
INSERT INTO t1 VALUES (21);
BEGIN;
INSERT INTO t1 VALUES (22);
INSERT INTO t1 VALUES (23);
COMMIT;
INSERT INTO t1 VALUES (24), (25);
INSERT INTO t1 VALUES (26);
--source include/save_master_gtid.inc
--connection server_1
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
--connection server_3
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
--connection server_4
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
# Clean up. # Clean up.
--connection server_1 --connection server_1
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_1
--connection server_2 --connection server_2
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_2
--connection server_3 --connection server_3
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_3
--connection server_4 --connection server_4
SET GLOBAL gtid_domain_id=0; SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES; STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel; SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
--connection server_1
DROP TABLE t1;
--source reset_master_slave.inc
--disconnect server_1
--connection server_2
DROP TABLE t1; DROP TABLE t1;
--source reset_master_slave.inc
--disconnect server_2
--connection server_3
DROP TABLE t1;
--source reset_master_slave.inc
--disconnect server_3
--connection server_4
DROP TABLE t1;
--source reset_master_slave.inc
--disconnect server_4 --disconnect server_4
...@@ -38,6 +38,7 @@ order by name limit 10; ...@@ -38,6 +38,7 @@ order by name limit 10;
NAME ENABLED TIMED NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES wait/synch/cond/sql/COND_flush_thread_cache YES YES
wait/synch/cond/sql/COND_group_commit_orderer YES YES wait/synch/cond/sql/COND_group_commit_orderer YES YES
wait/synch/cond/sql/COND_gtid_ignore_duplicates YES YES
wait/synch/cond/sql/COND_manager YES YES wait/synch/cond/sql/COND_manager YES YES
wait/synch/cond/sql/COND_parallel_entry YES YES wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES wait/synch/cond/sql/COND_prepare_ordered YES YES
...@@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_queue_state YES YES ...@@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES wait/synch/cond/sql/COND_rpl_thread YES YES
wait/synch/cond/sql/COND_rpl_thread_pool YES YES wait/synch/cond/sql/COND_rpl_thread_pool YES YES
wait/synch/cond/sql/COND_rpl_thread_queue YES YES wait/synch/cond/sql/COND_rpl_thread_queue YES YES
wait/synch/cond/sql/COND_server_started YES YES
select * from performance_schema.setup_instruments select * from performance_schema.setup_instruments
where name='Wait'; where name='Wait';
select * from performance_schema.setup_instruments select * from performance_schema.setup_instruments
......
SET @save_gtid_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SELECT @@GLOBAL.gtid_ignore_duplicates as 'must be zero because of default';
must be zero because of default
0
SELECT @@SESSION.gtid_ignore_duplicates as 'no session var';
ERROR HY000: Variable 'gtid_ignore_duplicates' is a GLOBAL variable
SET GLOBAL gtid_ignore_duplicates= FALSE;
SET GLOBAL gtid_ignore_duplicates= DEFAULT;
SET GLOBAL gtid_ignore_duplicates= TRUE;
SELECT @@GLOBAL.gtid_ignore_duplicates;
@@GLOBAL.gtid_ignore_duplicates
1
SET GLOBAL gtid_ignore_duplicates = @save_gtid_ignore_duplicates;
--source include/not_embedded.inc
SET @save_gtid_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SELECT @@GLOBAL.gtid_ignore_duplicates as 'must be zero because of default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.gtid_ignore_duplicates as 'no session var';
SET GLOBAL gtid_ignore_duplicates= FALSE;
SET GLOBAL gtid_ignore_duplicates= DEFAULT;
SET GLOBAL gtid_ignore_duplicates= TRUE;
SELECT @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates = @save_gtid_ignore_duplicates;
...@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'", ...@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
end: end:
if (sub_id && !thd->is_slave_error) if (sub_id && !thd->is_slave_error)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli); rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
/* /*
Probably we have set thd->query, thd->db, thd->catalog to point to places Probably we have set thd->query, thd->db, thd->catalog to point to places
...@@ -7327,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -7327,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
if (!res && sub_id) if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli); rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rgi);
/* /*
Increment the global status commit count variable Increment the global status commit count variable
......
...@@ -9447,6 +9447,7 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for ...@@ -9447,6 +9447,7 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
...@@ -9565,7 +9566,8 @@ PSI_stage_info *all_server_stages[]= ...@@ -9565,7 +9566,8 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_finalize_termination, & stage_waiting_to_finalize_termination,
& stage_waiting_to_get_readlock, & stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary, & stage_master_gtid_wait_primary,
& stage_master_gtid_wait & stage_master_gtid_wait,
& stage_gtid_wait_other_connection
}; };
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection; PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
......
...@@ -441,6 +441,7 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; ...@@ -441,6 +441,7 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
#ifdef HAVE_PSI_STATEMENT_INTERFACE #ifdef HAVE_PSI_STATEMENT_INTERFACE
/** /**
......
...@@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= ...@@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void void
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
const Relay_log_info *rli) rpl_group_info *rgi)
{ {
int err; int err;
/* /*
...@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, ...@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed. it is even committed.
*/ */
mysql_mutex_lock(&LOCK_slave_state); mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli); err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
mysql_mutex_unlock(&LOCK_slave_state); mysql_mutex_unlock(&LOCK_slave_state);
if (err) if (err)
{ {
...@@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) ...@@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if ((sub_id= rgi->gtid_sub_id)) if ((sub_id= rgi->gtid_sub_id))
{ {
rgi->gtid_sub_id= 0; rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
DBUG_RETURN(1); {
update_state_hash(sub_id, &rgi->current_gtid, rgi->rli); if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, rgi);
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) ...@@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
-1 Error (out of memory to allocate a new element for the domain). -1 Error (out of memory to allocate a new element for the domain).
*/ */
int int
rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
{ {
uint32 domain_id= gtid->domain_id; uint32 domain_id= gtid->domain_id;
uint32 seq_no= gtid->seq_no; uint32 seq_no= gtid->seq_no;
rpl_slave_state::element *elem; rpl_slave_state::element *elem;
int res; int res;
bool did_enter_cond;
PSI_stage_info old_stage;
THD *thd;
Relay_log_info *rli= rgi->rli;
mysql_mutex_lock(&LOCK_slave_state); mysql_mutex_lock(&LOCK_slave_state);
if (!(elem= get_element(domain_id))) if (!(elem= get_element(domain_id)))
{ {
my_error(ER_OUT_OF_RESOURCES, MYF(0));
res= -1; res= -1;
goto err; goto err;
} }
...@@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) ...@@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
each lock release and re-take. each lock release and re-take.
*/ */
/* ToDo: Make this wait killable. */ did_enter_cond= false;
for (;;) for (;;)
{ {
if (elem->highest_seq_no >= seq_no) if (elem->highest_seq_no >= seq_no)
{ {
/* This sequence number is already applied, ignore it. */ /* This sequence number is already applied, ignore it. */
res= 0; res= 0;
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
break; break;
} }
if (!elem->owner_rli) if (!elem->owner_rli)
...@@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) ...@@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
/* The domain became free, grab it and apply the event. */ /* The domain became free, grab it and apply the event. */
elem->owner_rli= rli; elem->owner_rli= rli;
elem->owner_count= 1; elem->owner_count= 1;
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1; res= 1;
break; break;
} }
...@@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) ...@@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
{ {
/* Already own this domain, increment reference count and apply event. */ /* Already own this domain, increment reference count and apply event. */
++elem->owner_count; ++elem->owner_count;
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1; res= 1;
break; break;
} }
thd= rgi->thd;
if (thd->check_killed())
{
thd->send_kill_message();
res= -1;
break;
}
/* /*
Someone else is currently processing this GTID (or an earlier one). Someone else is currently processing this GTID (or an earlier one).
Wait for them to complete (or fail), and then check again. Wait for them to complete (or fail), and then check again.
*/ */
if (!did_enter_cond)
{
thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
&stage_gtid_wait_other_connection, &old_stage);
did_enter_cond= true;
}
mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
&LOCK_slave_state); &LOCK_slave_state);
} }
err: err:
mysql_mutex_unlock(&LOCK_slave_state); if (did_enter_cond)
thd->EXIT_COND(&old_stage);
else
mysql_mutex_unlock(&LOCK_slave_state);
return res; return res;
} }
void
rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
{
element *elem= NULL;
mysql_mutex_lock(&LOCK_slave_state);
if (!(elem= get_element(rgi->current_gtid.domain_id)))
{
/*
We cannot really deal with error here, as we are already called in an
error handling case (transaction failure and rollback).
However, get_element() only fails if the element did not exist already
and could not be allocated due to out-of-memory - and if it did not
exist, then we would not get here in the first place.
*/
mysql_mutex_unlock(&LOCK_slave_state);
return;
}
if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
{
uint32 count= elem->owner_count;
DBUG_ASSERT(count > 0);
DBUG_ASSERT(elem->owner_rli == rgi->rli);
--count;
elem->owner_count= count;
if (count == 0)
{
elem->owner_rli= NULL;
mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
}
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
mysql_mutex_unlock(&LOCK_slave_state);
}
static void static void
rpl_slave_state_free_element(void *arg) rpl_slave_state_free_element(void *arg)
{ {
...@@ -233,7 +299,7 @@ rpl_slave_state::deinit() ...@@ -233,7 +299,7 @@ rpl_slave_state::deinit()
int int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, const Relay_log_info *rli) uint64 seq_no, rpl_group_info *rgi)
{ {
element *elem= NULL; element *elem= NULL;
list_element *list_elem= NULL; list_element *list_elem= NULL;
...@@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, ...@@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
mysql_cond_broadcast(&elem->COND_wait_gtid); mysql_cond_broadcast(&elem->COND_wait_gtid);
} }
if (opt_gtid_ignore_duplicates && rli) if (rgi)
{ {
uint32 count= elem->owner_count; if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
DBUG_ASSERT(count > 0);
DBUG_ASSERT(elem->owner_rli == rli);
--count;
elem->owner_count= count;
if (count == 0)
{ {
elem->owner_rli= NULL; Relay_log_info *rli= rgi->rli;
mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); uint32 count= elem->owner_count;
DBUG_ASSERT(count > 0);
DBUG_ASSERT(elem->owner_rli == rli);
--count;
elem->owner_count= count;
if (count == 0)
{
elem->owner_rli= NULL;
mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
}
} }
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
} }
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
......
...@@ -92,6 +92,7 @@ struct gtid_waiting { ...@@ -92,6 +92,7 @@ struct gtid_waiting {
class Relay_log_info; class Relay_log_info;
struct rpl_group_info;
/* /*
Replication slave state. Replication slave state.
...@@ -171,7 +172,7 @@ struct rpl_slave_state ...@@ -171,7 +172,7 @@ struct rpl_slave_state
void truncate_hash(); void truncate_hash();
ulong count() const { return hash.records; } ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, const Relay_log_info *rli); uint64 seq_no, rpl_group_info *rgi);
int truncate_state_table(THD *thd); int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement); bool in_transaction, bool in_statement);
...@@ -187,10 +188,10 @@ struct rpl_slave_state ...@@ -187,10 +188,10 @@ struct rpl_slave_state
element *get_element(uint32 domain_id); element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list); int put_back_list(uint32 domain_id, list_element *list);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
const Relay_log_info *rli);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli); int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
}; };
......
...@@ -425,10 +425,22 @@ handle_rpl_parallel_thread(void *arg) ...@@ -425,10 +425,22 @@ handle_rpl_parallel_thread(void *arg)
{ {
int res= int res=
rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid, rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
rgi->rli); rgi);
/* ToDo: Handle res==-1 error. */ if (res < 0)
if (!res) {
/* Error. */
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
}
else if (!res)
{
/* GTID already applied by another master connection, skip. */
skip_event_group= true; skip_event_group= true;
}
else
{
/* We have to apply the event. */
}
} }
} }
......
...@@ -1493,6 +1493,7 @@ rpl_group_info::reinit(Relay_log_info *rli) ...@@ -1493,6 +1493,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
row_stmt_start_timestamp= 0; row_stmt_start_timestamp= 0;
long_find_row_note_printed= false; long_find_row_note_printed= false;
did_mark_start_commit= false; did_mark_start_commit= false;
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
commit_orderer.reinit(); commit_orderer.reinit();
} }
...@@ -1631,6 +1632,13 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) ...@@ -1631,6 +1632,13 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/*
Ensure we always release the domain for others to process, when using
--gtid-ignore-duplicates.
*/
if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
rpl_global_gtid_slave_state.release_domain_owner(this);
/* /*
Reset state related to long_find_row notes in the error log: Reset state related to long_find_row notes in the error log:
- timestamp - timestamp
......
...@@ -575,6 +575,20 @@ struct rpl_group_info ...@@ -575,6 +575,20 @@ struct rpl_group_info
counting one event group twice. counting one event group twice.
*/ */
bool did_mark_start_commit; bool did_mark_start_commit;
enum {
GTID_DUPLICATE_NULL=0,
GTID_DUPLICATE_IGNORE=1,
GTID_DUPLICATE_OWNER=2
};
/*
When --gtid-ignore-duplicates, this is set to one of the above three
values:
GTID_DUPLICATE_NULL - Not using --gtid-ignore-duplicates.
GTID_DUPLICATE_IGNORE - This gtid already applied, skip the event group.
GTID_DUPLICATE_OWNER - We are the current owner of the domain, and must
apply the event group and then release the domain.
*/
uint8 gtid_ignore_duplicate_state;
/* /*
Runtime state for printing a note when slave is taking Runtime state for printing a note when slave is taking
......
...@@ -3508,18 +3508,46 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3508,18 +3508,46 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/ */
} }
/* if (typ == GTID_EVENT)
For GTID, allocate a new sub_id for the given domain_id.
The sub_id must be allocated in increasing order of binlog order.
*/
if (typ == GTID_EVENT &&
event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev)))
{ {
sql_print_error("Error reading relay log event: %s", Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
"slave SQL thread aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock); /*
delete ev; For GTID, allocate a new sub_id for the given domain_id.
DBUG_RETURN(1); The sub_id must be allocated in increasing order of binlog order.
*/
if (event_group_new_gtid(serial_rgi, gev))
{
sql_print_error("Error reading relay log event: %s", "slave SQL thread "
"aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
}
if (opt_gtid_ignore_duplicates)
{
serial_rgi->current_gtid.domain_id= gev->domain_id;
serial_rgi->current_gtid.server_id= gev->server_id;
serial_rgi->current_gtid.seq_no= gev->seq_no;
int res= rpl_global_gtid_slave_state.check_duplicate_gtid
(&serial_rgi->current_gtid, serial_rgi);
if (res < 0)
{
sql_print_error("Error processing GTID event: %s", "slave SQL "
"thread aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(1);
}
/*
If we need to skip this event group (because the GTID was already
applied), then do it using the code for slave_skip_counter, which
is able to handle skipping until the end of the event group.
*/
if (!res)
rli->slave_skip_counter= 1;
}
} }
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
......
...@@ -1839,18 +1839,14 @@ static bool ...@@ -1839,18 +1839,14 @@ static bool
fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type) fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
{ {
bool running; bool running;
bool err= false;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running(); running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
if (running)
err= true;
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
/* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */ return running ? true : false;
return err;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment