Commit ed04c40b authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-5289: master server starts slave parallel threads

Delay spawning parallel replication worker threads until a slave SQL
thread is running, and de-spawn them when the last SQL thread stops.

This is especially useful to avoid needless threads on a master in a
setup where same my.cnf is used on masters and slaves.
parent a7fd11b3
...@@ -4,8 +4,22 @@ SET GLOBAL slave_parallel_threads=10; ...@@ -4,8 +4,22 @@ SET GLOBAL slave_parallel_threads=10;
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10; SET GLOBAL slave_parallel_threads=10;
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
OK
CHANGE MASTER TO master_use_gtid=slave_pos; CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc include/start_slave.inc
SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
OK
include/stop_slave.inc
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
OK
include/start_slave.inc
SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*)))
OK
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
......
...@@ -12,9 +12,20 @@ SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; ...@@ -12,9 +12,20 @@ SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10; SET GLOBAL slave_parallel_threads=10;
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10; SET GLOBAL slave_parallel_threads=10;
# Check that we do not spawn any worker threads when no slave is running.
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
CHANGE MASTER TO master_use_gtid=slave_pos; CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc --source include/start_slave.inc
# Check that worker threads get spawned when slave starts.
SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
# ... and that worker threads get removed when slave stops.
--source include/stop_slave.inc
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
--source include/start_slave.inc
SELECT IF(COUNT(*) >= 10, "OK", CONCAT("Found too few system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** --echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
......
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
must be zero because of default IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
0 OK
SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
must be 20 because of .cnf
20
SELECT @@SESSION.slave_parallel_threads as 'no session var'; SELECT @@SESSION.slave_parallel_threads as 'no session var';
ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable
SET GLOBAL slave_parallel_threads= 0; SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT; SET GLOBAL slave_parallel_threads= DEFAULT;
SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
must be 0 because of default
0
SET GLOBAL slave_parallel_threads= 10; SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads; SELECT @@GLOBAL.slave_parallel_threads;
@@GLOBAL.slave_parallel_threads @@GLOBAL.slave_parallel_threads
10 10
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*)))
OK
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
# Use default setting for mysqld processes
!include include/default_mysqld.cnf
[mysqld.1]
slave_parallel_threads=20
...@@ -2,13 +2,20 @@ ...@@ -2,13 +2,20 @@
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; # Check that we don't spawn worker threads at server startup, when no
# slave is configured (MDEV-5289).
SELECT IF(COUNT(*) < 20, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
SELECT @@GLOBAL.slave_parallel_threads as 'must be 20 because of .cnf';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR --error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.slave_parallel_threads as 'no session var'; SELECT @@SESSION.slave_parallel_threads as 'no session var';
SET GLOBAL slave_parallel_threads= 0; SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT; SET GLOBAL slave_parallel_threads= DEFAULT;
SELECT @@GLOBAL.slave_parallel_threads as 'must be 0 because of default';
SET GLOBAL slave_parallel_threads= 10; SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads; SELECT @@GLOBAL.slave_parallel_threads;
# Check that we don't spawn worker threads when no slave is started.
SELECT IF(COUNT(*) < 10, "OK", CONCAT("Found too many system user processes: ", COUNT(*))) FROM information_schema.processlist WHERE user = "system user";
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
...@@ -1248,7 +1248,7 @@ bool Master_info_index::remove_master_info(LEX_STRING *name) ...@@ -1248,7 +1248,7 @@ bool Master_info_index::remove_master_info(LEX_STRING *name)
bool Master_info_index::give_error_if_slave_running() bool Master_info_index::give_error_if_slave_running()
{ {
DBUG_ENTER("warn_if_slave_running"); DBUG_ENTER("give_error_if_slave_running");
mysql_mutex_assert_owner(&LOCK_active_mi); mysql_mutex_assert_owner(&LOCK_active_mi);
if (!this) // master_info_index is set to NULL on server shutdown if (!this) // master_info_index is set to NULL on server shutdown
return TRUE; return TRUE;
...@@ -1268,6 +1268,32 @@ bool Master_info_index::give_error_if_slave_running() ...@@ -1268,6 +1268,32 @@ bool Master_info_index::give_error_if_slave_running()
} }
/**
Master_info_index::any_slave_sql_running()
The LOCK_active_mi must be held while calling this function.
@return
TRUE If some slave SQL thread is running.
FALSE No slave SQL thread is running
*/
bool Master_info_index::any_slave_sql_running()
{
DBUG_ENTER("any_slave_sql_running");
if (!this) // master_info_index is set to NULL on server shutdown
return TRUE;
for (uint i= 0; i< master_info_hash.records; ++i)
{
Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i);
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
/** /**
Master_info_index::start_all_slaves() Master_info_index::start_all_slaves()
......
...@@ -218,6 +218,7 @@ public: ...@@ -218,6 +218,7 @@ public:
Master_info *get_master_info(LEX_STRING *connection_name, Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning); Sql_condition::enum_warning_level warning);
bool give_error_if_slave_running(); bool give_error_if_slave_running();
bool any_slave_sql_running();
bool start_all_slaves(THD *thd); bool start_all_slaves(THD *thd);
bool stop_all_slaves(THD *thd); bool stop_all_slaves(THD *thd);
}; };
......
...@@ -944,9 +944,9 @@ dealloc_gco(group_commit_orderer *gco) ...@@ -944,9 +944,9 @@ dealloc_gco(group_commit_orderer *gco)
} }
int static int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool skip_check) uint32 new_count)
{ {
uint32 i; uint32 i;
rpl_parallel_thread **new_list= NULL; rpl_parallel_thread **new_list= NULL;
...@@ -991,24 +991,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -991,24 +991,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
new_free_list= new_list[i]; new_free_list= new_list[i];
} }
if (!skip_check)
{
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index->give_error_if_slave_running())
{
mysql_mutex_unlock(&LOCK_active_mi);
goto err;
}
if (pool->changing)
{
mysql_mutex_unlock(&LOCK_active_mi);
my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0));
goto err;
}
pool->changing= true;
mysql_mutex_unlock(&LOCK_active_mi);
}
/* /*
Grab each old thread in turn, and signal it to stop. Grab each old thread in turn, and signal it to stop.
...@@ -1068,13 +1050,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, ...@@ -1068,13 +1050,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
} }
if (!skip_check)
{
mysql_mutex_lock(&LOCK_active_mi);
pool->changing= false;
mysql_mutex_unlock(&LOCK_active_mi);
}
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
mysql_cond_broadcast(&pool->COND_rpl_thread_pool); mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
...@@ -1101,16 +1076,26 @@ err: ...@@ -1101,16 +1076,26 @@ err:
} }
my_free(new_list); my_free(new_list);
} }
if (!skip_check)
{
mysql_mutex_lock(&LOCK_active_mi);
pool->changing= false;
mysql_mutex_unlock(&LOCK_active_mi);
}
return 1; return 1;
} }
int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{
if (!pool->count)
return rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads);
return 0;
}
int
rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
{
return rpl_parallel_change_thread_count(pool, 0);
}
void void
rpl_parallel_thread::batch_free() rpl_parallel_thread::batch_free()
{ {
...@@ -1354,7 +1339,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) ...@@ -1354,7 +1339,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
rpl_parallel_thread_pool::rpl_parallel_thread_pool() rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), changing(false), inited(false) : count(0), threads(0), free_list(0), inited(false)
{ {
} }
...@@ -1369,10 +1354,14 @@ rpl_parallel_thread_pool::init(uint32 size) ...@@ -1369,10 +1354,14 @@ rpl_parallel_thread_pool::init(uint32 size)
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL);
changing= false;
inited= true; inited= true;
return rpl_parallel_change_thread_count(this, size, true); /*
The pool is initially empty. Threads will be spawned when a slave SQL
thread is started.
*/
return 0;
} }
...@@ -1381,7 +1370,7 @@ rpl_parallel_thread_pool::destroy() ...@@ -1381,7 +1370,7 @@ rpl_parallel_thread_pool::destroy()
{ {
if (!inited) if (!inited)
return; return;
rpl_parallel_change_thread_count(this, 0, true); rpl_parallel_change_thread_count(this, 0);
mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false; inited= false;
......
...@@ -204,7 +204,6 @@ struct rpl_parallel_thread_pool { ...@@ -204,7 +204,6 @@ struct rpl_parallel_thread_pool {
struct rpl_parallel_thread *free_list; struct rpl_parallel_thread *free_list;
mysql_mutex_t LOCK_rpl_thread_pool; mysql_mutex_t LOCK_rpl_thread_pool;
mysql_cond_t COND_rpl_thread_pool; mysql_cond_t COND_rpl_thread_pool;
bool changing;
bool inited; bool inited;
rpl_parallel_thread_pool(); rpl_parallel_thread_pool();
...@@ -314,9 +313,8 @@ struct rpl_parallel { ...@@ -314,9 +313,8 @@ struct rpl_parallel {
extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
uint32 new_count, extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
bool skip_check= false);
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
#endif /* RPL_PARALLEL_H */ #endif /* RPL_PARALLEL_H */
...@@ -652,6 +652,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) ...@@ -652,6 +652,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
if (opt_slave_parallel_threads > 0 &&
!master_info_index->any_slave_sql_running())
rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
} }
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{ {
...@@ -958,7 +962,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, ...@@ -958,7 +962,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi); mi);
if (!error && (thread_mask & SLAVE_SQL)) if (!error && (thread_mask & SLAVE_SQL))
{ {
error= start_slave_thread( if (opt_slave_parallel_threads > 0)
error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
if (!error)
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql, key_thread_slave_sql,
#endif #endif
......
...@@ -1752,16 +1752,12 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) ...@@ -1752,16 +1752,12 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
static bool static bool
fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type) fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
{ {
bool running; bool err;
bool err= false;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running(); err= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
opt_slave_parallel_threads))
err= true;
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return err; return err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment