Commit 7681c6aa authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

Fix some part of update of old-style coordinates in parallel replication:

 - Ignore XtraDB request for old-style coordinates, not meaningful for
   parallel replication (must use GTID to get crash-safe parallel slave).

 - Only update relay log coordinates forward, not backwards, to ensure
   that parallel threads do not conflict with each other.

 - Move future_event_relay_log_pos to rgi.
parent fcaf1e6a
...@@ -3881,7 +3881,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, ...@@ -3881,7 +3881,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
future-change-proof addon, e.g if COMMIT handling will start checking future-change-proof addon, e.g if COMMIT handling will start checking
invariants like IN_STMT flag must be off at committing the transaction. invariants like IN_STMT flag must be off at committing the transaction.
*/ */
const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT); const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT);
} }
else else
...@@ -4249,7 +4249,6 @@ end: ...@@ -4249,7 +4249,6 @@ end:
int Query_log_event::do_update_pos(rpl_group_info *rgi) int Query_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli;
/* /*
Note that we will not increment group* positions if we are just Note that we will not increment group* positions if we are just
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
...@@ -4257,7 +4256,7 @@ int Query_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -4257,7 +4256,7 @@ int Query_log_event::do_update_pos(rpl_group_info *rgi)
*/ */
if (thd->one_shot_set) if (thd->one_shot_set)
{ {
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
return 0; return 0;
} }
else else
...@@ -4864,7 +4863,6 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -4864,7 +4863,6 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
int Format_description_log_event::do_update_pos(rpl_group_info *rgi) int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli;
if (server_id == (uint32) global_system_variables.server_id) if (server_id == (uint32) global_system_variables.server_id)
{ {
/* /*
...@@ -4880,7 +4878,7 @@ int Format_description_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -4880,7 +4878,7 @@ int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
Intvar_log_event instead of starting at a Table_map_log_event or Intvar_log_event instead of starting at a Table_map_log_event or
the Intvar_log_event respectively. the Intvar_log_event respectively.
*/ */
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
return 0; return 0;
} }
else else
...@@ -5955,7 +5953,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -5955,7 +5953,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
(ulong) rli->group_master_log_pos)); (ulong) rli->group_master_log_pos));
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
rli->notify_group_master_log_name_update(); rli->notify_group_master_log_name_update();
rli->inc_group_relay_log_pos(pos, TRUE /* skip_lock */); rli->inc_group_relay_log_pos(pos, rgi, TRUE /* skip_lock */);
DBUG_PRINT("info", ("new group_master_log_name: '%s' " DBUG_PRINT("info", ("new group_master_log_name: '%s' "
"new group_master_log_pos: %lu", "new group_master_log_pos: %lu",
rli->group_master_log_name, rli->group_master_log_name,
...@@ -5978,7 +5976,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -5978,7 +5976,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
thd->variables.auto_increment_offset= 1; thd->variables.auto_increment_offset= 1;
} }
else else
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -6290,8 +6288,7 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -6290,8 +6288,7 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
int int
Gtid_log_event::do_update_pos(rpl_group_info *rgi) Gtid_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
...@@ -6723,8 +6720,7 @@ int Intvar_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -6723,8 +6720,7 @@ int Intvar_log_event::do_apply_event(rpl_group_info *rgi)
int Intvar_log_event::do_update_pos(rpl_group_info *rgi) int Intvar_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
...@@ -6820,8 +6816,7 @@ int Rand_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -6820,8 +6816,7 @@ int Rand_log_event::do_apply_event(rpl_group_info *rgi)
int Rand_log_event::do_update_pos(rpl_group_info *rgi) int Rand_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
...@@ -7485,8 +7480,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -7485,8 +7480,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
int User_var_log_event::do_update_pos(rpl_group_info *rgi) int User_var_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
...@@ -7717,11 +7711,11 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -7717,11 +7711,11 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
the target position when in fact we have not. the target position when in fact we have not.
*/ */
if (rli->get_flag(Relay_log_info::IN_TRANSACTION)) if (rli->get_flag(Relay_log_info::IN_TRANSACTION))
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
else else
{ {
rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi); rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi);
rli->inc_group_relay_log_pos(0); rli->inc_group_relay_log_pos(0, rgi);
flush_relay_log_info(rli); flush_relay_log_info(rli);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -9543,7 +9537,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -9543,7 +9537,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
} }
else else
{ {
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
} }
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -9767,8 +9761,7 @@ int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -9767,8 +9761,7 @@ int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi) int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
#endif #endif
...@@ -10395,8 +10388,7 @@ Table_map_log_event::do_shall_skip(rpl_group_info *rgi) ...@@ -10395,8 +10388,7 @@ Table_map_log_event::do_shall_skip(rpl_group_info *rgi)
int Table_map_log_event::do_update_pos(rpl_group_info *rgi) int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
{ {
Relay_log_info *rli= rgi->rli; rgi->inc_event_relay_log_pos();
rli->inc_event_relay_log_pos();
return 0; return 0;
} }
...@@ -11930,11 +11922,21 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, ...@@ -11930,11 +11922,21 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
return FALSE; return FALSE;
#else #else
const Relay_log_info *rli= &(active_mi->rli); const Relay_log_info *rli= &(active_mi->rli);
*log_file_name= rli->group_master_log_name; if (opt_slave_parallel_threads == 0)
*log_pos= rli->group_master_log_pos + {
(rli->future_event_relay_log_pos - rli->group_relay_log_pos); *log_file_name= rli->group_master_log_name;
*group_relay_log_name= rli->group_relay_log_name; *log_pos= rli->group_master_log_pos +
*relay_log_pos= rli->future_event_relay_log_pos; (rli->future_event_relay_log_pos - rli->group_relay_log_pos);
*group_relay_log_name= rli->group_relay_log_name;
*relay_log_pos= rli->future_event_relay_log_pos;
}
else
{
*log_file_name= "";
*log_pos= 0;
*group_relay_log_name= "";
*relay_log_pos= 0;
}
return TRUE; return TRUE;
#endif #endif
} }
......
...@@ -1839,7 +1839,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -1839,7 +1839,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
} }
else else
{ {
rli->inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
} }
DBUG_RETURN(error); DBUG_RETURN(error);
......
...@@ -64,6 +64,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -64,6 +64,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
/* ToDo: Access to thd, and what about rli, split out a parallel part? */ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock); mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd; qev->ev->thd= thd;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL; thd->rgi_slave= NULL;
...@@ -659,6 +660,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -659,6 +660,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
} }
qev->ev= ev; qev->ev= ev;
qev->next= NULL; qev->next= NULL;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
if (typ == GTID_EVENT) if (typ == GTID_EVENT)
{ {
......
...@@ -23,6 +23,7 @@ struct rpl_parallel_thread { ...@@ -23,6 +23,7 @@ struct rpl_parallel_thread {
queued_event *next; queued_event *next;
Log_event *ev; Log_event *ev;
rpl_group_info *rgi; rpl_group_info *rgi;
ulonglong future_event_relay_log_pos;
} *event_queue, *last_in_queue; } *event_queue, *last_in_queue;
}; };
......
...@@ -869,17 +869,33 @@ improper_arguments: %d timed_out: %d", ...@@ -869,17 +869,33 @@ improper_arguments: %d timed_out: %d",
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
bool skip_lock) rpl_group_info *rgi,
bool skip_lock)
{ {
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
if (!skip_lock) if (!skip_lock)
mysql_mutex_lock(&data_lock); mysql_mutex_lock(&data_lock);
inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
group_relay_log_pos= event_relay_log_pos; if (opt_slave_parallel_threads > 0)
strmake_buf(group_relay_log_name,event_relay_log_name); {
/* In case of parallel replication, do not update the position backwards. */
notify_group_relay_log_name_update(); int cmp= strcmp(group_relay_log_name, event_relay_log_name);
if (cmp < 0)
{
group_relay_log_pos= event_relay_log_pos;
strmake_buf(group_relay_log_name, event_relay_log_name);
notify_group_relay_log_name_update();
} else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos)
group_relay_log_pos= event_relay_log_pos;
}
else
{
/* Non-parallel case. */
group_relay_log_pos= event_relay_log_pos;
strmake_buf(group_relay_log_name, event_relay_log_name);
notify_group_relay_log_name_update();
}
/* /*
If the slave does not support transactions and replicates a transaction, If the slave does not support transactions and replicates a transaction,
...@@ -1226,10 +1242,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, ...@@ -1226,10 +1242,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
*/ */
if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions) opt_using_transactions)
inc_event_relay_log_pos(); rgi->inc_event_relay_log_pos();
else else
{ {
inc_group_relay_log_pos(event_master_log_pos); inc_group_relay_log_pos(event_master_log_pos, rgi);
if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi)) if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
{ {
report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
......
...@@ -351,13 +351,9 @@ public: ...@@ -351,13 +351,9 @@ public:
if (until_condition==UNTIL_MASTER_POS) if (until_condition==UNTIL_MASTER_POS)
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
} }
inline void inc_event_relay_log_pos()
{
event_relay_log_pos= future_event_relay_log_pos;
}
void inc_group_relay_log_pos(ulonglong log_pos, void inc_group_relay_log_pos(ulonglong log_pos,
rpl_group_info *rgi,
bool skip_lock=0); bool skip_lock=0);
int wait_for_pos(THD* thd, String* log_name, longlong log_pos, int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
...@@ -561,6 +557,8 @@ struct rpl_group_info ...@@ -561,6 +557,8 @@ struct rpl_group_info
*/ */
time_t last_event_start_time; time_t last_event_start_time;
ulonglong future_event_relay_log_pos;
private: private:
/* /*
Runtime state for printing a note when slave is taking Runtime state for printing a note when slave is taking
...@@ -684,6 +682,13 @@ public: ...@@ -684,6 +682,13 @@ public:
{ {
return long_find_row_note_printed; return long_find_row_note_printed;
} }
inline void inc_event_relay_log_pos()
{
if (opt_slave_parallel_threads == 0 ||
rli->event_relay_log_pos < future_event_relay_log_pos)
rli->event_relay_log_pos= future_event_relay_log_pos;
}
}; };
......
...@@ -3331,7 +3331,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3331,7 +3331,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rli->abort_slave= 1; rli->abort_slave= 1;
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
delete ev; delete ev;
rli->inc_event_relay_log_pos(); serial_rgi->inc_event_relay_log_pos();
DBUG_RETURN(0); DBUG_RETURN(0);
};); };);
} }
...@@ -3360,6 +3360,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3360,6 +3360,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
delete_or_keep_event_post_apply(serial_rgi, typ, ev); delete_or_keep_event_post_apply(serial_rgi, typ, ev);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment