more predicatable slave behaviour with wait_for_slave_stop in mysqltest

fixed a couple of bugs with SEQ_READ_APPEND cache
rpl000016 still has non-deterministic result, but I am going to commit and
push since what I have is now better than what is in the main repository
parent 90a77b4f
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* mysqltest test tool /* mysqltest test tool
* See man page for more information. * See the manual for more information
* TODO: document better how mysqltest works
* *
* Written by: * Written by:
* Sasha Pachev <sasha@mysql.com> * Sasha Pachev <sasha@mysql.com>
...@@ -26,9 +27,6 @@ ...@@ -26,9 +27,6 @@
/********************************************************************** /**********************************************************************
TODO: TODO:
- Print also the queries that returns a result to the log file; This makes
it much easier to find out what's wrong.
- Do comparison line by line, instead of doing a full comparison of - Do comparison line by line, instead of doing a full comparison of
the text file. This will save space as we don't need to keep many the text file. This will save space as we don't need to keep many
results in memory. It will also make it possible to do simple results in memory. It will also make it possible to do simple
...@@ -43,7 +41,7 @@ ...@@ -43,7 +41,7 @@
**********************************************************************/ **********************************************************************/
#define MTEST_VERSION "1.13" #define MTEST_VERSION "1.14"
#include <my_global.h> #include <my_global.h>
#include <mysql_embed.h> #include <mysql_embed.h>
...@@ -88,6 +86,12 @@ ...@@ -88,6 +86,12 @@
#define CON_RETRY_SLEEP 2 #define CON_RETRY_SLEEP 2
#define MAX_CON_TRIES 5 #define MAX_CON_TRIES 5
#ifndef OS2
#define SLAVE_POLL_INTERVAL 300000 /* 0.3 of a sec */
#else
#defile SLAVE_POLL_INTERVAL 0.3
#endif
enum {OPT_MANAGER_USER=256,OPT_MANAGER_HOST,OPT_MANAGER_PASSWD, enum {OPT_MANAGER_USER=256,OPT_MANAGER_HOST,OPT_MANAGER_PASSWD,
OPT_MANAGER_PORT,OPT_MANAGER_WAIT_TIMEOUT}; OPT_MANAGER_PORT,OPT_MANAGER_WAIT_TIMEOUT};
...@@ -187,6 +191,7 @@ Q_DISABLE_RPL_PARSE, Q_EVAL_RESULT, ...@@ -187,6 +191,7 @@ Q_DISABLE_RPL_PARSE, Q_EVAL_RESULT,
Q_ENABLE_QUERY_LOG, Q_DISABLE_QUERY_LOG, Q_ENABLE_QUERY_LOG, Q_DISABLE_QUERY_LOG,
Q_ENABLE_RESULT_LOG, Q_DISABLE_RESULT_LOG, Q_ENABLE_RESULT_LOG, Q_DISABLE_RESULT_LOG,
Q_SERVER_START, Q_SERVER_STOP,Q_REQUIRE_MANAGER, Q_SERVER_START, Q_SERVER_STOP,Q_REQUIRE_MANAGER,
Q_WAIT_FOR_SLAVE_TO_STOP,
Q_UNKNOWN, /* Unknown command. */ Q_UNKNOWN, /* Unknown command. */
Q_COMMENT, /* Comments, ignored. */ Q_COMMENT, /* Comments, ignored. */
Q_COMMENT_WITH_COMMAND Q_COMMENT_WITH_COMMAND
...@@ -222,7 +227,7 @@ const char *command_names[] = { ...@@ -222,7 +227,7 @@ const char *command_names[] = {
"enable_query_log", "disable_query_log", "enable_query_log", "disable_query_log",
"enable_result_log", "disable_result_log", "enable_result_log", "disable_result_log",
"server_start", "server_stop", "server_start", "server_stop",
"require_manager", "require_manager", "wait_for_slave_to_stop",
0 0
}; };
...@@ -653,6 +658,45 @@ int open_file(const char* name) ...@@ -653,6 +658,45 @@ int open_file(const char* name)
return 0; return 0;
} }
/* ugly long name, but we are following the convention */
int do_wait_for_slave_to_stop(struct st_query* __attribute__((unused)) q)
{
MYSQL* mysql = &cur_con->mysql;
#ifndef OS2
struct timeval t;
#endif
for (;;)
{
MYSQL_RES* res;
MYSQL_ROW row;
int done;
LINT_INIT(res);
if (mysql_query(mysql,"show status like 'Slave_running'")
|| !(res=mysql_store_result(mysql)))
die("Query failed while probing slave for stop: %s",
mysql_error(mysql));
if (!(row=mysql_fetch_row(res)) || !row[1])
{
mysql_free_result(res);
die("Strange result from query while probing slave for stop");
}
done = !strcmp(row[1],"OFF");
mysql_free_result(res);
if (done)
break;
#ifndef OS2
t.tv_sec=0;
t.tv_usec=SLAVE_POLL_INTERVAL;
select(0,0,0,0,&t); /* sleep */
#else
DosSleep(OS2_SLAVE_POLL_INTERVAL);
#endif
}
return 0;
}
int do_require_manager(struct st_query* __attribute__((unused)) q) int do_require_manager(struct st_query* __attribute__((unused)) q)
{ {
if (!manager) if (!manager)
...@@ -2335,6 +2379,7 @@ int main(int argc, char** argv) ...@@ -2335,6 +2379,7 @@ int main(int argc, char** argv)
case Q_DISABLE_RESULT_LOG: disable_result_log=1; break; case Q_DISABLE_RESULT_LOG: disable_result_log=1; break;
case Q_SOURCE: do_source(q); break; case Q_SOURCE: do_source(q); break;
case Q_SLEEP: do_sleep(q); break; case Q_SLEEP: do_sleep(q); break;
case Q_WAIT_FOR_SLAVE_TO_STOP: do_wait_for_slave_to_stop(q); break;
case Q_REQUIRE_MANAGER: do_require_manager(q); break; case Q_REQUIRE_MANAGER: do_require_manager(q); break;
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
case Q_SERVER_START: do_server_start(q); break; case Q_SERVER_START: do_server_start(q); break;
......
...@@ -643,7 +643,10 @@ extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); ...@@ -643,7 +643,10 @@ extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_block_write(IO_CACHE *info, const byte *Buffer, extern int my_block_write(IO_CACHE *info, const byte *Buffer,
uint Count, my_off_t pos); uint Count, my_off_t pos);
extern int flush_io_cache(IO_CACHE *info); extern int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock);
#define flush_io_cache(info) _flush_io_cache((info),1)
extern int end_io_cache(IO_CACHE *info); extern int end_io_cache(IO_CACHE *info);
extern uint my_b_fill(IO_CACHE *info); extern uint my_b_fill(IO_CACHE *info);
extern void my_b_seek(IO_CACHE *info,my_off_t pos); extern void my_b_seek(IO_CACHE *info,my_off_t pos);
......
...@@ -33,7 +33,6 @@ master-bin.003 ...@@ -33,7 +33,6 @@ master-bin.003
insert into t2 values(1234); insert into t2 values(1234);
set insert_id=1234; set insert_id=1234;
insert into t2 values(NULL); insert into t2 values(NULL);
slave stop;
set sql_slave_skip_counter=1; set sql_slave_skip_counter=1;
slave start; slave start;
purge master logs to 'master-bin.003'; purge master logs to 'master-bin.003';
...@@ -66,7 +65,7 @@ slave stop; ...@@ -66,7 +65,7 @@ slave stop;
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1312 master-bin.006 Yes Yes 0 0 445 127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445
lock tables t3 read; lock tables t3 read;
select count(*) from t3 where n >= 4; select count(*) from t3 where n >= 4;
count(*) count(*)
......
...@@ -51,9 +51,7 @@ insert into t2 values(NULL); ...@@ -51,9 +51,7 @@ insert into t2 values(NULL);
connection slave; connection slave;
sync_with_master; sync_with_master;
#the slave may have already stopped, so we ignore the error wait_for_slave_to_stop;
--error 0,1199
!slave stop;
#restart slave skipping one event #restart slave skipping one event
set sql_slave_skip_counter=1; set sql_slave_skip_counter=1;
......
...@@ -808,13 +808,19 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) ...@@ -808,13 +808,19 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=rest_length; Buffer+=rest_length;
Count-=rest_length; Count-=rest_length;
info->write_pos+=rest_length; info->write_pos+=rest_length;
if (flush_io_cache(info)) if (_flush_io_cache(info,0))
{
unlock_append_buffer(info);
return 1; return 1;
}
if (Count >= IO_SIZE) if (Count >= IO_SIZE)
{ /* Fill first intern buffer */ { /* Fill first intern buffer */
length=Count & (uint) ~(IO_SIZE-1); length=Count & (uint) ~(IO_SIZE-1);
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
{
unlock_append_buffer(info);
return info->error= -1; return info->error= -1;
}
Count-=length; Count-=length;
Buffer+=length; Buffer+=length;
} }
...@@ -883,14 +889,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, ...@@ -883,14 +889,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
/* Flush write cache */ /* Flush write cache */
int flush_io_cache(IO_CACHE *info) int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
{ {
uint length; uint length;
my_bool append_cache; my_bool append_cache;
my_off_t pos_in_file; my_off_t pos_in_file;
DBUG_ENTER("flush_io_cache"); DBUG_ENTER("flush_io_cache");
append_cache = (info->type == SEQ_READ_APPEND); if (!(append_cache = (info->type == SEQ_READ_APPEND)))
need_append_buffer_lock=0;
if (info->type == WRITE_CACHE || append_cache) if (info->type == WRITE_CACHE || append_cache)
{ {
if (info->file == -1) if (info->file == -1)
...@@ -898,6 +906,8 @@ int flush_io_cache(IO_CACHE *info) ...@@ -898,6 +906,8 @@ int flush_io_cache(IO_CACHE *info)
if (real_open_cached_file(info)) if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1)); DBUG_RETURN((info->error= -1));
} }
if (need_append_buffer_lock)
lock_append_buffer(info);
if ((length=(uint) (info->write_pos - info->write_buffer))) if ((length=(uint) (info->write_pos - info->write_buffer)))
{ {
pos_in_file=info->pos_in_file; pos_in_file=info->pos_in_file;
...@@ -909,6 +919,8 @@ int flush_io_cache(IO_CACHE *info) ...@@ -909,6 +919,8 @@ int flush_io_cache(IO_CACHE *info)
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR) MY_FILEPOS_ERROR)
{ {
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN((info->error= -1)); DBUG_RETURN((info->error= -1));
} }
if (!append_cache) if (!append_cache)
...@@ -932,6 +944,8 @@ int flush_io_cache(IO_CACHE *info) ...@@ -932,6 +944,8 @@ int flush_io_cache(IO_CACHE *info)
info->end_of_file+=(info->write_pos-info->append_read_pos); info->end_of_file+=(info->write_pos-info->append_read_pos);
info->append_read_pos=info->write_pos=info->write_buffer; info->append_read_pos=info->write_pos=info->write_buffer;
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN(info->error); DBUG_RETURN(info->error);
} }
} }
...@@ -942,6 +956,8 @@ int flush_io_cache(IO_CACHE *info) ...@@ -942,6 +956,8 @@ int flush_io_cache(IO_CACHE *info)
info->inited=0; info->inited=0;
} }
#endif #endif
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -703,12 +703,37 @@ void MYSQL_LOG::new_file(bool inside_mutex) ...@@ -703,12 +703,37 @@ void MYSQL_LOG::new_file(bool inside_mutex)
} }
} }
bool MYSQL_LOG::append(Log_event* ev)
{
bool error = 0;
pthread_mutex_lock(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
// Log_event::write() is smart enough to use my_b_write() or
// my_b_append() depending on the kind of cache we have
if (ev->write(&log_file))
{
error=1;
goto err;
}
if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
{
new_file(1);
}
signal_update();
err:
pthread_mutex_unlock(&LOCK_log);
return error;
}
bool MYSQL_LOG::appendv(const char* buf, uint len,...) bool MYSQL_LOG::appendv(const char* buf, uint len,...)
{ {
bool error = 0; bool error = 0;
va_list(args); va_list(args);
va_start(args,len); va_start(args,len);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
pthread_mutex_lock(&LOCK_log); pthread_mutex_lock(&LOCK_log);
do do
{ {
......
...@@ -26,6 +26,18 @@ ...@@ -26,6 +26,18 @@
#include <assert.h> #include <assert.h>
inline int my_b_safe_write(IO_CACHE* file, const char* buf,
int len)
{
// Sasha: We are not writing this with the ? operator to avoid hitting
// a possible compiler bug. At least gcc 2.95 cannot deal with
// several layers of ternary operators that evaluated comma(,) operator
// expressions inside - I do have a test case if somebody wants it
if (file->type == SEQ_READ_APPEND)
return my_b_append(file,buf,len);
return my_b_write(file,buf,len);
}
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len) static void pretty_print_str(FILE* file, char* str, int len)
{ {
...@@ -403,7 +415,7 @@ int Log_event::write_header(IO_CACHE* file) ...@@ -403,7 +415,7 @@ int Log_event::write_header(IO_CACHE* file)
pos += 4; pos += 4;
int2store(pos, flags); int2store(pos, flags);
pos += 2; pos += 2;
return (my_b_write(file, (byte*) buf, (uint) (pos - buf))); return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
...@@ -677,7 +689,7 @@ int Start_log_event::write_data(IO_CACHE* file) ...@@ -677,7 +689,7 @@ int Start_log_event::write_data(IO_CACHE* file)
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
int4store(buff + ST_CREATED_OFFSET,created); int4store(buff + ST_CREATED_OFFSET,created);
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
} }
Rotate_log_event::Rotate_log_event(const char* buf, int event_len, Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
...@@ -714,8 +726,8 @@ int Rotate_log_event::write_data(IO_CACHE* file) ...@@ -714,8 +726,8 @@ int Rotate_log_event::write_data(IO_CACHE* file)
{ {
char buf[ROTATE_HEADER_LEN]; char buf[ROTATE_HEADER_LEN];
int8store(buf, pos + R_POS_OFFSET); int8store(buf, pos + R_POS_OFFSET);
return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) || return my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
my_b_write(file, (byte*)new_log_ident, (uint) ident_len); my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len);
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
...@@ -812,9 +824,9 @@ int Query_log_event::write_data(IO_CACHE* file) ...@@ -812,9 +824,9 @@ int Query_log_event::write_data(IO_CACHE* file)
buf[Q_DB_LEN_OFFSET] = (char)db_len; buf[Q_DB_LEN_OFFSET] = (char)db_len;
int2store(buf + Q_ERR_CODE_OFFSET, error_code); int2store(buf + Q_ERR_CODE_OFFSET, error_code);
return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) || return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
my_b_write(file, (byte*) query, q_len)) ? -1 : 0; my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
} }
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format): Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
...@@ -840,7 +852,7 @@ int Intvar_log_event::write_data(IO_CACHE* file) ...@@ -840,7 +852,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
char buf[9]; char buf[9];
buf[I_TYPE_OFFSET] = type; buf[I_TYPE_OFFSET] = type;
int8store(buf + I_VAL_OFFSET, val); int8store(buf + I_VAL_OFFSET, val);
return my_b_write(file, (byte*) buf, sizeof(buf)); return my_b_safe_write(file, (byte*) buf, sizeof(buf));
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -878,7 +890,7 @@ int Load_log_event::write_data_header(IO_CACHE* file) ...@@ -878,7 +890,7 @@ int Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len; buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN); return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
} }
int Load_log_event::write_data_body(IO_CACHE* file) int Load_log_event::write_data_body(IO_CACHE* file)
...@@ -886,20 +898,20 @@ int Load_log_event::write_data_body(IO_CACHE* file) ...@@ -886,20 +898,20 @@ int Load_log_event::write_data_body(IO_CACHE* file)
if (sql_ex.write_data(file)) return 1; if (sql_ex.write_data(file)) return 1;
if (num_fields && fields && field_lens) if (num_fields && fields && field_lens)
{ {
if (my_b_write(file, (byte*)field_lens, num_fields) || if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
my_b_write(file, (byte*)fields, field_block_len)) my_b_safe_write(file, (byte*)fields, field_block_len))
return 1; return 1;
} }
return (my_b_write(file, (byte*)table_name, table_name_len + 1) || return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
my_b_write(file, (byte*)db, db_len + 1) || my_b_safe_write(file, (byte*)db, db_len + 1) ||
my_b_write(file, (byte*)fname, fname_len)); my_b_safe_write(file, (byte*)fname, fname_len));
} }
static bool write_str(IO_CACHE *file, char *str, byte length) static bool write_str(IO_CACHE *file, char *str, byte length)
{ {
return (my_b_write(file, &length, 1) || return (my_b_safe_write(file, &length, 1) ||
my_b_write(file, (byte*) str, (int) length)); my_b_safe_write(file, (byte*) str, (int) length));
} }
int sql_ex_info::write_data(IO_CACHE* file) int sql_ex_info::write_data(IO_CACHE* file)
...@@ -911,7 +923,7 @@ int sql_ex_info::write_data(IO_CACHE* file) ...@@ -911,7 +923,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
write_str(file, line_term, line_term_len) || write_str(file, line_term, line_term_len) ||
write_str(file, line_start, line_start_len) || write_str(file, line_start, line_start_len) ||
write_str(file, escaped, escaped_len) || write_str(file, escaped, escaped_len) ||
my_b_write(file,(byte*) &opt_flags,1)); my_b_safe_write(file,(byte*) &opt_flags,1));
} }
else else
{ {
...@@ -923,7 +935,7 @@ int sql_ex_info::write_data(IO_CACHE* file) ...@@ -923,7 +935,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
old_ex.escaped= *escaped; old_ex.escaped= *escaped;
old_ex.opt_flags= opt_flags; old_ex.opt_flags= opt_flags;
old_ex.empty_flags=empty_flags; old_ex.empty_flags=empty_flags;
return my_b_write(file, (byte*) &old_ex, sizeof(old_ex)); return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
} }
} }
...@@ -1280,7 +1292,7 @@ int Slave_log_event::write_data(IO_CACHE* file) ...@@ -1280,7 +1292,7 @@ int Slave_log_event::write_data(IO_CACHE* file)
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
// log and host are already there // log and host are already there
return my_b_write(file, (byte*)mem_pool, get_data_size()); return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
} }
void Slave_log_event::init_from_mem_pool(int data_size) void Slave_log_event::init_from_mem_pool(int data_size)
...@@ -1330,8 +1342,8 @@ int Create_file_log_event::write_data_body(IO_CACHE* file) ...@@ -1330,8 +1342,8 @@ int Create_file_log_event::write_data_body(IO_CACHE* file)
int res; int res;
if ((res = Load_log_event::write_data_body(file)) || fake_base) if ((res = Load_log_event::write_data_body(file)) || fake_base)
return res; return res;
return (my_b_write(file, (byte*) "", 1) || return (my_b_safe_write(file, (byte*) "", 1) ||
my_b_write(file, (byte*) block, block_len)); my_b_safe_write(file, (byte*) block, block_len));
} }
int Create_file_log_event::write_data_header(IO_CACHE* file) int Create_file_log_event::write_data_header(IO_CACHE* file)
...@@ -1341,7 +1353,7 @@ int Create_file_log_event::write_data_header(IO_CACHE* file) ...@@ -1341,7 +1353,7 @@ int Create_file_log_event::write_data_header(IO_CACHE* file)
return res; return res;
byte buf[CREATE_FILE_HEADER_LEN]; byte buf[CREATE_FILE_HEADER_LEN];
int4store(buf + CF_FILE_ID_OFFSET, file_id); int4store(buf + CF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, CREATE_FILE_HEADER_LEN); return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
} }
int Create_file_log_event::write_base(IO_CACHE* file) int Create_file_log_event::write_base(IO_CACHE* file)
...@@ -1423,8 +1435,8 @@ int Append_block_log_event::write_data(IO_CACHE* file) ...@@ -1423,8 +1435,8 @@ int Append_block_log_event::write_data(IO_CACHE* file)
{ {
byte buf[APPEND_BLOCK_HEADER_LEN]; byte buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id); int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) || return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
my_b_write(file, (byte*) block, block_len)); my_b_safe_write(file, (byte*) block, block_len));
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -1473,7 +1485,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file) ...@@ -1473,7 +1485,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file)
{ {
byte buf[DELETE_FILE_HEADER_LEN]; byte buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id); int4store(buf + DF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, DELETE_FILE_HEADER_LEN); return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -1520,7 +1532,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file) ...@@ -1520,7 +1532,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file)
{ {
byte buf[EXEC_LOAD_HEADER_LEN]; byte buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id); int4store(buf + EL_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN); return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
......
...@@ -54,6 +54,9 @@ static int stuck_count = 0; ...@@ -54,6 +54,9 @@ static int stuck_count = 0;
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi); static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
...@@ -1918,34 +1921,86 @@ the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \ ...@@ -1918,34 +1921,86 @@ the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here DBUG_RETURN(0); // Can't return anything here
} }
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
if (!rev->is_valid())
return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident,
rev->ident_len);
mi->master_log_name[rev->ident_len] = 0;
mi->master_log_pos = rev->pos;
#ifndef DBUG_OFF
/* if we do not do this, we will be getting the first
rotate event forever, so
we need to not disconnect after one
*/
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
return 0;
}
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len)
{
const char* errmsg = 0;
bool inc_pos = 1;
Log_event* ev = Log_event::read_log_event(buf,event_len, &errmsg,
1/*old format*/);
if (!ev)
{
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
return 1;
}
ev->log_pos = mi->master_log_pos;
switch (ev->get_type_code())
{
case ROTATE_EVENT:
if (process_io_rotate(mi,(Rotate_log_event*)ev))
{
delete ev;
return 1;
}
inc_pos = 0;
break;
case LOAD_EVENT:
// TODO: actually process it
mi->master_log_pos += event_len;
return 0;
break;
default:
break;
}
if (mi->rli.relay_log.append(ev))
{
delete ev;
return 1;
}
delete ev;
if (inc_pos)
mi->master_log_pos += event_len;
return 0;
}
int queue_event(MASTER_INFO* mi,const char* buf,uint event_len) int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
{ {
int error; int error;
bool inc_pos = 1; bool inc_pos = 1;
if (mi->old_format) if (mi->old_format)
return 1; // TODO: deal with old format return queue_old_event(mi,buf,event_len);
// TODO: figure out if other events in addition to Rotate
// require special processing
switch (buf[EVENT_TYPE_OFFSET]) switch (buf[EVENT_TYPE_OFFSET])
{ {
case ROTATE_EVENT: case ROTATE_EVENT:
{ {
Rotate_log_event rev(buf,event_len,0); Rotate_log_event rev(buf,event_len,0);
if (!rev.is_valid()) if (process_io_rotate(mi,&rev))
return 1; return 1;
DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name)); inc_pos=0;
memcpy(mi->master_log_name,rev.new_log_ident,
rev.ident_len);
mi->master_log_name[rev.ident_len] = 0;
mi->master_log_pos = rev.pos;
inc_pos = 0;
#ifndef DBUG_OFF
/* if we do not do this, we will be getting the first
rotate event forever, so
we need to not disconnect after one
*/
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
break; break;
} }
default: default:
......
...@@ -108,6 +108,7 @@ public: ...@@ -108,6 +108,7 @@ public:
//v stands for vector //v stands for vector
//invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) //invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
bool appendv(const char* buf,uint len,...); bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment