Commit 640fadf2 authored by unknown's avatar unknown

work to enable reading 3.23 logs - not yet finished

moved fail-safe replication routines from sql_repl.cc to repl_failsafe.cc
write start event only in the first log


client/mysqlbinlog.cc:
  work to enable reading 3.23 logs
libmysql/Makefile.shared:
  added mf_iocache2 to libmysqlclient - needed for mysqlbinlog
mysql-test/mysql-test-run.sh:
  added --start-and-exit
mysql-test/r/rpl000002.result:
  result clean-up
mysql-test/r/rpl000016.result:
  result update
mysql-test/r/rpl_log.result:
  result update
mysql-test/t/rpl000016.test:
  test cleanup
mysys/mf_iocache.c:
  fixed new bug
sql/log.cc:
  write start event only on server start or after reset master
sql/log_event.cc:
  work to enable reading 3.23 log format
sql/log_event.h:
  work to enable reading 3.23 format
sql/repl_failsafe.cc:
  code restructuring
sql/repl_failsafe.h:
  re-organized code
sql/slave.cc:
  check master version
sql/slave.h:
  old_format member
sql/sql_class.h:
  allow user to specify io cache type
  need_start_event member to allow writing start event only in the first log
sql/sql_parse.cc:
  code re-organization
sql/sql_repl.cc:
  code reorganization
sql/sql_repl.h:
  reorganized code
parent 83aeee64
......@@ -21,6 +21,8 @@
#include <time.h>
#include "log_event.h"
#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET)
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
char server_version[SERVER_VERSION_LENGTH];
......@@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table)
}
}
static int check_master_version(MYSQL* mysql)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
const char* version;
int old_format = 0;
if (mysql_query(mysql, "SELECT VERSION()")
|| !(res = mysql_store_result(mysql)))
{
mysql_close(mysql);
die("Error checking master version: %s",
mysql_error(mysql));
}
if (!(row = mysql_fetch_row(res)))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master returned no rows for SELECT VERSION()");
return 1;
}
if (!(version = row[0]))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master reported NULL for the version");
}
switch (*version)
{
case '3':
old_format = 1;
break;
case '4':
old_format = 0;
break;
default:
sql_print_error("Master reported unrecognized MySQL version '%s'",
version);
mysql_free_result(res);
mysql_close(mysql);
return 1;
}
mysql_free_result(res);
return old_format;
}
static void dump_remote_log_entries(const char* logname)
{
......@@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname)
char last_db[FN_REFLEN+1] = "";
uint len;
NET* net = &mysql->net;
int old_format;
old_format = check_master_version(mysql);
if(!position) position = 4; // protect the innocent from spam
if (position < 4)
{
......@@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname)
len = (uint) strlen(logname);
int4store(buf + 6, 0);
memcpy(buf + 10, logname,len);
if(simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
die("Error sending the log dump command");
for(;;)
......@@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname)
len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 ,
len - 1, &error);
len - 1, &error, old_format);
if (ev)
{
ev->print(result_file, short_form, last_db);
......@@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname)
}
}
static int check_header (IO_CACHE* file)
{
char buf[PROBE_HEADER_LEN];
int old_format;
my_off_t pos = my_b_tell(file);
my_b_seek(file, (my_off_t)0);
if (my_b_read(file, buf, sizeof(buf)))
die("Failed reading header");
if (buf[EVENT_TYPE_OFFSET+4] == START_EVENT)
{
uint event_len;
event_len = uint4korr(buf + EVENT_LEN_OFFSET + 4);
old_format = (event_len < LOG_EVENT_HEADER_LEN + START_HEADER_LEN);
}
else
old_format = 0;
my_b_seek(file, pos);
return old_format;
}
static void dump_local_log_entries(const char* logname)
{
File fd = -1;
IO_CACHE cache,*file= &cache;
ulonglong rec_count = 0;
char last_db[FN_REFLEN+1] = "";
bool old_format = 0;
if (logname && logname[0] != '-')
{
......@@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname)
if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
MYF(MY_WME | MY_NABP)))
exit(1);
old_format = check_header(file);
}
else
{
if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
exit(1);
old_format = check_header(file);
if (position)
{
/* skip 'position' characters from stdout */
......@@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname)
char llbuff[21];
my_off_t old_off = my_b_tell(file);
Log_event* ev = Log_event::read_log_event(file);
Log_event* ev = Log_event::read_log_event(file, old_format);
if (!ev)
{
if (file->error)
......
......@@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \
mf_loadpath.lo my_pthread.lo my_thr_init.lo \
thr_mutex.lo mulalloc.lo string.lo default.lo \
my_compress.lo array.lo my_once.lo list.lo my_net.lo \
charset.lo hash.lo mf_iocache.lo my_seek.lo \
charset.lo hash.lo mf_iocache.lo \
mf_iocache2.lo my_seek.lo \
my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo
# Not needed in the minimum library
......
......@@ -168,6 +168,9 @@ while test $# -gt 0; do
USE_MANAGER=1
USE_RUNNING_SERVER=
;;
--start-and-exit)
START_AND_EXIT=1
;;
--skip-innobase)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;;
......@@ -1091,6 +1094,10 @@ then
mysql_loadstd
fi
if [ "x$START_AND_EXIT" = "x1" ] ; then
echo "Servers started, exiting"
exit
fi
$ECHO "Starting Tests"
......
......@@ -16,7 +16,7 @@ n
2002
show slave hosts;
Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 9307 2 1
2 127.0.0.1 $SLAVE_MYPORT 2 1
drop table t1;
slave stop;
drop table if exists t2;
......
......@@ -15,7 +15,7 @@ create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard');
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 3
127.0.0.1 root $MASTER_MYPORT 60 master-bin.001 234 Yes 0 0 3
select * from t1;
s
Could not break slave
......@@ -42,7 +42,7 @@ master-bin.003
insert into t2 values (65);
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 3
127.0.0.1 root $MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2
select * from t2;
m
34
......@@ -60,12 +60,12 @@ master-bin.005
master-bin.006
show master status;
File Position Binlog_do_db Binlog_ignore_db
master-bin.006 710
master-bin.006 382
slave stop;
slave start;
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.006 710 Yes 0 0 11
127.0.0.1 root $MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6
lock tables t3 read;
select count(*) from t3 where n >= 4;
count(*)
......
......@@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1;
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
......@@ -41,7 +41,7 @@ insert into t1 values (1);
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
......@@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 668 Stop 1 11
show binlog events in 'master-bin.002';
Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.002 79 Query 1 2 use test; create table t1 (n int)
master-bin.002 137 Query 1 3 use test; insert into t1 values (1)
master-bin.002 197 Query 1 4 use test; drop table t1
master-bin.002 4 Query 1 1 use test; create table t1 (n int)
master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
master-bin.002 122 Query 1 3 use test; drop table t1
show master logs;
Log_name
master-bin.001
......@@ -69,7 +68,7 @@ slave-bin.001
slave-bin.002
show binlog events in 'slave-bin.001' from 4;
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.001 4 Start 2 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1
......@@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 729 Stop 2 5
show binlog events in 'slave-bin.002' from 4;
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 132 Query 1 2 use test; create table t1 (n int)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1)
slave-bin.002 250 Query 1 4 use test; drop table t1
slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
slave-bin.002 175 Query 1 3 use test; drop table t1
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 245 Yes 0 0 4
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3
show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
......@@ -106,8 +104,8 @@ slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
slave-bin.002 132
slave-bin.002 57
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=137 and master_log_seq=3 and master_server_id=1;
Log_name Log_pos
slave-bin.002 250
slave-bin.002 223
......@@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos;
connection slave;
sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status;
select * from t1;
connection master;
......@@ -70,7 +69,6 @@ insert into t2 values (65);
save_master_pos;
connection slave;
sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status;
select * from t2;
connection master;
......@@ -92,7 +90,6 @@ connection slave;
slave stop;
slave start;
sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status;
# because of concurrent insert, the table may not be up to date
# if we do not lock
......
......@@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer;
info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
info->write_pos = info->write_end = 0;
if (type == SEQ_READ_APPEND)
{
info->append_read_pos = info->write_pos = info->append_buffer;
......@@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
info->append_read_pos = info->write_pos = info->append_buffer;
}
if (!info->write_pos)
info->write_pos = info->buffer;
if (!info->write_end)
info->write_end = info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->type=type;
info->error=0;
init_read_function(info,type);
......
......@@ -81,7 +81,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0),
inited(0), log_seq(1), file_id(1),no_rotate(0)
inited(0), log_seq(1), file_id(1),no_rotate(0),
need_start_event(1)
{
/*
We don't want to intialize LOCK_Log here as the thread system may
......@@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options)
MYF(MY_WME))) < 0);
}
void MYSQL_LOG::init(enum_log_type log_type_arg)
void MYSQL_LOG::init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg)
{
log_type = log_type_arg;
io_cache_type = io_cache_type_arg;
if (!inited)
{
inited=1;
......@@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE,
init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
goto err;
......@@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
else if (log_type == LOG_BIN)
{
bool error;
/*
Explanation of the boolean black magic:
if we are supposed to write magic number try write
......@@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err;
log_seq = 1;
Start_log_event s;
bool error;
s.set_log_seq(0, this);
s.write(&log_file);
if (need_start_event)
{
Start_log_event s;
s.set_log_seq(0, this);
s.write(&log_file);
need_start_event=0;
}
flush_io_cache(&log_file);
pthread_mutex_lock(&LOCK_index);
error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name),
......@@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info)
file == &log_file && flush_io_cache(file))
goto err;
error=0;
should_rotate = (file == &log_file && my_b_tell(file) >= max_binlog_size);
should_rotate = (file == &log_file &&
(uint)my_b_tell(file) >= max_binlog_size);
err:
if (error)
{
......
......@@ -149,12 +149,21 @@ static void cleanup_load_tmpdir()
#endif
Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0)
Log_event::Log_event(const char* buf, bool old_format):
cached_event_len(0),temp_buf(0)
{
when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET);
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
if (old_format)
{
log_seq=0;
flags=0;
}
else
{
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT
thd = 0;
#endif
......@@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define UNLOCK_MUTEX
#endif
#ifndef MYSQL_CLIENT
#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file)
Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
#endif
{
char head[LOG_EVENT_HEADER_LEN];
#ifndef MYSQL_CLIENT
if(log_lock) pthread_mutex_lock(log_lock);
#endif
LOCK_MUTEX;
if (my_b_read(file, (byte *) head, sizeof(head)))
{
UNLOCK_MUTEX;
......@@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error = "read error";
goto err;
}
if ((res = read_log_event(buf, data_len, &error)))
if ((res = read_log_event(buf, data_len, &error, old_format)))
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
......@@ -502,7 +518,7 @@ err:
}
Log_event* Log_event::read_log_event(const char* buf, int event_len,
const char **error)
const char **error, bool old_format)
{
if (event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
......@@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
switch(buf[EVENT_TYPE_OFFSET])
{
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len);
ev = new Query_log_event(buf, event_len, old_format);
break;
case LOAD_EVENT:
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len);
ev = new Load_log_event(buf, event_len, old_format);
break;
case ROTATE_EVENT:
ev = new Rotate_log_event(buf, event_len);
ev = new Rotate_log_event(buf, event_len, old_format);
break;
case SLAVE_EVENT:
ev = new Slave_log_event(buf, event_len);
......@@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Execute_load_log_event(buf, event_len);
break;
case START_EVENT:
ev = new Start_log_event(buf);
ev = new Start_log_event(buf, old_format);
break;
case STOP_EVENT:
ev = new Stop_log_event(buf);
ev = new Stop_log_event(buf, old_format);
break;
case INTVAR_EVENT:
ev = new Intvar_log_event(buf);
ev = new Intvar_log_event(buf, old_format);
break;
default:
break;
......@@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
#endif /* #ifdef MYSQL_CLIENT */
Start_log_event::Start_log_event(const char* buf) :Log_event(buf)
Start_log_event::Start_log_event(const char* buf,
bool old_format) :Log_event(buf, old_format)
{
binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN +
ST_BINLOG_VER_OFFSET);
......@@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file)
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
}
Rotate_log_event::Rotate_log_event(const char* buf, int event_len):
Log_event(buf),new_log_ident(NULL),alloced(0)
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
bool old_format):
Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{
// the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET
......@@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
}
#endif
Query_log_event::Query_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0), query(NULL), db(NULL)
Query_log_event::Query_log_event(const char* buf, int event_len,
bool old_format):
Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{
if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
......@@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file)
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
}
Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
Log_event(buf, old_format)
{
buf += LOG_EVENT_HEADER_LEN;
type = buf[I_TYPE_OFFSET];
......@@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// the caller must do buf[event_len] = 0 before he starts using the
// constructed event
Load_log_event::Load_log_event(const char* buf, int event_len):
Log_event(buf),num_fields(0),fields(0),
Load_log_event::Load_log_event(const char* buf, int event_len,
bool old_format):
Log_event(buf, old_format),num_fields(0),fields(0),
field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0)
{
......@@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
}
Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf),mem_pool(0),master_host(0)
Log_event(buf,0),mem_pool(0),master_host(0)
{
event_len -= LOG_EVENT_HEADER_LEN;
if(event_len < 0)
......@@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file)
}
Create_file_log_event::Create_file_log_event(const char* buf, int len):
Load_log_event(buf,0),fake_base(0),block(0)
Load_log_event(buf,0,0),fake_base(0),block(0)
{
int block_offset;
if (copy_log_event(buf,len))
......@@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
#endif
Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf),block(0)
Log_event(buf, 0),block(0)
{
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return;
......@@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
#endif
Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf),file_id(0)
Log_event(buf, 0),file_id(0)
{
if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return;
......@@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
#endif
Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf),file_id(0)
Log_event(buf, 0),file_id(0)
{
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return;
......@@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int Start_log_event::exec_event(struct st_master_info* mi)
{
#ifdef TO_BE_DELETED
/*
We can't close temporary files or cleanup the tmpdir here, becasue
someone may have just rotated the logs on the master.
We should only do this cleanup when we know the master restarted.
*/
close_temporary_tables(thd);
cleanup_load_tmpdir();
#endif
if (!mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
return Log_event::exec_event(mi);
}
......@@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0))
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
(pthread_mutex_t*)0,
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
slave_print_error(0, "File '%s' appears corrupted", fname);
......
......@@ -242,7 +242,7 @@ public:
virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() = 0;
virtual bool get_cache_stmt() { return 0; }
Log_event(const char* buf);
Log_event(const char* buf, bool old_format);
#ifndef MYSQL_CLIENT
Log_event(THD* thd_arg, uint16 flags_arg = 0);
#endif
......@@ -268,12 +268,14 @@ public:
#ifndef MYSQL_CLIENT
// if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock);
static Log_event* read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format);
#else // avoid having to link mysqlbinlog against libpthread
static Log_event* read_log_event(IO_CACHE* file);
static Log_event* read_log_event(IO_CACHE* file, bool old_format);
#endif
static Log_event* read_log_event(const char* buf, int event_len,
const char **error);
const char **error, bool old_format);
const char* get_type_str();
#ifndef MYSQL_CLIENT
......@@ -317,7 +319,7 @@ public:
bool get_cache_stmt() { return cache_stmt; }
#endif
Query_log_event(const char* buf, int event_len);
Query_log_event(const char* buf, int event_len, bool old_format);
~Query_log_event()
{
if (data_buf)
......@@ -411,7 +413,7 @@ public:
int exec_event(NET* net, struct st_master_info* mi);
#endif
Load_log_event(const char* buf, int event_len);
Load_log_event(const char* buf, int event_len, bool old_format);
~Load_log_event()
{
}
......@@ -451,7 +453,7 @@ public:
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
}
#endif
Start_log_event(const char* buf);
Start_log_event(const char* buf, bool old_format);
~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;}
int write_data(IO_CACHE* file);
......@@ -479,7 +481,7 @@ public:
:Log_event(thd_arg),val(val_arg),type(type_arg)
{}
#endif
Intvar_log_event(const char* buf);
Intvar_log_event(const char* buf, bool old_format);
~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name();
......@@ -503,7 +505,8 @@ public:
Stop_log_event() :Log_event((THD*)0)
{}
#endif
Stop_log_event(const char* buf):Log_event(buf)
Stop_log_event(const char* buf, bool old_format):Log_event(buf,
old_format)
{
}
~Stop_log_event() {}
......@@ -534,7 +537,7 @@ public:
alloced(0)
{}
#endif
Rotate_log_event(const char* buf, int event_len);
Rotate_log_event(const char* buf, int event_len, bool old_format);
~Rotate_log_event()
{
if (alloced)
......@@ -686,7 +689,6 @@ public:
#endif
};
#endif
......
This diff is collapsed.
......@@ -2,6 +2,8 @@
#define REPL_FAILSAFE_H
#include "mysql.h"
#include "my_sys.h"
#include "slave.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
......@@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql);
extern HASH slave_list;
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
#endif
......@@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e)
......@@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1;
}
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
MYSQL_RES* res;
MYSQL_ROW row;
const char* version;
const char* errmsg = 0;
if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
|| !(res = mc_mysql_store_result(mysql)))
{
sql_print_error("Error checking master version: %s",
mc_mysql_error(mysql));
return 1;
}
if (!(row = mc_mysql_fetch_row(res)))
{
errmsg = "Master returned no rows for SELECT VERSION()";
goto err;
}
if (!(version = row[0]))
{
errmsg = "Master reported NULL for the version";
goto err;
}
switch (*version)
{
case '3':
mi->old_format = 1;
break;
case '4':
mi->old_format = 0;
break;
default:
errmsg = "Master reported unrecognized MySQL version";
goto err;
}
err:
if (res)
mc_mysql_free_result(res);
if (errmsg)
{
sql_print_error(errmsg);
return 1;
}
return 0;
}
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name)
......@@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi)
mi->inited = 1;
// now change the cache from READ to WRITE - must do this
// before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
pthread_mutex_unlock(&mi->lock);
return error;
......@@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
const char *error_msg;
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
event_len, &error_msg);
event_len, &error_msg,
mi->old_format);
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
if (ev->server_id == ::server_id || slave_skip_counter)
if (ev->server_id == ::server_id ||
(slave_skip_counter && ev->get_type_code() != ROTATE_EVENT))
{
if(type_code == LOAD_EVENT)
skip_load_data_infile(net);
......@@ -1070,9 +1122,17 @@ connected:
// register ourselves with the master
// if fails, this is not fatal - we just print the error message and go
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
update_slave_list(mysql);
thd->proc_info = "Checking master version";
if (check_master_version(mysql, &glob_mi))
{
goto err;
}
if (!glob_mi.old_format)
{
thd->proc_info = "Registering slave on master";
if (register_slave_on_master(mysql) || update_slave_list(mysql))
goto err;
}
while (!slave_killed(thd))
{
......
......@@ -23,8 +23,10 @@ typedef struct st_master_info
pthread_mutex_t lock;
pthread_cond_t cond;
bool inited;
bool old_format; /* master binlog is in 3.23 format */
st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0)
st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0),
old_format(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST);
......
......@@ -72,15 +72,18 @@ class MYSQL_LOG {
// we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for
// purging
enum cache_type io_cache_type;
bool need_start_event;
friend class Log_event;
public:
MYSQL_LOG();
~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void set_need_start_event() { need_start_event = 1; }
void set_index_file_name(const char* index_file_name = 0);
void init(enum_log_type log_type_arg);
void init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg = WRITE_CACHE);
void open(const char *log_name,enum_log_type log_type,
const char *new_name=0);
void new_file(bool inside_mutex = 0);
......
......@@ -22,6 +22,7 @@
#include "mysql_priv.h"
#include "sql_acl.h"
#include "sql_repl.h"
#include "repl_failsafe.h"
#include <m_ctype.h>
#include <thr_alarm.h>
#include <myisam.h>
......
This diff is collapsed.
......@@ -15,7 +15,6 @@ typedef struct st_slave_info
} SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
extern HASH slave_list;
extern char* master_host;
extern my_string opt_bin_logname, master_info_file;
extern uint32 server_id;
......@@ -27,26 +26,24 @@ extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail;
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg);
int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int show_binlog_events(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2);
void reset_slave();
void reset_master();
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment