Commit 7278f45b authored by kostja@bodhi.local's avatar kostja@bodhi.local

Replace the approach using Foo_thread_args + Foo_thread and manually

spawned threads with a reusable class Thread.

This is the second idea implemented in the Alik's patch for
BUG#22306: STOP INSTANCE can not be applied for instances in Crashed,
Failed and Abandoned.
Commiting separately to ease review process. 
parent 2f69dfb2
......@@ -29,7 +29,6 @@
#include "guardian.h"
#include "instance_map.h"
#include "log.h"
#include "manager.h"
#include "messages.h"
#include "mysqld_error.h"
#include "mysql_manager_error.h"
......
......@@ -20,7 +20,6 @@
#endif
#include "guardian.h"
#include <string.h>
#include <sys/types.h>
#include <signal.h>
......@@ -30,15 +29,6 @@
#include "log.h"
#include "mysql_manager_error.h"
pthread_handler_t guardian_thread_func(void *arg)
{
Guardian *guardian= (Guardian *) arg;
guardian->run();
return 0;
}
const char *
Guardian::get_instance_state_name(enum_instance_state state)
{
......@@ -68,18 +58,19 @@ Guardian::get_instance_state_name(enum_instance_state state)
return NULL; /* just to ignore compiler warning. */
}
/* {{{ Constructor & destructor. */
Guardian::Guardian(Thread_registry &thread_registry_arg,
Guardian::Guardian(Thread_registry *thread_registry_arg,
Instance_map *instance_map_arg,
uint monitoring_interval_arg) :
Guardian_args(thread_registry_arg, instance_map_arg,
monitoring_interval_arg),
thread_info(pthread_self(), TRUE), guarded_instances(0)
uint monitoring_interval_arg)
:monitoring_interval(monitoring_interval_arg),
shutdown_requested(FALSE),
stopped(FALSE),
thread_registry(thread_registry_arg),
instance_map(instance_map_arg)
{
pthread_mutex_init(&LOCK_guardian, 0);
pthread_cond_init(&COND_guardian, 0);
shutdown_requested= FALSE;
stopped= FALSE;
init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0);
}
......@@ -94,6 +85,8 @@ Guardian::~Guardian()
pthread_cond_destroy(&COND_guardian);
}
/* }}} */
void Guardian::request_shutdown()
{
......@@ -117,7 +110,7 @@ void Guardian::process_instance(Instance *instance,
if (current_node->state == STOPPING)
{
/* this brach is executed during shutdown */
/* this branch is executed during shutdown */
if (instance->options.shutdown_delay)
{
/*
......@@ -235,7 +228,7 @@ void Guardian::process_instance(Instance *instance,
/*
Run guardian thread
SYNOPSYS
SYNOPSIS
run()
DESCRIPTION
......@@ -252,9 +245,8 @@ void Guardian::run()
log_info("Guardian: started.");
thread_registry.register_thread(&thread_info);
thread_registry->register_thread(&thread_info);
my_thread_init();
pthread_mutex_lock(&LOCK_guardian);
/* loop, until all instances were shut down at the end */
......@@ -275,7 +267,7 @@ void Guardian::run()
/* check the loop predicate before sleeping */
if (!(shutdown_requested && (!(guarded_instances))))
thread_registry.cond_timedwait(&thread_info, &COND_guardian,
thread_registry->cond_timedwait(&thread_info, &COND_guardian,
&LOCK_guardian, &timeout);
}
......@@ -284,9 +276,8 @@ void Guardian::run()
stopped= TRUE;
pthread_mutex_unlock(&LOCK_guardian);
/* now, when the Guardian is stopped we can stop the IM */
thread_registry.unregister_thread(&thread_info);
thread_registry.request_shutdown();
my_thread_end();
thread_registry->unregister_thread(&thread_info);
thread_registry->request_shutdown();
log_info("Guardian: finished.");
}
......@@ -306,7 +297,7 @@ int Guardian::is_stopped()
Initialize the list of guarded instances: loop through the Instance_map and
add all of the instances, which don't have 'nonguarded' option specified.
SYNOPSYS
SYNOPSIS
Guardian::init()
NOTE: The operation should be invoked with the following locks acquired:
......@@ -315,7 +306,7 @@ int Guardian::is_stopped()
RETURN
0 - ok
1 - error occured
1 - error occurred
*/
int Guardian::init()
......@@ -344,7 +335,7 @@ int Guardian::init()
/*
Add instance to the Guardian list
SYNOPSYS
SYNOPSIS
guard()
instance the instance to be guarded
nolock whether we prefer do not lock Guardian here,
......@@ -357,7 +348,7 @@ int Guardian::init()
RETURN
0 - ok
1 - error occured
1 - error occurred
*/
int Guardian::guard(Instance *instance, bool nolock)
......@@ -418,7 +409,7 @@ int Guardian::stop_guard(Instance *instance)
An internal method which is called at shutdown to unregister instances and
attempt to stop them if requested.
SYNOPSYS
SYNOPSIS
stop_instances()
DESCRIPTION
......@@ -431,7 +422,7 @@ int Guardian::stop_guard(Instance *instance)
RETURN
0 - ok
1 - error occured
1 - error occurred
*/
int Guardian::stop_instances()
......
......@@ -16,11 +16,10 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_sys.h>
#include <my_list.h>
#include "thread_registry.h"
#include <my_sys.h>
#include <my_list.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
......@@ -31,30 +30,12 @@ class Instance_map;
class Thread_registry;
struct GUARD_NODE;
pthread_handler_t guardian_thread_func(void *arg);
struct Guardian_args
{
Thread_registry &thread_registry;
Instance_map *instance_map;
int monitoring_interval;
Guardian_args(Thread_registry &thread_registry_arg,
Instance_map *instance_map_arg,
uint monitoring_interval_arg) :
thread_registry(thread_registry_arg),
instance_map(instance_map_arg),
monitoring_interval(monitoring_interval_arg)
{}
};
/*
/**
The guardian thread is responsible for monitoring and restarting of guarded
instances.
*/
class Guardian: public Guardian_args
class Guardian: public Thread
{
public:
/* states of an instance */
......@@ -82,12 +63,10 @@ public:
/* Return client state name. */
static const char *get_instance_state_name(enum_instance_state state);
Guardian(Thread_registry &thread_registry_arg,
Guardian(Thread_registry *thread_registry_arg,
Instance_map *instance_map_arg,
uint monitoring_interval_arg);
~Guardian();
/* Main funtion of the thread */
void run();
virtual ~Guardian();
/* Initialize or refresh the list of guarded instances */
int init();
/* Request guardian shutdown. Stop instances if needed */
......@@ -117,6 +96,9 @@ public:
a valid list node.
*/
inline enum_instance_state get_instance_state(LIST *instance_node);
protected:
/* Main funtion of the thread */
virtual void run();
public:
pthread_cond_t COND_guardian;
......@@ -133,6 +115,9 @@ private:
private:
pthread_mutex_t LOCK_guardian;
Thread_info thread_info;
int monitoring_interval;
Thread_registry *thread_registry;
Instance_map *instance_map;
LIST *guarded_instances;
MEM_ROOT alloc;
/* this variable is set to TRUE when we want to stop Guardian thread */
......
......@@ -44,9 +44,6 @@ static const char * const INSTANCE_NAME_PREFIX= Instance::DFLT_INSTANCE_NAME.str
static const int INSTANCE_NAME_PREFIX_LEN= Instance::DFLT_INSTANCE_NAME.length;
static void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map,
Thread_registry *thread_registry);
#ifndef __WIN__
typedef pid_t My_process_info;
......@@ -61,13 +58,24 @@ typedef PROCESS_INFORMATION My_process_info;
to do it in a portable way.
*/
pthread_handler_t proxy(void *arg)
class Instance_monitor: public Thread
{
public:
Instance_monitor(Instance *instance_arg) :instance(instance_arg) {}
protected:
virtual void run();
void start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map,
Thread_registry *thread_registry);
private:
Instance *instance;
};
void Instance_monitor::run()
{
Instance *instance= (Instance *) arg;
start_and_monitor_instance(&instance->options,
instance->get_map(),
start_and_monitor_instance(&instance->options, instance->get_map(),
&instance->thread_registry);
return 0;
delete this;
}
/*
......@@ -242,14 +250,16 @@ static int start_process(Instance_options *instance_options,
Function returns no value
*/
static void start_and_monitor_instance(Instance_options *old_instance_options,
void
Instance_monitor::
start_and_monitor_instance(Instance_options *old_instance_options,
Instance_map *instance_map,
Thread_registry *thread_registry)
{
Instance_name instance_name(&old_instance_options->instance_name);
Instance *current_instance;
My_process_info process_info;
Thread_info thread_info(pthread_self(), FALSE);
Thread_info thread_info;
log_info("Monitoring thread (instance: '%s'): started.",
(const char *) instance_name.get_c_str());
......@@ -258,12 +268,10 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
{
/*
Register thread in Thread_registry to wait for it to stop on shutdown
only if instance is nuarded. If instance is guarded, the thread will not
only if instance is guarded. If instance is guarded, the thread will not
finish, because nonguarded instances are not stopped on shutdown.
*/
thread_registry->register_thread(&thread_info);
my_thread_init();
thread_registry->register_thread(&thread_info, FALSE);
}
/*
......@@ -302,10 +310,7 @@ static void start_and_monitor_instance(Instance_options *old_instance_options,
instance_map->unlock();
if (!old_instance_options->nonguarded)
{
thread_registry->unregister_thread(&thread_info);
my_thread_end();
}
log_info("Monitoring thread (instance: '%s'): finished.",
(const char *) instance_name.get_c_str());
......@@ -369,22 +374,19 @@ int Instance::start()
if (configured && !is_running())
{
Instance_monitor *instance_monitor;
remove_pid();
pthread_t proxy_thd_id;
pthread_attr_t proxy_thd_attr;
int rc;
instance_monitor= new Instance_monitor(this);
pthread_attr_init(&proxy_thd_attr);
pthread_attr_setdetachstate(&proxy_thd_attr, PTHREAD_CREATE_DETACHED);
rc= pthread_create(&proxy_thd_id, &proxy_thd_attr, proxy,
this);
pthread_attr_destroy(&proxy_thd_attr);
if (rc)
if (instance_monitor == NULL || instance_monitor->start_detached())
{
log_error("Instance::start(): pthread_create(proxy) failed");
delete instance_monitor;
log_error("Instance::start(): failed to create the monitoring thread"
" to start an instance");
return ER_CANNOT_START_INSTANCE;
}
/* The monitoring thread will delete itself when it's finished. */
return 0;
}
......
......@@ -29,7 +29,6 @@
#include <sys/un.h>
#endif
#include "instance_map.h"
#include "log.h"
#include "mysql_connection.h"
#include "options.h"
......@@ -59,47 +58,18 @@ static void set_no_inherit(int socket)
}
/*
Listener_thread - incapsulates listening functionality
*/
class Listener_thread: public Listener_thread_args
{
public:
Listener_thread(const Listener_thread_args &args);
~Listener_thread();
void run();
private:
static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
ulong total_connection_count;
Thread_info thread_info;
int sockets[2];
int num_sockets;
fd_set read_fds;
private:
void handle_new_mysql_connection(Vio *vio);
int create_tcp_socket();
int create_unix_socket(struct sockaddr_un &unix_socket_address);
};
Listener_thread::Listener_thread(const Listener_thread_args &args) :
Listener_thread_args(args.thread_registry, args.user_map, args.instance_map)
,total_connection_count(0)
,thread_info(pthread_self(), TRUE)
,num_sockets(0)
{
}
Listener_thread::~Listener_thread()
Listener::Listener(Thread_registry *thread_registry_arg,
User_map *user_map_arg)
:thread_registry(thread_registry_arg),
user_map(user_map_arg),
total_connection_count(0),
num_sockets(0)
{
}
/*
Listener_thread::run() - listen all supported sockets and spawn a thread
Listener::run() - listen all supported sockets and spawn a thread
to handle incoming connection.
Using 'die' in case of syscall failure is OK now - we don't hold any
resources and 'die' kills the signal thread automatically. To be rewritten
......@@ -108,11 +78,11 @@ Listener_thread::~Listener_thread()
architecture.
*/
void Listener_thread::run()
void Listener::run()
{
int i, n= 0;
log_info("Listener_thread: started.");
log_info("Listener: started.");
#ifndef __WIN__
/* we use this var to check whether we are running on LinuxThreads */
......@@ -125,9 +95,7 @@ void Listener_thread::run()
linuxthreads= (thread_pid != manager_pid);
#endif
thread_registry.register_thread(&thread_info);
my_thread_init();
thread_registry->register_thread(&thread_info);
FD_ZERO(&read_fds);
......@@ -146,7 +114,7 @@ void Listener_thread::run()
n++;
timeval tv;
while (!thread_registry.is_shutdown())
while (!thread_registry->is_shutdown())
{
fd_set read_fds_arg= read_fds;
/*
......@@ -166,7 +134,7 @@ void Listener_thread::run()
if (rc == 0 || rc == -1)
{
if (rc == -1 && errno != EINTR)
log_error("Listener_thread: select() failed, %s",
log_error("Listener: select() failed, %s",
strerror(errno));
continue;
}
......@@ -200,7 +168,7 @@ void Listener_thread::run()
/* III. Release all resources and exit */
log_info("Listener_thread: shutdown requested, exiting...");
log_info("Listener: shutdown requested, exiting...");
for (i= 0; i < num_sockets; i++)
close(sockets[i]);
......@@ -209,10 +177,9 @@ void Listener_thread::run()
unlink(unix_socket_address.sun_path);
#endif
thread_registry.unregister_thread(&thread_info);
my_thread_end();
thread_registry->unregister_thread(&thread_info);
log_info("Listener_thread: finished.");
log_info("Listener: finished.");
return;
err:
......@@ -220,13 +187,12 @@ err:
for (i= 0; i < num_sockets; i++)
close(sockets[i]);
thread_registry.unregister_thread(&thread_info);
thread_registry.request_shutdown();
my_thread_end();
thread_registry->unregister_thread(&thread_info);
thread_registry->request_shutdown();
return;
}
int Listener_thread::create_tcp_socket()
int Listener::create_tcp_socket()
{
/* value to be set by setsockopt */
int arg= 1;
......@@ -265,7 +231,7 @@ int Listener_thread::create_tcp_socket()
if (bind(ip_socket, (struct sockaddr *) &ip_socket_address,
sizeof(ip_socket_address)))
{
log_error("Listener_thread: bind(ip socket) failed, '%s'",
log_error("Listener: bind(ip socket) failed, '%s'",
strerror(errno));
close(ip_socket);
return -1;
......@@ -273,7 +239,7 @@ int Listener_thread::create_tcp_socket()
if (listen(ip_socket, LISTEN_BACK_LOG_SIZE))
{
log_error("Listener_thread: listen(ip socket) failed, %s",
log_error("Listener: listen(ip socket) failed, %s",
strerror(errno));
close(ip_socket);
return -1;
......@@ -292,7 +258,7 @@ int Listener_thread::create_tcp_socket()
}
#ifndef __WIN__
int Listener_thread::
int Listener::
create_unix_socket(struct sockaddr_un &unix_socket_address)
{
int unix_socket= socket(AF_UNIX, SOCK_STREAM, 0);
......@@ -318,7 +284,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (bind(unix_socket, (struct sockaddr *) &unix_socket_address,
sizeof(unix_socket_address)))
{
log_error("Listener_thread: bind(unix socket) failed, "
log_error("Listener: bind(unix socket) failed, "
"socket file name is '%s', error '%s'",
unix_socket_address.sun_path, strerror(errno));
close(unix_socket);
......@@ -329,7 +295,7 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
if (listen(unix_socket, LISTEN_BACK_LOG_SIZE))
{
log_error("Listener_thread: listen(unix socket) failed, %s",
log_error("Listener: listen(unix socket) failed, %s",
strerror(errno));
close(unix_socket);
return -1;
......@@ -357,46 +323,16 @@ create_unix_socket(struct sockaddr_un &unix_socket_address)
handle_new_mysql_connection()
*/
void Listener_thread::handle_new_mysql_connection(Vio *vio)
void Listener::handle_new_mysql_connection(Vio *vio)
{
if (Mysql_connection_thread_args *mysql_thread_args=
new Mysql_connection_thread_args(vio, thread_registry, user_map,
++total_connection_count,
instance_map)
)
{
/*
Initialize thread attributes to create detached thread; it seems
easier to do it ad-hoc than have a global variable for attributes.
*/
pthread_t mysql_thd_id;
pthread_attr_t mysql_thd_attr;
pthread_attr_init(&mysql_thd_attr);
pthread_attr_setdetachstate(&mysql_thd_attr, PTHREAD_CREATE_DETACHED);
if (set_stacksize_n_create_thread(&mysql_thd_id, &mysql_thd_attr,
mysql_connection, mysql_thread_args))
Mysql_connection *mysql_connection=
new Mysql_connection(thread_registry, user_map,
vio, ++total_connection_count);
if (mysql_connection == NULL || mysql_connection->start_detached())
{
delete mysql_thread_args;
log_error("handle_one_mysql_connection() failed");
delete mysql_connection;
vio_delete(vio);
log_error("handle_one_mysql_connection():"
"set_stacksize_n_create_thread(mysql) failed");
}
pthread_attr_destroy(&mysql_thd_attr);
}
else
vio_delete(vio);
/* The connection will delete itself when the thread is finished */
}
pthread_handler_t listener(void *arg)
{
Listener_thread_args *args= (Listener_thread_args *) arg;
Listener_thread listener(*args);
listener.run();
/*
args is a stack variable because listener thread lives as long as the
manager process itself
*/
return 0;
}
......@@ -16,33 +16,39 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_pthread.h>
#include "thread_registry.h"
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
#endif
pthread_handler_t listener(void *arg);
class Thread_registry;
class User_map;
class Instance_map;
struct Listener_thread_args
/**
Listener - a thread listening on sockets and spawning
connection threads.
*/
class Listener: public Thread
{
Thread_registry &thread_registry;
const User_map &user_map;
Instance_map &instance_map;
Listener_thread_args(Thread_registry &thread_registry_arg,
const User_map &user_map_arg,
Instance_map &instance_map_arg) :
thread_registry(thread_registry_arg)
,user_map(user_map_arg)
,instance_map(instance_map_arg)
{}
public:
Listener(Thread_registry *thread_registry_arg, User_map *user_map_arg);
protected:
virtual void run();
private:
Thread_info thread_info;
Thread_registry *thread_registry;
User_map *user_map;
static const int LISTEN_BACK_LOG_SIZE= 5; /* standard backlog size */
ulong total_connection_count;
int sockets[2];
int num_sockets;
fd_set read_fds;
void handle_new_mysql_connection(struct st_vio *vio);
int create_tcp_socket();
int create_unix_socket(struct sockaddr_un &unix_socket_address);
};
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H
......@@ -139,10 +139,10 @@ int Manager::main()
User_map user_map;
Instance_map instance_map(Options::Main::default_mysqld_path,
thread_registry);
Guardian guardian(thread_registry, &instance_map,
Guardian guardian(&thread_registry, &instance_map,
Options::Main::monitoring_interval);
Listener_thread_args listener_args(thread_registry, user_map, instance_map);
Listener listener(&thread_registry, &user_map);
manager_pid= getpid();
p_instance_map= &instance_map;
......@@ -212,40 +212,29 @@ int Manager::main()
sigset_t mask;
set_signals(&mask);
/* create guardian thread */
{
pthread_t guardian_thd_id;
pthread_attr_t guardian_thd_attr;
int rc;
/*
Create the guardian thread. The newly started thread will block until
we actually load instances.
NOTE: Guardian should be shutdown first. Only then all other threads
need to be stopped. This should be done, as guardian is responsible
for shutting down the instances, and this is a long operation.
NOTE: Guardian uses thr_alarm() when detects current state of
instances (is_running()), but it is not interfere with
flush_instances() later in the code, because until flush_instances()
complete in the main thread, Guardian thread is not permitted to
process instances. And before flush_instances() there is no instances
to proceed.
can be stopped. This should be done in this order because the guardian
is responsible for shutting down all the guarded instances, and this
is a long operation.
NOTE: Guardian uses thr_alarm() when detects the current state of an
instance (is_running()), but this does not interfere with
flush_instances() call later in the code, because until
flush_instances() completes in the main thread, Guardian thread is not
permitted to process instances. And before flush_instances() has
completed, there are no instances to guard.
*/
pthread_attr_init(&guardian_thd_attr);
pthread_attr_setdetachstate(&guardian_thd_attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_n_create_thread(&guardian_thd_id, &guardian_thd_attr,
guardian_thread_func, &guardian);
pthread_attr_destroy(&guardian_thd_attr);
if (rc)
if (guardian.start_detached())
{
log_error("manager(): set_stacksize_n_create_thread(guardian) failed");
log_error("manager(): Failed to create the guardian thread");
goto err;
}
}
/* Load instances. */
{
instance_map.guardian->lock();
instance_map.lock();
......@@ -265,24 +254,13 @@ int Manager::main()
}
}
/* create the listener */
{
pthread_t listener_thd_id;
pthread_attr_t listener_thd_attr;
int rc;
pthread_attr_init(&listener_thd_attr);
pthread_attr_setdetachstate(&listener_thd_attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_n_create_thread(&listener_thd_id, &listener_thd_attr,
listener, &listener_args);
pthread_attr_destroy(&listener_thd_attr);
if (rc)
/* start the listener */
if (listener.start_detached())
{
log_error("manager(): set_stacksize_n_create_thread(listener) failed");
stop_all(&guardian, &thread_registry);
goto err;
}
}
/*
After the list of guarded instances have been initialized,
......
......@@ -23,7 +23,6 @@
#include <m_string.h>
#include <m_string.h>
#include <my_global.h>
#include <mysql_com.h>
#include <mysql.h>
#include <my_sys.h>
#include <violite.h>
......@@ -40,66 +39,15 @@
#include "user_map.h"
Mysql_connection_thread_args::Mysql_connection_thread_args(
struct st_vio *vio_arg,
Thread_registry &thread_registry_arg,
const User_map &user_map_arg,
ulong connection_id_arg,
Instance_map &instance_map_arg) :
vio(vio_arg)
,thread_registry(thread_registry_arg)
,user_map(user_map_arg)
,connection_id(connection_id_arg)
,instance_map(instance_map_arg)
{}
/*
MySQL connection - handle one connection with mysql command line client
See also comments in mysqlmanager.cc to picture general Instance Manager
architecture.
We use conventional technique to work with classes without exceptions:
class acquires all vital resource in init(); Thus if init() succeed,
a user must call cleanup(). All other methods are valid only between
init() and cleanup().
*/
class Mysql_connection_thread: public Mysql_connection_thread_args
{
public:
Mysql_connection_thread(const Mysql_connection_thread_args &args);
int init();
void cleanup();
void run();
~Mysql_connection_thread();
private:
Thread_info thread_info;
NET net;
struct rand_struct rand_st;
char scramble[SCRAMBLE_LENGTH + 1];
uint status;
ulong client_capabilities;
private:
/* Names are conventionally the same as in mysqld */
int check_connection();
int do_command();
int dispatch_command(enum enum_server_command command,
const char *text, uint len);
};
Mysql_connection_thread::Mysql_connection_thread(
const Mysql_connection_thread_args &args) :
Mysql_connection_thread_args(args.vio,
args.thread_registry,
args.user_map,
args.connection_id,
args.instance_map)
,thread_info(pthread_self(), TRUE)
Mysql_connection::Mysql_connection(Thread_registry *thread_registry_arg,
User_map *user_map_arg,
struct st_vio *vio_arg, ulong
connection_id_arg)
:vio(vio_arg),
connection_id(connection_id_arg),
thread_registry(thread_registry_arg),
user_map(user_map_arg)
{
thread_registry.register_thread(&thread_info);
}
......@@ -129,7 +77,7 @@ C_MODE_END
This function is complementary to cleanup().
*/
int Mysql_connection_thread::init()
int Mysql_connection::init()
{
/* Allocate buffers for network I/O */
if (my_net_init(&net, vio))
......@@ -145,52 +93,46 @@ int Mysql_connection_thread::init()
create_random_string(scramble, SCRAMBLE_LENGTH, &rand_st);
/* We don't support transactions, every query is atomic */
status= SERVER_STATUS_AUTOCOMMIT;
thread_registry->register_thread(&thread_info);
return 0;
}
void Mysql_connection_thread::cleanup()
void Mysql_connection::cleanup()
{
net_end(&net);
thread_registry->unregister_thread(&thread_info);
}
Mysql_connection_thread::~Mysql_connection_thread()
Mysql_connection::~Mysql_connection()
{
/* vio_delete closes the socket if necessary */
vio_delete(vio);
thread_registry.unregister_thread(&thread_info);
}
void Mysql_connection_thread::run()
void Mysql_connection::main()
{
log_info("accepted mysql connection %lu", (unsigned long) connection_id);
my_thread_init();
if (check_connection())
{
my_thread_end();
return;
}
log_info("connection %lu is checked successfully",
(unsigned long) connection_id);
vio_keepalive(vio, TRUE);
while (!net.error && net.vio && !thread_registry.is_shutdown())
while (!net.error && net.vio && !thread_registry->is_shutdown())
{
if (do_command())
break;
}
my_thread_end();
}
int Mysql_connection_thread::check_connection()
int Mysql_connection::check_connection()
{
ulong pkt_len=0; // to hold client reply length
......@@ -279,7 +221,7 @@ int Mysql_connection_thread::check_connection()
net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1;
}
if (user_map.authenticate(&user_name, password, scramble))
if (user_map->authenticate(&user_name, password, scramble))
{
net_send_error(&net, ER_ACCESS_DENIED_ERROR);
return 1;
......@@ -289,7 +231,7 @@ int Mysql_connection_thread::check_connection()
}
int Mysql_connection_thread::do_command()
int Mysql_connection::do_command()
{
char *packet;
ulong packet_length;
......@@ -302,7 +244,7 @@ int Mysql_connection_thread::do_command()
/* Check if we can continue without closing the connection */
if (net.error != 3) // what is 3 - find out
return 1;
if (thread_registry.is_shutdown())
if (thread_registry->is_shutdown())
return 1;
net_send_error(&net, net.last_errno);
net.error= 0;
......@@ -310,7 +252,7 @@ int Mysql_connection_thread::do_command()
}
else
{
if (thread_registry.is_shutdown())
if (thread_registry->is_shutdown())
return 1;
packet= (char*) net.read_pos;
enum enum_server_command command= (enum enum_server_command)
......@@ -321,7 +263,7 @@ int Mysql_connection_thread::do_command()
}
}
int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
int Mysql_connection::dispatch_command(enum enum_server_command command,
const char *packet, uint len)
{
switch (command) {
......@@ -374,19 +316,16 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
}
pthread_handler_t mysql_connection(void *arg)
void Mysql_connection::run()
{
Mysql_connection_thread_args *args= (Mysql_connection_thread_args *) arg;
Mysql_connection_thread mysql_connection_thread(*args);
delete args;
if (mysql_connection_thread.init())
log_info("mysql_connection(): error initializing thread");
if (init())
log_info("Mysql_connection::run(): error initializing thread");
else
{
mysql_connection_thread.run();
mysql_connection_thread.cleanup();
main();
cleanup();
}
return 0;
delete this;
}
/*
......
......@@ -16,33 +16,60 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_global.h>
#include <my_pthread.h>
#include "thread_registry.h"
#include <mysql_com.h>
#if defined(__GNUC__) && defined(USE_PRAGMA_INTERFACE)
#pragma interface
#endif
pthread_handler_t mysql_connection(void *arg);
class Thread_registry;
class User_map;
class Instance_map;
struct st_vio;
class User_map;
struct Mysql_connection_thread_args
/*
MySQL connection - handle one connection with mysql command line client
See also comments in mysqlmanager.cc to picture general Instance Manager
architecture.
We use conventional technique to work with classes without exceptions:
class acquires all vital resource in init(); Thus if init() succeed,
a user must call cleanup(). All other methods are valid only between
init() and cleanup().
*/
class Mysql_connection: public Thread
{
public:
Mysql_connection(Thread_registry *thread_registry_arg,
User_map *user_map_arg,
struct st_vio *vio_arg,
ulong connection_id_arg);
virtual ~Mysql_connection();
protected:
virtual void run();
private:
struct st_vio *vio;
Thread_registry &thread_registry;
const User_map &user_map;
ulong connection_id;
Instance_map &instance_map;
Thread_info thread_info;
Thread_registry *thread_registry;
User_map *user_map;
NET net;
struct rand_struct rand_st;
char scramble[SCRAMBLE_LENGTH + 1];
uint status;
ulong client_capabilities;
private:
/* The main loop implementation triad */
int init();
void main();
void cleanup();
Mysql_connection_thread_args(struct st_vio *vio_arg,
Thread_registry &thread_registry_arg,
const User_map &user_map_arg,
ulong connection_id_arg,
Instance_map &instance_map_arg);
/* Names are conventionally the same as in mysqld */
int check_connection();
int do_command();
int dispatch_command(enum enum_server_command command,
const char *text, uint len);
};
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MYSQL_CONNECTION_H
......@@ -22,17 +22,6 @@
#include "log.h"
#if defined(__ia64__) || defined(__ia64)
/*
We can live with 32K, but reserve 64K. Just to be safe.
On ia64 we need to reserve double of the size.
*/
#define IM_THREAD_STACK_SIZE (128*1024L)
#else
#define IM_THREAD_STACK_SIZE (64*1024)
#endif
/* the pid of the manager process (of the signal thread on the LinuxThreads) */
pid_t manager_pid;
......@@ -66,33 +55,6 @@ unsigned long bytes_sent = 0L, bytes_received = 0L;
unsigned long mysqld_net_retry_count = 10L;
unsigned long open_files_limit;
/*
Change the stack size and start a thread. Return an error if either
pthread_attr_setstacksize or pthread_create fails.
Arguments are the same as for pthread_create().
*/
int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg)
{
int rc= 0;
#ifndef __WIN__
#ifndef PTHREAD_STACK_MIN
#define PTHREAD_STACK_MIN 32768
#endif
/*
Set stack size to be safe on the platforms with too small
default thread stack.
*/
rc= pthread_attr_setstacksize(attr,
(size_t) (PTHREAD_STACK_MIN +
IM_THREAD_STACK_SIZE));
#endif
if (!rc)
rc= pthread_create(thread, attr, start_routine, arg);
return rc;
}
int create_pid_file(const char *pid_file_name, int pid)
......
......@@ -105,10 +105,6 @@ extern unsigned long bytes_sent, bytes_received;
extern unsigned long mysqld_net_retry_count;
extern unsigned long open_files_limit;
int set_stacksize_n_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg);
int create_pid_file(const char *pid_file_name, int pid);
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_PRIV_H
......@@ -38,15 +38,13 @@ static void handle_signal(int __attribute__((unused)) sig_no)
}
#endif
/*
Thread_info initializer methods
*/
/* Thread_info initializer methods */
Thread_info::Thread_info() {}
Thread_info::Thread_info(pthread_t thread_id_arg,
bool send_signal_on_shutdown_arg) :
thread_id(thread_id_arg),
send_signal_on_shutdown(send_signal_on_shutdown_arg) {}
void Thread_info::init(bool send_signal_on_shutdown_arg)
{
thread_id= pthread_self();
send_signal_on_shutdown= send_signal_on_shutdown_arg;
}
/*
TODO: think about moving signal information (now it's shutdown_in_progress)
......@@ -86,11 +84,14 @@ Thread_registry::~Thread_registry()
points to the last node.
*/
void Thread_registry::register_thread(Thread_info *info)
void Thread_registry::register_thread(Thread_info *info,
bool send_signal_on_shutdown)
{
log_info("Thread_registry: registering thread %d...",
(int) info->thread_id);
info->init(send_signal_on_shutdown);
#ifndef __WIN__
struct sigaction sa;
sa.sa_handler= handle_signal;
......@@ -298,3 +299,80 @@ void Thread_registry::wait_for_threads_to_unregister()
}
}
}
/*********************************************************************
class Thread
*********************************************************************/
#if defined(__ia64__) || defined(__ia64)
/*
We can live with 32K, but reserve 64K. Just to be safe.
On ia64 we need to reserve double of the size.
*/
#define IM_THREAD_STACK_SIZE (128*1024L)
#else
#define IM_THREAD_STACK_SIZE (64*1024)
#endif
/*
Change the stack size and start a thread. Return an error if either
pthread_attr_setstacksize or pthread_create fails.
Arguments are the same as for pthread_create().
*/
static
int set_stacksize_and_create_thread(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg)
{
int rc= 0;
#ifndef __WIN__
#ifndef PTHREAD_STACK_MIN
#define PTHREAD_STACK_MIN 32768
#endif
/*
Set stack size to be safe on the platforms with too small
default thread stack.
*/
rc= pthread_attr_setstacksize(attr,
(size_t) (PTHREAD_STACK_MIN +
IM_THREAD_STACK_SIZE));
#endif
if (!rc)
rc= pthread_create(thread, attr, start_routine, arg);
return rc;
}
Thread::~Thread()
{
}
void *Thread::thread_func(void *arg)
{
Thread *thread= (Thread *) arg;
my_thread_init();
thread->run();
my_thread_end();
return NULL;
}
bool Thread::start_detached()
{
pthread_t thd_id;
pthread_attr_t attr;
int rc;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_and_create_thread(&thd_id, &attr,
Thread::thread_func, this);
pthread_attr_destroy(&attr);
return rc != 0;
}
......@@ -57,7 +57,7 @@
#pragma interface
#endif
/*
/**
Thread_info - repository entry for each worker thread
All entries comprise double-linked list like:
0 -- entry -- entry -- entry - 0
......@@ -67,12 +67,10 @@
class Thread_info
{
public:
Thread_info(pthread_t thread_id_arg, bool send_signal_on_shutdown_arg);
Thread_info() {}
friend class Thread_registry;
private:
Thread_info();
void init(bool send_signal_on_shutdown);
private:
pthread_cond_t *current_cond;
Thread_info *prev, *next;
......@@ -81,7 +79,26 @@ private:
};
/*
/**
A base class for a detached thread.
*/
class Thread
{
public:
Thread() {}
bool start_detached();
protected:
virtual void run()= 0;
virtual ~Thread();
private:
static void *thread_func(void *arg);
Thread(const Thread & /* rhs */); /* not implemented */
Thread &operator=(const Thread & /* rhs */); /* not implemented */
};
/**
Thread_registry - contains handles for each worker thread to deliver
signal information to workers.
*/
......@@ -92,7 +109,7 @@ public:
Thread_registry();
~Thread_registry();
void register_thread(Thread_info *info);
void register_thread(Thread_info *info, bool send_signal_on_shutdown= TRUE);
void unregister_thread(Thread_info *info);
void deliver_shutdown();
void request_shutdown();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment