Commit 2c2478b8 authored by unknown's avatar unknown

MDEV-5804: If same GTID is received on multiple master connections in...

MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure

Before, the arrival of same GTID twice in multi-source replication
would cause double-apply or in gtid strict mode an error.

Keep the behaviour, but add an option --gtid-ignore-duplicates which
allows to correctly handle duplicates, ignoring all but the first.
This relies on the user ensuring correct configuration so that
sequence numbers are strictly increasing within each replication
domain; then duplicates can be detected simply by comparing the
sequence numbers against what is already applied.

Only one master connection (but possibly multiple parallel worker
threads within that connection) is allowed to apply events within
one replication domain at a time; any other connection that
receives a GTID in the same domain either discards it (if it is
already applied) or waits for the other connection to not have
any events to apply.

Intermediate patch, as proof-of-concept for testing. The main limitation
is that currently it is only implemented for parallel replication,
@@slave_parallel_threads > 0.
parent 5c31e79f
!include my.cnf
[mysqld.1]
log-slave-updates
loose-innodb
[mysqld.2]
log-slave-updates
loose-innodb
[mysqld.3]
log-bin=server3-bin
log-slave-updates
loose-innodb
[mysqld.4]
server-id=4
log-bin=server4-bin
log-slave-updates
loose-innodb
[ENV]
SERVER_MYPORT_4= @mysqld.4.port
SERVER_MYSOCK_4= @mysqld.4.socket
*** Test all-to-all replication with --gtid-ignore-duplicates ***
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
CHANGE MASTER 'b2a' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
CHANGE MASTER 'c2a' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'b2a';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = 'c2a';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = '';
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 2;
SET SESSION gtid_domain_id= 2;
CHANGE MASTER 'a2b' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
CHANGE MASTER 'c2b' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2b';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = 'c2b';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = '';
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 3;
SET SESSION gtid_domain_id= 3;
CHANGE MASTER 'a2c' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
CHANGE MASTER 'b2c' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2c';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = 'b2c';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = '';
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
CHANGE MASTER 'a2d' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2d';
START SLAVE;
include/wait_for_slave_to_start.inc
set default_master_connection = '';
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
BEGIN;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
COMMIT;
INSERT INTO t1 VALUES (4), (5);
INSERT INTO t1 VALUES (6);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
5
6
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
5
6
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
5
6
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
5
6
INSERT INTO t1 VALUES (10);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a
10
STOP SLAVE "c2b";
SET default_master_connection = "c2b";
include/wait_for_slave_to_stop.inc
STOP SLAVE "a2b";
SET default_master_connection = "a2b";
include/wait_for_slave_to_stop.inc
INSERT INTO t1 VALUES (11);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a
10
11
SET default_master_connection = "b2a";
STOP SLAVE;
include/wait_for_slave_to_stop.inc
INSERT INTO t1 VALUES (12);
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a
10
12
include/save_master_gtid.inc
START SLAVE "b2a";
SET default_master_connection = "b2a";
include/wait_for_slave_to_start.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a
10
11
12
START SLAVE "c2b";
SET default_master_connection = "c2b";
include/wait_for_slave_to_start.inc
START SLAVE "a2b";
SET default_master_connection = "a2b";
include/wait_for_slave_to_start.inc
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
a
10
11
12
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
Warnings:
Note 1938 SLAVE 'c2a' stopped
Note 1938 SLAVE 'b2a' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
Warnings:
Note 1938 SLAVE 'a2b' stopped
Note 1938 SLAVE 'c2b' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
Warnings:
Note 1938 SLAVE 'a2c' stopped
Note 1938 SLAVE 'b2c' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
Warnings:
Note 1938 SLAVE 'a2d' stopped
include/reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--source include/not_embedded.inc
--source include/have_innodb.inc
--echo *** Test all-to-all replication with --gtid-ignore-duplicates ***
--connect (server_1,127.0.0.1,root,,,$SERVER_MYPORT_1)
--connect (server_2,127.0.0.1,root,,,$SERVER_MYPORT_2)
--connect (server_3,127.0.0.1,root,,,$SERVER_MYPORT_3)
--connect (server_4,127.0.0.1,root,,,$SERVER_MYPORT_4)
# Setup A <-> B, B <-> C, C <-> A, and A -> D.
--connection server_1
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
--replace_result $SERVER_MYPORT_2 MYPORT_2
eval CHANGE MASTER 'b2a' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
--replace_result $SERVER_MYPORT_3 MYPORT_3
eval CHANGE MASTER 'c2a' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'b2a';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'c2a';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--connection server_2
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 2;
SET SESSION gtid_domain_id= 2;
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval CHANGE MASTER 'a2b' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
--replace_result $SERVER_MYPORT_3 MYPORT_3
eval CHANGE MASTER 'c2b' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2b';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'c2b';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--connection server_3
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 3;
SET SESSION gtid_domain_id= 3;
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval CHANGE MASTER 'a2c' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
--replace_result $SERVER_MYPORT_2 MYPORT_2
eval CHANGE MASTER 'b2c' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2c';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'b2c';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--connection server_4
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=5;
SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
SET GLOBAL gtid_ignore_duplicates=1;
SET GLOBAL gtid_domain_id= 1;
SET SESSION gtid_domain_id= 1;
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval CHANGE MASTER 'a2d' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
set default_master_connection = 'a2d';
START SLAVE;
--source include/wait_for_slave_to_start.inc
set default_master_connection = '';
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
BEGIN;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
COMMIT;
INSERT INTO t1 VALUES (4), (5);
INSERT INTO t1 VALUES (6);
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_3
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_4
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_1
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
# Test that we can connect at a GTID position that has not yet reached
# that master server.
# We stop the connections C->B and A->B, create an event on C, Check that
# the event has reached A (but not B). Then let A stop and re-connect to
# B, which will connect at the new event, which is in the future for B.
--connection server_3
INSERT INTO t1 VALUES (10);
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
STOP SLAVE "c2b";
SET default_master_connection = "c2b";
--source include/wait_for_slave_to_stop.inc
STOP SLAVE "a2b";
SET default_master_connection = "a2b";
--source include/wait_for_slave_to_stop.inc
--connection server_3
INSERT INTO t1 VALUES (11);
--source include/save_master_gtid.inc
--connection server_1
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
SET default_master_connection = "b2a";
STOP SLAVE;
--source include/wait_for_slave_to_stop.inc
--connection server_2
INSERT INTO t1 VALUES (12);
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
--source include/save_master_gtid.inc
--connection server_1
START SLAVE "b2a";
SET default_master_connection = "b2a";
--source include/wait_for_slave_to_start.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
--connection server_2
START SLAVE "c2b";
SET default_master_connection = "c2b";
--source include/wait_for_slave_to_start.inc
START SLAVE "a2b";
SET default_master_connection = "a2b";
--source include/wait_for_slave_to_start.inc
--connection server_1
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
# Clean up.
--connection server_1
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_1
--connection server_2
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_2
--connection server_3
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_3
--connection server_4
SET GLOBAL gtid_domain_id=0;
STOP ALL SLAVES;
--source reset_master_slave.inc
SET GLOBAL slave_parallel_threads= @old_parallel;
SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
DROP TABLE t1;
--disconnect server_4
......@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
end:
if (sub_id && !thd->is_slave_error)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
......@@ -6806,7 +6806,8 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
sub_id_list[i],
false, false)))
return ret;
rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i],
NULL);
}
}
ret= Log_event::do_apply_event(rgi);
......@@ -7326,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
/*
Increment the global status commit count variable
......
......@@ -553,6 +553,7 @@ ulong opt_slave_domain_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
my_bool opt_gtid_ignore_duplicates= FALSE;
const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
......@@ -987,7 +988,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
{
......@@ -1035,7 +1036,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
......
......@@ -184,6 +184,7 @@ extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern my_bool opt_gtid_ignore_duplicates;
extern ulong back_log;
extern ulong executed_events;
extern char language[FN_REFLEN];
......@@ -299,7 +300,7 @@ extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer;
extern PSI_cond_key key_COND_wait_gtid;
extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
......
......@@ -33,7 +33,8 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
const Relay_log_info *rli)
{
int err;
/*
......@@ -44,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
......@@ -76,17 +77,102 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid);
update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
}
DBUG_RETURN(0);
}
/*
Check GTID event execution when --gtid-ignore-duplicates.
The idea with --gtid-ignore-duplicates is that we allow multiple master
connections (in multi-source replication) to all receive the same GTIDs and
event groups. Only one instance of each is applied; we use the sequence
number in the GTID to decide whether a GTID has already been applied.
So if the seq_no of a GTID (or a higher sequence number) has already been
applied, then the event should be skipped. If not then the event should be
applied.
To avoid two master connections tring to apply the same event
simultaneously, only one is allowed to work in any given domain at any point
in time. The associated Relay_log_info object is called the owner of the
domain (and there can be multiple parallel worker threads working in that
domain for that Relay_log_info). Any other Relay_log_info/master connection
must wait for the domain to become free, or for their GTID to have been
applied, before being allowed to proceed.
Returns:
0 This GTID is already applied, it should be skipped.
1 The GTID is not yet applied; this rli is now the owner, and must apply
the event and release the domain afterwards.
-1 Error (out of memory to allocate a new element for the domain).
*/
int
rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
{
uint32 domain_id= gtid->domain_id;
uint32 seq_no= gtid->seq_no;
rpl_slave_state::element *elem;
int res;
mysql_mutex_lock(&LOCK_slave_state);
if (!(elem= get_element(domain_id)))
{
res= -1;
goto err;
}
/*
Note that the elem pointer does not change once inserted in the hash. So
we can re-use the pointer without looking it up again in the hash after
each lock release and re-take.
*/
/* ToDo: Make this wait killable. */
for (;;)
{
if (elem->highest_seq_no >= seq_no)
{
/* This sequence number is already applied, ignore it. */
res= 0;
break;
}
if (!elem->owner_rli)
{
/* The domain became free, grab it and apply the event. */
elem->owner_rli= rli;
elem->owner_count= 1;
res= 1;
break;
}
if (elem->owner_rli == rli)
{
/* Already own this domain, increment reference count and apply event. */
++elem->owner_count;
res= 1;
break;
}
/*
Someone else is currently processing this GTID (or an earlier one).
Wait for them to complete (or fail), and then check again.
*/
mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
&LOCK_slave_state);
}
err:
mysql_mutex_unlock(&LOCK_slave_state);
return res;
}
static void
rpl_slave_state_free_element(void *arg)
{
struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
mysql_cond_destroy(&elem->COND_wait_gtid);
mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
my_free(elem);
}
......@@ -147,7 +233,7 @@ rpl_slave_state::deinit()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no)
uint64 seq_no, const Relay_log_info *rli)
{
element *elem= NULL;
list_element *list_elem= NULL;
......@@ -170,6 +256,20 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
if (opt_gtid_ignore_duplicates && rli)
{
uint32 count= elem->owner_count;
DBUG_ASSERT(count > 0);
DBUG_ASSERT(elem->owner_rli == rli);
--count;
elem->owner_count= count;
if (count == 0)
{
elem->owner_rli= NULL;
mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
}
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
......@@ -199,7 +299,11 @@ rpl_slave_state::get_element(uint32 domain_id)
elem->domain_id= domain_id;
elem->highest_seq_no= 0;
elem->gtid_waiter= NULL;
elem->owner_rli= NULL;
elem->owner_count= 0;
mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
mysql_cond_init(key_COND_gtid_ignore_duplicates,
&elem->COND_gtid_ignore_duplicates, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
......@@ -821,7 +925,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
record_gtid(thd, &gtid, sub_id, false, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
return 1;
if (state_from_master == end)
break;
......
......@@ -91,6 +91,8 @@ struct gtid_waiting {
};
class Relay_log_info;
/*
Replication slave state.
......@@ -131,6 +133,19 @@ struct rpl_slave_state
uint64 min_wait_seq_no;
mysql_cond_t COND_wait_gtid;
/*
For --gtid-ignore-duplicates. The Relay_log_info that currently owns
this domain, and the number of worker threads that are active in it.
The idea is that only one of multiple master connections is allowed to
actively apply events for a given domain. Other connections must either
discard the events (if the seq_no in GTID shows they have already been
applied), or wait to see if the current owner will apply it.
*/
const Relay_log_info *owner_rli;
uint32 owner_count;
mysql_cond_t COND_gtid_ignore_duplicates;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
{
......@@ -155,7 +170,8 @@ struct rpl_slave_state
void deinit();
void truncate_hash();
ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no, const Relay_log_info *rli);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
......@@ -171,8 +187,10 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid,
const Relay_log_info *rli);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli);
};
......
......@@ -202,7 +202,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
bool group_skip_for_stop= false;
bool skip_event_group= false;
rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
......@@ -385,13 +385,13 @@ handle_rpl_parallel_thread(void *arg)
point where we can safely stop. So set a flag that will cause us
to skip, rather than execute, the following events.
*/
group_skip_for_stop= true;
skip_event_group= true;
}
else
group_skip_for_stop= false;
skip_event_group= false;
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
group_skip_for_stop= true;
skip_event_group= true;
else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
......@@ -420,6 +420,16 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
}
thd->wait_for_commit_ptr= &rgi->commit_orderer;
if (opt_gtid_ignore_duplicates)
{
int res=
rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
rgi->rli);
/* ToDo: Handle res==-1 error. */
if (!res)
skip_event_group= true;
}
}
group_ending= event_type == XID_EVENT ||
......@@ -438,7 +448,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
if (!rgi->is_error && !group_skip_for_stop)
if (!rgi->is_error && !skip_event_group)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
......@@ -464,7 +474,7 @@ handle_rpl_parallel_thread(void *arg)
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
group_skip_for_stop= false;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
......@@ -526,7 +536,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
group_rgi= NULL;
group_skip_for_stop= false;
skip_event_group= false;
}
if (!in_event_group)
{
......
......@@ -1435,7 +1435,8 @@ rpl_load_gtid_slave_state(THD *thd)
if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id,
tmp_entry.gtid.server_id,
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
tmp_entry.gtid.seq_no,
NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
......
......@@ -2047,6 +2047,39 @@ after_set_capability:
}
}
query_str.length(0);
if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="),
system_charset_info) ||
query_str.append_ulonglong(opt_gtid_ignore_duplicates != false))
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory error "
"is encountered when it tries to set @slave_gtid_ignore_duplicates.";
sprintf(err_buff, "%s Error: Out of memory", errmsg);
goto err;
}
rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
if (rc)
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
mi->report(ERROR_LEVEL, err_code,
"Setting @slave_gtid_ignore_duplicates failed with "
"error: %s", mysql_error(mysql));
goto network_err;
}
else
{
/* Fatal error */
errmsg= "The slave I/O thread stops because a fatal error is "
"encountered when it tries to set @slave_gtid_ignore_duplicates.";
sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
goto err;
}
}
if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
{
query_str.length(0);
......
......@@ -114,6 +114,39 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
}
/*
Helper structure, used to pass miscellaneous info from mysql_binlog_send()
into the helper functions that it calls.
*/
struct binlog_send_info {
rpl_binlog_state until_binlog_state;
slave_connection_state gtid_state;
THD *thd;
NET *net;
String *packet;
char *log_file_name;
slave_connection_state *until_gtid_state;
Format_description_log_event *fdev;
int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group;
ushort flags;
uint8 current_checksum_alg;
bool slave_gtid_strict_mode;
bool send_fake_gtid_list;
bool slave_gtid_ignore_duplicates;
bool using_gtid_state;
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false),
slave_gtid_ignore_duplicates(false)
{ }
};
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
......@@ -132,16 +165,16 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
part.
*/
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
ulonglong position, const char** errmsg,
uint8 checksum_alg_arg)
static int fake_rotate_event(binlog_send_info *info, ulonglong position,
const char** errmsg, uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
char buf[ROTATE_HEADER_LEN+100];
my_bool do_checksum;
int err;
char* p = log_file_name+dirname_length(log_file_name);
char* p = info->log_file_name+dirname_length(info->log_file_name);
uint ident_len = (uint) strlen(p);
String *packet= info->packet;
ha_checksum crc;
if ((err= fake_event_header(packet, ROTATE_EVENT,
......@@ -160,22 +193,23 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(net, packet, errmsg)))
(err= fake_event_write(info->net, packet, errmsg)))
DBUG_RETURN(err);
DBUG_RETURN(0);
}
static int fake_gtid_list_event(NET* net, String* packet,
static int fake_gtid_list_event(binlog_send_info *info,
Gtid_list_log_event *glev, const char** errmsg,
uint8 checksum_alg_arg, uint32 current_pos)
uint32 current_pos)
{
my_bool do_checksum;
int err;
ha_checksum crc;
char buf[128];
String str(buf, sizeof(buf), system_charset_info);
String* packet= info->packet;
str.length(0);
if (glev->to_packet(&str))
......@@ -185,7 +219,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc,
errmsg, checksum_alg_arg, current_pos)))
errmsg, info->current_checksum_alg, current_pos)))
return err;
packet->append(str);
......@@ -195,7 +229,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(net, packet, errmsg)))
(err= fake_event_write(info->net, packet, errmsg)))
return err;
return 0;
......@@ -627,6 +661,19 @@ get_slave_gtid_strict_mode(THD *thd)
}
static bool
get_slave_gtid_ignore_duplicates(THD *thd)
{
bool null_value;
const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
user_var_entry *entry=
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
name.length);
return entry && entry->val_int(&null_value) && !null_value;
}
/*
Get the value of the @slave_until_gtid user variable into the supplied
String (this is the GTID position specified for START SLAVE UNTIL
......@@ -914,16 +961,16 @@ give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
*/
static int
check_slave_start_position(THD *thd, slave_connection_state *st,
const char **errormsg, rpl_gtid *error_gtid,
slave_connection_state *until_gtid_state)
check_slave_start_position(binlog_send_info *info, const char **errormsg,
rpl_gtid *error_gtid)
{
uint32 i;
int err;
slave_connection_state::entry **delete_list= NULL;
uint32 delete_idx= 0;
slave_connection_state *st= &info->gtid_state;
if (rpl_load_gtid_slave_state(thd))
if (rpl_load_gtid_slave_state(info->thd))
{
*errormsg= "Failed to load replication slave GTID state";
err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
......@@ -963,6 +1010,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
if (!start_at_own_slave_pos)
{
rpl_gtid domain_gtid;
slave_connection_state *until_gtid_state= info->until_gtid_state;
rpl_gtid *until_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
......@@ -981,6 +1029,17 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
continue;
}
if (info->slave_gtid_ignore_duplicates &&
domain_gtid.seq_no < slave_gtid->seq_no)
{
/*
When --gtid-ignore-duplicates, it is ok for the slave to request
something that we do not have (yet) - they might already have gotten
it through another path in a multi-path replication hierarchy.
*/
continue;
}
if (until_gtid_state &&
( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
(mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
......@@ -1462,13 +1521,11 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
static bool
is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
enum_gtid_until_state gtid_until_group,
Log_event_type event_type, uint8 current_checksum_alg,
ushort flags, const char **errmsg,
rpl_binlog_state *until_binlog_state, uint32 current_pos)
is_until_reached(binlog_send_info *info, ulong *ev_offset,
Log_event_type event_type, const char **errmsg,
uint32 current_pos)
{
switch (gtid_until_group)
switch (info->gtid_until_group)
{
case GTID_UNTIL_NOT_DONE:
return false;
......@@ -1479,9 +1536,10 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
if (event_type != XID_EVENT &&
(event_type != QUERY_EVENT ||
!Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
packet->length()-*ev_offset,
current_checksum_alg)))
!Query_log_event::peek_is_commit_rollback
(info->packet->ptr()+*ev_offset,
info->packet->length()-*ev_offset,
info->current_checksum_alg)))
return false;
break;
}
......@@ -1493,12 +1551,11 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Send a last fake Gtid_list_log_event with a flag set to mark that we
stop due to UNTIL condition.
*/
if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
return true;
Gtid_list_log_event glev(until_binlog_state,
Gtid_list_log_event glev(&info->until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED);
if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
current_pos))
if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
return true;
*errmsg= NULL;
return true;
......@@ -1512,23 +1569,19 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Returns NULL on success, error message string on error.
*/
static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
IO_CACHE *log, int mariadb_slave_capability,
ulong ev_offset, uint8 current_checksum_alg,
bool using_gtid_state, slave_connection_state *gtid_state,
enum_gtid_skip_type *gtid_skip_group,
slave_connection_state *until_gtid_state,
enum_gtid_until_state *gtid_until_group,
rpl_binlog_state *until_binlog_state,
bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
bool *send_fake_gtid_list,
Format_description_log_event *fdev)
send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
{
my_off_t pos;
String* const packet= info->packet;
size_t len= packet->length();
int mariadb_slave_capability= info->mariadb_slave_capability;
uint8 current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state;
if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state)
if (event_type == GTID_LIST_EVENT &&
info->using_gtid_state && until_gtid_state)
{
rpl_gtid *gtid_list;
uint32 list_len;
......@@ -1537,12 +1590,12 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (ev_offset > len ||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&gtid_list, &list_len, fdev))
&gtid_list, &list_len, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
}
err= until_binlog_state->load(gtid_list, list_len);
err= info->until_binlog_state.load(gtid_list, list_len);
my_free(gtid_list);
if (err)
{
......@@ -1552,7 +1605,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && using_gtid_state)
if (event_type == GTID_EVENT && info->using_gtid_state)
{
uchar flags2;
slave_connection_state::entry *gtid_entry;
......@@ -1566,7 +1619,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &flags2, fdev))
&event_gtid.seq_no, &flags2, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
......@@ -1575,7 +1628,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
{
rpl_gtid *dbug_gtid;
if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) &&
if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
dbug_gtid->seq_no == 100)
{
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
......@@ -1585,7 +1638,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
});
if (until_binlog_state->update_nolock(&event_gtid, false))
if (info->until_binlog_state.update_nolock(&event_gtid, false))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
......@@ -1618,12 +1671,13 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
/* Skip this event group if we have not yet reached slave start pos. */
if (event_gtid.server_id != gtid->server_id ||
event_gtid.seq_no <= gtid->seq_no)
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
if (event_gtid.server_id == gtid->server_id &&
event_gtid.seq_no >= gtid->seq_no)
{
if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no &&
if (info->slave_gtid_strict_mode &&
event_gtid.seq_no > gtid->seq_no &&
!(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
{
/*
......@@ -1645,7 +1699,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
The fake event will be sent at the end of this event group.
*/
*send_fake_gtid_list= true;
info->send_fake_gtid_list= true;
/*
Delete this entry if we have reached slave start position (so we
......@@ -1666,7 +1720,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
This domain already reached the START SLAVE UNTIL stop condition,
so skip this event group.
*/
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
else if (event_gtid.server_id == gtid->server_id &&
......@@ -1681,9 +1735,9 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
uint64 until_seq_no= gtid->seq_no;
until_gtid_state->remove(gtid);
if (until_gtid_state->count() == 0)
*gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_UNTIL_STOP_AFTER_STANDALONE :
GTID_UNTIL_STOP_AFTER_TRANSACTION);
info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_UNTIL_STOP_AFTER_STANDALONE :
GTID_UNTIL_STOP_AFTER_TRANSACTION);
if (event_gtid.seq_no > until_seq_no)
{
/*
......@@ -1693,7 +1747,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
should be in, we can just stop now. And we also need to skip this
event group (as it is beyond the UNTIL condition).
*/
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
}
......@@ -1707,11 +1761,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Note that slave that understands GTID can also tolerate holes, so there is
no need to supply dummy event.
*/
switch (*gtid_skip_group)
switch (info->gtid_skip_group)
{
case GTID_SKIP_STANDALONE:
if (!Log_event::is_part_of_group(event_type))
*gtid_skip_group= GTID_SKIP_NOT;
info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT ||
......@@ -1719,14 +1773,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
len - ev_offset,
current_checksum_alg)))
*gtid_skip_group= GTID_SKIP_NOT;
info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_NOT:
break;
}
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
if (event_type == ANNOTATE_ROWS_EVENT &&
!(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
{
......@@ -1820,7 +1875,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Skip events with the @@skip_replication flag set, if slave requested
skipping of such events.
*/
if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
{
/*
The first byte of the packet is a '\0' to distinguish it from an error
......@@ -1831,17 +1886,17 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return NULL;
}
THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
(info->thd, info->flags, packet, info->log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
}
if (my_net_write(net, (uchar*) packet->ptr(), len))
if (my_net_write(info->net, (uchar*) packet->ptr(), len))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
......@@ -1850,14 +1905,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
if (send_file(info->thd))
{
my_errno= ER_UNKNOWN_ERROR;
return "failed in send_file()";
}
}
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
if (RUN_HOOK(binlog_transmit, after_send_event,
(info->thd, info->flags, packet)))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
......@@ -1878,31 +1934,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log;
File file = -1;
String* const packet = &thd->packet;
String* const packet= &thd->packet;
int error;
const char *errmsg = "Unknown error", *tmp_msg;
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
NET* net = &thd->net;
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
char str_buf2[128];
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
slave_connection_state gtid_state, until_gtid_state_obj;
slave_connection_state *until_gtid_state= NULL;
slave_connection_state until_gtid_state_obj;
rpl_gtid error_gtid;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
rpl_binlog_state until_binlog_state;
bool slave_gtid_strict_mode= false;
bool send_fake_gtid_list= false;
binlog_send_info info(thd, packet, flags, log_file_name);
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
Format_description_log_event *fdev= NULL;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
......@@ -1928,16 +1974,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
mariadb_slave_capability= get_mariadb_slave_capability(thd);
info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
connect_gtid_state.length(0);
using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
if (using_gtid_state)
info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
if (info.using_gtid_state)
{
slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
if(get_slave_until_gtid(thd, &slave_until_gtid_str))
until_gtid_state= &until_gtid_state_obj;
info.until_gtid_state= &until_gtid_state_obj;
}
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
......@@ -1978,7 +2025,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
if (!(fdev= new Format_description_log_event(3)))
if (!(info.fdev= new Format_description_log_event(3)))
{
errmsg= "Out of memory initializing format_description event";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
......@@ -1999,33 +2046,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
if (using_gtid_state)
if (info.using_gtid_state)
{
if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
connect_gtid_state.length()))
if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
connect_gtid_state.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining start "
"position from GTID state";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
if (until_gtid_state &&
until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
slave_until_gtid_str.length()))
if (info.until_gtid_state &&
info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
slave_until_gtid_str.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
"position sent from slave";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
&error_gtid, until_gtid_state)))
if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
{
my_errno= error;
goto err;
}
if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
until_gtid_state)))
if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
info.until_gtid_state)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
......@@ -2098,7 +2144,7 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
if (fake_rotate_event(&info, pos, &errmsg,
get_binlog_checksum_value_at_connect(thd)))
{
/*
......@@ -2150,14 +2196,14 @@ impossible position";
{
Format_description_log_event *tmp;
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
......@@ -2170,14 +2216,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
fdev)))
info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
delete fdev;
fdev= tmp;
delete info.fdev;
info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
......@@ -2194,12 +2240,12 @@ impossible position";
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */
if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
......@@ -2235,13 +2281,13 @@ impossible position";
We will send one event, the format_description, and then stop.
*/
if (until_gtid_state && until_gtid_state->count() == 0)
gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
if (info.until_gtid_state && info.until_gtid_state->count() == 0)
info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
while (!net->error && net->vio != 0 && !thd->killed)
while (!info.net->error && info.net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
killed_state killed;
......@@ -2254,14 +2300,14 @@ impossible position";
bool is_active_binlog= false;
while (!(killed= thd->killed) &&
!(error = Log_event::read_log_event(&log, packet, log_lock,
current_checksum_alg,
info.current_checksum_alg,
log_file_name,
&is_active_binlog)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
net_flush(net);
net_flush(info.net);
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
goto err;
......@@ -2279,7 +2325,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
net_flush(net);
net_flush(info.net);
const char act[]=
"now "
"wait_for signal.continue";
......@@ -2298,14 +2344,14 @@ impossible position";
{
Format_description_log_event *tmp;
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
......@@ -2318,14 +2364,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
fdev)))
info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
delete fdev;
fdev= tmp;
delete info.fdev;
info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
......@@ -2343,36 +2389,28 @@ impossible position";
}
#endif
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg, using_gtid_state,
&gtid_state, &gtid_skip_group,
until_gtid_state, &gtid_until_group,
&until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
&send_fake_gtid_list, fdev)))
if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
if (unlikely(info.send_fake_gtid_list) &&
info.gtid_skip_group == GTID_SKIP_NOT)
{
Gtid_list_log_event glev(&until_binlog_state, 0);
Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
fake_gtid_list_event(net, packet, &glev, &errmsg,
current_checksum_alg, my_b_tell(&log)))
fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
send_fake_gtid_list= false;
info.send_fake_gtid_list= false;
}
if (until_gtid_state &&
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
event_type, current_checksum_alg, flags, &errmsg,
&until_binlog_state, my_b_tell(&log)))
if (info.until_gtid_state &&
is_until_reached(&info, &ev_offset, event_type, &errmsg,
my_b_tell(&log)))
{
if (errmsg)
{
......@@ -2386,7 +2424,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
net_flush(net);
net_flush(info.net);
}
});
......@@ -2423,7 +2461,7 @@ impossible position";
/*
Block until there is more data in the log
*/
if (net_flush(net))
if (net_flush(info.net))
{
errmsg = "failed on net_flush()";
my_errno= ER_UNKNOWN_ERROR;
......@@ -2466,7 +2504,7 @@ impossible position";
mysql_mutex_lock(log_lock);
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
current_checksum_alg)) {
info.current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
mysql_mutex_unlock(log_lock);
......@@ -2524,7 +2562,8 @@ impossible position";
thd->EXIT_COND(&old_stage);
goto err;
}
if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
if (send_heartbeat_event(info.net, packet, p_coord,
info.current_checksum_alg))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
......@@ -2549,36 +2588,28 @@ impossible position";
if (read_packet)
{
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg,
using_gtid_state, &gtid_state,
&gtid_skip_group, until_gtid_state,
&gtid_until_group, &until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
&send_fake_gtid_list, fdev)))
if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
if (unlikely(info.send_fake_gtid_list)
&& info.gtid_skip_group == GTID_SKIP_NOT)
{
Gtid_list_log_event glev(&until_binlog_state, 0);
Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
fake_gtid_list_event(net, packet, &glev, &errmsg,
current_checksum_alg, my_b_tell(&log)))
fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
send_fake_gtid_list= false;
info.send_fake_gtid_list= false;
}
if (until_gtid_state &&
is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
event_type, current_checksum_alg, flags, &errmsg,
&until_binlog_state, my_b_tell(&log)))
if (info.until_gtid_state &&
is_until_reached(&info, &ev_offset, event_type, &errmsg,
my_b_tell(&log)))
{
if (errmsg)
{
......@@ -2633,8 +2664,8 @@ impossible position";
read and send is Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
&errmsg, current_checksum_alg))
fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
info.current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
......@@ -2655,7 +2686,7 @@ end:
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete fdev;
delete info.fdev;
DBUG_VOID_RETURN;
err:
......@@ -2731,7 +2762,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete fdev;
delete info.fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
......
......@@ -1819,6 +1819,54 @@ static Sys_var_ulong Sys_slave_parallel_max_queued(
"--slave-parallel-threads > 0.",
GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
static bool
check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
{
bool running;
mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
return true;
return false;
}
static bool
fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
{
bool running;
bool err= false;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
err= true;
mysql_mutex_lock(&LOCK_global_system_variables);
/* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */
return err;
}
static Sys_var_mybool Sys_gtid_ignore_duplicates(
"gtid_ignore_duplicates",
"When set, different master connections in multi-source replication are "
"allowed to receive and process event groups with the same GTID (when "
"using GTID mode). Only one will be applied, any others will be "
"ignored. Within a given replication domain, just the sequence number "
"will be used to decide whether a given GTID has been already applied; "
"this means it is the responsibility of the user to ensure that GTID "
"sequence numbers are strictly increasing.",
GLOBAL_VAR(opt_gtid_ignore_duplicates), CMD_LINE(OPT_ARG),
DEFAULT(FALSE), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(check_gtid_ignore_duplicates),
ON_UPDATE(fix_gtid_ignore_duplicates));
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment