Commit a1cfd473 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

Wait for all worker threads to finish when stopping the SQL thread.
(Only a basic wait; this still needs to be fixed to include timeout
logic as in sql_slave_killed()).
parent 592e464a
...@@ -44,10 +44,9 @@ query_vertical SHOW SLAVE STATUS; ...@@ -44,10 +44,9 @@ query_vertical SHOW SLAVE STATUS;
--source include/start_slave.inc --source include/start_slave.inc
SELECT * FROM t1; SELECT * FROM t1;
--sleep 1
SELECT * FROM t1;
--source include/stop_slave.inc --source include/stop_slave.inc
SELECT * FROM t1;
--connection s1 --connection s1
SET sql_log_bin=0; SET sql_log_bin=0;
......
...@@ -894,7 +894,8 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, ...@@ -894,7 +894,8 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit; key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
static PSI_cond_info all_server_conds[]= static PSI_cond_info all_server_conds[]=
{ {
...@@ -938,7 +939,8 @@ static PSI_cond_info all_server_conds[]= ...@@ -938,7 +939,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0}, { &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0} { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0}
}; };
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
......
...@@ -283,7 +283,8 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, ...@@ -283,7 +283,8 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit; key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool; extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main, key_thread_handle_manager, key_thread_kill_server, key_thread_main,
......
...@@ -21,6 +21,9 @@ ...@@ -21,6 +21,9 @@
the logic in sql_slave_killed() that waits for current event group to the logic in sql_slave_killed() that waits for current event group to
complete needs to be extended appropriately... complete needs to be extended appropriately...
- Audit the use of Relay_log_info::data_lock. Make sure it is held
correctly in all needed places also when using parallel replication.
- We need some user-configurable limit on how far ahead the SQL thread will - We need some user-configurable limit on how far ahead the SQL thread will
fetch and queue events for parallel execution (otherwise if slave gets fetch and queue events for parallel execution (otherwise if slave gets
behind we will fill up memory with pending malloc()'ed events). behind we will fill up memory with pending malloc()'ed events).
...@@ -194,7 +197,11 @@ handle_rpl_parallel_thread(void *arg) ...@@ -194,7 +197,11 @@ handle_rpl_parallel_thread(void *arg)
*/ */
mysql_mutex_lock(&entry->LOCK_parallel_entry); mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < rgi->gtid_sub_id) if (entry->last_committed_sub_id < rgi->gtid_sub_id)
{
entry->last_committed_sub_id= rgi->gtid_sub_id; entry->last_committed_sub_id= rgi->gtid_sub_id;
if (entry->need_signal)
mysql_cond_broadcast(&entry->COND_parallel_entry);
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
rgi->commit_orderer.wakeup_subsequent_commits(); rgi->commit_orderer.wakeup_subsequent_commits();
...@@ -463,12 +470,30 @@ rpl_parallel::find(uint32 domain_id) ...@@ -463,12 +470,30 @@ rpl_parallel::find(uint32 domain_id)
} }
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
MY_MUTEX_INIT_FAST); MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
} }
return e; return e;
} }
void
rpl_parallel::wait_for_done()
{
struct rpl_parallel_entry *e;
uint32 i;
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
while (e->current_sub_id > e->last_commit_id)
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
mysql_mutex_unlock(&e->LOCK_parallel_entry);
}
}
bool bool
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
THD *parent_thd) THD *parent_thd)
......
...@@ -50,6 +50,7 @@ struct rpl_parallel_entry { ...@@ -50,6 +50,7 @@ struct rpl_parallel_entry {
uint64 last_seq_no; uint64 last_seq_no;
uint64 last_commit_id; uint64 last_commit_id;
bool active; bool active;
bool need_signal;
rpl_parallel_thread *rpl_thread; rpl_parallel_thread *rpl_thread;
/* /*
The sub_id of the last transaction to commit within this domain_id. The sub_id of the last transaction to commit within this domain_id.
...@@ -57,6 +58,7 @@ struct rpl_parallel_entry { ...@@ -57,6 +58,7 @@ struct rpl_parallel_entry {
*/ */
uint64 last_committed_sub_id; uint64 last_committed_sub_id;
mysql_mutex_t LOCK_parallel_entry; mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
uint64 current_sub_id; uint64 current_sub_id;
struct rpl_group_info *current_group_info; struct rpl_group_info *current_group_info;
}; };
...@@ -67,6 +69,7 @@ struct rpl_parallel { ...@@ -67,6 +69,7 @@ struct rpl_parallel {
rpl_parallel(); rpl_parallel();
~rpl_parallel(); ~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id); rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd); bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd);
}; };
......
...@@ -4342,6 +4342,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ ...@@ -4342,6 +4342,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
rli->executed_entries++; rli->executed_entries++;
} }
if (opt_slave_parallel_threads > 0)
rli->parallel.wait_for_done();
/* Thread stopped. Print the current replication position to the log */ /* Thread stopped. Print the current replication position to the log */
{ {
String tmp; String tmp;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment