Commit b5a496a7 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

Fix some bugs around waiting for worker threads to end during SQL slave stop.

Free Log_event after parallel execution (still needs to be made thread-safe by
using rpl_group_info rather than rli).
parent a1cfd473
......@@ -16,6 +16,7 @@
- Error handling. If we fail in one of multiple parallel executions, we
need to make a best effort to complete prior transactions and roll back
following transactions, so slave binlog position will be correct.
And all the retry logic for temporary errors like deadlock.
- Stopping the slave needs to handle stopping all parallel executions. And
the logic in sql_slave_killed() that waits for current event group to
......@@ -73,7 +74,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
/* ToDo: error handling. */
/* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
}
......@@ -85,6 +85,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
uint64 event_gtid_sub_id= 0;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
......@@ -142,6 +143,7 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
uint64 wait_for_sub_id;
bool end_of_group;
if (event_type == GTID_EVENT)
{
......@@ -150,6 +152,9 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
/* Save this, as it gets cleared once event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
/*
Register ourself to wait for the previous commit, if we need to do
such registration _and_ that previous commit has not already
......@@ -173,13 +178,19 @@ handle_rpl_parallel_thread(void *arg)
rpt_handle_event(events, thd, rpt);
if (in_event_group)
{
if ((group_standalone && !Log_event::is_part_of_group(event_type)) ||
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))))
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
/* ToDo: must use rgi here, not rli, for thread safety. */
delete_or_keep_event_post_apply(rgi->rli, event_type, events->ev);
my_free(events);
if (end_of_group)
{
in_event_group= false;
......@@ -196,9 +207,9 @@ handle_rpl_parallel_thread(void *arg)
done and also no longer need waiting for.
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < rgi->gtid_sub_id)
if (entry->last_committed_sub_id < event_gtid_sub_id)
{
entry->last_committed_sub_id= rgi->gtid_sub_id;
entry->last_committed_sub_id= event_gtid_sub_id;
if (entry->need_signal)
mysql_cond_broadcast(&entry->COND_parallel_entry);
}
......@@ -207,9 +218,7 @@ handle_rpl_parallel_thread(void *arg)
rgi->commit_orderer.wakeup_subsequent_commits();
delete rgi;
}
}
my_free(events);
events= next;
}
......@@ -487,7 +496,7 @@ rpl_parallel::wait_for_done()
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
while (e->current_sub_id > e->last_commit_id)
while (e->current_sub_id > e->last_committed_sub_id)
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
mysql_mutex_unlock(&e->LOCK_parallel_entry);
}
......@@ -605,6 +614,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
*/
qev->rgi= serial_rgi;
rpt_handle_event(qev, parent_thd, NULL);
delete_or_keep_event_post_apply(rli, typ, qev->ev);
return false;
}
else
......
......@@ -1556,4 +1556,50 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
return 0;
}
void
delete_or_keep_event_post_apply(Relay_log_info *rli,
Log_event_type typ, Log_event *ev)
{
/*
ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
thread-safe for parallel replication.
*/
switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
/*
Format_description_log_event should not be deleted because it
will be used to read info about the relay log's format;
it will be deleted when the SQL thread does not need it,
i.e. when this thread terminates.
*/
break;
case ANNOTATE_ROWS_EVENT:
/*
Annotate_rows event should not be deleted because after it has
been applied, thd->query points to the string inside this event.
The thd->query will be used to generate new Annotate_rows event
during applying the subsequent Rows events.
*/
rli->set_annotate_event((Annotate_rows_log_event*) ev);
break;
case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case WRITE_ROWS_EVENT:
/*
After the last Rows event has been applied, the saved Annotate_rows
event (if any) is not needed anymore and can be deleted.
*/
if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
rli->free_annotate_event();
/* fall through */
default:
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
if (!rli->is_deferred_event(ev))
delete ev;
break;
}
}
#endif
......@@ -646,5 +646,7 @@ extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
void delete_or_keep_event_post_apply(Relay_log_info *rli,
Log_event_type typ, Log_event *ev);
#endif /* RPL_RLI_H */
......@@ -3264,41 +3264,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
switch (typ) {
case FORMAT_DESCRIPTION_EVENT:
/*
Format_description_log_event should not be deleted because it
will be used to read info about the relay log's format;
it will be deleted when the SQL thread does not need it,
i.e. when this thread terminates.
*/
break;
case ANNOTATE_ROWS_EVENT:
/*
Annotate_rows event should not be deleted because after it has
been applied, thd->query points to the string inside this event.
The thd->query will be used to generate new Annotate_rows event
during applying the subsequent Rows events.
*/
rli->set_annotate_event((Annotate_rows_log_event*) ev);
break;
case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case WRITE_ROWS_EVENT:
/*
After the last Rows event has been applied, the saved Annotate_rows
event (if any) is not needed anymore and can be deleted.
*/
if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
rli->free_annotate_event();
/* fall through */
default:
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
if (!rli->is_deferred_event(ev))
delete ev;
break;
}
delete_or_keep_event_post_apply(rli, typ, ev);
/*
update_log_pos failed: this should not happen, so we don't
......@@ -4363,6 +4329,14 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
err:
/*
Once again, in case we aborted with an error and skipped the first one.
(We want the first one to be before the printout of stop position to
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
rli->parallel.wait_for_done();
/*
Some events set some playgrounds, which won't be cleared because thread
stops. Stopping of this thread may not be known to these events ("stop"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment