Fixed temp tables to work with replication. It will not work if you stop the slave

while slave thread has temp tables - not hard to fix though, but it is time to go to bed

added a new test case for temp table replication and Slave_open_temp_tables
status variable.

Misc fixes
parent acc8e0c9
...@@ -280,3 +280,5 @@ mysql-test/rpl000011.test ...@@ -280,3 +280,5 @@ mysql-test/rpl000011.test
mysql-test/var/lib/mysql-bin.007 mysql-test/var/lib/mysql-bin.007
sql/share/norwegian/errmsg.sys sql/share/norwegian/errmsg.sys
sql/share/norwegian-ny/errmsg.sys sql/share/norwegian-ny/errmsg.sys
mysql-test/r/3.23/rpl000001.b.result.reject
mysql-test/r/3.23/rpl000012.result.reject
...@@ -78,6 +78,7 @@ struct query ...@@ -78,6 +78,7 @@ struct query
}; };
static void die(const char* fmt, ...); static void die(const char* fmt, ...);
int close_connection(struct query* q);
int hex_val(int c) int hex_val(int c)
...@@ -155,6 +156,30 @@ int select_connection(struct query* q) ...@@ -155,6 +156,30 @@ int select_connection(struct query* q)
return 1; return 1;
} }
int close_connection(struct query* q)
{
char* p, *name;
struct connection *con;
p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++;
if(!*p)
die("Missing connection name in connect\n");
name = p;
while(*p && !isspace(*p))
p++;
*p = 0;
for(con = cons; con < next_con; con++)
if(!strcmp(con->name, name))
{
mysql_close(&con->mysql);
return 0;
}
die("connection '%s' not found in connection pool", name);
return 1;
}
/* this one now is a hack - we may want to improve in in the /* this one now is a hack - we may want to improve in in the
future to handle quotes. For now we assume that anything that is not future to handle quotes. For now we assume that anything that is not
...@@ -851,6 +876,8 @@ int main(int argc, char** argv) ...@@ -851,6 +876,8 @@ int main(int argc, char** argv)
do_connect(&q); do_connect(&q);
else if(check_first_word(&q, "connection", 10)) else if(check_first_word(&q, "connection", 10))
select_connection(&q); select_connection(&q);
else if(check_first_word(&q, "disconnect", 10))
close_connection(&q);
else if(check_first_word(&q, "source", 6)) else if(check_first_word(&q, "source", 6))
do_source(&q); do_source(&q);
else if(check_first_word(&q, "sleep", 5)) else if(check_first_word(&q, "sleep", 5))
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# to start mysqld yourself and run mysqltest -r # to start mysqld yourself and run mysqltest -r
RESULT_DIR=r/3.23 RESULT_DIR=r/3.23
if [ -z $EDITOR] then; if [ -z $EDITOR] ; then
EDITOR=vi EDITOR=vi
fi fi
...@@ -26,7 +26,7 @@ test_name=$1 ...@@ -26,7 +26,7 @@ test_name=$1
[ -z $test_name ] && usage [ -z $test_name ] && usage
result_file=$result_dir/$test_name.result result_file=$RESULT_DIR/$test_name.result
[ -f $result_file ] && die "result file $result_file has already been created" [ -f $result_file ] && die "result file $result_file has already been created"
...@@ -39,11 +39,11 @@ reject_file=$result_file.reject ...@@ -39,11 +39,11 @@ reject_file=$result_file.reject
if [ -f $reject_file ] ; then if [ -f $reject_file ] ; then
echo "Below are the contents of the reject file:" echo "Below are the contents of the reject file:"
echo "-----start---------------------" echo "-----start---------------------"
cat $result_file. cat $reject_file
echo "-----end-----------------------" echo "-----end-----------------------"
echo "Is this the output you expected from your test case?(y/n)[n]" echo "Is this the output you expected from your test case?(y/n)[n]"
read yes_no read yes_no
if [ x$yes_no = xy ] then; if [ x$yes_no = xy ] ; then
echo "Press any key to edit it in $EDITOR, or Ctrl-C to abort" echo "Press any key to edit it in $EDITOR, or Ctrl-C to abort"
read junk read junk
$EDITOR $reject_file $EDITOR $reject_file
......
Variable_name Value
Slave_open_temp_tables 0
source t/include/master-slave.inc;
connection master;
drop table if exists x;
create table x(n int);
create temporary table t(n int);
insert into t values(1),(2),(3);
insert into x select * from t;
connection master1;
create temporary table t (n int);
insert into t values (4),(5);
insert into x select * from t;
disconnect master;
connection master1;
insert into x values(6);
disconnect master1;
connection slave;
sleep 1;
@r/3.23/rpl000012.result select * from x;
@r/3.23/rpl000012.status.result show status like 'Slave_open_temp_tables';
connect (master,localhost,root,,test,0,var/tmp/mysql.sock); connect (master,localhost,root,,test,0,var/tmp/mysql.sock);
connect (master1,localhost,root,,test,0,var/tmp/mysql.sock);
connect (slave,localhost,root,,test,0,var/tmp/mysql-slave.sock); connect (slave,localhost,root,,test,0,var/tmp/mysql-slave.sock);
connect (slave1,localhost,root,,test,0,var/tmp/mysql-slave.sock);
connection slave; connection slave;
!slave stop; !slave stop;
connection master; connection master;
......
...@@ -397,7 +397,7 @@ void close_temporary_tables(THD *thd); ...@@ -397,7 +397,7 @@ void close_temporary_tables(THD *thd);
TABLE **find_temporary_table(THD *thd, const char *db, const char *table_name); TABLE **find_temporary_table(THD *thd, const char *db, const char *table_name);
bool close_temporary_table(THD *thd, const char *db, const char *table_name); bool close_temporary_table(THD *thd, const char *db, const char *table_name);
void close_temporary(TABLE *table, bool delete_table=1); void close_temporary(TABLE *table, bool delete_table=1);
bool rename_temporary_table(TABLE *table, const char *new_db, bool rename_temporary_table(THD* thd, TABLE *table, const char *new_db,
const char *table_name); const char *table_name);
void remove_db_from_cache(const my_string db); void remove_db_from_cache(const my_string db);
void flush_tables(); void flush_tables();
...@@ -451,7 +451,8 @@ extern ulong refresh_version,flush_version, thread_id,query_id,opened_tables, ...@@ -451,7 +451,8 @@ extern ulong refresh_version,flush_version, thread_id,query_id,opened_tables,
extern ulong filesort_rows, filesort_range_count, filesort_scan_count; extern ulong filesort_rows, filesort_range_count, filesort_scan_count;
extern ulong filesort_merge_passes; extern ulong filesort_merge_passes;
extern ulong select_range_check_count, select_range_count, select_scan_count; extern ulong select_range_check_count, select_range_count, select_scan_count;
extern ulong select_full_range_join_count,select_full_join_count; extern ulong select_full_range_join_count,select_full_join_count,
slave_open_temp_tables;
extern uint test_flags,select_errors,mysql_port,ha_open_options; extern uint test_flags,select_errors,mysql_port,ha_open_options;
extern ulong thd_startup_options, slow_launch_threads, slow_launch_time; extern ulong thd_startup_options, slow_launch_threads, slow_launch_time;
extern time_t start_time; extern time_t start_time;
......
...@@ -361,8 +361,7 @@ static void dump_local_log_entries(const char* logname) ...@@ -361,8 +361,7 @@ static void dump_local_log_entries(const char* logname)
die("Could not read entry at offset %ld : Error in log format or \ die("Could not read entry at offset %ld : Error in log format or \
read error", read error",
my_b_tell(file)); my_b_tell(file));
else // file->error == 0 means EOF, that's OK, we break in this case
die("Could not construct event object");
break; break;
} }
if (rec_count >= offset) if (rec_count >= offset)
......
...@@ -200,7 +200,7 @@ ulong keybuff_size,sortbuff_size,max_item_sort_length,table_cache_size, ...@@ -200,7 +200,7 @@ ulong keybuff_size,sortbuff_size,max_item_sort_length,table_cache_size,
thread_stack_min,net_wait_timeout,what_to_log= ~ (1L << (uint) COM_TIME), thread_stack_min,net_wait_timeout,what_to_log= ~ (1L << (uint) COM_TIME),
query_buff_size, lower_case_table_names, mysqld_net_retry_count, query_buff_size, lower_case_table_names, mysqld_net_retry_count,
net_interactive_timeout, slow_launch_time = 2L, net_interactive_timeout, slow_launch_time = 2L,
net_read_timeout,net_write_timeout; net_read_timeout,net_write_timeout,slave_open_temp_tables=0;
ulong thread_cache_size=0; ulong thread_cache_size=0;
volatile ulong cached_thread_count=0; volatile ulong cached_thread_count=0;
...@@ -2556,6 +2556,7 @@ struct show_var_st status_vars[]= { ...@@ -2556,6 +2556,7 @@ struct show_var_st status_vars[]= {
{"Select_range_check", (char*) &select_range_check_count, SHOW_LONG}, {"Select_range_check", (char*) &select_range_check_count, SHOW_LONG},
{"Select_scan", (char*) &select_scan_count, SHOW_LONG}, {"Select_scan", (char*) &select_scan_count, SHOW_LONG},
{"Slave_running", (char*) &slave_running, SHOW_BOOL}, {"Slave_running", (char*) &slave_running, SHOW_BOOL},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
{"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG}, {"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG},
{"Slow_queries", (char*) &long_query_count, SHOW_LONG}, {"Slow_queries", (char*) &long_query_count, SHOW_LONG},
{"Sort_merge_passes", (char*) &filesort_merge_passes, SHOW_LONG}, {"Sort_merge_passes", (char*) &filesort_merge_passes, SHOW_LONG},
......
...@@ -762,6 +762,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -762,6 +762,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->query_error = 0; // clear error thd->query_error = 0; // clear error
thd->net.last_errno = 0; thd->net.last_errno = 0;
thd->net.last_error[0] = 0; thd->net.last_error[0] = 0;
thd->slave_proxy_id = qev->thread_id; // for temp tables
mysql_parse(thd, thd->query, q_len); mysql_parse(thd, thd->query, q_len);
int expected_error,actual_error; int expected_error,actual_error;
if((expected_error = qev->error_code) != if((expected_error = qev->error_code) !=
...@@ -782,25 +783,18 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -782,25 +783,18 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->convert_set = 0; // assume no convert for next query thd->convert_set = 0; // assume no convert for next query
// unless set explictly // unless set explictly
close_thread_tables(thd); close_thread_tables(thd);
free_root(&thd->mem_root,0);
if (thd->query_error) if (thd->query_error || thd->fatal_error)
{ {
sql_print_error("Slave: error running query '%s' ", sql_print_error("Slave: error running query '%s' ",
qev->query); qev->query);
free_root(&thd->mem_root,0);
delete ev; delete ev;
return 1; return 1;
} }
free_root(&thd->mem_root,0);
delete ev; delete ev;
if(thd->fatal_error)
{
sql_print_error("Slave: Fatal error running query '%s' ",
thd->query);
return 1;
}
mi->inc_pos(event_len); mi->inc_pos(event_len);
flush_master_info(mi); flush_master_info(mi);
break; break;
...@@ -875,6 +869,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -875,6 +869,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
List<Item> fields; List<Item> fields;
lev->set_fields(fields); lev->set_fields(fields);
thd->slave_proxy_id = thd->thread_id;
thd->net.vio = net->vio; thd->net.vio = net->vio;
// mysql_load will use thd->net to read the file // mysql_load will use thd->net to read the file
thd->net.pkt_nr = net->pkt_nr; thd->net.pkt_nr = net->pkt_nr;
...@@ -920,11 +915,13 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -920,11 +915,13 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
} }
case START_EVENT: case START_EVENT:
close_temporary_tables(thd);
mi->inc_pos(event_len); mi->inc_pos(event_len);
flush_master_info(mi); flush_master_info(mi);
break; break;
case STOP_EVENT: case STOP_EVENT:
close_temporary_tables(thd);
mi->inc_pos(event_len); mi->inc_pos(event_len);
flush_master_info(mi); flush_master_info(mi);
break; break;
......
...@@ -98,6 +98,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), ...@@ -98,6 +98,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
start_time=(time_t) 0; start_time=(time_t) 0;
current_linfo = 0; current_linfo = 0;
slave_thread = 0; slave_thread = 0;
slave_proxy_id = 0;
last_nx_table = last_nx_db = 0; last_nx_table = last_nx_db = 0;
inactive_timeout=net_wait_timeout; inactive_timeout=net_wait_timeout;
open_options=ha_open_options; open_options=ha_open_options;
......
...@@ -274,6 +274,10 @@ public: ...@@ -274,6 +274,10 @@ public:
// if we do a purge of binary logs, log index info of the threads // if we do a purge of binary logs, log index info of the threads
// that are currently reading it needs to be adjusted. To do that // that are currently reading it needs to be adjusted. To do that
// each thread that is using LOG_INFO needs to adjust the pointer to it // each thread that is using LOG_INFO needs to adjust the pointer to it
ulong slave_proxy_id; // in slave thread we need to know in behalf of which
// thread the query is being run to replicate temp tables properly
THD(); THD();
~THD(); ~THD();
bool store_globals(); bool store_globals();
......
...@@ -860,7 +860,7 @@ mysql_execute_command(void) ...@@ -860,7 +860,7 @@ mysql_execute_command(void)
TABLE_LIST *tables=(TABLE_LIST*) lex->table_list.first; TABLE_LIST *tables=(TABLE_LIST*) lex->table_list.first;
DBUG_ENTER("mysql_execute_command"); DBUG_ENTER("mysql_execute_command");
if(thd->slave_thread && table_rules_on && tables && !tables_ok(thd,tables)) if(table_rules_on && thd->slave_thread && tables && !tables_ok(thd,tables))
DBUG_VOID_RETURN; // skip if we are in the slave thread, some table DBUG_VOID_RETURN; // skip if we are in the slave thread, some table
// rules have been given and the table list says the query should not be // rules have been given and the table list says the query should not be
// replicated // replicated
......
...@@ -1437,7 +1437,7 @@ int mysql_alter_table(THD *thd,char *new_db, char *new_name, ...@@ -1437,7 +1437,7 @@ int mysql_alter_table(THD *thd,char *new_db, char *new_name,
} }
/* Remove link to old table and rename the new one */ /* Remove link to old table and rename the new one */
close_temporary_table(thd,table->table_cache_key,table_name); close_temporary_table(thd,table->table_cache_key,table_name);
if (rename_temporary_table(new_table, new_db, new_name)) if (rename_temporary_table(thd, new_table, new_db, new_name))
{ // Fatal error { // Fatal error
close_temporary_table(thd,new_db,tmp_name); close_temporary_table(thd,new_db,tmp_name);
my_free((gptr) new_table,MYF(0)); my_free((gptr) new_table,MYF(0));
...@@ -1611,6 +1611,8 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -1611,6 +1611,8 @@ copy_data_between_tables(TABLE *from,TABLE *to,
(copy_end++)->set(*ptr,def->field,0); (copy_end++)->set(*ptr,def->field,0);
} }
found_count=delete_count=0;
if(order) { if(order) {
from->io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE), from->io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
MYF(MY_FAE | MY_ZEROFILL)); MYF(MY_FAE | MY_ZEROFILL));
...@@ -1628,7 +1630,6 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -1628,7 +1630,6 @@ copy_data_between_tables(TABLE *from,TABLE *to,
init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1); init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1);
found_count=delete_count=0;
next_field=to->next_number_field; next_field=to->next_number_field;
while (!(error=info.read_record(&info))) while (!(error=info.read_record(&info)))
{ {
......
...@@ -41,7 +41,8 @@ ...@@ -41,7 +41,8 @@
#define ERRMAPP 1 /* Errormap f|r my_error */ #define ERRMAPP 1 /* Errormap f|r my_error */
#define LIBLEN FN_REFLEN-FN_LEN /* Max l{ngd p} dev */ #define LIBLEN FN_REFLEN-FN_LEN /* Max l{ngd p} dev */
#define MAX_DBKEY_LENGTH (FN_LEN*2+2) #define MAX_DBKEY_LENGTH (FN_LEN*2+6) /* extra 4 bytes for slave tmp
* tables */
#define MAX_FIELD_NAME 34 /* Max colum name length +2 */ #define MAX_FIELD_NAME 34 /* Max colum name length +2 */
#define MAX_KEY 32 /* Max used keys */ #define MAX_KEY 32 /* Max used keys */
#define MAX_REF_PARTS 16 /* Max parts used as ref */ #define MAX_REF_PARTS 16 /* Max parts used as ref */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment