Commit 704becf2 authored by unknown's avatar unknown

updates for LOAD DATA FROM MASTER + some cleanup of replication code


include/mysqld_error.h:
  new errors
mysql-test/r/rpl000009.result:
  test load data from master
mysql-test/t/rpl000009.test:
  test load data from master
sql/mini_client.cc:
  extra functionality needed for load data from master and other things
sql/mini_client.h:
  addition to API
sql/mysql_priv.h:
  mysql_create_db()/mysql_rm_db() now return a value
sql/share/english/errmsg.txt:
  more error messages
sql/slave.cc:
  cleanup of fetch_nx_table()
sql/slave.h:
  cleanup of fetch_nx_table()
sql/sql_base.cc:
  remove unused code originally written for retrieving a non-existent table in slave thread
sql/sql_class.cc:
  remove unused replication variables
sql/sql_class.h:
  remove unused replication variabled
sql/sql_db.cc:
  make mysql_create_db()/mysql_rm_db() work with thd == 0 
  (do not write messages to the net) and instead return success/error
sql/sql_lex.h:
  added SQLCOM_LOAD_MASTER_DATA
sql/sql_parse.cc:
  LOAD MASTER DATA, cleanup of LOAD TABLE FROM MASTER
sql/sql_repl.cc:
  LOAD DATA FROM MASTER
sql/sql_repl.h:
  LOAD DATA FROM MASTER
sql/sql_yacc.yy:
  LOAD DATA FROM MASTER
parent 403b38ee
......@@ -205,4 +205,6 @@
#define ER_SLAVE_THREAD 1202
#define ER_TOO_MANY_USER_CONNECTIONS 1203
#define ER_SET_CONSTANTS_ONLY 1204
#define ER_ERROR_MESSAGES 205
#define ER_CONNECT_TO_MASTER 1205
#define ER_QUERY_ON_MASTER 1206
#define ER_ERROR_MESSAGES 207
n m
4 15
Database
bar
foo
mysql
test
Database
mysql
test
Database
bar
foo
mysql
test
Tables_in_foo
Tables_in_bar
t1
t2
n s
1 one bar
2 two bar
3 three bar
n s
11 eleven bar
12 twelve bar
13 thirteen bar
n s
1 one bar
2 two bar
3 three bar
4 four bar
......@@ -31,3 +31,56 @@ connection slave;
sync_with_master;
drop database if exists bar;
drop database if exists foo;
#now let's test load data from master
#first create some databases and tables on the master
connection master;
set sql_log_bin = 0;
create database foo;
create database bar;
show databases;
create table foo.t1(n int, s char(20));
create table foo.t2(n int, s text);
insert into foo.t1 values (1, 'one'), (2, 'two'), (3, 'three');
insert into foo.t2 values (11, 'eleven'), (12, 'twelve'), (13, 'thirteen');
create table bar.t1(n int, s char(20));
create table bar.t2(n int, s text);
insert into bar.t1 values (1, 'one bar'), (2, 'two bar'), (3, 'three bar');
insert into bar.t2 values (11, 'eleven bar'), (12, 'twelve bar'),
(13, 'thirteen bar');
set sql_log_bin = 1;
save_master_pos;
connection slave;
sync_with_master;
#this should show that the slave is empty at this point
show databases;
load data from master;
#now let's check if we have the right tables and the right data in them
show databases;
use foo;
show tables;
use bar;
show tables;
select * from bar.t1;
select * from bar.t2;
#now let's see if replication works
connection master;
insert into bar.t1 values (4, 'four bar');
save_master_pos;
connection slave;
sync_with_master;
select * from bar.t1;
#now time for cleanup
connection master;
drop database bar;
drop database foo;
save_master_pos;
connection slave;
sync_with_master;
This diff is collapsed.
......@@ -42,6 +42,17 @@ char * STDCALL mc_mysql_error(MYSQL *mysql);
int STDCALL mc_mysql_errno(MYSQL *mysql);
my_bool STDCALL mc_mysql_reconnect(MYSQL* mysql);
int STDCALL mc_mysql_send_query(MYSQL* mysql, const char* query, uint length);
int STDCALL mc_mysql_read_query_result(MYSQL *mysql);
int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length);
MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql);
void STDCALL mc_mysql_free_result(MYSQL_RES *result);
void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row);
my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res);
unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res);
MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res);
int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db);
#endif
......@@ -223,7 +223,7 @@ inline THD *_current_thd(void)
#include "opt_range.h"
void mysql_create_db(THD *thd, char *db, uint create_info);
int mysql_create_db(THD *thd, char *db, uint create_info);
void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags);
int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists);
int quick_rm_table(enum db_type base,const char *db,
......@@ -245,7 +245,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length);
bool check_stack_overrun(THD *thd,char *dummy);
bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables);
void mysql_rm_db(THD *thd,char *db,bool if_exists);
int mysql_rm_db(THD *thd,char *db,bool if_exists);
void table_cache_init(void);
void table_cache_free(void);
uint cached_tables(void);
......
......@@ -153,7 +153,7 @@
"You have an error in your SQL syntax",
"Delayed insert thread couldn't get requested lock for table %-.64s",
"Too many delayed threads in use",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s)",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html",
"Got a packet bigger than 'max_allowed_packet'",
"Got a read error from the connection pipe",
"Got an error from fcntl()",
......@@ -185,7 +185,7 @@
"Got error %d during ROLLBACK",
"Got error %d during FLUSH_LOGS",
"Got error %d during CHECKPOINT",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s)",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html",
"The handler for the table does not support binary table dump",
"Binlog closed, cannot RESET MASTER",
"Failed rebuilding the index of dumped table '%-.64s'",
......@@ -206,3 +206,8 @@
"Could not create slave thread, check system resources",
"User %-.64s has already more than 'max_user_connections' active connections",
"You may only use constant expressions with SET",
"Error connecting to master: %-.128s",
"Error running query on master: %-.128s",
......@@ -20,6 +20,7 @@
#include <myisam.h>
#include "mini_client.h"
#include "slave.h"
#include "sql_repl.h"
#include <thr_alarm.h>
#include <my_dir.h>
......@@ -55,7 +56,7 @@ static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
inline char* rewrite_db(char* db);
......@@ -344,7 +345,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Creating table from master dump";
// save old db in case we are creating in a different database
char* save_db = thd->db;
thd->db = thd->last_nx_db;
thd->db = (char*)db;
mysql_parse(thd, thd->query, packet_len); // run create table
thd->db = save_db; // leave things the way the were before
......@@ -400,31 +401,39 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
return error;
}
int fetch_nx_table(THD* thd, MASTER_INFO* mi)
int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql)
{
MYSQL* mysql = mc_mysql_init(NULL);
int error = 1;
int nx_errno = 0;
if(!mysql)
bool called_connected = (mysql != NULL);
if(!called_connected && !(mysql = mc_mysql_init(NULL)))
{
sql_print_error("fetch_nx_table: Error in mysql_init()");
nx_errno = ER_GET_ERRNO;
goto err;
}
safe_connect(thd, mysql, mi);
if(slave_killed(thd))
goto err;
if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
if(!called_connected)
{
if(connect_to_master(thd, mysql, mi))
{
sql_print_error("Could not connect to master while fetching table\
'%-64s.%-64s'", db_name, table_name);
nx_errno = ER_CONNECT_TO_MASTER;
goto err;
}
}
if(request_table_dump(mysql, db_name, table_name))
{
nx_errno = ER_GET_ERRNO;
sql_print_error("fetch_nx_table: failed on table dump request ");
goto err;
}
if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
thd->last_nx_table))
if(create_table_from_dump(thd, &mysql->net, db_name,
table_name))
{
// create_table_from_dump will have sent the error alread
sql_print_error("fetch_nx_table: failed on create table ");
......@@ -434,7 +443,7 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi)
error = 0;
err:
if (mysql)
if (mysql && !called_connected)
mc_mysql_close(mysql);
if (nx_errno && thd->net.vio)
send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
......@@ -764,7 +773,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
return 0;
}
static int request_table_dump(MYSQL* mysql, char* db, char* table)
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
{
char buf[1024];
char * p = buf;
......@@ -901,7 +910,6 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->last_nx_table = thd->last_nx_db = 0;
thd->query_error = 0; // clear error
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
......
#ifndef SLAVE_H
#define SLAVE_H
#include "mysql.h"
typedef struct st_master_info
{
char log_file_name[FN_REFLEN];
......@@ -65,11 +67,14 @@ typedef struct st_table_rule_ent
int flush_master_info(MASTER_INFO* mi);
int mysql_table_dump(THD* thd, char* db, char* tbl_name, int fd = -1);
int mysql_table_dump(THD* thd, const char* db,
const char* tbl_name, int fd = -1);
// if fd is -1, dump to NET
int fetch_nx_table(THD* thd, MASTER_INFO* mi);
int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql);
// retrieve non-exitent table from master
// the caller must set thd->last_nx_table and thd->last_nx_db first
int show_master_info(THD* thd);
int show_binlog_info(THD* thd);
......
......@@ -837,25 +837,6 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name,
!(table->table_cache_key=memdup_root(&table->mem_root,(char*) key,
key_length)))
{
MEM_ROOT* glob_alloc;
LINT_INIT(glob_alloc);
if (errno == ENOENT &&
(glob_alloc = my_pthread_getspecific_ptr(MEM_ROOT*,THR_MALLOC)))
// Sasha: needed for replication
// remember the name of the non-existent table
// so we can try to download it from the master
{
int table_name_len = (uint) strlen(table_name);
int db_len = (uint) strlen(db);
thd->last_nx_db = alloc_root(glob_alloc,db_len + table_name_len + 2);
if(thd->last_nx_db)
{
thd->last_nx_table = thd->last_nx_db + db_len + 1;
memcpy(thd->last_nx_table, table_name, table_name_len + 1);
memcpy(thd->last_nx_db, db, db_len + 1);
}
}
table->next=table->prev=table;
free_cache_entry(table);
VOID(pthread_mutex_unlock(&LOCK_open));
......
......@@ -96,7 +96,6 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
current_linfo = 0;
slave_thread = 0;
slave_proxy_id = 0;
last_nx_table = last_nx_db = 0;
cond_count=0;
convert_set=0;
mysys_var=0;
......
......@@ -242,8 +242,6 @@ public:
enum enum_server_command command;
uint32 server_id;
const char *where;
char* last_nx_table; // last non-existent table, we need this for replication
char* last_nx_db; // database of the last nx table
time_t start_time,time_after_lock,user_time;
time_t connect_time,thr_create_time; // track down slow pthread_create
thr_lock_type update_lock_default;
......
......@@ -30,11 +30,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *path,
/* db-name is already validated when we come here */
void mysql_create_db(THD *thd, char *db, uint create_options)
int mysql_create_db(THD *thd, char *db, uint create_options)
{
char path[FN_REFLEN+16];
MY_DIR *dirp;
long result=1;
int error = 0;
DBUG_ENTER("mysql_create_db");
VOID(pthread_mutex_lock(&LOCK_mysql_create_db));
......@@ -47,7 +48,9 @@ void mysql_create_db(THD *thd, char *db, uint create_options)
my_dirend(dirp);
if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
{
net_printf(&thd->net,ER_DB_CREATE_EXISTS,db);
if(thd)
net_printf(&thd->net,ER_DB_CREATE_EXISTS,db);
error = 1;
goto exit;
}
result = 0;
......@@ -57,34 +60,39 @@ void mysql_create_db(THD *thd, char *db, uint create_options)
strend(path)[-1]=0; // Remove last '/' from path
if (my_mkdir(path,0777,MYF(0)) < 0)
{
net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno);
if(thd)
net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno);
error = 1;
goto exit;
}
}
if (!thd->query)
{
thd->query = path;
thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)-
path);
}
if(thd)
{
mysql_update_log.write(thd,thd->query, thd->query_length);
if (mysql_bin_log.is_open())
if (!thd->query)
{
Query_log_event qinfo(thd, thd->query);
mysql_bin_log.write(&qinfo);
thd->query = path;
thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)-
path);
}
{
mysql_update_log.write(thd,thd->query, thd->query_length);
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, thd->query);
mysql_bin_log.write(&qinfo);
}
}
if (thd->query == path)
{
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net, result);
}
if (thd->query == path)
{
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net, result);
exit:
VOID(pthread_mutex_unlock(&LOCK_mysql_create_db));
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
const char *del_exts[]=
......@@ -94,10 +102,14 @@ static TYPELIB deletable_extentions=
/* db-name is already validated when we come here */
void mysql_rm_db(THD *thd,char *db,bool if_exists)
/* If thd == 0, do not write any messages
This is useful in replication when we want to remove
a stale database before replacing it with the new one
*/
int mysql_rm_db(THD *thd,char *db,bool if_exists)
{
long deleted=0;
int error = 0;
char path[FN_REFLEN+16];
MY_DIR *dirp;
DBUG_ENTER("mysql_rm_db");
......@@ -110,15 +122,19 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists)
/* See if the directory exists */
if (!(dirp = my_dir(path,MYF(MY_WME | MY_DONT_SORT))))
{
if (!if_exists)
net_printf(&thd->net,ER_DB_DROP_EXISTS,db);
else
send_ok(&thd->net,0);
if(thd)
{
if (!if_exists)
net_printf(&thd->net,ER_DB_DROP_EXISTS,db);
else
send_ok(&thd->net,0);
}
error = !if_exists;
goto exit;
}
remove_db_from_cache(db);
if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0)
if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0 && thd)
{
if (!thd->query)
{
......@@ -137,13 +153,14 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists)
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net,(ulong) deleted);
}
exit:
VOID(pthread_mutex_unlock(&LOCK_open));
VOID(pthread_mutex_unlock(&LOCK_mysql_create_db));
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
/*
......@@ -151,6 +168,7 @@ exit:
are 2 digits (raid directories).
*/
/* This one also needs to work with thd == 0 for replication */
static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
uint level)
{
......@@ -162,7 +180,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* remove all files with known extensions */
for (uint idx=2 ;
idx < (uint) dirp->number_off_files && !thd->killed ;
idx < (uint) dirp->number_off_files && (!thd || !thd->killed) ;
idx++)
{
FILEINFO *file=dirp->dir_entry+idx;
......@@ -196,7 +214,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
unpack_filename(filePath,filePath);
if (my_delete(filePath,MYF(MY_WME)))
{
net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error);
if(thd)
net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error);
my_dirend(dirp);
DBUG_RETURN(-1);
}
......@@ -205,7 +224,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
my_dirend(dirp);
if (thd->killed)
if (thd && thd->killed)
{
send_error(&thd->net,ER_SERVER_SHUTDOWN);
DBUG_RETURN(-1);
......@@ -229,7 +248,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* Don't give errors if we can't delete 'RAID' directory */
if (level)
DBUG_RETURN(deleted);
send_error(&thd->net);
if(thd)
send_error(&thd->net);
DBUG_RETURN(-1);
}
path=filePath;
......@@ -242,7 +262,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* Don't give errors if we can't delete 'RAID' directory */
if (rmdir(path) < 0 && !level)
{
net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno);
if(thd)
net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno);
DBUG_RETURN(-1);
}
}
......
......@@ -53,7 +53,7 @@ enum enum_sql_command {
SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_CHANGE_MASTER,
SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE,
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS,
SQLCOM_SHOW_OPEN_TABLES,
SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA,
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ
};
......
......@@ -1203,6 +1203,13 @@ mysql_execute_command(void)
res = show_binlog_info(thd);
break;
}
case SQLCOM_LOAD_MASTER_DATA: // sync with master
if(check_process_priv(thd))
goto error;
res = load_master_data(thd);
break;
case SQLCOM_LOAD_MASTER_TABLE:
if (!tables->db)
......@@ -1226,9 +1233,7 @@ mysql_execute_command(void)
break;
}
thd->last_nx_table = tables->real_name;
thd->last_nx_db = tables->db;
if(fetch_nx_table(thd, &glob_mi))
if(fetch_nx_table(thd, tables->db, tables->real_name, &glob_mi, 0))
// fetch_nx_table is responsible for sending
// the error
{
......
......@@ -21,6 +21,7 @@
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
#include "mini_client.h"
#include <thr_alarm.h>
#include <my_dir.h>
......@@ -845,5 +846,220 @@ err:
return 1;
}
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
if(!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Connection to master failed: %s",
mc_mysql_error(mysql));
return 1;
}
return 0;
}
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
MYSQL_RES** cur, MYSQL_RES** start)
{
for( ; cur >= start; --cur)
if(*cur)
mc_mysql_free_result(*cur);
mc_mysql_free_result(db_res);
}
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
MYSQL_RES* table_res)
{
MYSQL_ROW row;
for( row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
TABLE_LIST table;
const char* table_name = row[0];
int error;
if(table_rules_on)
{
table.next = 0;
table.db = (char*)db;
table.real_name = (char*)table_name;
if(!tables_ok(thd, &table))
continue;
}
if((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
return error;
}
return 0;
}
int load_master_data(THD* thd)
{
MYSQL mysql;
MYSQL_RES* master_status_res = 0;
bool slave_was_running = 0;
int error = 0;
mc_mysql_init(&mysql);
pthread_mutex_lock(&LOCK_slave);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
// first, kill the slave
if((slave_was_running = slave_running))
{
abort_slave = 1;
thr_alarm_kill(slave_real_id);
thd->proc_info = "waiting for slave to die";
while(slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
}
if(connect_to_master(thd, &mysql, &glob_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
uint num_dbs;
MYSQL_ROW row;
if(mc_mysql_query(&mysql, "show databases", 0) ||
!(db_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
if(!(num_dbs = mc_mysql_num_rows(db_res)))
goto err;
// in theory, the master could have no databases at all
// and run with skip-grant
if(!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
net_printf(&thd->net, error = ER_OUTOFMEMORY);
goto err;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if(mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0)
|| mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end = table_res + num_dbs;
for(cur_table_res = table_res; cur_table_res < table_res_end;
++cur_table_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(db_res);
// since we know how many rows we have, this can never be NULL
char* db = row[0];
int drop_error = 0;
// do not replicate databases excluded by rules
// also skip mysql database - in most cases the user will
// mess up and not exclude mysql database with the rules when
// he actually means to - in this case, he is up for a surprise if
// his priv tables get dropped and downloaded from master
// TO DO - add special option, not enabled
// by default, to allow inclusion of mysql database into load
// data from master
if(!db_ok(db, replicate_do_db, replicate_ignore_db) ||
!strcmp(db,"mysql"))
{
*cur_table_res = 0;
continue;
}
if((drop_error = mysql_rm_db(0, db, 1)) ||
mysql_create_db(0, db, 0))
{
error = (drop_error) ? ER_DB_DROP_DELETE : ER_CANT_CREATE_DB;
net_printf(&thd->net, error, db, my_error);
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if(mc_mysql_select_db(&mysql, db) ||
mc_mysql_query(&mysql, "show tables", 0) ||
!(*cur_table_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
goto err;
}
}
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
// adjust position in the master
if(master_status_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
// we need this check because the master may not be running with
// log-bin, but it will still allow us to do all the steps
// of LOAD DATA FROM MASTER - no reason to forbid it, really,
// although it does not make much sense for the user to do it
if(row[0] && row[1])
{
strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
if(glob_mi.pos < 4)
glob_mi.pos = 4; // don't hit the magic number
glob_mi.pending = 0;
flush_master_info(&glob_mi);
}
mc_mysql_free_result(master_status_res);
}
if(mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
}
err:
pthread_mutex_unlock(&LOCK_slave);
if(slave_was_running)
start_slave(0, 0);
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if(!error)
send_ok(&thd->net);
return error;
}
......@@ -14,6 +14,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd);
void reset_slave();
void reset_master();
......
......@@ -2401,6 +2401,11 @@ load: LOAD DATA_SYM opt_low_priority opt_local INFILE TEXT_STRING
YYABORT;
}
|
LOAD DATA_SYM FROM MASTER_SYM
{
Lex->sql_command = SQLCOM_LOAD_MASTER_DATA;
}
opt_local:
/* empty */ { $$=0;}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment