Bug #17415 special character tables are not handled correctly in ndb binlog/schema dist

parent cd8f8449
...@@ -477,8 +477,7 @@ ha_ndbcluster::invalidate_dictionary_cache(TABLE_SHARE *share, Ndb *ndb, ...@@ -477,8 +477,7 @@ ha_ndbcluster::invalidate_dictionary_cache(TABLE_SHARE *share, Ndb *ndb,
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
char key[FN_REFLEN]; char key[FN_REFLEN];
strxnmov(key, FN_LEN-1, mysql_data_home, "/", build_table_filename(key, sizeof(key), dbname, tabname, "");
dbname, "/", tabname, NullS);
DBUG_PRINT("info", ("Getting ndbcluster mutex")); DBUG_PRINT("info", ("Getting ndbcluster mutex"));
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
NDB_SHARE *ndb_share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, NDB_SHARE *ndb_share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
...@@ -4191,16 +4190,14 @@ int ha_ndbcluster::create(const char *name, ...@@ -4191,16 +4190,14 @@ int ha_ndbcluster::create(const char *name,
NDBCOL col; NDBCOL col;
uint pack_length, length, i, pk_length= 0; uint pack_length, length, i, pk_length= 0;
const void *data, *pack_data; const void *data, *pack_data;
char name2[FN_HEADLEN];
bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE); bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
DBUG_ENTER("ha_ndbcluster::create"); DBUG_ENTER("ha_ndbcluster::create");
DBUG_PRINT("enter", ("name: %s", name)); DBUG_PRINT("enter", ("name: %s", name));
strcpy(name2, name); DBUG_ASSERT(*fn_rext((char*)name) == 0);
DBUG_ASSERT(*fn_rext((char*)name2) == 0); set_dbname(name);
set_dbname(name2); set_tabname(name);
set_tabname(name2);
table= form; table= form;
if (create_from_engine) if (create_from_engine)
...@@ -4213,7 +4210,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -4213,7 +4210,7 @@ int ha_ndbcluster::create(const char *name,
if ((my_errno= write_ndb_file(name))) if ((my_errno= write_ndb_file(name)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2), ndbcluster_create_binlog_setup(get_ndb(), name, strlen(name),
m_dbname, m_tabname, FALSE); m_dbname, m_tabname, FALSE);
#endif /* HAVE_NDB_BINLOG */ #endif /* HAVE_NDB_BINLOG */
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
...@@ -4361,18 +4358,18 @@ int ha_ndbcluster::create(const char *name, ...@@ -4361,18 +4358,18 @@ int ha_ndbcluster::create(const char *name,
First make sure we get a "fresh" share here, not an old trailing one... First make sure we get a "fresh" share here, not an old trailing one...
*/ */
{ {
const char *key= name2; uint length= (uint) strlen(name);
uint length= (uint) strlen(key);
if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) key, length))) (byte*) name, length)))
handle_trailing_share(share); handle_trailing_share(share);
} }
/* /*
get a new share get a new share
*/ */
if (!(share= get_share(name2, form, true, true)))
if (!(share= get_share(name, form, true, true)))
{ {
sql_print_error("NDB: allocating table share for %s failed", name2); sql_print_error("NDB: allocating table share for %s failed", name);
/* my_errno is set */ /* my_errno is set */
} }
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
...@@ -4413,7 +4410,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -4413,7 +4410,7 @@ int ha_ndbcluster::create(const char *name,
ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0) ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
{ {
sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations." sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
" Event: %s", name2); " Event: %s", name);
/* a warning has been issued to the client */ /* a warning has been issued to the client */
} }
if (share && !do_event_op) if (share && !do_event_op)
...@@ -5285,7 +5282,7 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, ...@@ -5285,7 +5282,7 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
NDBDICT* dict= ndb->getDictionary(); NDBDICT* dict= ndb->getDictionary();
dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
dict->invalidateTable(name); dict->invalidateTable(name);
strxnmov(key, FN_LEN-1, mysql_data_home, "/", db, "/", name, NullS); build_table_filename(key, sizeof(key), db, name, "");
NDB_SHARE *share= get_share(key, 0, false); NDB_SHARE *share= get_share(key, 0, false);
if (share && get_ndb_share_state(share) == NSS_ALTERED) if (share && get_ndb_share_state(share) == NSS_ALTERED)
{ {
...@@ -5419,13 +5416,14 @@ int ndbcluster_drop_database_impl(const char *path) ...@@ -5419,13 +5416,14 @@ int ndbcluster_drop_database_impl(const char *path)
} }
// Drop any tables belonging to database // Drop any tables belonging to database
char full_path[FN_REFLEN]; char full_path[FN_REFLEN];
char *tmp= strxnmov(full_path, FN_REFLEN-1, share_prefix, dbname, "/", char *tmp= full_path +
NullS); build_table_filename(full_path, sizeof(full_path), dbname, "", "");
ndb->setDatabaseName(dbname); ndb->setDatabaseName(dbname);
List_iterator_fast<char> it(drop_list); List_iterator_fast<char> it(drop_list);
while ((tabname=it++)) while ((tabname=it++))
{ {
strxnmov(tmp, FN_REFLEN - (tmp - full_path)-1, tabname, NullS); tablename_to_filename(tabname, tmp, FN_REFLEN - (tmp - full_path)-1);
if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname)) if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
{ {
const NdbError err= dict->getNdbError(); const NdbError err= dict->getNdbError();
...@@ -5518,14 +5516,16 @@ int ndbcluster_find_all_files(THD *thd) ...@@ -5518,14 +5516,16 @@ int ndbcluster_find_all_files(THD *thd)
continue; continue;
/* check if database exists */ /* check if database exists */
char *end= strxnmov(key, FN_LEN-1, mysql_data_home, "/", char *end= key +
elmt.database, NullS); build_table_filename(key, sizeof(key), elmt.database, "", "");
if (my_access(key, F_OK)) if (my_access(key, F_OK))
{ {
/* no such database defined, skip table */ /* no such database defined, skip table */
continue; continue;
} }
end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS); /* finalize construction of path */
end+= tablename_to_filename(elmt.name, end,
sizeof(key)-(end-key));
const void *data= 0, *pack_data= 0; const void *data= 0, *pack_data= 0;
uint length, pack_length; uint length, pack_length;
int discover= 0; int discover= 0;
...@@ -5660,10 +5660,9 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -5660,10 +5660,9 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
} }
// File is not in NDB, check for .ndb file with this name // File is not in NDB, check for .ndb file with this name
(void)strxnmov(name, FN_REFLEN-1, build_table_filename(name, sizeof(name), db, file_name, ha_ndb_ext);
mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
DBUG_PRINT("info", ("Check access for %s", name)); DBUG_PRINT("info", ("Check access for %s", name));
if (access(name, F_OK)) if (my_access(name, F_OK))
{ {
DBUG_PRINT("info", ("%s did not exist on disk", name)); DBUG_PRINT("info", ("%s did not exist on disk", name));
// .ndb file did not exist on disk, another table type // .ndb file did not exist on disk, another table type
...@@ -5685,12 +5684,13 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -5685,12 +5684,13 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* setup logging to binlog for all discovered tables */ /* setup logging to binlog for all discovered tables */
{ {
char *end, *end1= char *end, *end1= name +
strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS); build_table_filename(name, sizeof(name), db, "", "");
for (i= 0; i < ok_tables.records; i++) for (i= 0; i < ok_tables.records; i++)
{ {
file_name= (char*)hash_element(&ok_tables, i); file_name= (char*)hash_element(&ok_tables, i);
end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS); end= end1 +
tablename_to_filename(file_name, end1, sizeof(name) - (end1 - name));
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
ndbcluster_create_binlog_setup(ndb, name, end-name, ndbcluster_create_binlog_setup(ndb, name, end-name,
db, file_name, TRUE); db, file_name, TRUE);
...@@ -5707,9 +5707,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, ...@@ -5707,9 +5707,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
file_name= hash_element(&ndb_tables, i); file_name= hash_element(&ndb_tables, i);
if (!hash_search(&ok_tables, file_name, strlen(file_name))) if (!hash_search(&ok_tables, file_name, strlen(file_name)))
{ {
strxnmov(name, sizeof(name)-1, build_table_filename(name, sizeof(name), db, file_name, reg_ext);
mysql_data_home, "/", db, "/", file_name, reg_ext, NullS); if (my_access(name, F_OK))
if (access(name, F_OK))
{ {
DBUG_PRINT("info", ("%s must be discovered", file_name)); DBUG_PRINT("info", ("%s must be discovered", file_name));
// File is in list of ndb tables and not in ok_tables // File is in list of ndb tables and not in ok_tables
...@@ -6243,7 +6242,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, ...@@ -6243,7 +6242,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
NDB_SHARE *share; NDB_SHARE *share;
DBUG_ENTER("ndb_get_commitcount"); DBUG_ENTER("ndb_get_commitcount");
(void)strxnmov(name, FN_REFLEN-1, share_prefix, dbname, "/", tabname, NullS); build_table_filename(name, sizeof(name), dbname, tabname, "");
DBUG_PRINT("enter", ("name: %s", name)); DBUG_PRINT("enter", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
......
...@@ -659,11 +659,8 @@ static int ndbcluster_create_apply_status_table(THD *thd) ...@@ -659,11 +659,8 @@ static int ndbcluster_create_apply_status_table(THD *thd)
if so, remove it since there is none in Ndb if so, remove it since there is none in Ndb
*/ */
{ {
strxnmov(buf, sizeof(buf), build_table_filename(buf, sizeof(buf),
mysql_data_home, NDB_REP_DB, NDB_APPLY_TABLE, reg_ext);
"/" NDB_REP_DB "/" NDB_APPLY_TABLE,
reg_ext, NullS);
unpack_filename(buf,buf);
my_delete(buf, MYF(0)); my_delete(buf, MYF(0));
} }
...@@ -711,11 +708,8 @@ static int ndbcluster_create_schema_table(THD *thd) ...@@ -711,11 +708,8 @@ static int ndbcluster_create_schema_table(THD *thd)
if so, remove it since there is none in Ndb if so, remove it since there is none in Ndb
*/ */
{ {
strxnmov(buf, sizeof(buf), build_table_filename(buf, sizeof(buf),
mysql_data_home, NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext);
"/" NDB_REP_DB "/" NDB_SCHEMA_TABLE,
reg_ext, NullS);
unpack_filename(buf,buf);
my_delete(buf, MYF(0)); my_delete(buf, MYF(0));
} }
...@@ -940,8 +934,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -940,8 +934,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
if (get_a_share) if (get_a_share)
{ {
char key[FN_REFLEN]; char key[FN_REFLEN];
(void)strxnmov(key, FN_REFLEN, share_prefix, db, build_table_filename(key, sizeof(key), db, table_name, "");
"/", table_name, NullS);
share= get_share(key, 0, false, false); share= get_share(key, 0, false, false);
} }
...@@ -1434,8 +1427,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1434,8 +1427,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
case SOT_CLEAR_SLOCK: case SOT_CLEAR_SLOCK:
{ {
char key[FN_REFLEN]; char key[FN_REFLEN];
(void)strxnmov(key, FN_REFLEN, share_prefix, schema->db, build_table_filename(key, sizeof(key),
"/", schema->name, NullS); schema->db, schema->name, "");
NDB_SHARE *share= get_share(key, 0, false, false); NDB_SHARE *share= get_share(key, 0, false, false);
if (share) if (share)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment