Commit 8cdb118a authored by Michael Widenius's avatar Michael Widenius

Fixed: MDEV-4352; LOAD DATA was not multi-source safe

- Calls to cleanup_load_tmpdir() could delete temporary files for another master connection
- Concurrent LOAD DATA commands from two master connections could use the same file name

Other bug fixes:
- Enlarge buffer for connection names with 'special characters' one can't store in filenames

Optimization:
- Don't do 'lower case' of connection names. We can use cmp_connection_name, where we already have the connection name in lower case.


mysql-test/suite/multi_source/load_data.result:
  Test case for MDEV-4352
mysql-test/suite/multi_source/load_data.test:
  Test case for MDEV-4352
sql/log_event.cc:
  Fixed: MDEV-4352
  - Calls to cleanup_load_tmpdir() could delete temporary files for another master connection
  - Concurrent LOAD DATA commands from two master connections could use the same file name
  
  The fix was to add the connection name (if one exists) to all slave temporary files used by LOAD DATA
sql/rpl_mi.cc:
  Enlarge buffer for connection names with 'special characters' one can't store in filenames
  Use mi->cmp_connection_name for connection file names.
sql/rpl_rli.cc:
  Use mi->cmp_connection_name for connection file names.
sql/slave.cc:
  Removed not needed empty line
sql/sql_const.h:
  Added MAX_FILENAME_MBWIDTH to be able to calculate buffer length for connection_names stored in file names
sql/sql_repl.cc:
  Use mi->cmp_connection_name for connection file names.
parent 59830e1a
change master '' to master_port=MYPORT_1, master_host='127.0.0.1', master_user='root';
change master 'master2' to master_port=MYPORT_2, master_host='127.0.0.1', master_user='root';
start all slaves;
Warnings:
Note 1937 SLAVE 'master2' started
Note 1937 SLAVE '' started
set default_master_connection = '';
include/wait_for_slave_to_start.inc
set default_master_connection = 'master2';
include/wait_for_slave_to_start.inc
set default_master_connection = '';
create table t1 (a varchar(10) character set utf8);
load data infile '../../std_data/loaddata6.dat' into table t1;
create table t2 (a varchar(10) character set utf8);
load data infile '../../std_data/loaddata6.dat' into table t2;
select count(*) from t1;
count(*)
1
select count(*) from t2;
count(*)
1
drop table t1;
drop table t2;
stop all slaves;
Warnings:
Note 1938 SLAVE 'master2' stopped
Note 1938 SLAVE '' stopped
include/reset_master_slave.inc
include/reset_master_slave.inc
include/reset_master_slave.inc
#
# Simple multi-master test
#
--source include/not_embedded.inc
--let $rpl_server_count= 0
--connect (slave,127.0.0.1,root,,,$SERVER_MYPORT_3)
--connect (master1,127.0.0.1,root,,,$SERVER_MYPORT_1)
--connect (master2,127.0.0.1,root,,,$SERVER_MYPORT_2)
--connection slave
--replace_result $SERVER_MYPORT_1 MYPORT_1
eval change master '' to master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root';
--replace_result $SERVER_MYPORT_2 MYPORT_2
eval change master 'master2' to master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root';
start all slaves;
set default_master_connection = '';
--source include/wait_for_slave_to_start.inc
set default_master_connection = 'master2';
--source include/wait_for_slave_to_start.inc
#
# Now test doing a load data infile from both connections
#
set default_master_connection = '';
--connection master1
create table t1 (a varchar(10) character set utf8);
load data infile '../../std_data/loaddata6.dat' into table t1;
--save_master_pos
--connection slave
--sync_with_master 0,''
--connection master2
create table t2 (a varchar(10) character set utf8);
load data infile '../../std_data/loaddata6.dat' into table t2;
--save_master_pos
--connection slave
--sync_with_master 0,'master2'
select count(*) from t1;
select count(*) from t2;
--connection master1
drop table t1;
--connection master2
drop table t2;
#
# clean up
#
--connection master1
--save_master_pos
--connection slave
--sync_with_master 0,''
--connection master2
--save_master_pos
--connection slave
--sync_with_master 0,'master2'
--connection slave
stop all slaves;
--source reset_master_slave.inc
--disconnect slave
--connection master1
--source reset_master_slave.inc
--disconnect master1
--connection master2
--source reset_master_slave.inc
--disconnect master2
......@@ -47,6 +47,7 @@
#include "transaction.h"
#include <my_dir.h>
#include "sql_show.h" // append_identifier
#include <strfunc.h>
#endif /* MYSQL_CLIENT */
......@@ -518,11 +519,59 @@ pretty_print_str(String *packet, const char *str, int len)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
/**
Creates a temporary name for load data infile:.
Create a prefix for the temporary files that is to be used for
load data file name for this master
@param name Store prefix of name here
@param connection_name Connection name
@return pointer to end of name
@description
We assume that FN_REFLEN is big enough to hold
MAX_CONNECTION_NAME * MAX_FILENAME_MBWIDTH characters + 2 numbers +
a short extension.
The resulting file name has the following parts, each separated with a '-'
- PREFIX_SQL_LOAD (SQL_LOAD-)
- If a connection name is given (multi-master setup):
- Add an extra '-' to mark that this is a multi-master file
- connection name in lower case, converted to safe file characters.
(see create_logfile_name_with_suffix()).
- server_id
- A last '-' (after server_id).
*/
static char *load_data_tmp_prefix(char *name,
LEX_STRING *connection_name)
{
name= strmov(name, PREFIX_SQL_LOAD);
if (connection_name->length)
{
uint buf_length;
uint errors;
/* Add marker that this is a multi-master-file */
*name++='-';
/* Convert connection_name to a safe filename */
buf_length= strconvert(system_charset_info, connection_name->str,
&my_charset_filename, name, FN_REFLEN,
&errors);
name+= buf_length;
*name++= '-';
}
name= int10_to_str(global_system_variables.server_id, name, 10);
*name++ = '-';
*name= '\0'; // For testing prefixes
return name;
}
/**
Creates a temporary name for LOAD DATA INFILE
@param buf Store new filename here
@param file_id File_id (part of file name)
@param event_server_id Event_id (part of file name)
@param event_server_id Event_id (part of file name)
@param ext Extension for file name
@return
......@@ -530,16 +579,14 @@ pretty_print_str(String *packet, const char *str, int len)
*/
static char *slave_load_file_stem(char *buf, uint file_id,
int event_server_id, const char *ext)
int event_server_id, const char *ext,
LEX_STRING *connection_name)
{
char *res;
fn_format(buf,PREFIX_SQL_LOAD,slave_load_tmpdir, "", MY_UNPACK_FILENAME);
res= buf+ unpack_dirname(buf, slave_load_tmpdir);
to_unix_path(buf);
buf = strend(buf);
buf = int10_to_str(global_system_variables.server_id, buf, 10);
*buf++ = '-';
buf = int10_to_str(event_server_id, buf, 10);
buf= load_data_tmp_prefix(res, connection_name);
buf= int10_to_str(event_server_id, buf, 10);
*buf++ = '-';
res= int10_to_str(file_id, buf, 10);
strmov(res, ext); // Add extension last
......@@ -554,14 +601,17 @@ static char *slave_load_file_stem(char *buf, uint file_id,
Delete all temporary files used for SQL_LOAD.
*/
static void cleanup_load_tmpdir()
static void cleanup_load_tmpdir(LEX_STRING *connection_name)
{
MY_DIR *dirp;
FILEINFO *file;
uint i;
char fname[FN_REFLEN], prefbuf[31], *p;
char dir[FN_REFLEN], fname[FN_REFLEN];
char prefbuf[31 + MAX_CONNECTION_NAME* MAX_FILENAME_MBWIDTH + 1];
DBUG_ENTER("cleanup_load_tmpdir");
if (!(dirp=my_dir(slave_load_tmpdir,MYF(0))))
unpack_dirname(dir, slave_load_tmpdir);
if (!(dirp=my_dir(dir, MYF(MY_WME))))
return;
/*
......@@ -572,10 +622,9 @@ static void cleanup_load_tmpdir()
we cannot meet Start_log event in the middle of events from one
LOAD DATA.
*/
p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD));
p= int10_to_str(global_system_variables.server_id, p, 10);
*(p++)= '-';
*p= 0;
load_data_tmp_prefix(prefbuf, connection_name);
DBUG_PRINT("enter", ("dir: '%s' prefix: '%s'", dir, prefbuf));
for (i=0 ; i < (uint)dirp->number_of_files; i++)
{
......@@ -588,6 +637,7 @@ static void cleanup_load_tmpdir()
}
my_dirend(dirp);
DBUG_VOID_RETURN;
}
#endif
......@@ -4417,7 +4467,11 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
if (created)
{
error= close_temporary_tables(thd);
cleanup_load_tmpdir();
/*
The following is only false if we get here with a BINLOG statement
*/
if (rli->mi)
cleanup_load_tmpdir(&rli->mi->cmp_connection_name);
}
else
{
......@@ -7784,7 +7838,8 @@ int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
bzero((char*)&file, sizeof(file));
fname_buf= strmov(proc_info, "Making temp file ");
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info",
&rli->mi->connection_name);
thd_proc_info(thd, proc_info);
/* old copy may exist already */
mysql_file_delete(key_file_log_event_info, fname_buf, MYF(0));
......@@ -7962,7 +8017,8 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
DBUG_ENTER("Append_block_log_event::do_apply_event");
fname= strmov(proc_info, "Making temp file ");
slave_load_file_stem(fname, file_id, server_id, ".data");
slave_load_file_stem(fname, file_id, server_id, ".data",
&rli->mi->cmp_connection_name);
thd_proc_info(thd, proc_info);
if (get_create_or_append())
{
......@@ -8106,7 +8162,8 @@ void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol)
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli)
{
char fname[FN_REFLEN+10];
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data",
&rli->mi->cmp_connection_name);
mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME));
strmov(ext, ".info");
mysql_file_delete(key_file_log_event_info, fname, MYF(MY_WME));
......@@ -8210,7 +8267,8 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
IO_CACHE file;
Load_log_event *lev= 0;
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
ext= slave_load_file_stem(fname, file_id, server_id, ".info",
&rli->mi->cmp_connection_name);
if ((fd= mysql_file_open(key_file_log_event_info,
fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
MYF(MY_WME))) < 0 ||
......@@ -8497,7 +8555,8 @@ Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
memcpy(p, query, fn_pos_start);
p+= fn_pos_start;
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
p= slave_load_file_stem(p, file_id, server_id, ".data");
p= slave_load_file_stem(p, file_id, server_id, ".data",
&rli->mi->cmp_connection_name);
fname_end= p= strend(p); // Safer than p=p+5
*(p++)='\'';
switch (dup_handling) {
......
......@@ -687,10 +687,11 @@ bool check_master_connection_name(LEX_STRING *name)
*/
void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file, bool append,
LEX_STRING *suffix)
const char *info_file, bool append,
LEX_STRING *suffix)
{
char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p;
char buff[MAX_CONNECTION_NAME+1],
res[MAX_CONNECTION_NAME * MAX_FILENAME_MBWIDTH+1], *p;
p= strmake(res_file_name, info_file, length);
/* If not empty suffix and there is place left for some part of the suffix */
......@@ -703,8 +704,6 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
/* Create null terminated string */
strmake(buff, suffix->str, suffix->length);
/* Convert to lower case */
my_casedn_str(system_charset_info, buff);
/* Convert to characters usable in a file name */
res_length= strconvert(system_charset_info, buff,
&my_charset_filename, res, sizeof(res), &errors);
......@@ -820,7 +819,7 @@ bool Master_info_index::init_all_master_info()
{
int thread_mask;
int err_num= 0, succ_num= 0; // The number of success read Master_info
char sign[MAX_CONNECTION_NAME];
char sign[MAX_CONNECTION_NAME+1];
File index_file_nr;
DBUG_ENTER("init_all_master_info");
......@@ -872,11 +871,14 @@ bool Master_info_index::init_all_master_info()
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
create_logfile_name_with_suffix(buf_master_info_file, sizeof(buf_master_info_file),
master_info_file, 0, &connection_name);
create_logfile_name_with_suffix(buf_master_info_file,
sizeof(buf_master_info_file),
master_info_file, 0,
&mi->cmp_connection_name);
create_logfile_name_with_suffix(buf_relay_log_info_file,
sizeof(buf_relay_log_info_file),
relay_log_info_file, 0, &connection_name);
sizeof(buf_relay_log_info_file),
relay_log_info_file, 0,
&mi->cmp_connection_name);
if (global_system_variables.log_warnings > 1)
sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
buf_master_info_file, buf_relay_log_info_file);
......
......@@ -213,17 +213,18 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
char *buf_relaylog_index_name= opt_relaylog_index_name;
create_logfile_name_with_suffix(buf_relay_logname, sizeof(buf_relay_logname),
ln, 1, &mi->connection_name);
create_logfile_name_with_suffix(buf_relay_logname,
sizeof(buf_relay_logname),
ln, 1, &mi->cmp_connection_name);
ln= buf_relay_logname;
if (opt_relaylog_index_name)
{
buf_relaylog_index_name= buf_relaylog_index_name_buff;
create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
sizeof(buf_relaylog_index_name_buff),
opt_relaylog_index_name, 0,
&mi->connection_name);
sizeof(buf_relaylog_index_name_buff),
opt_relaylog_index_name, 0,
&mi->cmp_connection_name);
}
/*
......
......@@ -3712,7 +3712,6 @@ int check_temp_dir(char* tmp_file)
MY_DIR *dirp;
char tmp_dir[FN_REFLEN];
size_t tmp_dir_size;
DBUG_ENTER("check_temp_dir");
mysql_mutex_lock(&LOCK_thread_count);
......
......@@ -41,6 +41,7 @@
#define MAX_CONNECTION_NAME NAME_LEN
#define MAX_MBWIDTH 3 /* Max multibyte sequence */
#define MAX_FILENAME_MBWIDTH 5
#define MAX_FIELD_CHARLENGTH 255
#define MAX_FIELD_VARCHARLENGTH 65535
#define MAX_FIELD_BLOBLENGTH UINT_MAX32 /* cf field_blob::get_length() */
......
......@@ -2128,11 +2128,12 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
create_logfile_name_with_suffix(master_info_file_tmp,
sizeof(master_info_file_tmp),
master_info_file, 0, &mi->connection_name);
master_info_file, 0,
&mi->cmp_connection_name);
create_logfile_name_with_suffix(relay_log_info_file_tmp,
sizeof(relay_log_info_file_tmp),
relay_log_info_file, 0,
&mi->connection_name);
&mi->cmp_connection_name);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
......@@ -2378,11 +2379,13 @@ int reset_slave(THD *thd, Master_info* mi)
// and delete these two files
create_logfile_name_with_suffix(master_info_file_tmp,
sizeof(master_info_file_tmp),
master_info_file, 0, &mi->connection_name);
sizeof(master_info_file_tmp),
master_info_file, 0,
&mi->cmp_connection_name);
create_logfile_name_with_suffix(relay_log_info_file_tmp,
sizeof(relay_log_info_file_tmp),
relay_log_info_file, 0, &mi->connection_name);
sizeof(relay_log_info_file_tmp),
relay_log_info_file, 0,
&mi->cmp_connection_name);
fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
......@@ -2549,11 +2552,13 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
thd_proc_info(thd, "Changing master");
create_logfile_name_with_suffix(master_info_file_tmp,
sizeof(master_info_file_tmp),
master_info_file, 0, &mi->connection_name);
sizeof(master_info_file_tmp),
master_info_file, 0,
&mi->cmp_connection_name);
create_logfile_name_with_suffix(relay_log_info_file_tmp,
sizeof(relay_log_info_file_tmp),
relay_log_info_file, 0, &mi->connection_name);
sizeof(relay_log_info_file_tmp),
relay_log_info_file, 0,
&mi->cmp_connection_name);
/* if new Master_info doesn't exists, add it */
if (!master_info_index->get_master_info(&mi->connection_name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment