Commit 592e464a authored by unknown's avatar unknown

MDEV-4506: Parallel replication. Intermediate commit.

Pass down rpl_group_info * to remove one instance of non-threadsafe
use of rli->group_info.
parent 31a5edb5
......@@ -937,8 +937,9 @@ Log_event::Log_event(const char* buf,
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
int Log_event::do_update_pos(Relay_log_info *rli)
int Log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
rli is null when (as far as I (Guilhem) know) the caller is
Load_log_event::do_apply_event *and* that one is called from
......@@ -967,7 +968,7 @@ int Log_event::do_update_pos(Relay_log_info *rli)
(is_artificial_event() &&
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
0 : when),
thd);
thd, rgi);
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
......@@ -4243,8 +4244,9 @@ end:
DBUG_RETURN(thd->is_slave_error);
}
int Query_log_event::do_update_pos(Relay_log_info *rli)
int Query_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
Note that we will not increment group* positions if we are just
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
......@@ -4256,7 +4258,7 @@ int Query_log_event::do_update_pos(Relay_log_info *rli)
return 0;
}
else
return Log_event::do_update_pos(rli);
return Log_event::do_update_pos(rgi);
}
......@@ -4865,8 +4867,9 @@ int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(ret);
}
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
int Format_description_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
if (server_id == (uint32) global_system_variables.server_id)
{
/*
......@@ -4887,7 +4890,7 @@ int Format_description_log_event::do_update_pos(Relay_log_info *rli)
}
else
{
return Log_event::do_update_pos(rli);
return Log_event::do_update_pos(rgi);
}
}
......@@ -5916,8 +5919,9 @@ bool Rotate_log_event::write(IO_CACHE* file)
@retval
0 ok
*/
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
int Rotate_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rotate_log_event::do_update_pos");
#ifndef DBUG_OFF
char buf[32];
......@@ -5962,7 +5966,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
mysql_mutex_unlock(&rli->data_lock);
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi);
flush_relay_log_info(rli);
/*
......@@ -6291,8 +6295,9 @@ Gtid_log_event::do_apply_event(struct rpl_group_info *rgi)
int
Gtid_log_event::do_update_pos(Relay_log_info *rli)
Gtid_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......@@ -6726,8 +6731,9 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
int Intvar_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......@@ -6829,8 +6835,9 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
int Rand_log_event::do_update_pos(Relay_log_info *rli)
int Rand_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......@@ -7498,8 +7505,9 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(0);
}
int User_var_log_event::do_update_pos(Relay_log_info *rli)
int User_var_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......@@ -7718,8 +7726,9 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
*/
int Stop_log_event::do_update_pos(Relay_log_info *rli)
int Stop_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
We do not want to update master_log pos because we get a rotate event
before stop, so by now group_master_log_name is set to the next log.
......@@ -7731,7 +7740,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli)
rli->inc_event_relay_log_pos();
else
{
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi);
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
......@@ -9529,8 +9538,9 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
@retval non-zero Error in the statement commit
*/
int
Rows_log_event::do_update_pos(Relay_log_info *rli)
Rows_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rows_log_event::do_update_pos");
int error= 0;
......@@ -9544,7 +9554,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when, thd);
rli->stmt_done(log_pos, when, thd, rgi);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
......@@ -9777,8 +9787,9 @@ int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli)
int Annotate_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......@@ -10404,8 +10415,9 @@ Table_map_log_event::do_shall_skip(Relay_log_info *rli)
return continue_group(rli);
}
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
int Table_map_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
return 0;
}
......
......@@ -1331,9 +1331,9 @@ public:
@see do_update_pos
*/
int update_pos(Relay_log_info *rli)
int update_pos(struct rpl_group_info *rgi)
{
return do_update_pos(rli);
return do_update_pos(rgi);
}
/**
......@@ -1461,7 +1461,7 @@ protected:
1). Observe that handler errors are returned by the
do_apply_event() function, and not by this one.
*/
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
/**
......@@ -1987,7 +1987,7 @@ public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
int do_apply_event(struct rpl_group_info *rgi,
const char *query_arg,
......@@ -2597,7 +2597,7 @@ public:
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
......@@ -2676,7 +2676,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
......@@ -2755,7 +2755,7 @@ class Rand_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
......@@ -2871,7 +2871,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
......@@ -2905,7 +2905,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli)
{
/*
......@@ -3007,7 +3007,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
......@@ -3120,7 +3120,7 @@ public:
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
#else
......@@ -3636,7 +3636,7 @@ public:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
private:
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info*);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*);
#endif
......@@ -4051,7 +4051,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
......@@ -4279,7 +4279,7 @@ private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
......
......@@ -36,12 +36,13 @@
// Old implementation of do_apply_event()
int
Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
int error= 0;
THD *ev_thd= ev->thd;
uchar const *row_start= ev->m_rows_buf;
const Relay_log_info *rli= rgi->rli;
/*
If m_table_id == ~0UL, then we have a dummy event that does not
......@@ -1450,10 +1451,11 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
Relay_log_info const *rli= rgi->rli;
/*
If m_table_id == ~0UL, then we have a dummy event that does not
......@@ -1832,8 +1834,9 @@ Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
}
int
Old_rows_log_event::do_update_pos(Relay_log_info *rli)
Old_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Old_rows_log_event::do_update_pos");
int error= 0;
......@@ -1847,7 +1850,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when, thd);
rli->stmt_done(log_pos, when, thd, rgi);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
......
......@@ -214,8 +214,8 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
virtual int do_update_pos(Relay_log_info *rli);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
......@@ -275,7 +275,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int do_apply_event(Old_rows_log_event*,const Relay_log_info*);
int do_apply_event(Old_rows_log_event*, struct rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
......@@ -403,8 +403,8 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(struct rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
......@@ -481,8 +481,8 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(struct rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
......@@ -556,8 +556,8 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(struct rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
......
......@@ -62,16 +62,15 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
int
rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
rpl_slave_state::record_and_update_gtid(THD *thd, struct rpl_group_info *rgi)
{
uint64 sub_id;
struct rpl_group_info *rgi;
/*
Update the GTID position, if we have it and did not already update
it in a GTID transaction.
*/
if ((rgi= rli->group_info) && (sub_id= rgi->gtid_sub_id))
if ((sub_id= rgi->gtid_sub_id))
{
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
......
......@@ -108,7 +108,7 @@ struct rpl_slave_state
int put_back_list(uint32 domain_id, list_element *list);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
int record_and_update_gtid(THD *thd, Relay_log_info *rli);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
};
......
......@@ -1194,13 +1194,15 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
time_t event_creation_time, THD *thd)
time_t event_creation_time, THD *thd,
struct rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
clear_flag(IN_STMT);
DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
......@@ -1229,7 +1231,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
else
{
inc_group_relay_log_pos(event_master_log_pos);
if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this))
if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
{
report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
"Failed to update GTID state in %s.%s, slave state may become "
......
......@@ -458,7 +458,8 @@ public:
the <code>Seconds_behind_master</code> field.
*/
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd);
time_t event_creation_time, THD *thd,
struct rpl_group_info *rgi);
/**
......
......@@ -3108,7 +3108,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0)
{
int error= ev->update_pos(rli);
int error= ev->update_pos(rgi);
#ifdef HAVE_valgrind
if (!rli->is_fake)
#endif
......@@ -3262,7 +3262,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(1);
}
exec_res= apply_event_and_update_pos(ev, thd, rli->group_info, NULL);
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment