SCRUM:

Task 499 'init_connect, init_slave options'
parent 39e7db9f
......@@ -775,7 +775,7 @@ manager_launch()
ident=$1
shift
if [ $USE_MANAGER = 0 ] ; then
$@ >> $CUR_MYERR 2>&1 &
echo $@ | /bin/sh >> $CUR_MYERR 2>&1 &
sleep 2 #hack
return
fi
......
select hex(@a);
hex(@a)
NULL
select hex(@a);
hex(@a)
610063
set global init_connect="set @a=2;set @b=3";
select @a, @b;
@a @b
2 3
set GLOBAL init_connect=DEFAULT;
select @a;
@a
NULL
set global init_connect="create table t1(a char(10));\
insert into t1 values ('\0');insert into t1 values('abc')";
select hex(a) from t1;
hex(a)
00
616263
set GLOBAL init_connect="adsfsdfsdfs";
select @a;
ERROR HY000: Lost connection to MySQL server during query
drop table t1;
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
reset master;
create table t1(n int);
insert into t1 values (@a), (@b);
select * from t1;
n
NULL
NULL
select * from t1;
n
1
2
set global init_connect="set @c=1";
show variables like 'init_connect';
Variable_name Value
init_connect set @c=1
drop table t1;
stop slave;
--set-variable=init_connect="set @a='a\0c'"
#
# Test of init_connect variable
#
connect (con0,localhost,root,,);
connection con0;
select hex(@a);
connect (con1,localhost,user_1,,);
connection con1;
select hex(@a);
connection con0;
set global init_connect="set @a=2;set @b=3";
connect (con2,localhost,user_1,,);
connection con2;
select @a, @b;
connection con0;
set GLOBAL init_connect=DEFAULT;
connect (con3,localhost,user_1,,);
connection con3;
select @a;
connection con0;
set global init_connect="create table t1(a char(10));\
insert into t1 values ('\0');insert into t1 values('abc')";
connect (con4,localhost,user_1,,);
connection con4;
select hex(a) from t1;
connection con0;
set GLOBAL init_connect="adsfsdfsdfs";
connect (con5,localhost,user_1,,);
connection con5;
--error 2013
select @a;
connection con0;
drop table t1;
--init-slave="set @a=1;set @b=2"
source include/master-slave.inc;
#
# Test of init_slave variable
#
save_master_pos;
connection slave;
sync_with_master;
reset master;
connection master;
create table t1(n int);
insert into t1 values (@a), (@b);
select * from t1;
save_master_pos;
connection slave;
sync_with_master;
select * from t1;
set global init_connect="set @c=1";
show variables like 'init_connect';
connection master;
drop table t1;
save_master_pos;
connection slave;
sync_with_master;
stop slave;
......@@ -769,7 +769,7 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables;
extern rw_lock_t LOCK_grant;
extern rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
extern pthread_attr_t connection_attrib;
extern I_List<THD> threads;
......
......@@ -300,7 +300,8 @@ char log_error_file[FN_REFLEN], glob_hostname[FN_REFLEN];
char* log_error_file_ptr= log_error_file;
char mysql_real_data_home[FN_REFLEN],
language[LIBLEN],reg_ext[FN_EXTLEN], mysql_charsets_dir[FN_REFLEN],
max_sort_char,*mysqld_user,*mysqld_chroot, *opt_init_file;
max_sort_char,*mysqld_user,*mysqld_chroot, *opt_init_file,
*opt_init_connect, *opt_init_slave;
char *language_ptr, *default_collation_name, *default_character_set_name;
char mysql_data_home_buff[2], *mysql_data_home=mysql_real_data_home;
char server_version[SERVER_VERSION_LENGTH]=MYSQL_SERVER_VERSION;
......@@ -344,7 +345,7 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count,
LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi;
rw_lock_t LOCK_grant;
rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
pthread_cond_t COND_refresh,COND_thread_count, COND_slave_stopped,
COND_slave_start;
pthread_cond_t COND_thread_cache,COND_flush_thread_cache;
......@@ -883,6 +884,8 @@ void clean_up(bool print_message)
#endif
if (defaults_argv)
free_defaults(defaults_argv);
my_free(sys_init_connect.value, MYF(MY_ALLOW_ZERO_PTR));
my_free(sys_init_slave.value, MYF(MY_ALLOW_ZERO_PTR));
free_tmpdir(&mysql_tmpdir_list);
#ifdef HAVE_REPLICATION
my_free(slave_load_tmpdir,MYF(MY_ALLOW_ZERO_PTR));
......@@ -949,6 +952,8 @@ static void clean_up_mutexes()
(void) pthread_cond_destroy(&COND_rpl_status);
#endif
(void) pthread_mutex_destroy(&LOCK_active_mi);
(void) rwlock_destroy(&LOCK_sys_init_connect);
(void) rwlock_destroy(&LOCK_sys_init_slave);
(void) pthread_mutex_destroy(&LOCK_global_system_variables);
(void) pthread_cond_destroy(&COND_thread_count);
(void) pthread_cond_destroy(&COND_refresh);
......@@ -2056,6 +2061,14 @@ static int init_common_variables(const char *conf_file_name, int argc,
global_system_variables.character_set_client= default_charset_info;
global_system_variables.collation_connection= default_charset_info;
sys_init_connect.value_length= 0;
if ((sys_init_connect.value= opt_init_connect))
sys_init_connect.value_length= strlen(opt_init_connect);
sys_init_slave.value_length= 0;
if ((sys_init_slave.value= opt_init_slave))
sys_init_slave.value_length= strlen(opt_init_slave);
if (use_temp_pool && bitmap_init(&temp_pool,1024,1))
return 1;
return 0;
......@@ -2081,6 +2094,8 @@ static int init_thread_environment()
(void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST);
(void) pthread_mutex_init(&LOCK_global_system_variables, MY_MUTEX_INIT_FAST);
(void) my_rwlock_init(&LOCK_sys_init_connect, NULL);
(void) my_rwlock_init(&LOCK_sys_init_slave, NULL);
(void) my_rwlock_init(&LOCK_grant, NULL);
(void) pthread_cond_init(&COND_thread_count,NULL);
(void) pthread_cond_init(&COND_refresh,NULL);
......@@ -3472,7 +3487,9 @@ enum options
OPT_EXPIRE_LOGS_DAYS,
OPT_DEFAULT_WEEK_FORMAT,
OPT_GROUP_CONCAT_MAX_LEN,
OPT_DEFAULT_COLLATION
OPT_DEFAULT_COLLATION,
OPT_INIT_CONNECT,
OPT_INIT_SLAVE
};
......@@ -3638,6 +3655,12 @@ Disable with --skip-bdb (will save memory).",
#endif /* End HAVE_INNOBASE_DB */
{"help", '?', "Display this help and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0,
0, 0, 0, 0, 0},
{"init-connect", OPT_INIT_CONNECT, "Command what executes for all new connections",
(gptr*) &opt_init_connect, (gptr*) &opt_init_connect, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
{"init-slave", OPT_INIT_SLAVE, "Command what is executed when replication is starting",
(gptr*) &opt_init_slave, (gptr*) &opt_init_slave, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
{"init-file", OPT_INIT_FILE, "Read SQL commands from this file at startup.",
(gptr*) &opt_init_file, (gptr*) &opt_init_file, 0, GET_STR, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
......
......@@ -92,6 +92,7 @@ void send_error(THD *thd, uint sql_errno, const char *err)
/* In bootstrap it's ok to print on stderr */
fprintf(stderr,"ERROR: %d %s\n",sql_errno,err);
}
thd->init_connect_error= 1;
DBUG_VOID_RETURN;
}
......@@ -210,6 +211,7 @@ net_printf(THD *thd, uint errcode, ...)
fprintf(stderr,"ERROR: %d %s\n",errcode,text_pos);
thd->fatal_error();
}
thd->init_connect_error= 1;
DBUG_VOID_RETURN;
}
......
......@@ -75,6 +75,10 @@ TYPELIB delay_key_write_typelib=
static bool sys_check_charset(THD *thd, set_var *var);
static bool sys_update_charset(THD *thd, set_var *var);
static void sys_set_default_charset(THD *thd, enum_var_type type);
static bool sys_update_init_connect(THD*, set_var*);
static void sys_default_init_connect(THD*, enum_var_type type);
static bool sys_update_init_slave(THD*, set_var*);
static void sys_default_init_slave(THD*, enum_var_type type);
static bool set_option_bit(THD *thd, set_var *var);
static bool set_option_autocommit(THD *thd, set_var *var);
static bool set_log_update(THD *thd, set_var *var);
......@@ -109,6 +113,12 @@ sys_var_str sys_charset_system("character_set_system",
sys_check_charset,
sys_update_charset,
sys_set_default_charset);
sys_var_str sys_init_connect("init_connect", 0,
sys_update_init_connect,
sys_default_init_connect);
sys_var_str sys_init_slave("init_slave", 0,
sys_update_init_slave,
sys_default_init_slave);
sys_var_character_set_database sys_character_set_database("character_set_database");
sys_var_character_set_client sys_character_set_client("character_set_client");
sys_var_character_set_connection sys_character_set_connection("character_set_connection");
......@@ -386,6 +396,8 @@ sys_var *sys_variables[]=
&sys_foreign_key_checks,
&sys_group_concat_max_len,
&sys_identity,
&sys_init_connect,
&sys_init_slave,
&sys_insert_id,
&sys_interactive_timeout,
&sys_join_buffer_size,
......@@ -522,6 +534,8 @@ struct show_var_st init_vars[]= {
{"have_openssl", (char*) &have_openssl, SHOW_HAVE},
{"have_query_cache", (char*) &have_query_cache, SHOW_HAVE},
{"init_file", (char*) &opt_init_file, SHOW_CHAR_PTR},
{"init_connect", (char*) &sys_init_connect, SHOW_SYS},
{"init_slave", (char*) &sys_init_slave, SHOW_SYS},
#ifdef HAVE_INNOBASE_DB
{"innodb_additional_mem_pool_size", (char*) &innobase_additional_mem_pool_size, SHOW_LONG },
{"innodb_buffer_pool_size", (char*) &innobase_buffer_pool_size, SHOW_LONG },
......@@ -659,6 +673,83 @@ struct show_var_st init_vars[]= {
Functions to check and update variables
*/
static bool sys_update_init_connect(THD *thd, set_var *var)
{
char *res= 0, *old_value;
uint new_length;
/* If the string is "", delete old init command */
if ((new_length= var->value->str_value.length()))
{
if (!(res= my_strdup_with_length(var->value->str_value.c_ptr(),
new_length,
MYF(0))))
return 1;
}
/*
Replace the old value in such a way that the any thread using
the value will work.
*/
rw_wrlock(&LOCK_sys_init_connect);
old_value= sys_init_connect.value;
sys_init_connect.value= res;
sys_init_connect.value_length= new_length;
rw_unlock(&LOCK_sys_init_connect);
my_free(old_value, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
static void sys_default_init_connect(THD* thd, enum_var_type type)
{
char *old_value;
rw_wrlock(&LOCK_sys_init_connect);
old_value= sys_init_connect.value;
sys_init_connect.value= 0;
sys_init_connect.value_length= 0;
rw_unlock(&LOCK_sys_init_connect);
my_free(old_value, MYF(MY_ALLOW_ZERO_PTR));
}
static bool sys_update_init_slave(THD *thd, set_var *var)
{
char *res= 0, *old_value;
uint new_length;
/* If the string is "", delete old init command */
if ((new_length= var->value->str_value.length()))
{
if (!(res= my_strdup_with_length(var->value->str_value.c_ptr(),
new_length,
MYF(0))))
return 1;
}
/*
Replace the old value in such a way that the any thread using
the value will work.
*/
rw_wrlock(&LOCK_sys_init_slave);
old_value= sys_init_slave.value;
sys_init_slave.value= res;
sys_init_slave.value_length= new_length;
rw_unlock(&LOCK_sys_init_slave);
my_free(old_value, MYF(MY_ALLOW_ZERO_PTR));
return 0;
}
static void sys_default_init_slave(THD* thd, enum_var_type type)
{
char *old_value;
rw_wrlock(&LOCK_sys_init_slave);
old_value= sys_init_slave.value;
sys_init_slave.value= 0;
sys_init_slave.value_length= 0;
rw_unlock(&LOCK_sys_init_slave);
my_free(old_value, MYF(MY_ALLOW_ZERO_PTR));
}
/*
The following 3 functions need to be changed in 4.1 when we allow
one to change character sets
......
......@@ -137,6 +137,7 @@ class sys_var_str :public sys_var
{
public:
char *value; // Pointer to allocated string
uint value_length;
sys_check_func check_func;
sys_update_func update_func;
sys_set_default_func set_default_func;
......@@ -704,6 +705,8 @@ int sql_set_variables(THD *thd, List<set_var_base> *var_list);
void fix_delay_key_write(THD *thd, enum_var_type type);
ulong fix_sql_mode(ulong sql_mode);
extern sys_var_str sys_charset_system;
extern sys_var_str sys_init_connect;
extern sys_var_str sys_init_slave;
CHARSET_INFO *get_old_charset_by_name(const char *old_name);
gptr find_named(I_List<NAMED_LIST> *list, const char *name, uint length);
void delete_elements(I_List<NAMED_LIST> *list, void (*free_element)(gptr));
......
......@@ -2448,6 +2448,24 @@ err:
}
void init_slave_execute(THD *thd, sys_var_str *init_slave_var)
{
Vio* save_vio;
ulong save_client_capabilities;
thd->proc_info= "Execution of init_slave";
thd->query= init_slave_var->value;
thd->query_length= init_slave_var->value_length;
save_client_capabilities= thd->client_capabilities;
thd->client_capabilities|= CLIENT_MULTI_QUERIES;
save_vio= thd->net.vio;
thd->net.vio= 0;
dispatch_command(COM_QUERY, thd, thd->query, thd->query_length+1);
thd->client_capabilities= save_client_capabilities;
thd->net.vio= save_vio;
}
/* Slave SQL Thread entry point */
extern "C" pthread_handler_decl(handle_slave_sql,arg)
......@@ -2530,6 +2548,20 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
llstr(rli->group_relay_log_pos,llbuff1));
/* execute init_slave variable */
if (sys_init_slave.value)
{
rw_wrlock(&LOCK_sys_init_slave);
init_slave_execute(thd, &sys_init_slave);
rw_unlock(&LOCK_sys_init_slave);
if (thd->query_error)
{
sql_print_error("\
Slave SQL thread aborted. Can't execute init_slave query");
goto err;
}
}
/* Read queries from the IO/THREAD until this thread is killed */
while (!sql_slave_killed(thd,rli))
......
......@@ -121,6 +121,7 @@ THD::THD():user_time(0), is_fatal_error(0),
system_thread=cleanup_done=0;
peer_port= 0; // For SHOW PROCESSLIST
transaction.changed_tables = 0;
init_connect_error= 0;
#ifdef __WIN__
real_id = 0;
#endif
......
......@@ -560,6 +560,7 @@ public:
bool prepare_command;
bool tmp_table_used;
bool init_connect_error;
/*
If we do a purge of binary logs, log index info of the threads
that are currently reading it needs to be adjusted. To do that
......
......@@ -794,6 +794,24 @@ check_connections(THD *thd)
}
void init_connect_execute(THD *thd, sys_var_str *init_connect_var)
{
Vio* save_vio;
ulong save_client_capabilities;
thd->proc_info= "Execution of init_connect";
thd->query= init_connect_var->value;
thd->query_length= init_connect_var->value_length;
save_client_capabilities= thd->client_capabilities;
thd->client_capabilities|= CLIENT_MULTI_QUERIES;
save_vio= thd->net.vio;
thd->net.vio= 0;
dispatch_command(COM_QUERY, thd, thd->query, thd->query_length+1);
thd->client_capabilities= save_client_capabilities;
thd->net.vio= save_vio;
}
pthread_handler_decl(handle_one_connection,arg)
{
THD *thd=(THD*) arg;
......@@ -866,9 +884,35 @@ pthread_handler_decl(handle_one_connection,arg)
if (thd->client_capabilities & CLIENT_COMPRESS)
net->compress=1; // Use compression
thd->proc_info=0; // Remove 'login'
thd->command=COM_SLEEP;
thd->version=refresh_version;
thd->version= refresh_version;
if (sys_init_connect.value && !(thd->master_access & SUPER_ACL))
{
rw_wrlock(&LOCK_sys_init_connect);
init_connect_execute(thd, &sys_init_connect);
rw_unlock(&LOCK_sys_init_connect);
if (thd->init_connect_error)
{
if (thd->user_connect)
decrease_user_connections(thd->user_connect);
free_root(&thd->mem_root,MYF(0));
if (!thd->killed && thd->variables.log_warnings)
{
sql_print_error(ER(ER_NEW_ABORTING_CONNECTION),
thd->thread_id,(thd->db ? thd->db : "unconnected"),
thd->user ? thd->user : "unauthenticated",
thd->host_or_ip,
"Can't execute init_connect query");
statistic_increment(aborted_threads,&LOCK_status);
}
else if (thd->killed)
{
statistic_increment(aborted_threads,&LOCK_status);
}
goto end_thread;
}
}
thd->proc_info=0;
thd->set_time();
thd->init_for_queries();
while (!net->error && net->vio != 0 && !thd->killed)
......
......@@ -1587,9 +1587,12 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables,
break;
}
case SHOW_CHAR:
pos= value;
end= strend(pos);
{
if (!(pos= value))
pos= "";
end= strend(pos);
break;
}
case SHOW_STARTTIME:
nr= (long) (thd->query_start() - start_time);
end= int10_to_str(nr, buff, 10);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment