Commit 03f28863 authored by unknown's avatar unknown

MDEV-26: Global transaction commit. Intermediate commit.

Now slave records GTID in mysql.rpl_slave_state when applying XID log event.
parent ab8e8f4b
...@@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * ...@@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread(); static bool start_binlog_background_thread();
rpl_binlog_state rpl_global_gtid_binlog_state;
/** /**
purge logs, master and slave sides both, related error code purge logs, master and slave sides both, related error code
convertor. convertor.
...@@ -5334,7 +5336,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, ...@@ -5334,7 +5336,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
/* Update the replication state (last GTID in each replication domain). */ /* Update the replication state (last GTID in each replication domain). */
mysql_mutex_lock(&LOCK_rpl_gtid_state); mysql_mutex_lock(&LOCK_rpl_gtid_state);
global_rpl_gtid_state.update(&gtid); rpl_global_gtid_binlog_state.update(&gtid);
mysql_mutex_unlock(&LOCK_rpl_gtid_state); mysql_mutex_unlock(&LOCK_rpl_gtid_state);
return false; return false;
} }
......
...@@ -6055,28 +6055,247 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) ...@@ -6055,28 +6055,247 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
Global transaction ID stuff Global transaction ID stuff
**************************************************************************/ **************************************************************************/
/** rpl_slave_state::rpl_slave_state()
Current replication state (hash of last GTID executed, per replication : inited(false), loaded(false)
domain). {
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
}
rpl_slave_state::~rpl_slave_state()
{
}
#ifdef MYSQL_SERVER
void
rpl_slave_state::init()
{
DBUG_ASSERT(!inited);
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
inited= true;
}
void
rpl_slave_state::deinit()
{
uint32 i;
if (!inited)
return;
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
list_element *l= e->list;
list_element *next;
while (l)
{
next= l->next;
my_free(l);
l= next;
}
/* The element itself is freed by my_hash_free(). */
}
my_hash_free(&hash);
mysql_mutex_destroy(&LOCK_slave_state);
}
#endif
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
uint64 seq_no)
{
element *elem= NULL;
list_element *list_elem= NULL;
if (!(elem= get_element(domain_id)))
return 1;
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
elem->add(list_elem);
return 0;
}
struct rpl_slave_state::element *
rpl_slave_state::get_element(uint32 domain_id)
{
struct element *elem;
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (elem)
return elem;
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
return NULL;
elem->list= NULL;
elem->last_sub_id= 0;
elem->domain_id= domain_id;
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
return NULL;
}
return elem;
}
#ifdef MYSQL_SERVER
#ifdef HAVE_REPLICATION
/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
transaction if !in_transaction (eg. MyISAM or DDL).
gtid The global transaction id for this event group.
sub_id Value allocated within the sub_id when the event group was
read (sub_id must be consistent with commit order in master binlog).
Note that caller must later ensure that the new gtid and sub_id is inserted
into the appropriate HASH element with rpl_slave_state.add(), so that it can
be deleted later. But this must only be done after COMMIT if in transaction.
*/ */
rpl_state global_rpl_gtid_state; int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction)
{
TABLE_LIST tlist;
int err= 0;
bool table_opened= false;
TABLE *table;
list_element *elist= 0, *next;
element *elem;
DBUG_ASSERT(in_transaction /* ToDo: new transaction for DDL etc. */);
rpl_state::rpl_state() mysql_reset_thd_for_next_command(thd, 0);
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_WRITE);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
/*
ToDo: Check the table definition, error if not as expected.
We need the correct first 4 columns with correct type, and the primary key.
*/
bitmap_set_bit(table->write_set, table->field[0]->field_index);
bitmap_set_bit(table->write_set, table->field[1]->field_index);
bitmap_set_bit(table->write_set, table->field[2]->field_index);
bitmap_set_bit(table->write_set, table->field[3]->field_index);
table->field[0]->store((ulonglong)gtid->domain_id, true);
table->field[1]->store(sub_id, true);
table->field[2]->store((ulonglong)gtid->server_id, true);
table->field[3]->store(gtid->seq_no, true);
if ((err= table->file->ha_write_row(table->record[0])))
goto end;
lock();
if ((elem= get_element(gtid->domain_id)) == NULL)
{
unlock();
err= 1;
goto end;
}
elist= elem->grab_list();
unlock();
if (!elist)
goto end;
/* Now delete any already committed rows. */
DBUG_ASSERT
((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
table->s->primary_key < MAX_KEY /* ToDo support all storage engines */);
bitmap_set_bit(table->read_set, table->field[0]->field_index);
bitmap_set_bit(table->read_set, table->field[1]->field_index);
while (elist)
{
next= elist->next;
table->field[1]->store(elist->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */
if ((err= table->file->ha_rnd_pos_by_record(table->record[0])) ||
(err= table->file->ha_delete_row(table->record[0])))
goto end;
my_free(elist);
elist= next;
}
end:
if (table_opened)
{
if (err)
{
/*
ToDo: If error, we need to put any remaining elist back into the HASH so
we can do another delete attempt later.
*/
ha_rollback_trans(thd, FALSE);
close_thread_tables(thd);
if (in_transaction)
ha_rollback_trans(thd, TRUE);
}
else
{
ha_commit_trans(thd, FALSE);
close_thread_tables(thd);
if (in_transaction)
ha_commit_trans(thd, TRUE);
}
}
return err;
}
uint64
rpl_slave_state::next_subid(uint32 domain_id)
{
uint32 sub_id= 0;
element *elem;
lock();
elem= get_element(domain_id);
if (elem)
sub_id= ++elem->last_sub_id;
unlock();
return sub_id;
}
#endif
rpl_binlog_state::rpl_binlog_state()
{ {
my_hash_init(&hash, &my_charset_bin, 32, my_hash_init(&hash, &my_charset_bin, 32,
offsetof(rpl_gtid, domain_id), sizeof(uint32), offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free,
NULL, my_free, HASH_UNIQUE); HASH_UNIQUE);
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW);
} }
rpl_state::~rpl_state() rpl_binlog_state::~rpl_binlog_state()
{ {
mysql_mutex_destroy(&LOCK_binlog_state);
my_hash_free(&hash); my_hash_free(&hash);
} }
#ifdef MYSQL_SERVER
/* /*
Update replication state with a new GTID. Update replication state with a new GTID.
...@@ -6086,7 +6305,7 @@ rpl_state::~rpl_state() ...@@ -6086,7 +6305,7 @@ rpl_state::~rpl_state()
Returns 0 for ok, 1 for error. Returns 0 for ok, 1 for error.
*/ */
int int
rpl_state::update(const struct rpl_gtid *gtid) rpl_binlog_state::update(const struct rpl_gtid *gtid)
{ {
uchar *rec; uchar *rec;
...@@ -6206,20 +6425,20 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol) ...@@ -6206,20 +6425,20 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
protocol->store(buf, p-buf, &my_charset_bin); protocol->store(buf, p-buf, &my_charset_bin);
} }
static char gtid_begin_string[5] = {'B','E','G','I','N'}; static char gtid_begin_string[] = "BEGIN";
int int
Gtid_log_event::do_apply_event(Relay_log_info const *rli) Gtid_log_event::do_apply_event(Relay_log_info const *rli)
{ {
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
/* ToDo: record the new GTID. */ thd->variables.gtid_seq_no= this->seq_no;
if (flags2 & FL_STANDALONE) if (flags2 & FL_STANDALONE)
return 0; return 0;
/* Execute this like a BEGIN query event. */ /* Execute this like a BEGIN query event. */
thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string), thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
&my_charset_bin, next_query_id()); &my_charset_bin, next_query_id());
Parser_state parser_state; Parser_state parser_state;
if (!parser_state.init(thd, thd->query(), thd->query_length())) if (!parser_state.init(thd, thd->query(), thd->query_length()))
...@@ -6350,7 +6569,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, ...@@ -6350,7 +6569,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set) Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
: count(gtid_set->count()), list(0) : count(gtid_set->count()), list(0)
{ {
DBUG_ASSERT(count != 0); DBUG_ASSERT(count != 0);
...@@ -6804,12 +7023,73 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -6804,12 +7023,73 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Xid_log_event::do_apply_event(Relay_log_info const *rli) int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{ {
bool res; bool res;
int err;
rpl_gtid gtid;
uint64 sub_id;
/*
Record any GTID in the same transaction, so slave state is transactionally
consistent.
*/
if ((sub_id= rli->gtid_sub_id))
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
gtid= rli->current_gtid;
err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
if (err)
{
trans_rollback(thd);
return err;
}
}
/* For a slave Xid_log_event is COMMIT */ /* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY, general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */"); "COMMIT /* implicit, from Xid_log_event */");
res= trans_commit(thd); /* Automatically rolls back on error. */ res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
if (sub_id)
{
/*
Add the gtid to the HASH in the replication slave state.
We must do this only here _after_ commit, so that for parallel
replication, there will not be an attempt to delete the corresponding
table row before it is even committed.
Even if commit fails, we still add the entry - in case the table
mysql.rpl_slave_state is non-transactional and the row is not removed
by rollback.
*/
rpl_slave_state::element *elem=
rpl_global_gtid_slave_state.get_element(gtid.domain_id);
rpl_slave_state::list_element *lelem=
(rpl_slave_state::list_element *)my_malloc(sizeof(*lelem), MYF(MY_WME));
if (elem && lelem)
{
lelem->sub_id= sub_id;
lelem->server_id= gtid.server_id;
lelem->seq_no= gtid.seq_no;
elem->add(lelem);
}
else
{
if (lelem)
my_free(lelem);
sql_print_warning("Slave: Out of memory during slave state maintenance. "
"Some no longer necessary rows in table "
"mysql.rpl_slave_state may be left undeleted.");
}
/*
Such failure is not fatal. We will fail to delete the row for this GTID,
but it will do no harm and will be removed automatically on next server
restart.
*/
}
/* /*
Increment the global status commit count variable Increment the global status commit count variable
*/ */
......
...@@ -2953,18 +2953,92 @@ struct rpl_gtid ...@@ -2953,18 +2953,92 @@ struct rpl_gtid
}; };
struct rpl_state /*
Replication slave state.
For every independent replication stream (identified by domain_id), this
remembers the last gtid applied on the slave within this domain.
Since events are always committed in-order within a single domain, this is
sufficient to maintain the state of the replication slave.
*/
struct rpl_slave_state
{ {
/* Elements in the list of GTIDs kept for each domain_id. */
struct list_element
{
struct list_element *next;
uint64 sub_id;
uint64 seq_no;
uint32 server_id;
};
/* Elements in the HASH that hold the state for one domain_id. */
struct element
{
struct list_element *list;
uint64 last_sub_id;
uint32 domain_id;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add (list_element *l)
{
l->next= list;
list= l;
if (last_sub_id < l->sub_id)
last_sub_id= l->sub_id;
}
};
/* Mapping from domain_id to its element. */
HASH hash; HASH hash;
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_slave_state;
bool inited;
bool loaded;
rpl_state(); rpl_slave_state();
~rpl_state(); ~rpl_slave_state();
void init();
void deinit();
ulong count() const { return hash.records; } ulong count() const { return hash.records; }
int update(const struct rpl_gtid *gtid); int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction);
uint64 next_subid(uint32 domain_id);
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
element *get_element(uint32 domain_id);
}; };
extern rpl_state global_rpl_gtid_state;
/*
Binlog state.
This keeps the last GTID written to the binlog for every distinct
(domain_id, server_id) pair.
This will be logged at the start of the next binlog file as a
Gtid_list_log_event; this way, it is easy to find the binlog file
containing a gigen GTID, by simply scanning backwards from the newest
one until a lower seq_no is found in the Gtid_list_log_event at the
start of a binlog for the given domain_id and server_id.
*/
struct rpl_binlog_state
{
/* Mapping from (domain_id,server_id) to its GTID. */
HASH hash;
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_binlog_state;
rpl_binlog_state();
~rpl_binlog_state();
ulong count() const { return hash.records; }
int update(const struct rpl_gtid *gtid);
};
/** /**
@class Gtid_log_event @class Gtid_log_event
...@@ -3129,7 +3203,7 @@ public: ...@@ -3129,7 +3203,7 @@ public:
static const uint element_size= 4+4+8; static const uint element_size= 4+4+8;
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
Gtid_list_log_event(rpl_state *gtid_set); Gtid_list_log_event(rpl_binlog_state *gtid_set);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol); void pack_info(THD *thd, Protocol *protocol);
#endif #endif
......
...@@ -766,6 +766,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, ...@@ -766,6 +766,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count, key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
key_PARTITION_LOCK_auto_inc; key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
PSI_mutex_key key_LOCK_stats, PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
...@@ -838,7 +839,9 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -838,7 +839,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0} { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
{ &key_LOCK_slave_state, "key_LOCK_slave_state", 0},
{ &key_LOCK_binlog_state, "key_LOCK_binlog_state", 0}
}; };
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
...@@ -1783,6 +1786,7 @@ static void mysqld_exit(int exit_code) ...@@ -1783,6 +1786,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently. spawn the kill_server_thread thread, which is running concurrently.
*/ */
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end(); wait_for_signal_thread_to_end();
mysql_audit_finalize(); mysql_audit_finalize();
clean_up_mutexes(); clean_up_mutexes();
...@@ -4064,6 +4068,10 @@ static int init_thread_environment() ...@@ -4064,6 +4068,10 @@ static int init_thread_environment()
PTHREAD_CREATE_DETACHED); PTHREAD_CREATE_DETACHED);
pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM); pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM);
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
#endif
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -247,6 +247,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, ...@@ -247,6 +247,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index; extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
extern PSI_mutex_key key_LOCK_stats, extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
......
...@@ -31,6 +31,16 @@ ...@@ -31,6 +31,16 @@
static int count_relay_log_space(Relay_log_info* rli); static int count_relay_log_space(Relay_log_info* rli);
/**
Current replication state (hash of last GTID executed, per replication
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
const LEX_STRING rpl_gtid_slave_state_table_name=
{ STRING_WITH_LEN("rpl_slave_state") };
// Defined in slave.cc // Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
...@@ -51,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -51,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
tables_to_lock(0), tables_to_lock_count(0), gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0), last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0) m_annotate_event(0)
......
...@@ -307,6 +307,14 @@ public: ...@@ -307,6 +307,14 @@ public:
char slave_patternload_file[FN_REFLEN]; char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size; size_t slave_patternload_file_size;
/*
Current GTID being processed.
The sub_id gives the binlog order within one domain_id. A zero sub_id
means that there is no active GTID.
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
Relay_log_info(bool is_slave_recovery); Relay_log_info(bool is_slave_recovery);
~Relay_log_info(); ~Relay_log_info();
...@@ -584,4 +592,8 @@ private: ...@@ -584,4 +592,8 @@ private:
int init_relay_log_info(Relay_log_info* rli, const char* info_fname); int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern const LEX_STRING rpl_gtid_slave_state_table_name;
extern struct rpl_slave_state rpl_global_gtid_slave_state;
#endif /* RPL_RLI_H */ #endif /* RPL_RLI_H */
...@@ -3724,6 +3724,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, ...@@ -3724,6 +3724,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
goto err; goto err;
} }
/* Load the set of seen GTIDs, if we did not already. */
if (rpl_load_gtid_slave_state(thd))
{
rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Unable to load replication GTID slave state from mysql.%s: %s",
rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
goto err;
}
/* execute init_slave variable */ /* execute init_slave variable */
if (opt_init_slave.length) if (opt_init_slave.length)
{ {
...@@ -5189,6 +5198,27 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5189,6 +5198,27 @@ static Log_event* next_event(Relay_log_info* rli)
inc_event_relay_log_pos() inc_event_relay_log_pos()
*/ */
rli->future_event_relay_log_pos= my_b_tell(cur_log); rli->future_event_relay_log_pos= my_b_tell(cur_log);
/*
For GTID, allocate a new sub_id for the given domain_id.
The sub_id must be allocated in increasing order of binlog order.
*/
if (ev->get_type_code() == GTID_EVENT)
{
Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
if (!sub_id)
{
errmsg = "slave SQL thread aborted because of out-of-memory error";
if (hot_log)
mysql_mutex_unlock(log_lock);
goto err;
}
rli->gtid_sub_id= sub_id;
rli->current_gtid.server_id= gev->server_id;
rli->current_gtid.domain_id= gev->domain_id;
rli->current_gtid.seq_no= gev->seq_no;
}
if (hot_log) if (hot_log)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev); DBUG_RETURN(ev);
......
...@@ -16,10 +16,12 @@ ...@@ -16,10 +16,12 @@
#include "sql_priv.h" #include "sql_priv.h"
#include "unireg.h" #include "unireg.h"
#include "sql_base.h"
#include "sql_parse.h" // check_access #include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
#include "rpl_mi.h" #include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_repl.h" #include "sql_repl.h"
#include "sql_acl.h" // SUPER_ACL #include "sql_acl.h" // SUPER_ACL
#include "log_event.h" #include "log_event.h"
...@@ -748,7 +750,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -748,7 +750,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mariadb_slave_capability= get_mariadb_slave_capability(thd); mariadb_slave_capability= get_mariadb_slave_capability(thd);
if (global_system_variables.log_warnings > 1) if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
thd->variables.server_id, log_ident, (ulong)pos); (int)thd->variables.server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{ {
errmsg= "Failed to run hook 'transmit_start'"; errmsg= "Failed to run hook 'transmit_start'";
...@@ -2442,4 +2444,118 @@ int log_loaded_block(IO_CACHE* file) ...@@ -2442,4 +2444,118 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
Initialise the slave replication state from the mysql.rpl_slave_state table.
This is called each time an SQL thread starts, but the data is only actually
loaded on the first call.
The slave state is the last GTID applied on the slave within each
replication domain.
To avoid row lock contention, there are multiple rows for each domain_id.
The one containing the current slave state is the one with the maximal
sub_id value, within each domain_id.
CREATE TABLE mysql.rpl_slave_state (
domain_id INT UNSIGNED NOT NULL,
sub_id BIGINT UNSIGNED NOT NULL,
server_id INT UNSIGNED NOT NULL,
seq_no BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (domain_id, sub_id))
*/
void
rpl_init_gtid_slave_state()
{
rpl_global_gtid_slave_state.init();
}
void
rpl_deinit_gtid_slave_state()
{
rpl_global_gtid_slave_state.deinit();
}
int
rpl_load_gtid_slave_state(THD *thd)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
DBUG_ENTER("rpl_load_gtid_slave_state");
int err= 0;
rpl_global_gtid_slave_state.lock();
if (rpl_global_gtid_slave_state.loaded)
goto end;
mysql_reset_thd_for_next_command(thd, 0);
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
/*
ToDo: Check the table definition, error if not as expected.
We need the correct first 4 columns with correct type, and the primary key.
*/
bitmap_set_bit(table->read_set, table->field[0]->field_index);
bitmap_set_bit(table->read_set, table->field[1]->field_index);
bitmap_set_bit(table->read_set, table->field[2]->field_index);
bitmap_set_bit(table->read_set, table->field[3]->field_index);
if ((err= table->file->ha_rnd_init_with_error(1)))
goto end;
table_scanned= true;
for (;;)
{
uint32 domain_id, server_id;
uint64 sub_id, seq_no;
if ((err= table->file->ha_rnd_next(table->record[0])))
{
if (err == HA_ERR_RECORD_DELETED)
continue;
else if (err == HA_ERR_END_OF_FILE)
break;
else
goto end;
}
domain_id= (ulonglong)table->field[0]->val_int();
sub_id= (ulonglong)table->field[1]->val_int();
server_id= (ulonglong)table->field[2]->val_int();
seq_no= (ulonglong)table->field[3]->val_int();
DBUG_PRINT("info", ("Read slave state row: %u:%u-%lu sub_id=%lu\n",
(unsigned)domain_id, (unsigned)server_id,
(ulong)seq_no, (ulong)sub_id));
if ((err= rpl_global_gtid_slave_state.update(domain_id, server_id,
sub_id, seq_no)))
goto end;
}
err= 0; /* Clear HA_ERR_END_OF_FILE */
rpl_global_gtid_slave_state.loaded= true;
end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
close_thread_tables(thd);
rpl_global_gtid_slave_state.unlock();
DBUG_RETURN(err);
}
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
...@@ -65,6 +65,11 @@ int log_loaded_block(IO_CACHE* file); ...@@ -65,6 +65,11 @@ int log_loaded_block(IO_CACHE* file);
int init_replication_sys_vars(); int init_replication_sys_vars();
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int rpl_load_gtid_slave_state(THD *thd);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
#endif /* SQL_REPL_INCLUDED */ #endif /* SQL_REPL_INCLUDED */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment