BUG#29968 (rpl_ndb_circular.test and rpl_ndb_log.test fail):

Removing unguarded read of slave_running field from inside
terminate_slave_threads(). This could cause premature exit in the event
that the slave thread already were shutting down, but isn't finished yet.

The fields slave_running, io_thd, and sql_thread are guarded by an
associated run_lock. A read of these fields were not guarded inside
terminate_slave_threads(), which caused an assertion to fire. The
assertion was removed, and the code reorganized slightly.
parent 3584a4d4
...@@ -132,6 +132,11 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, ...@@ -132,6 +132,11 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
const char* table_name, bool overwrite); const char* table_name, bool overwrite);
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi); static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
static Log_event* next_event(RELAY_LOG_INFO* rli); static Log_event* next_event(RELAY_LOG_INFO* rli);
static int terminate_slave_thread(THD *thd,
pthread_mutex_t* term_lock,
pthread_cond_t* term_cond,
volatile uint *slave_running,
bool skip_lock);
/* /*
Find out which replications threads are running Find out which replications threads are running
...@@ -312,35 +317,26 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) ...@@ -312,35 +317,26 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(0); /* successfully do nothing */ DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL); int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
pthread_mutex_t *sql_cond_lock,*io_cond_lock;
sql_cond_lock=sql_lock; if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
io_cond_lock=io_lock;
if (skip_lock)
{
sql_lock = io_lock = 0;
}
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
{ {
DBUG_PRINT("info",("Terminating IO thread")); DBUG_PRINT("info",("Terminating IO thread"));
mi->abort_slave=1; mi->abort_slave=1;
if ((error=terminate_slave_thread(mi->io_thd,io_lock, if ((error=terminate_slave_thread(mi->io_thd,io_lock,
io_cond_lock,
&mi->stop_cond, &mi->stop_cond,
&mi->slave_running)) && &mi->slave_running,
skip_lock)) &&
!force_all) !force_all)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running) if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
{ {
DBUG_PRINT("info",("Terminating SQL thread")); DBUG_PRINT("info",("Terminating SQL thread"));
DBUG_ASSERT(mi->rli.sql_thd != 0) ;
mi->rli.abort_slave=1; mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock, if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
sql_cond_lock,
&mi->rli.stop_cond, &mi->rli.stop_cond,
&mi->rli.slave_running)) && &mi->rli.slave_running,
skip_lock)) &&
!force_all) !force_all)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -348,23 +344,60 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) ...@@ -348,23 +344,60 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
} }
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, /**
pthread_mutex_t *cond_lock, Wait for a slave thread to terminate.
This function is called after requesting the thread to terminate
(by setting @c abort_slave member of @c Relay_log_info or @c
Master_info structure to 1). Termination of the thread is
controlled with the the predicate <code>*slave_running</code>.
Function will acquire @c term_lock before waiting on the condition
unless @c skip_lock is true in which case the mutex should be owned
by the caller of this function and will remain acquired after
return from the function.
@param term_lock
Associated lock to use when waiting for @c term_cond
@param term_cond
Condition that is signalled when the thread has terminated
@param slave_running
Pointer to predicate to check for slave thread termination
@param skip_lock
If @c true the lock will not be acquired before waiting on
the condition. In this case, it is assumed that the calling
function acquires the lock before calling this function.
@retval 0 All OK
*/
static int
terminate_slave_thread(THD *thd,
pthread_mutex_t* term_lock,
pthread_cond_t* term_cond, pthread_cond_t* term_cond,
volatile uint *slave_running) volatile uint *slave_running,
bool skip_lock)
{ {
int error;
DBUG_ENTER("terminate_slave_thread"); DBUG_ENTER("terminate_slave_thread");
if (term_lock)
{ if (!skip_lock)
pthread_mutex_lock(term_lock); pthread_mutex_lock(term_lock);
safe_mutex_assert_owner(term_lock);
if (!*slave_running) if (!*slave_running)
{ {
if (!skip_lock)
pthread_mutex_unlock(term_lock); pthread_mutex_unlock(term_lock);
DBUG_RETURN(ER_SLAVE_NOT_RUNNING); DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
} }
}
DBUG_ASSERT(thd != 0); DBUG_ASSERT(thd != 0);
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
/* /*
Is is critical to test if the slave is running. Otherwise, we might Is is critical to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it be referening freed memory trying to kick it
...@@ -380,9 +413,13 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, ...@@ -380,9 +413,13 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
*/ */
struct timespec abstime; struct timespec abstime;
set_timespec(abstime,2); set_timespec(abstime,2);
pthread_cond_timedwait(term_cond, cond_lock, &abstime); error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
DBUG_ASSERT(error == ETIMEDOUT || error == 0);
} }
if (term_lock)
DBUG_ASSERT(*slave_running == 0);
if (!skip_lock)
pthread_mutex_unlock(term_lock); pthread_mutex_unlock(term_lock);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -133,10 +133,6 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli); ...@@ -133,10 +133,6 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli);
int register_slave_on_master(MYSQL* mysql); int register_slave_on_master(MYSQL* mysql);
int terminate_slave_threads(MASTER_INFO* mi, int thread_mask, int terminate_slave_threads(MASTER_INFO* mi, int thread_mask,
bool skip_lock = 0); bool skip_lock = 0);
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_mutex,
pthread_mutex_t* cond_lock,
pthread_cond_t* term_cond,
volatile uint* slave_running);
int start_slave_threads(bool need_slave_mutex, bool wait_for_start, int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
MASTER_INFO* mi, const char* master_info_fname, MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname, int thread_mask); const char* slave_info_fname, int thread_mask);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment