Commit ba4b937a authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit

Move the deferred event stuff from Relay_log_info to rpl_group_info
to make it thread safe for parallel replication.
parent 6d5f237e
......@@ -6716,8 +6716,8 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
*/
rli->set_flag(Relay_log_info::IN_STMT);
if (rli->deferred_events_collecting)
return rli->deferred_events->add(this);
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
switch (type) {
case LAST_INSERT_ID_EVENT:
......@@ -6827,8 +6827,8 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
if (rli->deferred_events_collecting)
return rli->deferred_events->add(this);
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2;
......@@ -6868,14 +6868,14 @@ Rand_log_event::do_shall_skip(Relay_log_info *rli)
bool slave_execute_deferred_events(THD *thd)
{
bool res= false;
Relay_log_info *rli= thd->rli_slave;
rpl_group_info *rgi= thd->rgi_slave;
DBUG_ASSERT(rli && (!rli->deferred_events_collecting || rli->deferred_events));
DBUG_ASSERT(rgi && (!rgi->deferred_events_collecting || rgi->deferred_events));
if (!rli->deferred_events_collecting || rli->deferred_events->is_empty())
if (!rgi->deferred_events_collecting || rgi->deferred_events->is_empty())
return res;
res= rli->deferred_events->execute(rli->group_info);
res= rgi->deferred_events->execute(rgi);
return res;
}
......@@ -7423,10 +7423,10 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("User_var_log_event::do_apply_event");
if (rli->deferred_events_collecting)
if (rgi->deferred_events_collecting)
{
set_deferred();
DBUG_RETURN(rli->deferred_events->add(this));
DBUG_RETURN(rgi->deferred_events->add(this));
}
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
......
......@@ -4698,16 +4698,6 @@ bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;
#ifndef MYSQL_CLIENT
/**
The function is called by slave applier in case there are
active table filtering rules to force gathering events associated
with Query-log-event into an array to execute
them once the fate of the Query is determined for execution.
*/
bool slave_execute_deferred_events(THD *thd);
#endif
/**
@} (end of group Replication)
*/
......
......@@ -66,7 +66,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
thd->rli_slave= rli;
thd->rgi_slave= rgi;
thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Get rid of rli->group_info, it is not thread safe. */
rli->group_info= rgi;
......@@ -574,6 +574,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true;
}
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
rgi->deferred_events= new Deferred_log_events(rli);
if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
e->last_commit_id == gtid_ev->commit_id)
......
......@@ -60,7 +60,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
group_info(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0)
{
......@@ -1535,7 +1535,8 @@ end:
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0)
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL)
{
bzero(&current_gtid, sizeof(current_gtid));
}
......@@ -1596,7 +1597,7 @@ delete_or_keep_event_post_apply(Relay_log_info *rli,
/* fall through */
default:
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
if (!rli->is_deferred_event(ev))
if (!rli->group_info->is_deferred_event(ev))
delete ev;
break;
}
......
......@@ -403,41 +403,6 @@ public:
The timestamp is set and reset in @c sql_slave_killed().
*/
time_t last_event_start_time;
/*
A container to hold on Intvar-, Rand-, Uservar- log-events in case
the slave is configured with table filtering rules.
The withhold events are executed when their parent Query destiny is
determined for execution as well.
*/
Deferred_log_events *deferred_events;
/*
State of the container: true stands for IRU events gathering,
false does for execution, either deferred or direct.
*/
bool deferred_events_collecting;
/*
Returns true if the argument event resides in the containter;
more specifically, the checking is done against the last added event.
*/
bool is_deferred_event(Log_event * ev)
{
return deferred_events_collecting ? deferred_events->is_last(ev) : false;
};
/* The general cleanup that slave applier may need at the end of query. */
inline void cleanup_after_query()
{
if (deferred_events)
deferred_events->rewind();
};
/* The general cleanup that slave applier may need at the end of session. */
void cleanup_after_session()
{
if (deferred_events)
delete deferred_events;
};
/**
Helper function to do after statement completion.
......@@ -581,6 +546,7 @@ public:
private:
/* ToDo: This must be moved to rpl_group_info. */
uint32 m_flags;
/*
......@@ -645,6 +611,41 @@ struct rpl_group_info
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info() { };
/*
A container to hold on Intvar-, Rand-, Uservar- log-events in case
the slave is configured with table filtering rules.
The withhold events are executed when their parent Query destiny is
determined for execution as well.
*/
Deferred_log_events *deferred_events;
/*
State of the container: true stands for IRU events gathering,
false does for execution, either deferred or direct.
*/
bool deferred_events_collecting;
/*
Returns true if the argument event resides in the containter;
more specifically, the checking is done against the last added event.
*/
bool is_deferred_event(Log_event * ev)
{
return deferred_events_collecting ? deferred_events->is_last(ev) : false;
};
/* The general cleanup that slave applier may need at the end of query. */
inline void cleanup_after_query()
{
if (deferred_events)
deferred_events->rewind();
};
/* The general cleanup that slave applier may need at the end of session. */
void cleanup_after_session()
{
if (deferred_events)
delete deferred_events;
};
};
......
......@@ -1146,18 +1146,17 @@ bool Deferred_log_events::is_empty()
bool Deferred_log_events::execute(struct rpl_group_info *rgi)
{
bool res= false;
Relay_log_info *rli= rgi->rli;
DBUG_ASSERT(rli->deferred_events_collecting);
DBUG_ASSERT(rgi->deferred_events_collecting);
rli->deferred_events_collecting= false;
rgi->deferred_events_collecting= false;
for (uint i= 0; !res && i < array.elements; i++)
{
Log_event *ev= (* (Log_event **)
dynamic_array_ptr(&array, i));
res= ev->apply_event(rgi);
}
rli->deferred_events_collecting= true;
rgi->deferred_events_collecting= true;
return res;
}
......
......@@ -4025,10 +4025,10 @@ pthread_handler_t handle_slave_sql(void *arg)
goto err_during_init;
}
thd->init_for_queries();
thd->rli_slave= rli;
if ((rli->deferred_events_collecting= mi->rpl_filter->is_on()))
thd->rgi_slave= &serial_rgi;
if ((serial_rgi.deferred_events_collecting= mi->rpl_filter->is_on()))
{
rli->deferred_events= new Deferred_log_events(rli);
serial_rgi.deferred_events= new Deferred_log_events(rli);
}
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
......@@ -6302,10 +6302,10 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
*/
bool rpl_master_erroneous_autoinc(THD *thd)
{
if (thd->rli_slave)
if (thd->rgi_slave)
{
DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;);
return rpl_master_has_bug(thd->rli_slave, 33029, FALSE, NULL, NULL);
return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL);
}
return FALSE;
}
......
......@@ -80,6 +80,8 @@ void mysql_client_binlog_statement(THD* thd)
my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
struct rpl_group_info *rgi;
rli= thd->rli_fake;
if (!rli)
{
......@@ -95,11 +97,12 @@ void mysql_client_binlog_statement(THD* thd)
new Format_description_log_event(4);
have_fd_event= FALSE;
}
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
const char *error= 0;
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
Log_event *ev = 0;
struct rpl_group_info rgi(rli);
/*
Out of memory check
......@@ -197,8 +200,8 @@ void mysql_client_binlog_statement(THD* thd)
}
}
rgi.rli= rli;
rgi.thd= thd;
rgi->rli= rli;
rgi->thd= thd;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
0);
......@@ -235,7 +238,7 @@ void mysql_client_binlog_statement(THD* thd)
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
OPTION_SKIP_REPLICATION : 0);
err= ev->apply_event(&rgi);
err= ev->apply_event(rgi);
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
......
......@@ -769,7 +769,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
THD::THD()
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rli_slave(NULL),
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
......@@ -1490,6 +1490,11 @@ THD::~THD()
dbug_sentry= THD_SENTRY_GONE;
#endif
#ifndef EMBEDDED_LIBRARY
if (rgi_fake)
{
delete rgi_fake;
rgi_fake= NULL;
}
if (rli_fake)
{
delete rli_fake;
......@@ -1497,8 +1502,8 @@ THD::~THD()
}
mysql_audit_free_thd(this);
if (rli_slave)
rli_slave->cleanup_after_session();
if (rgi_slave)
rgi_slave->cleanup_after_session();
#endif
free_root(&main_mem_root, MYF(0));
......@@ -1883,7 +1888,7 @@ void THD::cleanup_after_query()
which is intended to consume its event (there can be other
SET statements between them).
*/
if ((rli_slave || rli_fake) && is_update_query(lex->sql_command))
if ((rgi_slave || rli_fake) && is_update_query(lex->sql_command))
auto_inc_intervals_forced.empty();
#endif
}
......@@ -1905,8 +1910,8 @@ void THD::cleanup_after_query()
m_binlog_invoker= FALSE;
#ifndef EMBEDDED_LIBRARY
if (rli_slave)
rli_slave->cleanup_after_query();
if (rgi_slave)
rgi_slave->cleanup_after_query();
#endif
DBUG_VOID_RETURN;
......
......@@ -47,6 +47,7 @@
class Reprepare_observer;
class Relay_log_info;
struct rpl_group_info;
class Rpl_filter;
class Query_log_event;
......@@ -1697,8 +1698,9 @@ public:
/* Used to execute base64 coded binlog events in MySQL server */
Relay_log_info* rli_fake;
rpl_group_info* rgi_fake;
/* Slave applier execution context */
Relay_log_info* rli_slave;
rpl_group_info* rgi_slave;
/* Used to SLAVE SQL thread */
Rpl_filter* rpl_filter;
......
......@@ -810,10 +810,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
table->next_number_field=table->found_next_number_field;
#ifdef HAVE_REPLICATION
if (thd->rli_slave &&
if (thd->rgi_slave &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL))
rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
goto abort;
#endif
......@@ -3464,10 +3464,10 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->next_number_field=table->found_next_number_field;
#ifdef HAVE_REPLICATION
if (thd->rli_slave &&
if (thd->rgi_slave &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL))
rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
DBUG_RETURN(1);
#endif
......
......@@ -362,11 +362,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
MY_RETURN_REAL_PATH);
}
if (thd->rli_slave)
if (thd->rgi_slave)
{
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
if (strncmp(thd->rli_slave->slave_patternload_file, name,
thd->rli_slave->slave_patternload_file_size))
if (strncmp(thd->rgi_slave->rli->slave_patternload_file, name,
thd->rgi_slave->rli->slave_patternload_file_size))
{
/*
LOAD DATA INFILE in the slave SQL Thread can only read from
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment