Commit 2735f0b9 authored by Venkatesh Duggirala's avatar Venkatesh Duggirala

Bug#21205695 DROP TABLE MAY CAUSE SLAVES TO BREAK

    Problem:
    ========
    1) Drop table queries are re-generated by server
    before writing the events(queries) into binlog
    for various reasons. If table name/db name contains
    a non regular characters (like latin characters),
    the generated query is wrong. Hence it breaks the
    replication.
    2) In the edge case, when table name/db name contains
    64 characters, server is throwing an assert
    assert(M_TBLLEN < 128)
    3) In the edge case, when db name contains 64 latin
    characters, binlog content is interpreted badly
    which is leading replication failure.

    Analysis & Fix :
    ================
    1) Parser reads the table name from the query and converts
    it to standard charset(utf8) and stores it in table_name variable.
    When drop table query is regenerated with the same table_name
    variable, it should be converted back to the original charset
    from standard charset(utf8).

    2) Latin character takes two bytes for each character. Limit
    of the identifier is 64. SYSTEM_CHARSET_MBMAXLEN is set to '3'.
    So there is a possiblity that tablename/dbname contains 3 * 64.
    Hence assert is changed to
    (M_TBLLEN <= NAME_CHAR_LEN*SYSTEM_CHARSET_MBMAXLEN)

    3) db_len in the binlog event header is taking 1 byte.
       db_len is ranged from 0 to 192 bytes (3 * 64).
       While reading the db_len from the event, server
       is casting to uint instead of uchar which is leading
       to bad db_len. This problem is fixed by changing the
       cast type to uchar.
parent 08e92938
include/master-slave.inc
[connection master]
Test case 1:- table name with one character latin name.
SET @s:=CONCAT("CREATE TABLE `",REPEAT(CHAR(131),1),"` (a INT)");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("INSERT INTO `",REPEAT(CHAR(131),1),"` VALUES (1)");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("DROP TABLE `",REPEAT(CHAR(131),1), "`");
PREPARE STMT FROM @s;
EXECUTE stmt;
Test case 2:- table name and database names with one character latin name.
SET @s:=CONCAT("CREATE DATABASE `",REPEAT(CHAR(131),1),"`");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("CREATE TABLE `",REPEAT(CHAR(131),1),"`.`",REPEAT(CHAR(131),1),"` (a INT)");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("INSERT INTO `",REPEAT(CHAR(131),1),"`.`",REPEAT(CHAR(131),1),"` VALUES (1)");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("DROP TABLE `",REPEAT(CHAR(131),1),"`.`",REPEAT(CHAR(131),1), "`");
PREPARE STMT FROM @s;
EXECUTE stmt;
SET @s:=CONCAT("DROP DATABASE `",REPEAT(CHAR(131),1),"`");
PREPARE STMT FROM @s;
EXECUTE stmt;
include/rpl_end.inc
###############################################################################
# Bug#21205695 DROP TABLE MAY CAUSE SLAVES TO BREAK
#
# Problem:
# ========
# 1) Drop table queries are re-generated by server
# before writing the events(queries) into binlog
# for various reasons. If table name/db name contains
# a non regular characters (like latin characters),
# the generated query is wrong. Hence it breaks the
# replication.
# 2) In the edge case, when table name contains
# 64 latin characters (latin takes 2 bytes), server is
# throwing an assert (M_TBLLEN < 128)
#
# 3) In the edge case, when db name contains 64 latin
# characters, binlog contents are interpreted wrongly
# which is leading to replication issues.
#
###############################################################################
--source include/not_windows.inc
--source include/master-slave.inc
--let iter=1
# Change iteration to 4 after fixing Bug #22280214
while ($iter <= 2)
{
--connection master
if ($iter == 1)
{
--echo Test case 1:- table name with one character latin name.
--let $tblname= REPEAT(CHAR(131),1)
}
if ($iter == 2)
{
--echo Test case 2:- table name and database names with one character latin name.
--let $tblname= REPEAT(CHAR(131),1),"`.`",REPEAT(CHAR(131),1)
--eval SET @s:=CONCAT("CREATE DATABASE `",REPEAT(CHAR(131),1),"`")
PREPARE STMT FROM @s; EXECUTE stmt;
}
# After fixing Bug #22280214 DATADIR LOCATION IS LIMITING
# IDENTIFIER MAX LENGTH, the following two tests (iter 3 and 4) can be
# uncommented.
#if ($iter == 3)
#{
# --echo Test case 3:- table name and database names with 64 latin characters name.
# --let $tblname= REPEAT(CHAR(131),64),"`.`", REPEAT(CHAR(131),64)
# --eval SET @s:=CONCAT("CREATE DATABASE `",REPEAT(CHAR(131),64),"`")
# PREPARE STMT FROM @s; EXECUTE stmt;
#}
#if ($iter == 4)
#{
# --echo Test case 4:- table name and database names with 64 Euro(€) characters.
# --let $tblname= REPEAT(CHAR(226,130,172),64),"`.`", REPEAT(CHAR(226,130,172),64)
# --eval SET @s:=CONCAT("CREATE DATABASE `",REPEAT(CHAR(226,130,172),64),"`")
# PREPARE STMT FROM @s; EXECUTE stmt;
#}
--eval SET @s:=CONCAT("CREATE TABLE `",$tblname,"` (a INT)")
PREPARE STMT FROM @s; EXECUTE stmt;
--eval SET @s:=CONCAT("INSERT INTO `",$tblname,"` VALUES (1)")
PREPARE STMT FROM @s; EXECUTE stmt;
--eval SET @s:=CONCAT("DROP TABLE `",$tblname, "`")
PREPARE STMT FROM @s; EXECUTE stmt;
if ($iter == 2)
{
--eval SET @s:=CONCAT("DROP DATABASE `",REPEAT(CHAR(131),1),"`")
PREPARE STMT FROM @s; EXECUTE stmt;
}
# After fixing Bug #22280214 DATADIR LOCATION IS LIMITING
# IDENTIFIER MAX LENGTH, the following two tests (iter 3 and 4) can be
# uncommented.
#if ($iter == 3)
#{
# --eval SET @s:=CONCAT("DROP DATABASE `",REPEAT(CHAR(131),64),"`")
# PREPARE STMT FROM @s; EXECUTE stmt;
#}
#if ($iter == 4)
#{
# --eval SET @s:=CONCAT("DROP DATABASE `",REPEAT(CHAR(226,130,172),64),"`")
# PREPARE STMT FROM @s; EXECUTE stmt;
#}
--sync_slave_with_master
--inc $iter
}
--source include/rpl_end.inc
/* /*
Copyright (c) 2000, 2014, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -2792,7 +2792,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, ...@@ -2792,7 +2792,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET); slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET); exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars db_len = (uchar)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET); error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
/* /*
...@@ -8960,9 +8960,9 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file) ...@@ -8960,9 +8960,9 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
{ {
DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL); DBUG_ASSERT(m_tblnam != NULL);
/* We use only one byte per length for storage in event: */
DBUG_ASSERT(m_dblen < 128); DBUG_ASSERT(m_dblen <= NAME_LEN);
DBUG_ASSERT(m_tbllen < 128); DBUG_ASSERT(m_tbllen <= NAME_LEN);
uchar const dbuf[]= { (uchar) m_dblen }; uchar const dbuf[]= { (uchar) m_dblen };
uchar const tbuf[]= { (uchar) m_tbllen }; uchar const tbuf[]= { (uchar) m_tbllen };
......
...@@ -949,29 +949,44 @@ static const char *require_quotes(const char *name, uint name_length) ...@@ -949,29 +949,44 @@ static const char *require_quotes(const char *name, uint name_length)
} }
/* /**
Quote the given identifier if needed and append it to the target string. Convert and quote the given identifier if needed and append it to the
If the given identifier is empty, it will be quoted. target string. If the given identifier is empty, it will be quoted.
@thd thread handler
SYNOPSIS @packet target string
append_identifier() @name the identifier to be appended
thd thread handler @length length of the appending identifier
packet target string @param from_cs Charset information about the input string
name the identifier to be appended @param to_cs Charset information about the target string
name_length length of the appending identifier
*/ */
void void
append_identifier(THD *thd, String *packet, const char *name, uint length) append_identifier(THD *thd, String *packet, const char *name, uint length,
CHARSET_INFO *from_cs, CHARSET_INFO *to_cs)
{ {
const char *name_end; const char *name_end;
char quote_char; char quote_char;
int q; int q;
q= thd ? get_quote_char_for_identifier(thd, name, length) : '`';
CHARSET_INFO *cs_info= system_charset_info;
const char *to_name= name;
size_t to_length= length;
String to_string(name,length, from_cs);
if (from_cs != NULL && to_cs != NULL && from_cs != to_cs)
thd->convert_string(&to_string, from_cs, to_cs);
if (to_cs != NULL)
{
to_name= to_string.c_ptr();
to_length= to_string.length();
cs_info= to_cs;
}
q= thd ? get_quote_char_for_identifier(thd, to_name, to_length) : '`';
if (q == EOF) if (q == EOF)
{ {
packet->append(name, length, packet->charset()); packet->append(to_name, to_length, packet->charset());
return; return;
} }
...@@ -980,14 +995,14 @@ append_identifier(THD *thd, String *packet, const char *name, uint length) ...@@ -980,14 +995,14 @@ append_identifier(THD *thd, String *packet, const char *name, uint length)
it's a keyword it's a keyword
*/ */
(void) packet->reserve(length*2 + 2); (void) packet->reserve(to_length*2 + 2);
quote_char= (char) q; quote_char= (char) q;
packet->append(&quote_char, 1, system_charset_info); packet->append(&quote_char, 1, system_charset_info);
for (name_end= name+length ; name < name_end ; name+= length) for (name_end= to_name+to_length ; to_name < name_end ; to_name+= to_length)
{ {
uchar chr= (uchar) *name; uchar chr= (uchar) *to_name;
length= my_mbcharlen(system_charset_info, chr); to_length= my_mbcharlen(cs_info, chr);
/* /*
my_mbcharlen can return 0 on a wrong multibyte my_mbcharlen can return 0 on a wrong multibyte
sequence. It is possible when upgrading from 4.0, sequence. It is possible when upgrading from 4.0,
...@@ -995,11 +1010,11 @@ append_identifier(THD *thd, String *packet, const char *name, uint length) ...@@ -995,11 +1010,11 @@ append_identifier(THD *thd, String *packet, const char *name, uint length)
The manual says it does not work. So we'll just The manual says it does not work. So we'll just
change length to 1 not to hang in the endless loop. change length to 1 not to hang in the endless loop.
*/ */
if (!length) if (!to_length)
length= 1; to_length= 1;
if (length == 1 && chr == (uchar) quote_char) if (to_length == 1 && chr == (uchar) quote_char)
packet->append(&quote_char, 1, system_charset_info); packet->append(&quote_char, 1, system_charset_info);
packet->append(name, length, system_charset_info); packet->append(to_name, to_length, system_charset_info);
} }
packet->append(&quote_char, 1, system_charset_info); packet->append(&quote_char, 1, system_charset_info);
} }
......
...@@ -92,8 +92,14 @@ int view_store_create_info(THD *thd, TABLE_LIST *table, String *buff); ...@@ -92,8 +92,14 @@ int view_store_create_info(THD *thd, TABLE_LIST *table, String *buff);
int copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table); int copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table);
int get_quote_char_for_identifier(THD *thd, const char *name, uint length); int get_quote_char_for_identifier(THD *thd, const char *name, uint length);
void append_identifier(THD *thd, String *packet, const char *name, void append_identifier(THD *thd, String *packet, const char *name, uint length,
uint length); CHARSET_INFO *from_cs, CHARSET_INFO *to_cs);
inline void append_identifier(THD *thd, String *packet, const char *name,
uint length)
{
append_identifier(thd, packet, name, length, NULL, NULL);
}
void mysqld_list_fields(THD *thd,TABLE_LIST *table, const char *wild); void mysqld_list_fields(THD *thd,TABLE_LIST *table, const char *wild);
int mysqld_dump_create_info(THD *thd, TABLE_LIST *table_list, int fd); int mysqld_dump_create_info(THD *thd, TABLE_LIST *table_list, int fd);
bool mysqld_show_create(THD *thd, TABLE_LIST *table_list); bool mysqld_show_create(THD *thd, TABLE_LIST *table_list);
......
...@@ -2138,11 +2138,13 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists, ...@@ -2138,11 +2138,13 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
if (thd->db == NULL || strcmp(db,thd->db) != 0 if (thd->db == NULL || strcmp(db,thd->db) != 0
|| is_drop_tmp_if_exists_added ) || is_drop_tmp_if_exists_added )
{ {
append_identifier(thd, built_ptr_query, db, db_len); append_identifier(thd, built_ptr_query, db, db_len,
system_charset_info, thd->charset());
built_ptr_query->append("."); built_ptr_query->append(".");
} }
append_identifier(thd, built_ptr_query, table->table_name, append_identifier(thd, built_ptr_query, table->table_name,
strlen(table->table_name)); strlen(table->table_name), system_charset_info,
thd->charset());
built_ptr_query->append(","); built_ptr_query->append(",");
} }
/* /*
...@@ -2204,12 +2206,14 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists, ...@@ -2204,12 +2206,14 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
*/ */
if (thd->db == NULL || strcmp(db,thd->db) != 0) if (thd->db == NULL || strcmp(db,thd->db) != 0)
{ {
append_identifier(thd, &built_query, db, db_len); append_identifier(thd, &built_query, db, db_len,
system_charset_info, thd->charset());
built_query.append("."); built_query.append(".");
} }
append_identifier(thd, &built_query, table->table_name, append_identifier(thd, &built_query, table->table_name,
strlen(table->table_name)); strlen(table->table_name), system_charset_info,
thd->charset());
built_query.append(","); built_query.append(",");
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment