Commit 80649572 authored by anozdrin/alik@alik's avatar anozdrin/alik@alik

Merge bk-internal.mysql.com:/home/bk/mysql-5.1-runtime

into  alik.:/mnt/raid/alik/MySQL/devel/5.1-rt-bug17486
parents eda71e24 c47c87ab
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
#events : BUG#17619 2006-02-21 andrey Race conditions #events : BUG#17619 2006-02-21 andrey Race conditions
#events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails on some platforms. Has to be checked. #events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails on some platforms. Has to be checked.
im_options : Bug#20294 2006-07-24 stewart Instance manager test im_options fails randomly im_options : Bug#20294 2006-07-24 stewart Instance manager test im_options fails randomly
#im_life_cycle : Bug#20368 2006-06-10 alik im_life_cycle test fails
im_daemon_life_cycle : BUG#22379 2006-09-15 ingo im_daemon_life_cycle.test fails on merge of 5.1 -> 5.1-engines
im_instance_conf : BUG#20294 2006-09-16 ingo Instance manager test im_instance_conf fails randomly im_instance_conf : BUG#20294 2006-09-16 ingo Instance manager test im_instance_conf fails randomly
concurrent_innodb : BUG#21579 2006-08-11 mleich innodb_concurrent random failures with varying differences concurrent_innodb : BUG#21579 2006-08-11 mleich innodb_concurrent random failures with varying differences
ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
......
...@@ -74,7 +74,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg, ...@@ -74,7 +74,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg,
uint monitoring_interval_arg) : uint monitoring_interval_arg) :
Guardian_thread_args(thread_registry_arg, instance_map_arg, Guardian_thread_args(thread_registry_arg, instance_map_arg,
monitoring_interval_arg), monitoring_interval_arg),
thread_info(pthread_self()), guarded_instances(0) thread_info(pthread_self(), TRUE), guarded_instances(0)
{ {
pthread_mutex_init(&LOCK_guardian, 0); pthread_mutex_init(&LOCK_guardian, 0);
pthread_cond_init(&COND_guardian, 0); pthread_cond_init(&COND_guardian, 0);
...@@ -250,6 +250,8 @@ void Guardian_thread::run() ...@@ -250,6 +250,8 @@ void Guardian_thread::run()
LIST *node; LIST *node;
struct timespec timeout; struct timespec timeout;
log_info("Guardian: started.");
thread_registry.register_thread(&thread_info); thread_registry.register_thread(&thread_info);
my_thread_init(); my_thread_init();
...@@ -277,12 +279,16 @@ void Guardian_thread::run() ...@@ -277,12 +279,16 @@ void Guardian_thread::run()
&LOCK_guardian, &timeout); &LOCK_guardian, &timeout);
} }
log_info("Guardian: stopped.");
stopped= TRUE; stopped= TRUE;
pthread_mutex_unlock(&LOCK_guardian); pthread_mutex_unlock(&LOCK_guardian);
/* now, when the Guardian is stopped we can stop the IM */ /* now, when the Guardian is stopped we can stop the IM */
thread_registry.unregister_thread(&thread_info); thread_registry.unregister_thread(&thread_info);
thread_registry.request_shutdown(); thread_registry.request_shutdown();
my_thread_end(); my_thread_end();
log_info("Guardian: finished.");
} }
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "mysql_manager_error.h" #include "mysql_manager_error.h"
#include "portability.h" #include "portability.h"
#include "priv.h" #include "priv.h"
#include "thread_registry.h"
const LEX_STRING const LEX_STRING
...@@ -44,7 +45,8 @@ static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length; ...@@ -44,7 +45,8 @@ static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
static void start_and_monitor_instance(Instance_options *old_instance_options, static void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map); Instance_map *instance_map,
Thread_registry *thread_registry);
#ifndef __WIN__ #ifndef __WIN__
typedef pid_t My_process_info; typedef pid_t My_process_info;
...@@ -63,7 +65,8 @@ pthread_handler_t proxy(void *arg) ...@@ -63,7 +65,8 @@ pthread_handler_t proxy(void *arg)
{ {
Instance *instance= (Instance *) arg; Instance *instance= (Instance *) arg;
start_and_monitor_instance(&instance->options, start_and_monitor_instance(&instance->options,
instance->get_map()); instance->get_map(),
&instance->thread_registry);
return 0; return 0;
} }
...@@ -99,6 +102,7 @@ static int wait_process(My_process_info *pi) ...@@ -99,6 +102,7 @@ static int wait_process(My_process_info *pi)
thread, but we don't know this one). Or we could use waitpid(), but thread, but we don't know this one). Or we could use waitpid(), but
couldn't use wait(), because it could return in any wait() in the program. couldn't use wait(), because it could return in any wait() in the program.
*/ */
if (linuxthreads) if (linuxthreads)
wait(NULL); /* LinuxThreads were detected */ wait(NULL); /* LinuxThreads were detected */
else else
...@@ -239,11 +243,28 @@ static int start_process(Instance_options *instance_options, ...@@ -239,11 +243,28 @@ static int start_process(Instance_options *instance_options,
*/ */
static void start_and_monitor_instance(Instance_options *old_instance_options, static void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map) Instance_map *instance_map,
Thread_registry *thread_registry)
{ {
Instance_name instance_name(&old_instance_options->instance_name); Instance_name instance_name(&old_instance_options->instance_name);
Instance *current_instance; Instance *current_instance;
My_process_info process_info; My_process_info process_info;
Thread_info thread_info(pthread_self(), FALSE);
log_info("Monitoring thread (instance: '%s'): started.",
(const char *) instance_name.get_c_str());
if (!old_instance_options->nonguarded)
{
/*
Register thread in Thread_registry to wait for it to stop on shutdown
only if instance is nuarded. If instance is guarded, the thread will not
finish, because nonguarded instances are not stopped on shutdown.
*/
thread_registry->register_thread(&thread_info);
my_thread_init();
}
/* /*
Lock instance map to guarantee that no instances are deleted during Lock instance map to guarantee that no instances are deleted during
...@@ -280,7 +301,14 @@ static void start_and_monitor_instance(Instance_options *old_instance_options, ...@@ -280,7 +301,14 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
instance_map->unlock(); instance_map->unlock();
return; if (!old_instance_options->nonguarded)
{
thread_registry->unregister_thread(&thread_info);
my_thread_end();
}
log_info("Monitoring thread (instance: '%s'): finished.",
(const char *) instance_name.get_c_str());
} }
...@@ -343,10 +371,6 @@ int Instance::start() ...@@ -343,10 +371,6 @@ int Instance::start()
{ {
remove_pid(); remove_pid();
/*
No need to monitor this thread in the Thread_registry, as all
instances are to be stopped during shutdown.
*/
pthread_t proxy_thd_id; pthread_t proxy_thd_id;
pthread_attr_t proxy_thd_attr; pthread_attr_t proxy_thd_attr;
int rc; int rc;
...@@ -404,7 +428,8 @@ void Instance::set_crash_flag_n_wake_all() ...@@ -404,7 +428,8 @@ void Instance::set_crash_flag_n_wake_all()
Instance::Instance(): crashed(FALSE), configured(FALSE) Instance::Instance(Thread_registry &thread_registry_arg):
crashed(FALSE), configured(FALSE), thread_registry(thread_registry_arg)
{ {
pthread_mutex_init(&LOCK_instance, 0); pthread_mutex_init(&LOCK_instance, 0);
pthread_cond_init(&COND_instance_stopped, 0); pthread_cond_init(&COND_instance_stopped, 0);
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#endif #endif
class Instance_map; class Instance_map;
class Thread_registry;
/* /*
...@@ -87,7 +88,7 @@ public: ...@@ -87,7 +88,7 @@ public:
static bool is_mysqld_compatible_name(const LEX_STRING *name); static bool is_mysqld_compatible_name(const LEX_STRING *name);
public: public:
Instance(); Instance(Thread_registry &thread_registry_arg);
~Instance(); ~Instance();
int init(const LEX_STRING *name_arg); int init(const LEX_STRING *name_arg);
...@@ -120,6 +121,7 @@ public: ...@@ -120,6 +121,7 @@ public:
public: public:
enum { DEFAULT_SHUTDOWN_DELAY= 35 }; enum { DEFAULT_SHUTDOWN_DELAY= 35 };
Instance_options options; Instance_options options;
Thread_registry &thread_registry;
private: private:
/* This attributes is a flag, specifies if the instance has been crashed. */ /* This attributes is a flag, specifies if the instance has been crashed. */
......
...@@ -169,7 +169,7 @@ int Instance_map::process_one_option(const LEX_STRING *group, ...@@ -169,7 +169,7 @@ int Instance_map::process_one_option(const LEX_STRING *group,
if (!(instance= (Instance *) hash_search(&hash, (byte *) group->str, if (!(instance= (Instance *) hash_search(&hash, (byte *) group->str,
group->length))) group->length)))
{ {
if (!(instance= new Instance())) if (!(instance= new Instance(thread_registry)))
return 1; return 1;
if (instance->init(group) || add_instance(instance)) if (instance->init(group) || add_instance(instance))
...@@ -213,8 +213,10 @@ int Instance_map::process_one_option(const LEX_STRING *group, ...@@ -213,8 +213,10 @@ int Instance_map::process_one_option(const LEX_STRING *group,
} }
Instance_map::Instance_map(const char *default_mysqld_path_arg): Instance_map::Instance_map(const char *default_mysqld_path_arg,
mysqld_path(default_mysqld_path_arg) Thread_registry &thread_registry_arg):
mysqld_path(default_mysqld_path_arg),
thread_registry(thread_registry_arg)
{ {
pthread_mutex_init(&LOCK_instance_map, 0); pthread_mutex_init(&LOCK_instance_map, 0);
} }
...@@ -333,7 +335,7 @@ int Instance_map::remove_instance(Instance *instance) ...@@ -333,7 +335,7 @@ int Instance_map::remove_instance(Instance *instance)
int Instance_map::create_instance(const LEX_STRING *instance_name, int Instance_map::create_instance(const LEX_STRING *instance_name,
const Named_value_arr *options) const Named_value_arr *options)
{ {
Instance *instance= new Instance(); Instance *instance= new Instance(thread_registry);
if (!instance) if (!instance)
{ {
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
class Guardian_thread; class Guardian_thread;
class Instance; class Instance;
class Named_value_arr; class Named_value_arr;
class Thread_registry;
extern int load_all_groups(char ***groups, const char *filename); extern int load_all_groups(char ***groups, const char *filename);
extern void free_groups(char **groups); extern void free_groups(char **groups);
...@@ -104,7 +105,8 @@ public: ...@@ -104,7 +105,8 @@ public:
int create_instance(const LEX_STRING *instance_name, int create_instance(const LEX_STRING *instance_name,
const Named_value_arr *options); const Named_value_arr *options);
Instance_map(const char *default_mysqld_path_arg); Instance_map(const char *default_mysqld_path_arg,
Thread_registry &thread_registry_arg);
~Instance_map(); ~Instance_map();
/* /*
...@@ -130,6 +132,8 @@ private: ...@@ -130,6 +132,8 @@ private:
enum { START_HASH_SIZE = 16 }; enum { START_HASH_SIZE = 16 };
pthread_mutex_t LOCK_instance_map; pthread_mutex_t LOCK_instance_map;
HASH hash; HASH hash;
Thread_registry &thread_registry;
}; };
#endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */ #endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */
...@@ -87,7 +87,7 @@ private: ...@@ -87,7 +87,7 @@ private:
Listener_thread::Listener_thread(const Listener_thread_args &args) : Listener_thread::Listener_thread(const Listener_thread_args &args) :
Listener_thread_args(args.thread_registry, args.user_map, args.instance_map) Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
,total_connection_count(0) ,total_connection_count(0)
,thread_info(pthread_self()) ,thread_info(pthread_self(), TRUE)
,num_sockets(0) ,num_sockets(0)
{ {
} }
...@@ -112,6 +112,8 @@ void Listener_thread::run() ...@@ -112,6 +112,8 @@ void Listener_thread::run()
{ {
int i, n= 0; int i, n= 0;
log_info("Listener_thread: started.");
#ifndef __WIN__ #ifndef __WIN__
/* we use this var to check whether we are running on LinuxThreads */ /* we use this var to check whether we are running on LinuxThreads */
pid_t thread_pid; pid_t thread_pid;
...@@ -164,7 +166,7 @@ void Listener_thread::run() ...@@ -164,7 +166,7 @@ void Listener_thread::run()
if (rc == 0 || rc == -1) if (rc == 0 || rc == -1)
{ {
if (rc == -1 && errno != EINTR) if (rc == -1 && errno != EINTR)
log_error("Listener_thread::run(): select() failed, %s", log_error("Listener_thread: select() failed, %s",
strerror(errno)); strerror(errno));
continue; continue;
} }
...@@ -198,7 +200,7 @@ void Listener_thread::run() ...@@ -198,7 +200,7 @@ void Listener_thread::run()
/* III. Release all resources and exit */ /* III. Release all resources and exit */
log_info("Listener_thread::run(): shutdown requested, exiting..."); log_info("Listener_thread: shutdown requested, exiting...");
for (i= 0; i < num_sockets; i++) for (i= 0; i < num_sockets; i++)
close(sockets[i]); close(sockets[i]);
...@@ -209,6 +211,8 @@ void Listener_thread::run() ...@@ -209,6 +211,8 @@ void Listener_thread::run()
thread_registry.unregister_thread(&thread_info); thread_registry.unregister_thread(&thread_info);
my_thread_end(); my_thread_end();
log_info("Listener_thread: finished.");
return; return;
err: err:
...@@ -230,7 +234,7 @@ int Listener_thread::create_tcp_socket() ...@@ -230,7 +234,7 @@ int Listener_thread::create_tcp_socket()
int ip_socket= socket(AF_INET, SOCK_STREAM, 0); int ip_socket= socket(AF_INET, SOCK_STREAM, 0);
if (ip_socket == INVALID_SOCKET) if (ip_socket == INVALID_SOCKET)
{ {
log_error("Listener_thead::run(): socket(AF_INET) failed, %s", log_error("Listener_thead: socket(AF_INET) failed, %s",
strerror(errno)); strerror(errno));
return -1; return -1;
} }
...@@ -261,7 +265,7 @@ int Listener_thread::create_tcp_socket() ...@@ -261,7 +265,7 @@ int Listener_thread::create_tcp_socket()
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address, if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
sizeof(ip_socket_address))) sizeof(ip_socket_address)))
{ {
log_error("Listener_thread::run(): bind(ip socket) failed, '%s'", log_error("Listener_thread: bind(ip socket) failed, '%s'",
strerror(errno)); strerror(errno));
close(ip_socket); close(ip_socket);
return -1; return -1;
...@@ -269,7 +273,7 @@ int Listener_thread::create_tcp_socket() ...@@ -269,7 +273,7 @@ int Listener_thread::create_tcp_socket()
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE)) if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
{ {
log_error("Listener_thread::run(): listen(ip socket) failed, %s", log_error("Listener_thread: listen(ip socket) failed, %s",
strerror(errno)); strerror(errno));
close(ip_socket); close(ip_socket);
return -1; return -1;
...@@ -294,7 +298,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -294,7 +298,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0); int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
if (unix_socket == INVALID_SOCKET) if (unix_socket == INVALID_SOCKET)
{ {
log_error("Listener_thead::run(): socket(AF_UNIX) failed, %s", log_error("Listener_thead: socket(AF_UNIX) failed, %s",
strerror(errno)); strerror(errno));
return -1; return -1;
} }
...@@ -314,7 +318,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -314,7 +318,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address, if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
sizeof(unix_socket_address))) sizeof(unix_socket_address)))
{ {
log_error("Listener_thread::run(): bind(unix socket) failed, " log_error("Listener_thread: bind(unix socket) failed, "
"socket file name is '%s', error '%s'", "socket file name is '%s', error '%s'",
unix_socket_address.sun_path, strerror(errno)); unix_socket_address.sun_path, strerror(errno));
close(unix_socket); close(unix_socket);
...@@ -325,7 +329,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address) ...@@ -325,7 +329,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE)) if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
{ {
log_error("Listener_thread::run(): listen(unix socket) failed, %s", log_error("Listener_thread: listen(unix socket) failed, %s",
strerror(errno)); strerror(errno));
close(unix_socket); close(unix_socket);
return -1; return -1;
......
...@@ -52,14 +52,16 @@ static inline void log(FILE *file, const char *format, va_list args) ...@@ -52,14 +52,16 @@ static inline void log(FILE *file, const char *format, va_list args)
struct tm bd_time; // broken-down time struct tm bd_time; // broken-down time
localtime_r(&now, &bd_time); localtime_r(&now, &bd_time);
char buff_date[32]; char buff_date[128];
sprintf(buff_date, "%02d%02d%02d %2d:%02d:%02d\t", sprintf(buff_date, "[%d/%lu] [%02d/%02d/%02d %02d:%02d:%02d] ",
bd_time.tm_year % 100, (int) getpid(),
bd_time.tm_mon + 1, (unsigned long) pthread_self(),
bd_time.tm_mday, bd_time.tm_year % 100,
bd_time.tm_hour, bd_time.tm_mon + 1,
bd_time.tm_min, bd_time.tm_mday,
bd_time.tm_sec); bd_time.tm_hour,
bd_time.tm_min,
bd_time.tm_sec);
/* Format the message */ /* Format the message */
char buff_stack[256]; char buff_stack[256];
......
...@@ -156,7 +156,8 @@ void manager() ...@@ -156,7 +156,8 @@ void manager()
*/ */
User_map user_map; User_map user_map;
Instance_map instance_map(Options::Main::default_mysqld_path); Instance_map instance_map(Options::Main::default_mysqld_path,
thread_registry);
Guardian_thread guardian_thread(thread_registry, Guardian_thread guardian_thread(thread_registry,
&instance_map, &instance_map,
Options::Main::monitoring_interval); Options::Main::monitoring_interval);
...@@ -308,6 +309,8 @@ void manager() ...@@ -308,6 +309,8 @@ void manager()
*/ */
pthread_cond_signal(&guardian_thread.COND_guardian); pthread_cond_signal(&guardian_thread.COND_guardian);
log_info("Main loop: started.");
while (!shutdown_complete) while (!shutdown_complete)
{ {
int signo; int signo;
...@@ -320,6 +323,20 @@ void manager() ...@@ -320,6 +323,20 @@ void manager()
goto err; goto err;
} }
/*
The general idea in this loop is the following:
- we are waiting for SIGINT, SIGTERM -- signals that mean we should
shutdown;
- as shutdown signal is caught, we stop Guardian thread (by calling
Guardian_thread::request_shutdown());
- as Guardian_thread is stopped, it sends SIGTERM to this thread
(by calling Thread_registry::request_shutdown()), so that the
my_sigwait() above returns;
- as we catch the second SIGTERM, we send signals to all threads
registered in Thread_registry (by calling
Thread_registry::deliver_shutdown()) and waiting for threads to stop;
*/
#ifndef __WIN__ #ifndef __WIN__
/* /*
On some Darwin kernels SIGHUP is delivered along with most On some Darwin kernels SIGHUP is delivered along with most
...@@ -336,6 +353,8 @@ void manager() ...@@ -336,6 +353,8 @@ void manager()
else else
#endif #endif
{ {
log_info("Main loop: got shutdown signal.");
if (!guardian_thread.is_stopped()) if (!guardian_thread.is_stopped())
{ {
guardian_thread.request_shutdown(); guardian_thread.request_shutdown();
...@@ -349,6 +368,8 @@ void manager() ...@@ -349,6 +368,8 @@ void manager()
} }
} }
log_info("Main loop: finished.");
err: err:
/* delete the pid file */ /* delete the pid file */
my_delete(Options::Main::pid_file_name, MYF(0)); my_delete(Options::Main::pid_file_name, MYF(0));
......
...@@ -97,7 +97,7 @@ Mysql_connection_thread::Mysql_connection_thread( ...@@ -97,7 +97,7 @@ Mysql_connection_thread::Mysql_connection_thread(
args.user_map, args.user_map,
args.connection_id, args.connection_id,
args.instance_map) args.instance_map)
,thread_info(pthread_self()) ,thread_info(pthread_self(), TRUE)
{ {
thread_registry.register_thread(&thread_info); thread_registry.register_thread(&thread_info);
} }
...@@ -165,7 +165,7 @@ Mysql_connection_thread::~Mysql_connection_thread() ...@@ -165,7 +165,7 @@ Mysql_connection_thread::~Mysql_connection_thread()
void Mysql_connection_thread::run() void Mysql_connection_thread::run()
{ {
log_info("accepted mysql connection %d", connection_id); log_info("accepted mysql connection %d", (int) connection_id);
my_thread_init(); my_thread_init();
...@@ -175,7 +175,7 @@ void Mysql_connection_thread::run() ...@@ -175,7 +175,7 @@ void Mysql_connection_thread::run()
return; return;
} }
log_info("connection %d is checked successfully", connection_id); log_info("connection %d is checked successfully", (int) connection_id);
vio_keepalive(vio, TRUE); vio_keepalive(vio, TRUE);
...@@ -315,7 +315,7 @@ int Mysql_connection_thread::do_command() ...@@ -315,7 +315,7 @@ int Mysql_connection_thread::do_command()
enum enum_server_command command= (enum enum_server_command) enum enum_server_command command= (enum enum_server_command)
(uchar) *packet; (uchar) *packet;
log_info("connection %d: packet_length=%d, command=%d", log_info("connection %d: packet_length=%d, command=%d",
connection_id, packet_length, command); (int) connection_id, (int) packet_length, (int) command);
return dispatch_command(command, packet + 1, packet_length - 1); return dispatch_command(command, packet + 1, packet_length - 1);
} }
} }
...@@ -325,27 +325,33 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, ...@@ -325,27 +325,33 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
{ {
switch (command) { switch (command) {
case COM_QUIT: // client exit case COM_QUIT: // client exit
log_info("query for connection %d received quit command", connection_id); log_info("query for connection %d received quit command",
(int) connection_id);
return 1; return 1;
case COM_PING: case COM_PING:
log_info("query for connection %d received ping command", connection_id); log_info("query for connection %d received ping command",
(int) connection_id);
net_send_ok(&net, connection_id, NULL); net_send_ok(&net, connection_id, NULL);
break; break;
case COM_QUERY: case COM_QUERY:
{ {
log_info("query for connection %d : ----\n%s\n-------------------------", log_info("query for connection %d : ----\n%s\n-------------------------",
connection_id,packet); (int) connection_id,
(const char *) packet);
if (Command *command= parse_command(&instance_map, packet)) if (Command *command= parse_command(&instance_map, packet))
{ {
int res= 0; int res= 0;
log_info("query for connection %d successfully parsed",connection_id); log_info("query for connection %d successfully parsed",
(int) connection_id);
res= command->execute(&net, connection_id); res= command->execute(&net, connection_id);
delete command; delete command;
if (!res) if (!res)
log_info("query for connection %d executed ok",connection_id); log_info("query for connection %d executed ok",
(int) connection_id);
else else
{ {
log_info("query for connection %d executed err=%d",connection_id,res); log_info("query for connection %d executed err=%d",
(int) connection_id, (int) res);
net_send_error(&net, res); net_send_error(&net, res);
return 0; return 0;
} }
...@@ -358,7 +364,8 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command, ...@@ -358,7 +364,8 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
break; break;
} }
default: default:
log_info("query for connection %d received unknown command",connection_id); log_info("query for connection %d received unknown command",
(int) connection_id);
net_send_error(&net, ER_UNKNOWN_COM_ERROR); net_send_error(&net, ER_UNKNOWN_COM_ERROR);
break; break;
} }
......
...@@ -43,8 +43,10 @@ static void handle_signal(int __attribute__((unused)) sig_no) ...@@ -43,8 +43,10 @@ static void handle_signal(int __attribute__((unused)) sig_no)
*/ */
Thread_info::Thread_info() {} Thread_info::Thread_info() {}
Thread_info::Thread_info(pthread_t thread_id_arg) : Thread_info::Thread_info(pthread_t thread_id_arg,
thread_id(thread_id_arg) {} bool send_signal_on_shutdown_arg) :
thread_id(thread_id_arg),
send_signal_on_shutdown(send_signal_on_shutdown_arg) {}
/* /*
TODO: think about moving signal information (now it's shutdown_in_progress) TODO: think about moving signal information (now it's shutdown_in_progress)
...@@ -86,6 +88,9 @@ Thread_registry::~Thread_registry() ...@@ -86,6 +88,9 @@ Thread_registry::~Thread_registry()
void Thread_registry::register_thread(Thread_info *info) void Thread_registry::register_thread(Thread_info *info)
{ {
log_info("Thread_registry: registering thread %d...",
(int) info->thread_id);
#ifndef __WIN__ #ifndef __WIN__
struct sigaction sa; struct sigaction sa;
sa.sa_handler= handle_signal; sa.sa_handler= handle_signal;
...@@ -112,11 +117,19 @@ void Thread_registry::register_thread(Thread_info *info) ...@@ -112,11 +117,19 @@ void Thread_registry::register_thread(Thread_info *info)
void Thread_registry::unregister_thread(Thread_info *info) void Thread_registry::unregister_thread(Thread_info *info)
{ {
log_info("Thread_registry: unregistering thread %d...",
(int) info->thread_id);
pthread_mutex_lock(&LOCK_thread_registry); pthread_mutex_lock(&LOCK_thread_registry);
info->prev->next= info->next; info->prev->next= info->next;
info->next->prev= info->prev; info->next->prev= info->prev;
if (head.next == &head) if (head.next == &head)
{
log_info("Thread_registry: thread registry is empty!");
pthread_cond_signal(&COND_thread_registry_is_empty); pthread_cond_signal(&COND_thread_registry_is_empty);
}
pthread_mutex_unlock(&LOCK_thread_registry); pthread_mutex_unlock(&LOCK_thread_registry);
} }
...@@ -181,11 +194,6 @@ int Thread_registry::cond_timedwait(Thread_info *info, pthread_cond_t *cond, ...@@ -181,11 +194,6 @@ int Thread_registry::cond_timedwait(Thread_info *info, pthread_cond_t *cond,
void Thread_registry::deliver_shutdown() void Thread_registry::deliver_shutdown()
{ {
Thread_info *info;
struct timespec shutdown_time;
int error;
set_timespec(shutdown_time, 1);
pthread_mutex_lock(&LOCK_thread_registry); pthread_mutex_lock(&LOCK_thread_registry);
shutdown_in_progress= TRUE; shutdown_in_progress= TRUE;
...@@ -199,29 +207,14 @@ void Thread_registry::deliver_shutdown() ...@@ -199,29 +207,14 @@ void Thread_registry::deliver_shutdown()
process_alarm(THR_SERVER_ALARM); process_alarm(THR_SERVER_ALARM);
#endif #endif
for (info= head.next; info != &head; info= info->next)
{
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
/*
sic: race condition here, the thread may not yet fall into
pthread_cond_wait.
*/
if (info->current_cond)
pthread_cond_signal(info->current_cond);
}
/* /*
The common practice is to test predicate before pthread_cond_wait. sic: race condition here, the thread may not yet fall into
I don't do that here because the predicate is practically always false pthread_cond_wait.
before wait - is_shutdown's been just set, and the lock's still not
released - the only case when the predicate is false is when no other
threads exist.
*/ */
while (((error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
&LOCK_thread_registry, interrupt_threads();
&shutdown_time)) != ETIMEDOUT &&
error != ETIME) && wait_for_threads_to_unregister();
head.next != &head)
;
/* /*
If previous signals did not reach some threads, they must be sleeping If previous signals did not reach some threads, they must be sleeping
...@@ -230,11 +223,28 @@ void Thread_registry::deliver_shutdown() ...@@ -230,11 +223,28 @@ void Thread_registry::deliver_shutdown()
so this time everybody should be informed (presumably each worker can so this time everybody should be informed (presumably each worker can
get CPU during shutdown_time.) get CPU during shutdown_time.)
*/ */
for (info= head.next; info != &head; info= info->next)
interrupt_threads();
/* Get the last chance to threads to stop. */
wait_for_threads_to_unregister();
/*
Print out threads, that didn't stopped. Thread_registry destructor will
probably abort the program if there is still any alive thread.
*/
if (head.next != &head)
{ {
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL); log_info("Thread_registry: non-stopped threads:");
if (info->current_cond)
pthread_cond_signal(info->current_cond); for (Thread_info *info= head.next; info != &head; info= info->next)
log_info(" - %ld", (long int) info->thread_id);
}
else
{
log_info("Thread_registry: all threads stopped.");
} }
pthread_mutex_unlock(&LOCK_thread_registry); pthread_mutex_unlock(&LOCK_thread_registry);
...@@ -245,3 +255,46 @@ void Thread_registry::request_shutdown() ...@@ -245,3 +255,46 @@ void Thread_registry::request_shutdown()
{ {
pthread_kill(sigwait_thread_pid, SIGTERM); pthread_kill(sigwait_thread_pid, SIGTERM);
} }
void Thread_registry::interrupt_threads()
{
for (Thread_info *info= head.next; info != &head; info= info->next)
{
if (!info->send_signal_on_shutdown)
continue;
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
if (info->current_cond)
pthread_cond_signal(info->current_cond);
}
}
void Thread_registry::wait_for_threads_to_unregister()
{
struct timespec shutdown_time;
set_timespec(shutdown_time, 1);
log_info("Thread_registry: joining threads...");
while (true)
{
if (head.next == &head)
{
log_info("Thread_registry: emptied.");
return;
}
int error= pthread_cond_timedwait(&COND_thread_registry_is_empty,
&LOCK_thread_registry,
&shutdown_time);
if (error == ETIMEDOUT || error == ETIME)
{
log_info("Thread_registry: threads shutdown timed out.");
return;
}
}
}
...@@ -67,13 +67,17 @@ ...@@ -67,13 +67,17 @@
class Thread_info class Thread_info
{ {
public: public:
Thread_info(); Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg);
Thread_info(pthread_t thread_id_arg);
friend class Thread_registry; friend class Thread_registry;
private:
Thread_info();
private: private:
pthread_cond_t *current_cond; pthread_cond_t *current_cond;
Thread_info *prev, *next; Thread_info *prev, *next;
pthread_t thread_id; pthread_t thread_id;
bool send_signal_on_shutdown;
}; };
...@@ -97,6 +101,10 @@ public: ...@@ -97,6 +101,10 @@ public:
pthread_mutex_t *mutex); pthread_mutex_t *mutex);
int cond_timedwait(Thread_info *info, pthread_cond_t *cond, int cond_timedwait(Thread_info *info, pthread_cond_t *cond,
pthread_mutex_t *mutex, struct timespec *wait_time); pthread_mutex_t *mutex, struct timespec *wait_time);
private:
void interrupt_threads();
void wait_for_threads_to_unregister();
private: private:
Thread_info head; Thread_info head;
bool shutdown_in_progress; bool shutdown_in_progress;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment