Commit 47f8e0ef authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit

Remove Relay_log_info::group_info. (It is not thread safe).
parent ba4b937a
...@@ -68,8 +68,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -68,8 +68,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
thd->rgi_slave= rgi; thd->rgi_slave= rgi;
thd->rpl_filter = rli->mi->rpl_filter; thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Get rid of rli->group_info, it is not thread safe. */
rli->group_info= rgi;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock); mysql_mutex_lock(&rli->data_lock);
...@@ -203,7 +201,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -203,7 +201,7 @@ handle_rpl_parallel_thread(void *arg)
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
/* ToDo: must use rgi here, not rli, for thread safety. */ /* ToDo: must use rgi here, not rli, for thread safety. */
delete_or_keep_event_post_apply(rgi->rli, event_type, events->ev); delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events); my_free(events);
if (end_of_group) if (end_of_group)
...@@ -661,7 +659,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev) ...@@ -661,7 +659,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
*/ */
qev->rgi= serial_rgi; qev->rgi= serial_rgi;
rpt_handle_event(qev, NULL); rpt_handle_event(qev, NULL);
delete_or_keep_event_post_apply(rli, typ, qev->ev); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev);
return false; return false;
} }
......
...@@ -59,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -59,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
group_info(0), tables_to_lock(0), tables_to_lock_count(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), m_flags(0), last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0) m_annotate_event(0)
...@@ -1559,7 +1559,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) ...@@ -1559,7 +1559,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
void void
delete_or_keep_event_post_apply(Relay_log_info *rli, delete_or_keep_event_post_apply(rpl_group_info *rgi,
Log_event_type typ, Log_event *ev) Log_event_type typ, Log_event *ev)
{ {
/* /*
...@@ -1583,7 +1583,7 @@ delete_or_keep_event_post_apply(Relay_log_info *rli, ...@@ -1583,7 +1583,7 @@ delete_or_keep_event_post_apply(Relay_log_info *rli,
The thd->query will be used to generate new Annotate_rows event The thd->query will be used to generate new Annotate_rows event
during applying the subsequent Rows events. during applying the subsequent Rows events.
*/ */
rli->set_annotate_event((Annotate_rows_log_event*) ev); rgi->rli->set_annotate_event((Annotate_rows_log_event*) ev);
break; break;
case DELETE_ROWS_EVENT: case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT: case UPDATE_ROWS_EVENT:
...@@ -1593,11 +1593,11 @@ delete_or_keep_event_post_apply(Relay_log_info *rli, ...@@ -1593,11 +1593,11 @@ delete_or_keep_event_post_apply(Relay_log_info *rli,
event (if any) is not needed anymore and can be deleted. event (if any) is not needed anymore and can be deleted.
*/ */
if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
rli->free_annotate_event(); rgi->rli->free_annotate_event();
/* fall through */ /* fall through */
default: default:
DBUG_PRINT("info", ("Deleting the event after it has been executed")); DBUG_PRINT("info", ("Deleting the event after it has been executed"));
if (!rli->group_info->is_deferred_event(ev)) if (!rgi->is_deferred_event(ev))
delete ev; delete ev;
break; break;
} }
......
...@@ -314,8 +314,6 @@ public: ...@@ -314,8 +314,6 @@ public:
char slave_patternload_file[FN_REFLEN]; char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size; size_t slave_patternload_file_size;
/* ToDo: We need to remove this, always use the per-transaction one to work with parallel replication. */
struct rpl_group_info *group_info;
rpl_parallel parallel; rpl_parallel parallel;
Relay_log_info(bool is_slave_recovery); Relay_log_info(bool is_slave_recovery);
...@@ -657,7 +655,7 @@ extern struct rpl_slave_state rpl_global_gtid_slave_state; ...@@ -657,7 +655,7 @@ extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd); int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev); int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
void delete_or_keep_event_post_apply(Relay_log_info *rli, void delete_or_keep_event_post_apply(rpl_group_info *rgi,
Log_event_type typ, Log_event *ev); Log_event_type typ, Log_event *ev);
#endif /* RPL_RLI_H */ #endif /* RPL_RLI_H */
...@@ -3264,7 +3264,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3264,7 +3264,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
delete_or_keep_event_post_apply(rli, typ, ev); delete_or_keep_event_post_apply(serial_rgi, typ, ev);
/* /*
update_log_pos failed: this should not happen, so we don't update_log_pos failed: this should not happen, so we don't
...@@ -4189,13 +4189,6 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4189,13 +4189,6 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
} }
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
/*
ToDo: Get rid of this, all accesses to rpl_group_info must be made
per-worker-thread to work with parallel replication.
*/
if (opt_slave_parallel_threads <= 0)
rli->group_info= &serial_rgi;
/* Read queries from the IO/THREAD until this thread is killed */ /* Read queries from the IO/THREAD until this thread is killed */
while (!sql_slave_killed(thd,rli)) while (!sql_slave_killed(thd,rli))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment