Commit ada15c7a authored by unknown's avatar unknown

Fix various places where code would work incorrectly if the common_header_len...

Fix various places where code would work incorrectly if the common_header_len of events is different on master and slave

Patch developed with the help of Pavel Ivanov.

Also fix an uninitialised variable in queue_event().
parent 378bd044
......@@ -4806,12 +4806,23 @@ end:
}
bool MYSQL_BIN_LOG::append(Log_event* ev)
bool
MYSQL_BIN_LOG::append(Log_event *ev)
{
bool error = 0;
bool res;
mysql_mutex_lock(&LOCK_log);
res= append_no_lock(ev);
mysql_mutex_unlock(&LOCK_log);
return res;
}
bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
{
bool error = 0;
DBUG_ENTER("MYSQL_BIN_LOG::append");
mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Log_event::write() is smart enough to use my_b_write() or
......@@ -4829,7 +4840,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
mysql_mutex_unlock(&LOCK_log);
signal_update(); // Safe as we don't call close
DBUG_RETURN(error);
}
......
......@@ -712,6 +712,7 @@ public:
*/
bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
bool append_no_lock(Log_event* ev);
void mark_xids_active(ulong cookie, uint xid_count);
void mark_xid_done(ulong cookie, bool write_checkpoint);
......
......@@ -4754,16 +4754,15 @@ bool Format_description_log_event::write(IO_CACHE* file)
We don't call Start_log_event_v3::write() because this would make 2
my_b_safe_write().
*/
uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
size_t rec_size= sizeof(buff);
uchar buff[START_V3_HEADER_LEN+1];
size_t rec_size= sizeof(buff) + BINLOG_CHECKSUM_ALG_DESC_LEN +
number_of_event_types;
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
if (!dont_set_created)
created= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
LOG_EVENT_TYPES);
/*
if checksum is requested
record the checksum-algorithm descriptor next to
......@@ -4776,7 +4775,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#ifndef DBUG_OFF
data_written= 0; // to prepare for need_checksum assert
#endif
buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
uchar checksum_byte= need_checksum() ?
checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
/*
FD of checksum-aware server is always checksum-equipped, (V) is in,
......@@ -4796,7 +4795,10 @@ bool Format_description_log_event::write(IO_CACHE* file)
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
ret= (write_header(file, rec_size) ||
wrapper_my_b_safe_write(file, buff, rec_size) ||
wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
wrapper_my_b_safe_write(file, (uchar*)post_header_len,
number_of_event_types) ||
wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) ||
write_footer(file));
if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
......@@ -6125,7 +6127,7 @@ bool
Gtid_log_event::peek(const char *event_start, size_t event_len,
uint8 checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2)
uchar *flags2, const Format_description_log_event *fdev)
{
const char *p;
......@@ -6140,10 +6142,10 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
return true;
*server_id= uint4korr(event_start + SERVER_ID_OFFSET);
p= event_start + LOG_EVENT_HEADER_LEN;
p= event_start + fdev->common_header_len;
*seq_no= uint8korr(p);
p+= 8;
*domain_id= uint4korr(p);
......@@ -6581,7 +6583,8 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len)
rpl_gtid **out_gtid_list, uint32 *out_list_len,
const Format_description_log_event *fdev)
{
const char *p;
uint32 count_field, count;
......@@ -6598,13 +6601,13 @@ Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN)
return true;
p= event_start + LOG_EVENT_HEADER_LEN;
p= event_start + fdev->common_header_len;
count_field= uint4korr(p);
p+= 4;
count= count_field & ((1<<28)-1);
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN +
16 * count)
return true;
if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0),
......
......@@ -3118,7 +3118,7 @@ public:
static bool peek(const char *event_start, size_t event_len,
uint8 checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2);
uchar *flags2, const Format_description_log_event *fdev);
#endif
};
......@@ -3232,7 +3232,8 @@ public:
#endif
static bool peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len);
rpl_gtid **out_gtid_list, uint32 *out_list_len,
const Format_description_log_event *fdev);
};
......
......@@ -4925,8 +4925,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
LINT_INIT(inc_pos);
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
(uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
......@@ -5182,7 +5180,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &dummy_flag))
&event_gtid.seq_no, &dummy_flag,
rli->relay_log.description_event_for_queue))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
......@@ -5240,15 +5239,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
}
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
&mi->last_queued_gtid.domain_id,
&mi->last_queued_gtid.server_id,
&mi->last_queued_gtid.seq_no, &dummy_flag))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
mi->last_queued_gtid= event_gtid;
++mi->events_queued_since_last_gtid;
inc_pos= event_len;
}
break;
......@@ -5308,6 +5301,26 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (unlikely(gtid_skip_enqueue))
{
mi->master_log_pos+= inc_pos;
if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT &&
s_id == mi->master_id)
{
/*
If we write this master's description event in the middle of an event
group due to GTID reconnect, SQL thread will think that master crashed
in the middle of the group and roll back the first half, so we must not.
But we still have to write an artificial copy of the masters description
event, to override the initial slave-version description event so that
SQL thread has the right information for parsing the events it reads.
*/
rli->relay_log.description_event_for_queue->created= 0;
rli->relay_log.description_event_for_queue->set_artificial_event();
if (rli->relay_log.append_no_lock
(rli->relay_log.description_event_for_queue))
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
else
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
}
else
if ((s_id == global_system_variables.server_id &&
......
......@@ -1269,6 +1269,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int err;
String packet;
Format_description_log_event *fdev= NULL;
if (gtid_state->load((const rpl_gtid *)NULL, 0))
{
......@@ -1280,6 +1281,13 @@ gtid_state_from_pos(const char *name, uint32 offset,
if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
return errormsg;
if (!(fdev= new Format_description_log_event(3)))
{
errormsg= "Out of memory initializing format_description event "
"while scanning binlog to find start position";
goto end;
}
/*
First we need to find the initial GTID_LIST_EVENT. We need this even
if the offset is at the very start of the binlog file.
......@@ -1315,6 +1323,8 @@ gtid_state_from_pos(const char *name, uint32 offset,
typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
if (typ == FORMAT_DESCRIPTION_EVENT)
{
Format_description_log_event *tmp;
if (found_format_description_event)
{
errormsg= "Duplicate format description log event found while "
......@@ -1324,6 +1334,15 @@ gtid_state_from_pos(const char *name, uint32 offset,
current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
found_format_description_event= true;
if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(),
fdev)))
{
errormsg= "Corrupt Format_description event found or out-of-memory "
"while searching for old-style position in binlog";
goto end;
}
delete fdev;
fdev= tmp;
}
else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
{
......@@ -1348,7 +1367,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
}
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
current_checksum_alg,
&gtid_list, &list_len);
&gtid_list, &list_len, fdev);
if (status)
{
errormsg= "Error reading Gtid_list_log_event while searching "
......@@ -1376,7 +1395,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
uchar flags2;
if (Gtid_log_event::peek(packet.ptr(), packet.length(),
current_checksum_alg, &gtid.domain_id,
&gtid.server_id, &gtid.seq_no, &flags2))
&gtid.server_id, &gtid.seq_no, &flags2, fdev))
{
errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
"initial slave position";
......@@ -1399,6 +1418,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
}
end:
delete fdev;
end_io_cache(&cache);
mysql_file_close(file, MYF(MY_WME));
......@@ -1502,7 +1522,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
enum_gtid_until_state *gtid_until_group,
rpl_binlog_state *until_binlog_state,
bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
bool *send_fake_gtid_list)
bool *send_fake_gtid_list,
Format_description_log_event *fdev)
{
my_off_t pos;
size_t len= packet->length();
......@@ -1516,7 +1537,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (ev_offset > len ||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&gtid_list, &list_len))
&gtid_list, &list_len, fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
......@@ -1545,7 +1566,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &flags2))
&event_gtid.seq_no, &flags2, fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
......@@ -1881,6 +1902,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
Format_description_log_event *fdev= NULL;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
......@@ -1956,6 +1978,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
if (!(fdev= new Format_description_log_event(3)))
{
errmsg= "Out of memory initializing format_description event";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
if (!mysql_bin_log.is_open())
{
errmsg = "Binary log is not open";
......@@ -2119,6 +2148,8 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
Format_description_log_event *tmp;
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
......@@ -2136,6 +2167,18 @@ impossible position";
"slaves that cannot process them");
goto err;
}
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
delete fdev;
fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
......@@ -2253,6 +2296,8 @@ impossible position";
#endif
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
Format_description_log_event *tmp;
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
......@@ -2271,6 +2316,17 @@ impossible position";
goto err;
}
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
delete fdev;
fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
......@@ -2295,7 +2351,7 @@ impossible position";
until_gtid_state, &gtid_until_group,
&until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
&send_fake_gtid_list)))
&send_fake_gtid_list, fdev)))
{
errmsg= tmp_msg;
goto err;
......@@ -2501,7 +2557,7 @@ impossible position";
&gtid_skip_group, until_gtid_state,
&gtid_until_group, &until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
&send_fake_gtid_list)))
&send_fake_gtid_list, fdev)))
{
errmsg= tmp_msg;
goto err;
......@@ -2599,6 +2655,7 @@ end:
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete fdev;
DBUG_VOID_RETURN;
err:
......@@ -2674,6 +2731,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment