sql/log.cc

    Added magic number to binlog
sql/log_event.cc
    distinquish bogus data from truncated logs
sql/log_event.h
    added magic number
    added LOG_READ_TRUNC error
sql/mysqlbinlog.cc
    fixed to handle magic number
    added O_BINARY to my_fopen
sql/mysqld.cc
    added code for replicate-rewrite-db
sql/slave.cc
    replicate-rewrite-db
    O_BINARY
    handle magic
sql/sql_class.h
    added i_string_pair class
sql/sql_repl.cc
    added magic
    better error messages
support-files/magic
    added magic for binlog

Added test case for replication of queries with error
parent a434c8d2
source ../include/master-slave.inc;
connection master;
drop table if exists x;
create table x(n int primary key);
!insert into x values (1),(2),(2);
insert into x values (3);
connection slave;
sleep 3;
@x.master select * from x;
......@@ -149,6 +149,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
fn_format(index_file_name, name, mysql_data_home, ".index", 6);
db[0]=0;
MY_STAT tmp_stat;
bool do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0)));
file=my_fopen(log_file_name,O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG));
if (!file)
......@@ -187,10 +191,18 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
else if (log_type == LOG_BIN)
{
Start_log_event s;
if(!index_file &&
// Explanation of the boolean black magic:
//
// if we are supposed to write magic number try write
// clean up if failed
// then if index_file has not been previously opened, try to open it
// clean up if failed
if((do_magic && my_fwrite(file, (byte*)BINLOG_MAGIC, 4,
MYF(MY_NABP|MY_WME)) ||
(!index_file &&
!(index_file = my_fopen(index_file_name,O_APPEND | O_BINARY | O_RDWR,
MYF(MY_WME))))
MYF(MY_WME))))))
{
my_fclose(file,MYF(MY_WME));
my_free(name,MYF(0));
......@@ -199,6 +211,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
log_type=LOG_CLOSED;
return;
}
Start_log_event s;
s.write(file);
pthread_mutex_lock(&LOCK_index);
my_fseek(index_file, 0L, MY_SEEK_END, MYF(MY_WME));
......
......@@ -100,7 +100,7 @@ int Log_event::read_log_event(FILE* file, String* packet,
{
if(log_lock)
pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO;
return feof(file) ? LOG_READ_TRUNC: LOG_READ_IO;
}
if(log_lock) pthread_mutex_unlock(log_lock);
......
......@@ -26,6 +26,7 @@
#define LOG_READ_BOGUS -2
#define LOG_READ_IO -3
#define LOG_READ_MEM -5
#define LOG_READ_TRUNC -6
#define LOG_EVENT_OFFSET 4
#define BINLOG_VERSION 1
......@@ -42,6 +43,7 @@
#define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info))
#define BINLOG_MAGIC "\xfe\x62\x69\x6e"
enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2,
STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5,
......
......@@ -267,6 +267,15 @@ static void dump_remote_log_entries(const char* logname)
char buf[128];
uint len;
NET* net = &mysql->net;
if(!position) position = 4; // protect the innocent from spam
if(position < 4)
{
position = 4;
// warn the guity
fprintf(stderr,
"Warning: with the position so small you would hit the magic number\n\
Unfortunately, no sweepstakes today, adjusted position to 4\n");
}
int4store(buf, position);
int2store(buf + 4, binlog_flags);
len = (uint) strlen(logname);
......@@ -305,7 +314,7 @@ static void dump_local_log_entries(const char* logname)
int rec_count = 0;
if(logname && logname[0] != '-')
file = my_fopen(logname, O_RDONLY, MYF(MY_WME));
file = my_fopen(logname, O_RDONLY|O_BINARY, MYF(MY_WME));
else
file = stdin;
......@@ -314,6 +323,15 @@ static void dump_local_log_entries(const char* logname)
if(my_fseek(file, position, MY_SEEK_SET, MYF(MY_WME)))
die("failed on my_fseek()");
if(!position)
{
char magic[4];
if(my_fread(file, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
die("I/O error reading binlog magic number");
if(memcmp(magic, BINLOG_MAGIC, 4))
die("Bad magic number");
}
while(1)
{
......
......@@ -177,6 +177,7 @@ static VioSSLAcceptorFd* ssl_acceptor_fd = 0;
extern bool slave_running;
I_List <i_string_pair> replicate_rewrite_db;
I_List<i_string> replicate_do_db, replicate_ignore_db;
// allow the user to tell us which db to replicate and which to ignore
I_List<i_string> binlog_do_db, binlog_ignore_db;
......@@ -2854,6 +2855,38 @@ static void get_options(int argc,char **argv)
replicate_do_db.push_back(db);
break;
}
case (int)OPT_REPLICATE_REWRITE_DB:
{
char* key = optarg,*p, *val;
p = strstr(optarg, "->");
if(!p)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - missing ->\n");
exit(1);
}
val = p--;
while(isspace(*p) && p > optarg) *p-- = 0;
if(p == optarg)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - empty FROM db\n");
exit(1);
}
*val = 0;
val += 2;
while(*val && isspace(*val)) *val++;
if(!*val)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - empty TO db\n");
exit(1);
}
i_string_pair* db_pair = new i_string_pair(key, val);
replicate_rewrite_db.push_back(db_pair);
break;
}
case (int)OPT_BINLOG_IGNORE_DB:
{
......
......@@ -30,6 +30,7 @@ extern my_string master_user, master_password, master_host,
master_info_file;
extern I_List<i_string> replicate_do_db, replicate_ignore_db;
extern I_List<i_string_pair> replicate_rewrite_db;
extern I_List<THD> threads;
bool slave_running = 0;
pthread_t slave_real_id;
......@@ -48,6 +49,7 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
static inline char* rewrite_db(char* db);
static inline bool slave_killed(THD* thd)
{
......@@ -62,6 +64,20 @@ static inline void skip_load_data_infile(NET* net)
send_ok(net); // the master expects it
}
static inline char* rewrite_db(char* db)
{
if(replicate_rewrite_db.is_empty() || !db) return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db);
i_string_pair* tmp;
while((tmp=it++))
{
if(!strcmp(tmp->key, db))
return tmp->val;
}
return db;
}
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
......@@ -278,11 +294,11 @@ int init_master_info(MASTER_INFO* mi)
if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages
// if the file does not exist
{
file = my_fopen(fname, O_CREAT|O_RDWR, MYF(MY_WME));
file = my_fopen(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME));
if(!file)
return 1;
mi->log_file_name[0] = 0;
mi->pos = 0;
mi->pos = 4; // skip magic number
mi->file = file;
if(master_host)
......@@ -299,7 +315,7 @@ int init_master_info(MASTER_INFO* mi)
}
else
{
file = my_fopen(fname, O_RDWR, MYF(MY_WME));
file = my_fopen(fname, O_RDWR|O_BINARY, MYF(MY_WME));
if(!file)
return 1;
......@@ -589,7 +605,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
Query_log_event* qev = (Query_log_event*)ev;
int q_len = qev->q_len;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = (char*)qev->db;
thd->db = rewrite_db((char*)qev->db);
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)qev->query;
......@@ -645,7 +661,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
Load_log_event* lev = (Load_log_event*)ev;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = (char*)lev->db;
thd->db = rewrite_db((char*)lev->db);
thd->query = 0;
thd->query_error = 0;
......@@ -766,7 +782,8 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
int ident_len = rev->ident_len;
memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
mi->log_file_name[ident_len] = 0;
mi->pos = 0;
mi->pos = 4; // skip magic number
flush_master_info(mi);
break;
}
......
......@@ -218,6 +218,17 @@ public:
i_string(char* s) : ptr(s) {}
};
//needed for linked list of two strings for replicate-rewrite-db
class i_string_pair: public ilink
{
public:
char* key;
char* val;
i_string_pair():key(0),val(0) { }
i_string_pair(char* key, char* val) : key(key),val(val) {}
};
/****************************************************************************
** every connection is handle by a thread with a THD
****************************************************************************/
......
......@@ -98,6 +98,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN];
char magic[4];
FILE* log = NULL;
String* packet = &thd->packet;
int error;
......@@ -129,7 +130,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
errmsg = "Could not open log file";
goto err;
}
if(my_fread(log, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
{
errmsg = "I/O error reading binlog magic number";
goto err;
}
if(memcmp(magic, BINLOG_MAGIC, 4))
{
errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
if(pos < 4)
{
errmsg = "Contratulations! You have hit the magic number and can win \
sweepstakes if you report the bug";
goto err;
}
if(my_fseek(log, pos, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR )
{
errmsg = "Error on fseek()";
......@@ -168,7 +187,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
}
if(error != LOG_READ_EOF)
{
errmsg = "error reading log event";
switch(error)
{
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
}
goto err;
}
......@@ -261,7 +294,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
}
else
{
bool loop_breaker = 0; // need this to break out of the for loop from switch
bool loop_breaker = 0;
// need this to break out of the for loop from switch
thd->proc_info = "switching to next log";
switch(mysql_bin_log.find_next_log(&linfo))
{
......@@ -281,14 +315,31 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
(void) my_fclose(log, MYF(MY_WME));
log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
if(!log)
goto err;
{
errmsg = "Could not open next log";
goto err;
}
//check the magic
if(my_fread(log, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
{
errmsg = "I/O error reading binlog magic number";
goto err;
}
if(memcmp(magic, BINLOG_MAGIC, 4))
{
errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
// fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset
{
char header[LOG_EVENT_HEADER_LEN];
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR); // find the last slash
char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p)
p++;
else
......
......@@ -12,3 +12,4 @@
>3 byte x Version %d
0 belong&0xffffff00 0xfefe0600 MySQL ISAM compressed data file
>3 byte x Version %d
0 string \376bin MySQL replication log
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment