make LOAD DATA INFILE replication work with multi-character

delimiters/starters/terminators

started work on server management daemon
parent 440bff25
...@@ -6,8 +6,8 @@ master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) ...@@ -6,8 +6,8 @@ master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
master-bin.001 554 Exec_load 1 8 ;file_id=1 master-bin.001 556 Exec_load 1 8 ;file_id=1
master-bin.001 577 Query 1 9 use test; drop table t1 master-bin.001 579 Query 1 9 use test; drop table t1
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
...@@ -23,10 +23,10 @@ master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) ...@@ -23,10 +23,10 @@ master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
master-bin.001 554 Exec_load 1 8 ;file_id=1 master-bin.001 556 Exec_load 1 8 ;file_id=1
master-bin.001 577 Query 1 9 use test; drop table t1 master-bin.001 579 Query 1 9 use test; drop table t1
master-bin.001 625 Rotate 1 10 master-bin.002;pos=4 master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 666 Stop 1 11 master-bin.001 668 Stop 1 11
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.002 79 Query 1 2 use test; create table t1 (n int) master-bin.002 79 Query 1 2 use test; create table t1 (n int)
...@@ -47,10 +47,10 @@ slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL) ...@@ -47,10 +47,10 @@ slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL)
slave-bin.001 316 Query 1 5 use test; drop table t1 slave-bin.001 316 Query 1 5 use test; drop table t1
slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null) slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null)
slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81
slave-bin.001 616 Exec_load 1 8 ;file_id=1 slave-bin.001 618 Exec_load 1 8 ;file_id=1
slave-bin.001 639 Query 1 9 use test; drop table t1 slave-bin.001 641 Query 1 9 use test; drop table t1
slave-bin.001 687 Rotate 1 4 slave-bin.002;pos=4; forced by master slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 727 Stop 2 5 slave-bin.001 729 Stop 2 5
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
......
...@@ -24,20 +24,24 @@ ...@@ -24,20 +24,24 @@
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
static void pretty_print_char(FILE* file, int c) static void pretty_print_str(FILE* file, char* str, int len)
{ {
char* end = str + len;
fputc('\'', file); fputc('\'', file);
switch(c) { while (str < end)
case '\n': fprintf(file, "\\n"); break; {
case '\r': fprintf(file, "\\r"); break; switch ((c=*str++)) {
case '\\': fprintf(file, "\\\\"); break; case '\n': fprintf(file, "\\n"); break;
case '\b': fprintf(file, "\\b"); break; case '\r': fprintf(file, "\\r"); break;
case '\t': fprintf(file, "\\t"); break; case '\\': fprintf(file, "\\\\"); break;
case '\'': fprintf(file, "\\'"); break; case '\b': fprintf(file, "\\b"); break;
case 0 : fprintf(file, "\\0"); break; case '\t': fprintf(file, "\\t"); break;
default: case '\'': fprintf(file, "\\'"); break;
fputc(c, file); case 0 : fprintf(file, "\\0"); break;
break; default:
fputc(c, file);
break;
}
} }
fputc('\'', file); fputc('\'', file);
} }
...@@ -46,20 +50,25 @@ static void pretty_print_char(FILE* file, int c) ...@@ -46,20 +50,25 @@ static void pretty_print_char(FILE* file, int c)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
static void pretty_print_char(String* packet, int c) static void pretty_print_str(String* packet, char* str, int len)
{ {
char* end = str + len;
packet->append('\''); packet->append('\'');
switch(c) { while (str < end)
case '\n': packet->append( "\\n"); break; {
case '\r': packet->append( "\\r"); break; char c;
case '\\': packet->append( "\\\\"); break; switch((c=*str++)) {
case '\b': packet->append( "\\b"); break; case '\n': packet->append( "\\n"); break;
case '\t': packet->append( "\\t"); break; case '\r': packet->append( "\\r"); break;
case '\'': packet->append( "\\'"); break; case '\\': packet->append( "\\\\"); break;
case 0 : packet->append( "\\0"); break; case '\b': packet->append( "\\b"); break;
default: case '\t': packet->append( "\\t"); break;
packet->append((char)c); case '\'': packet->append( "\\'"); break;
break; case 0 : packet->append( "\\0"); break;
default:
packet->append((char)c);
break;
}
} }
packet->append('\''); packet->append('\'');
} }
...@@ -88,6 +97,7 @@ const char* Log_event::get_type_str() ...@@ -88,6 +97,7 @@ const char* Log_event::get_type_str()
case ROTATE_EVENT: return "Rotate"; case ROTATE_EVENT: return "Rotate";
case INTVAR_EVENT: return "Intvar"; case INTVAR_EVENT: return "Intvar";
case LOAD_EVENT: return "Load"; case LOAD_EVENT: return "Load";
case NEW_LOAD_EVENT: return "New_load";
case SLAVE_EVENT: return "Slave"; case SLAVE_EVENT: return "Slave";
case CREATE_FILE_EVENT: return "Create_file"; case CREATE_FILE_EVENT: return "Create_file";
case APPEND_BLOCK_EVENT: return "Append_block"; case APPEND_BLOCK_EVENT: return "Append_block";
...@@ -201,36 +211,36 @@ void Load_log_event::pack_info(String* packet) ...@@ -201,36 +211,36 @@ void Load_log_event::pack_info(String* packet)
tmp.append("INTO TABLE "); tmp.append("INTO TABLE ");
tmp.append(table_name); tmp.append(table_name);
if (!(sql_ex.empty_flags & FIELD_TERM_EMPTY)) if (sql_ex.field_term_len)
{ {
tmp.append(" FIELDS TERMINATED BY "); tmp.append(" FIELDS TERMINATED BY ");
pretty_print_char(&tmp, sql_ex.field_term); pretty_print_str(&tmp, sql_ex.field_term, sql_ex.field_term_len);
} }
if (!(sql_ex.empty_flags & ENCLOSED_EMPTY)) if (sql_ex.enclosed_len)
{ {
if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
tmp.append(" OPTIONALLY "); tmp.append(" OPTIONALLY ");
tmp.append( " ENCLOSED BY "); tmp.append( " ENCLOSED BY ");
pretty_print_char(&tmp, sql_ex.enclosed); pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len);
} }
if (!(sql_ex.empty_flags & ESCAPED_EMPTY)) if (sql_ex.escaped_len)
{ {
tmp.append( " ESCAPED BY "); tmp.append( " ESCAPED BY ");
pretty_print_char(&tmp, sql_ex.escaped); pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len);
} }
if (!(sql_ex.empty_flags & LINE_TERM_EMPTY)) if (sql_ex.line_term_len)
{ {
tmp.append(" LINES TERMINATED BY "); tmp.append(" LINES TERMINATED BY ");
pretty_print_char(&tmp, sql_ex.line_term); pretty_print_str(&tmp, sql_ex.line_term, sql_ex.line_term_len);
} }
if (!(sql_ex.empty_flags & LINE_START_EMPTY)) if (sql_ex.line_start_len)
{ {
tmp.append(" LINES STARTING BY "); tmp.append(" LINES STARTING BY ");
pretty_print_char(&tmp, sql_ex.line_start); pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len);
} }
if ((int)skip_lines > 0) if ((int)skip_lines > 0)
...@@ -443,13 +453,14 @@ Log_event* Log_event::read_log_event(IO_CACHE* file) ...@@ -443,13 +453,14 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error = "Event too small"; error = "Event too small";
goto err; goto err;
} }
if (!(buf = my_malloc(data_len, MYF(MY_WME)))) // some events use the extra byte to null-terminate strings
if (!(buf = my_malloc(data_len+1, MYF(MY_WME))))
{ {
error = "Out of memory"; error = "Out of memory";
goto err; goto err;
} }
buf[data_len] = 0;
memcpy(buf, head, LOG_EVENT_HEADER_LEN); memcpy(buf, head, LOG_EVENT_HEADER_LEN);
if(my_b_read(file, (byte*) buf + LOG_EVENT_HEADER_LEN, if(my_b_read(file, (byte*) buf + LOG_EVENT_HEADER_LEN,
data_len - LOG_EVENT_HEADER_LEN)) data_len - LOG_EVENT_HEADER_LEN))
...@@ -483,6 +494,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) ...@@ -483,6 +494,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len)
ev = new Query_log_event(buf, event_len); ev = new Query_log_event(buf, event_len);
break; break;
case LOAD_EVENT: case LOAD_EVENT:
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len); ev = new Load_log_event(buf, event_len);
break; break;
case ROTATE_EVENT: case ROTATE_EVENT:
...@@ -784,12 +796,12 @@ int Load_log_event::write_data_header(IO_CACHE* file) ...@@ -784,12 +796,12 @@ int Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len; buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
memcpy(buf + L_SQL_EX_OFFSET, &sql_ex, sizeof(sql_ex));
return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN); return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN);
} }
int Load_log_event::write_data_body(IO_CACHE* file) int Load_log_event::write_data_body(IO_CACHE* file)
{ {
if (sql_ex.write_data(file)) return 1;
if (num_fields && fields && field_lens) if (num_fields && fields && field_lens)
{ {
if(my_b_write(file, (byte*)field_lens, num_fields) || if(my_b_write(file, (byte*)field_lens, num_fields) ||
...@@ -801,6 +813,76 @@ int Load_log_event::write_data_body(IO_CACHE* file) ...@@ -801,6 +813,76 @@ int Load_log_event::write_data_body(IO_CACHE* file)
my_b_write(file, (byte*)fname, fname_len); my_b_write(file, (byte*)fname, fname_len);
} }
#define WRITE_STR(name) my_b_write(file,(byte*)&name ## _len, 1) || \
my_b_write(file,(byte*)name,name ## _len)
#define OLD_EX_INIT(name) old_ex.##name = *name
int sql_ex_info::write_data(IO_CACHE* file)
{
if (new_format())
{
return WRITE_STR(field_term) || WRITE_STR(enclosed) ||
WRITE_STR(line_term) || WRITE_STR(line_start) ||
WRITE_STR(escaped) || my_b_write(file,(byte*)&opt_flags,1);
}
else
{
old_sql_ex old_ex;
OLD_EX_INIT(field_term);
OLD_EX_INIT(enclosed);
OLD_EX_INIT(line_term);
OLD_EX_INIT(line_start);
OLD_EX_INIT(escaped);
old_ex.opt_flags = opt_flags;
old_ex.empty_flags = empty_flags;
return my_b_write(file,(byte*)&old_ex,sizeof(old_ex));
}
}
#define READ_STR(name) name ## _len = *buf++;\
if (buf >= buf_end) return 0;\
name = buf; \
buf += name ## _len; \
if (buf >= buf_end) return 0;
#define READ_OLD_STR(name) name ## _len = 1; \
name = buf++; \
if (buf >= buf_end) return 0;
#define FIX_OLD_LEN(name,NAME) if (empty_flags & NAME ## _EMPTY) \
name ## _len = 0
char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
{
cached_new_format = use_new_format;
if (use_new_format)
{
READ_STR(field_term);
READ_STR(enclosed);
READ_STR(line_term);
READ_STR(line_start);
READ_STR(escaped);
opt_flags = *buf++;
}
else
{
READ_OLD_STR(field_term);
READ_OLD_STR(enclosed);
READ_OLD_STR(line_term);
READ_OLD_STR(line_start);
READ_OLD_STR(escaped);
opt_flags = *buf++;
empty_flags = *buf++;
FIX_OLD_LEN(field_term,FIELD_TERM);
FIX_OLD_LEN(enclosed,ENCLOSED);
FIX_OLD_LEN(line_term,LINE_TERM);
FIX_OLD_LEN(line_start,LINE_START);
FIX_OLD_LEN(escaped,ESCAPED);
}
return buf;
}
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
const char* db_arg, const char* table_name_arg, const char* db_arg, const char* table_name_arg,
...@@ -809,7 +891,7 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, ...@@ -809,7 +891,7 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
num_fields(0),fields(0),field_lens(0),field_block_len(0), num_fields(0),fields(0),field_lens(0),field_block_len(0),
table_name(table_name_arg), table_name(table_name_arg),
db(db_arg), db(db_arg),
fname(ex->file_name),fname_null_term(1) fname(ex->file_name)
{ {
time_t end_time; time_t end_time;
time(&end_time); time(&end_time);
...@@ -817,12 +899,19 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, ...@@ -817,12 +899,19 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
db_len = (db) ? (uint32) strlen(db) : 0; db_len = (db) ? (uint32) strlen(db) : 0;
table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
fname_len = (fname) ? (uint) strlen(fname) : 0; fname_len = (fname) ? (uint) strlen(fname) : 0;
sql_ex.field_term = (*ex->field_term)[0]; sql_ex.field_term = (char*)ex->field_term->ptr();
sql_ex.enclosed = (*ex->enclosed)[0]; sql_ex.field_term_len = ex->field_term->length();
sql_ex.line_term = (*ex->line_term)[0]; sql_ex.enclosed = (char*)ex->enclosed->ptr();
sql_ex.line_start = (*ex->line_start)[0]; sql_ex.enclosed_len = ex->enclosed->length();
sql_ex.escaped = (*ex->escaped)[0]; sql_ex.line_term = (char*)ex->line_term->ptr();
sql_ex.line_term_len = ex->line_term->length();
sql_ex.line_start = (char*)ex->line_start->ptr();
sql_ex.line_start_len = ex->line_start->length();
sql_ex.escaped = (char*)ex->escaped->ptr();
sql_ex.escaped_len = ex->escaped->length();
sql_ex.opt_flags = 0; sql_ex.opt_flags = 0;
sql_ex.cached_new_format = -1;
if(ex->dumpfile) if(ex->dumpfile)
sql_ex.opt_flags |= DUMPFILE_FLAG; sql_ex.opt_flags |= DUMPFILE_FLAG;
if(ex->opt_enclosed) if(ex->opt_enclosed)
...@@ -837,6 +926,7 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, ...@@ -837,6 +926,7 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
case DUP_ERROR: break; case DUP_ERROR: break;
} }
if(!ex->field_term->length()) if(!ex->field_term->length())
sql_ex.empty_flags |= FIELD_TERM_EMPTY; sql_ex.empty_flags |= FIELD_TERM_EMPTY;
if(!ex->enclosed->length()) if(!ex->enclosed->length())
...@@ -869,10 +959,12 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, ...@@ -869,10 +959,12 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
#endif #endif
// the caller must do buf[event_len] = 0 before he starts using the
// constructed event
Load_log_event::Load_log_event(const char* buf, int event_len): Load_log_event::Load_log_event(const char* buf, int event_len):
Log_event(buf),num_fields(0),fields(0), Log_event(buf),num_fields(0),fields(0),
field_lens(0),field_block_len(0), field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0),fname_null_term(0) table_name(0),db(0),fname(0)
{ {
if (!event_len) // derived class, will call copy_log_event() itself if (!event_len) // derived class, will call copy_log_event() itself
return; return;
...@@ -882,12 +974,7 @@ Load_log_event::Load_log_event(const char* buf, int event_len): ...@@ -882,12 +974,7 @@ Load_log_event::Load_log_event(const char* buf, int event_len):
int Load_log_event::copy_log_event(const char *buf, ulong event_len) int Load_log_event::copy_log_event(const char *buf, ulong event_len)
{ {
uint data_len; uint data_len;
int body_offset = get_data_body_offset(); char* buf_end = (char*)buf + event_len;
if((int)event_len < body_offset)
return 1;
memcpy(&sql_ex, buf + L_SQL_EX_OFFSET + LOG_EVENT_HEADER_LEN,
sizeof(sql_ex));
data_len = event_len - body_offset;
thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN); thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN);
exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN); exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN);
skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN); skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN);
...@@ -895,9 +982,19 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len) ...@@ -895,9 +982,19 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len)
db_len = (uint)buf[L_DB_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; db_len = (uint)buf[L_DB_LEN_OFFSET + LOG_EVENT_HEADER_LEN];
num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN); num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN);
int body_offset = get_data_body_offset();
if ((int)event_len < body_offset)
return 1;
//sql_ex.init() on success returns the pointer to the first byte after
//the sql_ex structure, which is the start of field lengths array
if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset,
buf_end,
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
return 1;
data_len = event_len - body_offset;
if (num_fields > data_len) // simple sanity check against corruption if (num_fields > data_len) // simple sanity check against corruption
return 1; return 1;
field_lens = (uchar*)buf + body_offset;
uint i; uint i;
for (i = 0; i < num_fields; i++) for (i = 0; i < num_fields; i++)
{ {
...@@ -907,9 +1004,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len) ...@@ -907,9 +1004,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len)
table_name = fields + field_block_len; table_name = fields + field_block_len;
db = table_name + table_name_len + 1; db = table_name + table_name_len + 1;
fname = db + db_len + 1; fname = db + db_len + 1;
fname_len = (get_type_code() == LOAD_EVENT) ? int type_code = get_type_code();
data_len - 2 - db_len - table_name_len - num_fields - field_block_len : fname_len = strlen(fname);
strlen(fname); // null termination is accomplished by the caller doing buf[event_len]=0
return 0; return 0;
} }
...@@ -943,36 +1040,36 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -943,36 +1040,36 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
fprintf(file," IGNORE "); fprintf(file," IGNORE ");
fprintf(file, "INTO TABLE %s ", table_name); fprintf(file, "INTO TABLE %s ", table_name);
if(!(sql_ex.empty_flags & FIELD_TERM_EMPTY)) if(sql_ex.field_term)
{ {
fprintf(file, " FIELDS TERMINATED BY "); fprintf(file, " FIELDS TERMINATED BY ");
pretty_print_char(file, sql_ex.field_term); pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
} }
if(!(sql_ex.empty_flags & ENCLOSED_EMPTY)) if(sql_ex.enclosed)
{ {
if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
fprintf(file," OPTIONALLY "); fprintf(file," OPTIONALLY ");
fprintf(file, " ENCLOSED BY "); fprintf(file, " ENCLOSED BY ");
pretty_print_char(file, sql_ex.enclosed); pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
} }
if(!(sql_ex.empty_flags & ESCAPED_EMPTY)) if (sql_ex.escaped)
{ {
fprintf(file, " ESCAPED BY "); fprintf(file, " ESCAPED BY ");
pretty_print_char(file, sql_ex.escaped); pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
} }
if(!(sql_ex.empty_flags & LINE_TERM_EMPTY)) if (sql_ex.line_term)
{ {
fprintf(file," LINES TERMINATED BY "); fprintf(file," LINES TERMINATED BY ");
pretty_print_char(file, sql_ex.line_term); pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len);
} }
if(!(sql_ex.empty_flags & LINE_START_EMPTY)) if (sql_ex.line_start)
{ {
fprintf(file," LINES STARTING BY "); fprintf(file," LINES STARTING BY ");
pretty_print_char(file, sql_ex.line_start); pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
} }
if((int)skip_lines > 0) if((int)skip_lines > 0)
...@@ -1119,6 +1216,7 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, ...@@ -1119,6 +1216,7 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex,
fake_base(0),block(block_arg),block_len(block_len_arg), fake_base(0),block(block_arg),block_len(block_len_arg),
file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
{ {
sql_ex.force_new_format();
} }
#endif #endif
...@@ -1155,12 +1253,11 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len): ...@@ -1155,12 +1253,11 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len):
int block_offset; int block_offset;
if (copy_log_event(buf,len)) if (copy_log_event(buf,len))
return; return;
fname_null_term = 1;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
if(len < block_offset) if (len < block_offset)
return; return;
block = (char*)buf + block_offset; block = (char*)buf + block_offset;
block_len = len - block_offset; block_len = len - block_offset;
...@@ -1443,46 +1540,20 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) ...@@ -1443,46 +1540,20 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{ {
char llbuff[22]; char llbuff[22];
enum enum_duplicates handle_dup = DUP_IGNORE; enum enum_duplicates handle_dup = DUP_IGNORE;
char fname_buf[FN_REFLEN+1], *fname_p;
if (fname_null_term)
fname_p = (char*)fname;
else
{
int len = min(FN_REFLEN,fname_len);
memcpy(fname_buf,fname,len);
fname_buf[len] = 0;
fname_p = fname_buf;
}
if(sql_ex.opt_flags && REPLACE_FLAG) if(sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE; handle_dup = DUP_REPLACE;
sql_exchange ex(fname_p, sql_ex.opt_flags && sql_exchange ex((char*)fname, sql_ex.opt_flags &&
DUMPFILE_FLAG ); DUMPFILE_FLAG );
String field_term(&sql_ex.field_term, 1),
enclosed(&sql_ex.enclosed, 1), #define SET_EX(name) String name(sql_ex.name,sql_ex.name ## _len);\
line_term(&sql_ex.line_term,1), ex.name = &name;
escaped(&sql_ex.escaped, 1),
line_start(&sql_ex.line_start, 1); SET_EX(field_term);
SET_EX(enclosed);
ex.field_term = &field_term; SET_EX(line_term);
if(sql_ex.empty_flags & FIELD_TERM_EMPTY) SET_EX(line_start);
ex.field_term->length(0); SET_EX(escaped);
ex.enclosed = &enclosed;
if(sql_ex.empty_flags & ENCLOSED_EMPTY)
ex.enclosed->length(0);
ex.line_term = &line_term;
if(sql_ex.empty_flags & LINE_TERM_EMPTY)
ex.line_term->length(0);
ex.line_start = &line_start;
if(sql_ex.empty_flags & LINE_START_EMPTY)
ex.line_start->length(0);
ex.escaped = &escaped;
if(sql_ex.empty_flags & ESCAPED_EMPTY)
ex.escaped->length(0);
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
if(sql_ex.empty_flags & FIELD_TERM_EMPTY) if(sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0); ex.field_term->length(0);
...@@ -1528,7 +1599,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) ...@@ -1528,7 +1599,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
sql_error = ER_UNKNOWN_ERROR; sql_error = ER_UNKNOWN_ERROR;
slave_print_error(sql_error, "Slave: Error '%s' running load data infile ", slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
ER(sql_error)); ER_SAFE(sql_error));
free_root(&thd->mem_root,0); free_root(&thd->mem_root,0);
return 1; return 1;
} }
...@@ -1745,7 +1816,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) ...@@ -1745,7 +1816,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
goto err; goto err;
} }
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0)) if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0))
|| lev->get_type_code() != LOAD_EVENT) || lev->get_type_code() != NEW_LOAD_EVENT)
{ {
slave_print_error(0, "File '%s' appears corrupted", fname); slave_print_error(0, "File '%s' appears corrupted", fname);
goto err; goto err;
......
...@@ -53,16 +53,46 @@ ...@@ -53,16 +53,46 @@
#define LINE_START_EMPTY 0x8 #define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10 #define ESCAPED_EMPTY 0x10
struct old_sql_ex
struct sql_ex_info
{ {
char field_term; char field_term;
char enclosed; char enclosed;
char line_term; char line_term;
char line_start; char line_start;
char escaped; char escaped;
char opt_flags; // flags for the options char opt_flags;
char empty_flags; // flags to indicate which of the terminating charact char empty_flags;
};
struct sql_ex_info
{
char* field_term;
char* enclosed;
char* line_term;
char* line_start;
char* escaped;
uchar field_term_len,enclosed_len,line_term_len,line_start_len,
escaped_len;
char opt_flags;
char empty_flags;
int cached_new_format;
// store in new format even if old is possible
void force_new_format() { cached_new_format = 1;}
int data_size() { return new_format() ?
field_term_len + enclosed_len + line_term_len +
line_start_len + escaped_len + 6 : 7;}
int write_data(IO_CACHE* file);
char* init(char* buf,char* buf_end,bool use_new_format);
bool new_format()
{
return (cached_new_format != -1) ? cached_new_format :
(cached_new_format=(field_term_len > 1 ||
enclosed_len > 1 ||
line_term_len > 1 || line_start_len > 1 ||
escaped_len > 1));
}
} ; } ;
/* Binary log consists of events. Each event has a fixed length header, /* Binary log consists of events. Each event has a fixed length header,
...@@ -76,7 +106,7 @@ struct sql_ex_info ...@@ -76,7 +106,7 @@ struct sql_ex_info
/* event-specific post-header sizes */ /* event-specific post-header sizes */
#define LOG_EVENT_HEADER_LEN 19 #define LOG_EVENT_HEADER_LEN 19
#define QUERY_HEADER_LEN (4 + 4 + 1 + 2) #define QUERY_HEADER_LEN (4 + 4 + 1 + 2)
#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4+sizeof(struct sql_ex_info)) #define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4)
#define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4) #define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4)
#define ROTATE_HEADER_LEN 8 #define ROTATE_HEADER_LEN 8
#define CREATE_FILE_HEADER_LEN 4 #define CREATE_FILE_HEADER_LEN 4
...@@ -162,7 +192,8 @@ struct sql_ex_info ...@@ -162,7 +192,8 @@ struct sql_ex_info
enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2,
STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5,
LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8,
APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11,
NEW_LOAD_EVENT=12};
enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2
}; };
...@@ -359,7 +390,6 @@ public: ...@@ -359,7 +390,6 @@ public:
const char* table_name; const char* table_name;
const char* db; const char* db;
const char* fname; const char* fname;
bool fname_null_term;
uint32 skip_lines; uint32 skip_lines;
sql_ex_info sql_ex; sql_ex_info sql_ex;
...@@ -384,7 +414,8 @@ public: ...@@ -384,7 +414,8 @@ public:
~Load_log_event() ~Load_log_event()
{ {
} }
Log_event_type get_type_code() { return LOAD_EVENT; } Log_event_type get_type_code() { return sql_ex.new_format() ?
NEW_LOAD_EVENT: LOAD_EVENT; }
int write_data_header(IO_CACHE* file); int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file); int write_data_body(IO_CACHE* file);
bool is_valid() { return table_name != 0; } bool is_valid() { return table_name != 0; }
...@@ -395,7 +426,7 @@ public: ...@@ -395,7 +426,7 @@ public:
+ 4 // exec_time + 4 // exec_time
+ 4 // skip_lines + 4 // skip_lines
+ 4 // field block len + 4 // field block len
+ sizeof(sql_ex) + field_block_len + num_fields; + sql_ex.data_size() + field_block_len + num_fields;
; ;
} }
int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; } int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; }
...@@ -545,8 +576,10 @@ public: ...@@ -545,8 +576,10 @@ public:
~Create_file_log_event() ~Create_file_log_event()
{ {
} }
Log_event_type get_type_code() { return fake_base ? LOAD_EVENT : Log_event_type get_type_code()
CREATE_FILE_EVENT;} {
return fake_base ? Load_log_event::get_type_code() : CREATE_FILE_EVENT;
}
int get_data_size() { return fake_base ? Load_log_event::get_data_size() : int get_data_size() { return fake_base ? Load_log_event::get_data_size() :
Load_log_event::get_data_size() + Load_log_event::get_data_size() +
4 + 1 + block_len;} 4 + 1 + block_len;}
......
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* MySQL server management daemon
*
* Written by:
* Sasha Pachev <sasha@mysql.com>
**/
#define VERSION "1.0"
#include <global.h>
#include <my_sys.h>
#include <m_string.h>
#include <mysql.h>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment