Commit f36f9d00 authored by monty@hundin.mysql.fi's avatar monty@hundin.mysql.fi

Fixed bug in wait_for_update() that I had introduced.

Changed option variables to my_bool (to avoid bugs in my_getopt())
Added new thread specific mutex LOCK_delete to be able to free LOCK_thread_count early.
Changed usage of LOCK_thread_count -> LOCK_status for statistics variables
parent 4ab6d8c4
......@@ -384,7 +384,6 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
......
......@@ -343,6 +343,7 @@ SLAVE_MYPID="$MYRUN_DIR/slave.pid"
SLAVE_MYLOG="$MYSQL_TEST_DIR/var/log/slave.log"
SLAVE_MYERR="$MYSQL_TEST_DIR/var/log/slave.err"
CURRENT_TEST="$MYSQL_TEST_DIR/var/log/current_test"
SMALL_SERVER="-O key_buffer_size=1M -O sort_buffer=256K -O max_heap_table_size=1M"
export MASTER_MYPORT
......@@ -1034,6 +1035,7 @@ run_testcase ()
master_init_script=$TESTDIR/$tname-master.sh
slave_init_script=$TESTDIR/$tname-slave.sh
slave_master_info_file=$TESTDIR/$tname-slave-master-info.opt
echo $tname > $CURRENT_TEST
SKIP_SLAVE=`$EXPR \( $tname : rpl \) = 0`
if [ $USE_MANAGER = 1 ] ; then
many_slaves=`$EXPR \( $tname : rpl_failsafe \) != 0`
......
......@@ -4,7 +4,6 @@ reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
drop table if exists t1,t2;
create table t2(n int);
create table t1(n int not null auto_increment primary key);
insert into t1 values (NULL),(NULL);
......
......@@ -2,7 +2,6 @@
# COM_BINLOG_DUMP and additionally limits the number of events per dump
source include/master-slave.inc;
drop table if exists t1,t2;
create table t2(n int);
create table t1(n int not null auto_increment primary key);
......
......@@ -810,12 +810,8 @@ void MYSQL_LOG::new_file(bool need_lock)
if (log_type == LOG_BIN)
{
if (generate_new_name(new_name, name))
{
/* Error; Continue using old log file */
if (need_lock)
VOID(pthread_mutex_unlock(&LOCK_log));
return; // Something went wrong
}
goto end; /* Error; Continue using old log file */
new_name_ptr=new_name;
if (!no_auto_events)
{
......@@ -853,6 +849,7 @@ void MYSQL_LOG::new_file(bool need_lock)
no_auto_events);
my_free(old_name,MYF(0));
end:
if (need_lock)
{
pthread_mutex_unlock(&LOCK_index);
......@@ -1358,16 +1355,24 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
NOTES
One must have a lock on LOCK_log before calling this function.
This lock will be freed before return!
The reason for the above is that for enter_cond() / exit_cond() to
work the mutex must be got before enter_cond() but releases before
exit_cond().
If you don't do it this way, you will get a deadlock in THD::awake()
*/
void MYSQL_LOG:: wait_for_update(THD* thd)
{
safe_mutex_assert_owner(&LOCK_log);
const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
"Slave: waiting for binlog update");
pthread_cond_wait(&update_cond, &LOCK_log);
pthread_mutex_unlock(&LOCK_log); // See NOTES
thd->exit_cond(old_msg);
}
}
/*
......
......@@ -353,7 +353,7 @@ mc_net_safe_read(MYSQL *mysql)
else
strmov(net->last_error, "Packet too large - increase \
max_allowed_packet on this server");
}
}
return(packet_error);
}
if (net->read_pos[0] == 255)
......@@ -671,7 +671,7 @@ mc_mysql_connect(MYSQL *mysql,const char *host, const char *user,
if ((pkt_length=mc_net_safe_read(mysql)) == packet_error)
goto error;
/* Check if version of protocoll matches current one */
/* Check if version of protocol matches current one */
mysql->protocol_version= net->read_pos[0];
DBUG_DUMP("packet",(char*) net->read_pos,10);
......
......@@ -637,15 +637,16 @@ extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables;
extern uint delay_key_write_options;
extern bool opt_endinfo, using_udf_functions, locked_in_memory;
extern bool opt_using_transactions, use_temp_pool, mysql_embedded;
extern bool opt_using_transactions, mysql_embedded;
extern bool using_update_log, opt_large_files;
extern bool opt_log, opt_update_log, opt_bin_log, opt_slow_log;
extern bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern bool opt_disable_networking, opt_skip_show_db, opt_enable_named_pipe;
extern bool opt_disable_networking, opt_skip_show_db;
extern bool volatile abort_loop, shutdown_in_progress, grant_option;
extern uint volatile thread_count, thread_running, global_read_lock;
extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern my_bool opt_safe_show_db, opt_local_infile, lower_case_table_names;
extern my_bool opt_slave_compressed_protocol;
extern my_bool opt_slave_compressed_protocol, use_temp_pool;
extern my_bool opt_enable_named_pipe;
extern char f_fyllchar;
extern MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log;
......@@ -656,7 +657,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status,
LOCK_grant, LOCK_error_log, LOCK_delayed_insert,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_server_id, LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables;
extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
extern pthread_attr_t connection_attrib;
......
......@@ -246,18 +246,18 @@ bool opt_large_files= sizeof(my_off_t) > 4;
Variables to store startup options
*/
bool opt_skip_slave_start = 0; // If set, slave is not autostarted
my_bool opt_skip_slave_start = 0; // If set, slave is not autostarted
/*
If set, some standard measures to enforce slave data integrity will not
be performed
*/
bool opt_reckless_slave = 0;
my_bool opt_reckless_slave = 0;
ulong back_log, connect_timeout, concurrency;
char mysql_home[FN_REFLEN], pidfile_name[FN_REFLEN], time_zone[30];
bool opt_log, opt_update_log, opt_bin_log, opt_slow_log;
bool opt_disable_networking=0, opt_skip_show_db=0;
bool opt_enable_named_pipe= 0;
my_bool opt_enable_named_pipe= 0;
my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol;
uint delay_key_write_options= (uint) DELAY_KEY_WRITE_ON;
......@@ -272,11 +272,12 @@ static my_string opt_logname=0,opt_update_logname=0,
static char* mysql_home_ptr= mysql_home;
static char* pidfile_name_ptr= pidfile_name;
static pthread_t select_thread;
static bool opt_noacl, opt_bootstrap=0, opt_myisam_log=0;
bool opt_sql_bin_update = 0, opt_log_slave_updates = 0;
bool opt_safe_user_create = 0, opt_no_mix_types = 0;
static my_bool opt_noacl=0, opt_bootstrap=0, opt_myisam_log=0;
my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_safe_show_db=0, lower_case_table_names, opt_old_rpl_compat;
my_bool opt_show_slave_auth_info;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
volatile bool mqh_used = 0;
FILE *bootstrap_file=0;
int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice
......@@ -408,7 +409,7 @@ TYPELIB sql_mode_typelib= {array_elements(sql_mode_names)-1,"",
sql_mode_names};
MY_BITMAP temp_pool;
bool use_temp_pool=0;
my_bool use_temp_pool=0;
pthread_key(MEM_ROOT*,THR_MALLOC);
pthread_key(THD*, THR_THD);
......@@ -418,7 +419,7 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
LOCK_error_log,
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
LOCK_server_id, LOCK_global_system_variables,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi;
pthread_cond_t COND_refresh,COND_thread_count, COND_slave_stopped,
......@@ -1553,10 +1554,16 @@ static void *signal_hand(void *arg __attribute__((unused)))
}
#endif /* HAVE_STACK_TRACE_ON_SEGV */
// signal to start_signal_handler that we are ready
/*
signal to start_signal_handler that we are ready
This works by waiting for start_signal_handler to free mutex,
after which we signal it that we are ready.
At this pointer there is no other threads running, so there
should not be any other pthread_cond_signal() calls.
*/
(void) pthread_mutex_lock(&LOCK_thread_count);
(void) pthread_cond_signal(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_sigmask(SIG_BLOCK,&set,NULL);
for (;;)
......@@ -1860,7 +1867,6 @@ int main(int argc, char **argv)
(void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
......@@ -2170,9 +2176,7 @@ The server will not act as a slave.");
}
}
while (handler_count > 0)
{
pthread_cond_wait(&COND_handler_count,&LOCK_thread_count);
}
}
pthread_mutex_unlock(&LOCK_thread_count);
}
......@@ -2194,8 +2198,8 @@ The server will not act as a slave.");
(void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex"));
select_thread_in_use=0; // For close_connections
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
#ifdef EXTRA_DEBUG2
sql_print_error("After lock_thread_count");
#endif
......@@ -2204,9 +2208,7 @@ The server will not act as a slave.");
/* Wait until cleanup is done */
(void) pthread_mutex_lock(&LOCK_thread_count);
while (!ready_to_exit)
{
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
}
(void) pthread_mutex_unlock(&LOCK_thread_count);
#if defined(__WIN__) && !defined(EMBEDDED_LIBRARY)
......@@ -2794,8 +2796,8 @@ pthread_handler_decl(handle_connections_namedpipes,arg)
pthread_mutex_lock(&LOCK_thread_count);
handler_count--;
pthread_cond_signal(&COND_handler_count);
pthread_mutex_unlock(&LOCK_thread_count);
pthread_cond_signal(&COND_handler_count);
DBUG_RETURN(0);
}
#endif /* __NT__ */
......@@ -3144,25 +3146,23 @@ struct my_option my_long_options[] =
"Syntax: myisam-recover[=option[,option...]], where option can be DEFAULT, BACKUP or FORCE.",
(gptr*) &myisam_recover_options_str, (gptr*) &myisam_recover_options_str, 0,
GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
/*
Option needs to be available for the test case to pass in non-debugging
mode. is a no-op.
*/
{"memlock", OPT_MEMLOCK, "Lock mysqld in memory", (gptr*) &locked_in_memory,
(gptr*) &locked_in_memory, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"disconnect-slave-event-count", OPT_DISCONNECT_SLAVE_EVENT_COUNT,
"Undocumented: Meant for debugging and testing of replication",
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &disconnect_slave_event_count,
(gptr*) &disconnect_slave_event_count, 0, GET_INT, REQUIRED_ARG, 0, 0, 0,
0, 0, 0},
{"abort-slave-event-count", OPT_ABORT_SLAVE_EVENT_COUNT,
"Undocumented: Meant for debugging and testing of replication",
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &abort_slave_event_count, (gptr*) &abort_slave_event_count,
0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"max-binlog-dump-events", OPT_MAX_BINLOG_DUMP_EVENTS, "Undocumented",
{"max-binlog-dump-events", OPT_MAX_BINLOG_DUMP_EVENTS,
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &max_binlog_dump_events, (gptr*) &max_binlog_dump_events, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"sporadic-binlog-dump-fail", OPT_SPORADIC_BINLOG_DUMP_FAIL, "Undocumented",
{"sporadic-binlog-dump-fail", OPT_SPORADIC_BINLOG_DUMP_FAIL,
"Option used by mysql-test for debugging and testing of replication",
(gptr*) &opt_sporadic_binlog_dump_fail,
(gptr*) &opt_sporadic_binlog_dump_fail, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0},
......@@ -3176,7 +3176,7 @@ struct my_option my_long_options[] =
(gptr*) &opt_no_mix_types, (gptr*) &opt_no_mix_types, 0, GET_BOOL, NO_ARG,
0, 0, 0, 0, 0, 0},
#endif
{"old-protocol", 'o', "Use the old (3.20) protocol",
{"old-protocol", 'o', "Use the old (3.20) protocol client/server protocol",
(gptr*) &protocol_version, (gptr*) &protocol_version, 0, GET_UINT, NO_ARG,
PROTOCOL_VERSION, 0, 0, 0, 0, 0},
{"old-rpl-compat", OPT_OLD_RPL_COMPAT,
......
......@@ -276,9 +276,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
return 1;
}
linfo.index_file_offset = 0;
if (mysql_bin_log.find_log_pos(&linfo, NullS))
if (mysql_bin_log.find_log_pos(&linfo, NullS, 1))
{
strmov(errmsg,"Could not find first log");
return 1;
......@@ -332,7 +330,7 @@ int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
strmov(last_log_name, linfo.log_file_name);
last_pos = my_b_tell(&log);
switch (mysql_bin_log.find_next_log(&linfo)) {
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case LOG_INFO_EOF:
if (last_file >= 0)
(void)my_close(last_file, MYF(MY_WME));
......
......@@ -230,7 +230,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
Test to see if the previous run was with the skip of purging
If yes, we do not purge when we restart
*/
if (rli->relay_log.find_log_pos(&rli->linfo,NullS))
if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
{
*errmsg="Could not find first log during relay log initialization";
goto err;
......@@ -240,7 +240,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
{
if (strcmp(log, rli->linfo.log_file_name))
rli->skip_log_purge=1; // Different name; Don't purge
if (rli->relay_log.find_log_pos(&rli->linfo, log))
if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{
*errmsg="Could not find target log during relay log initialization";
goto err;
......@@ -1201,7 +1201,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
rli->log_space_total = 0;
if (rli->relay_log.find_log_pos(&linfo, NullS))
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{
sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1);
......@@ -1210,7 +1210,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
if (add_relay_log(rli,&linfo))
DBUG_RETURN(1);
} while (!rli->relay_log.find_next_log(&linfo));
} while (!rli->relay_log.find_next_log(&linfo, 1));
DBUG_RETURN(0);
}
......@@ -2538,9 +2538,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
{
last_errno=mc_mysql_errno(mysql);
suppress_warnings= 0;
sql_print_error("Slave I/O thread: error connecting to master \
sql_print_error("Slave I/O thread: error %s to master \
'%s@%s:%d': \
Error: '%s' errno: %d retry-time: %d retries: %d",
(reconnect ? "reconnecting" : "connecting"),
mi->user,mi->host,mi->port,
mc_mysql_error(mysql), last_errno,
mi->connect_retry,
......@@ -2771,8 +2772,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
update. If we do not, show slave status will block
*/
pthread_mutex_unlock(&rli->data_lock);
/* Note that wait_for_update unlocks lock_log ! */
rli->relay_log.wait_for_update(rli->sql_thd);
pthread_mutex_unlock(log_lock);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
......
......@@ -30,7 +30,8 @@ extern bool use_slave_mask;
extern char* slave_load_tmpdir;
extern my_string master_info_file,relay_log_info_file;
extern my_string opt_relay_logname, opt_relaylog_index_name;
extern bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
extern ulong relay_log_space_limit;
struct st_master_info;
......@@ -408,7 +409,6 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg);
extern bool opt_log_slave_updates ;
pthread_handler_decl(handle_slave_io,arg);
pthread_handler_decl(handle_slave_sql,arg);
extern bool volatile abort_loop;
......
......@@ -116,8 +116,8 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
active_vio = 0;
pthread_mutex_init(&active_vio_lock, MY_MUTEX_INIT_FAST);
#endif
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
/* Variables with default values */
proc_info="login";
......@@ -189,6 +189,10 @@ THD::~THD()
{
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
/* Ensure that no one is using THD */
pthread_mutex_lock(&LOCK_delete);
pthread_mutex_unlock(&LOCK_delete);
/* Close connection */
if (net.vio)
{
......@@ -217,18 +221,19 @@ THD::~THD()
free_root(&mem_root,MYF(0));
free_root(&transaction.mem_root,MYF(0));
mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
pthread_mutex_destroy(&active_vio_lock);
#endif
pthread_mutex_destroy(&LOCK_delete);
#ifndef DBUG_OFF
dbug_sentry = THD_SENTRY_GONE;
#endif
DBUG_VOID_RETURN;
}
void THD::awake(bool prepare_to_die)
{
THD_CHECK_SENTRY(this);
safe_mutex_assert_owner(&LOCK_delete);
if (prepare_to_die)
killed = 1;
thr_alarm_kill(real_id);
......
......@@ -141,8 +141,8 @@ public:
// iterating through the log index file
int find_log_pos(LOG_INFO* linfo, const char* log_name,
bool need_mutex=1);
int find_next_log(LOG_INFO* linfo, bool need_mutex=1);
bool need_mutex);
int find_next_log(LOG_INFO* linfo, bool need_mutex);
int get_current_log(LOG_INFO* linfo);
uint next_file_id();
......@@ -330,7 +330,8 @@ public:
struct sockaddr_in remote; // client socket address
struct rand_struct rand; // used for authentication
struct system_variables variables; // Changeable local variables
pthread_mutex_t LOCK_delete; // Locked before thd is deleted
char *query; // Points to the current query,
/*
A pointer to the stack frame of handle_one_connection(),
......@@ -410,7 +411,6 @@ public:
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
Vio* active_vio;
pthread_mutex_t active_vio_lock;
#endif
ulonglong next_insert_id,last_insert_id,current_insert_id,
limit_found_rows;
......@@ -465,25 +465,25 @@ public:
#ifdef SIGNAL_WITH_VIO_CLOSE
inline void set_active_vio(Vio* vio)
{
pthread_mutex_lock(&active_vio_lock);
pthread_mutex_lock(&LOCK_delete);
active_vio = vio;
pthread_mutex_unlock(&active_vio_lock);
pthread_mutex_unlock(&LOCK_delete);
}
inline void clear_active_vio()
{
pthread_mutex_lock(&active_vio_lock);
pthread_mutex_lock(&LOCK_delete);
active_vio = 0;
pthread_mutex_unlock(&active_vio_lock);
pthread_mutex_unlock(&LOCK_delete);
}
inline void close_active_vio()
{
pthread_mutex_lock(&active_vio_lock);
pthread_mutex_lock(&LOCK_delete);
if (active_vio)
{
vio_close(active_vio);
active_vio = 0;
}
pthread_mutex_unlock(&active_vio_lock);
pthread_mutex_unlock(&LOCK_delete);
}
#endif
void awake(bool prepare_to_die);
......
......@@ -562,8 +562,8 @@ public:
thd.user=thd.host=0;
thread_count--;
delayed_insert_threads--;
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
VOID(pthread_mutex_unlock(&LOCK_thread_count));
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
}
/* The following is for checking when we can delete ourselves */
......
......@@ -585,7 +585,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (!(test_flags & TEST_NO_THREADS) & my_thread_init())
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_thread_count);
statistic_increment(aborted_connects,&LOCK_status);
end_thread(thd,0);
return 0;
}
......@@ -612,7 +612,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (thd->store_globals())
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_thread_count);
statistic_increment(aborted_connects,&LOCK_status);
end_thread(thd,0);
return 0;
}
......@@ -634,7 +634,7 @@ pthread_handler_decl(handle_one_connection,arg)
if (vio_type(net->vio) == VIO_TYPE_NAMEDPIPE)
sleep(1); /* must wait after eof() */
#endif
statistic_increment(aborted_connects,&LOCK_thread_count);
statistic_increment(aborted_connects,&LOCK_status);
goto end_thread;
}
......@@ -668,7 +668,7 @@ pthread_handler_decl(handle_one_connection,arg)
(net->last_errno ? ER(net->last_errno) :
ER(ER_UNKNOWN_ERROR)));
send_error(net,net->last_errno,NullS);
thread_safe_increment(aborted_threads,&LOCK_thread_count);
thread_safe_increment(aborted_threads,&LOCK_status);
}
end_thread:
......@@ -757,8 +757,8 @@ pthread_handler_decl(handle_bootstrap,arg)
end:
(void) pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
(void) pthread_cond_broadcast(&COND_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
(void) pthread_cond_broadcast(&COND_thread_count);
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); // Never reached
......@@ -883,7 +883,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->lex.select_lex.options=0; // We store status here
switch (command) {
case COM_INIT_DB:
thread_safe_increment(com_stat[SQLCOM_CHANGE_DB],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_CHANGE_DB],&LOCK_status);
if (!mysql_change_db(thd,packet))
mysql_log.write(thd,command,"%s",thd->db);
break;
......@@ -895,7 +895,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
case COM_TABLE_DUMP:
{
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other, &LOCK_status);
slow_command = TRUE;
uint db_len = *(uchar*)packet;
uint tbl_len = *(uchar*)(packet + db_len + 1);
......@@ -912,7 +912,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
case COM_CHANGE_USER:
{
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other,&LOCK_status);
char *user= (char*) packet;
char *passwd= strend(user)+1;
char *db= strend(passwd)+1;
......@@ -992,7 +992,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
{
char *fields;
TABLE_LIST table_list;
thread_safe_increment(com_stat[SQLCOM_SHOW_FIELDS],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_SHOW_FIELDS],&LOCK_status);
bzero((char*) &table_list,sizeof(table_list));
if (!(table_list.db=thd->db))
{
......@@ -1027,7 +1027,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
case COM_CREATE_DB: // QQ: To be removed
{
thread_safe_increment(com_stat[SQLCOM_CREATE_DB],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_CREATE_DB],&LOCK_status);
char *db=thd->strdup(packet);
// null test to handle EOM
if (!db || !strip_sp(db) || check_db_name(db))
......@@ -1045,7 +1045,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
case COM_DROP_DB: // QQ: To be removed
{
thread_safe_increment(com_stat[SQLCOM_DROP_DB],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_DROP_DB],&LOCK_status);
char *db=thd->strdup(packet);
// null test to handle EOM
if (!db || !strip_sp(db) || check_db_name(db))
......@@ -1066,7 +1066,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
case COM_BINLOG_DUMP:
{
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other,&LOCK_status);
slow_command = TRUE;
if (check_global_access(thd, REPL_SLAVE_ACL))
break;
......@@ -1078,11 +1078,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* TODO: The following has to be changed to an 8 byte integer */
pos = uint4korr(packet);
flags = uint2korr(packet + 4);
pthread_mutex_lock(&LOCK_server_id);
thd->server_id=0; /* avoid suicide */
kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6));
thd->server_id = slave_server_id;
pthread_mutex_unlock(&LOCK_server_id);
mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags);
unregister_slave(thd,1,1);
// fake COM_QUIT -- if we get here, the thread needs to terminate
......@@ -1092,7 +1090,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
case COM_REFRESH:
{
thread_safe_increment(com_stat[SQLCOM_FLUSH],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_FLUSH],&LOCK_status);
ulong options= (ulong) (uchar) packet[0];
if (check_global_access(thd,RELOAD_ACL))
break;
......@@ -1104,7 +1102,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break;
}
case COM_SHUTDOWN:
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other,&LOCK_status);
if (check_global_access(thd,SHUTDOWN_ACL))
break; /* purecov: inspected */
DBUG_PRINT("quit",("Got shutdown command"));
......@@ -1127,7 +1125,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
case COM_STATISTICS:
{
mysql_log.write(thd,command,NullS);
thread_safe_increment(com_stat[SQLCOM_SHOW_STATUS],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_SHOW_STATUS],&LOCK_status);
char buff[200];
ulong uptime = (ulong) (thd->start_time - start_time);
sprintf((char*) buff,
......@@ -1146,11 +1144,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break;
}
case COM_PING:
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other,&LOCK_status);
send_ok(net); // Tell client we are alive
break;
case COM_PROCESS_INFO:
thread_safe_increment(com_stat[SQLCOM_SHOW_PROCESSLIST],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_SHOW_PROCESSLIST],&LOCK_status);
if (!thd->priv_user[0] && check_global_access(thd,PROCESS_ACL))
break;
mysql_log.write(thd,command,NullS);
......@@ -1159,13 +1157,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break;
case COM_PROCESS_KILL:
{
thread_safe_increment(com_stat[SQLCOM_KILL],&LOCK_thread_count);
thread_safe_increment(com_stat[SQLCOM_KILL],&LOCK_status);
ulong id=(ulong) uint4korr(packet);
kill_one_thread(thd,id);
break;
}
case COM_DEBUG:
thread_safe_increment(com_other,&LOCK_thread_count);
thread_safe_increment(com_other,&LOCK_status);
if (check_global_access(thd, SUPER_ACL))
break; /* purecov: inspected */
mysql_print_status(thd);
......@@ -1265,7 +1263,7 @@ mysql_execute_command(void)
!tables_ok(thd,tables)))
DBUG_VOID_RETURN;
thread_safe_increment(com_stat[lex->sql_command],&LOCK_thread_count);
thread_safe_increment(com_stat[lex->sql_command],&LOCK_status);
switch (lex->sql_command) {
case SQLCOM_SELECT:
{
......@@ -3393,28 +3391,46 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
}
/*
kill on thread
SYNOPSIS
kill_one_thread()
thd Thread class
id Thread id
NOTES
This is written such that we have a short lock on LOCK_thread_count
*/
void kill_one_thread(THD *thd, ulong id)
{
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
THD *tmp;
uint error=ER_NO_SUCH_THREAD;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (tmp->thread_id == id)
{
if ((thd->master_access & SUPER_ACL) ||
!strcmp(thd->user,tmp->user))
{
tmp->awake(1 /*prepare to die*/);
error=0;
}
else
error=ER_KILL_DENIED_ERROR;
break; // Found thread
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
break;
}
}
VOID(pthread_mutex_unlock(&LOCK_thread_count));
if (tmp)
{
if ((thd->master_access & SUPER_ACL) ||
!strcmp(thd->user,tmp->user))
{
tmp->awake(1 /*prepare to die*/);
error=0;
}
else
error=ER_KILL_DENIED_ERROR;
pthread_mutex_unlock(&tmp->LOCK_delete);
}
if (!error)
send_ok(&thd->net);
else
......
......@@ -27,7 +27,7 @@
extern const char* any_db;
int max_binlog_dump_events = 0; // unlimited
bool opt_sporadic_binlog_dump_fail = 0;
my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
......@@ -247,7 +247,8 @@ bool log_in_use(const char* log_name)
pthread_mutex_lock(&linfo->lock);
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
pthread_mutex_unlock(&linfo->lock);
if (result) break;
if (result)
break;
}
}
......@@ -346,7 +347,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
if (mysql_bin_log.find_log_pos(&linfo, name))
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find first log file name in binary log index file";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
......@@ -496,22 +497,28 @@ Increase max_allowed_packet on master";
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
break;
case LOG_READ_EOF:
DBUG_PRINT("wait",("waiting for data in binary log"));
if (!thd->killed)
{
/* Note that the following call unlocks lock_log */
mysql_bin_log.wait_for_update(thd);
}
else
pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
}
pthread_mutex_unlock(log_lock);
if (read_packet)
{
thd->proc_info = "sending update to slave";
......@@ -552,7 +559,7 @@ Increase max_allowed_packet on master";
bool loop_breaker = 0;
// need this to break out of the for loop from switch
thd->proc_info = "switching to next log";
switch (mysql_bin_log.find_next_log(&linfo)) {
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case LOG_INFO_EOF:
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
break;
......@@ -739,17 +746,21 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
if (tmp->command == COM_BINLOG_DUMP &&
tmp->server_id == slave_server_id)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. Plus it double-locks LOCK_tread_count, which
make safe_mutex complain and abort.
We just to do kill the thread ourselves.
*/
tmp->awake(1/*prepare to die*/);
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
break;
}
}
pthread_mutex_unlock(&LOCK_thread_count);
if (tmp)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. We just to do kill the thread ourselves.
*/
tmp->awake(1/*prepare to die*/);
pthread_mutex_unlock(&tmp->LOCK_delete);
}
}
......@@ -927,9 +938,10 @@ int show_binlog_events(THD* thd)
my_off_t pos = lex_mi->pos;
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
LOG_INFO linfo;
Log_event* ev;
limit_start = thd->lex.select->offset_limit;
limit_end = thd->lex.select->select_limit + limit_start;
......@@ -942,7 +954,7 @@ int show_binlog_events(THD* thd)
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
if (mysql_bin_log.find_log_pos(&linfo, name))
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find target log";
goto err;
......@@ -957,7 +969,7 @@ int show_binlog_events(THD* thd)
goto err;
}
pthread_mutex_lock(mysql_bin_log.get_log_lock());
pthread_mutex_lock(log_lock);
my_b_seek(&log, pos);
for (event_count = 0;
......@@ -968,7 +980,7 @@ int show_binlog_events(THD* thd)
{
errmsg = "Net error";
delete ev;
pthread_mutex_unlock(mysql_bin_log.get_log_lock());
pthread_mutex_unlock(log_lock);
goto err;
}
......@@ -982,11 +994,11 @@ int show_binlog_events(THD* thd)
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
pthread_mutex_unlock(mysql_bin_log.get_log_lock());
pthread_mutex_unlock(log_lock);
goto err;
}
pthread_mutex_unlock(mysql_bin_log.get_log_lock());
pthread_mutex_unlock(log_lock);
}
err:
......
......@@ -18,9 +18,9 @@ extern bool server_id_supplied;
extern I_List<i_string> binlog_do_db, binlog_ignore_db;
extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail;
extern my_bool opt_sporadic_binlog_dump_fail;
#define KICK_SLAVE(thd) thd->awake(0 /* do not prepare to die*/);
#define KICK_SLAVE(thd) { pthread_mutex_lock(&(thd)->LOCK_delete); (thd)->awake(0 /* do not prepare to die*/); pthread_mutex_unlock(&(thd)->LOCK_delete); }
File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment