Commit e5a323e1 authored by Sergei Golubchik's avatar Sergei Golubchik

single table discovery: handlerton::discover_table() method.

fixes for need_full_discover_for_existence mode
parent f532653c
......@@ -29,7 +29,9 @@ ALTER TABLE t1 ANALYZE PARTITION ALL;
ALTER TABLE t1 REBUILD PARTITION ALL;
ALTER TABLE t1 ENGINE Memory;
ALTER TABLE t1 ADD (new INT);
--disable_warnings
DROP TABLE t1;
--enable_warnings
--error ER_OPTION_PREVENTS_STATEMENT
CREATE TABLE t1 (
......
......@@ -429,7 +429,9 @@ system echo "this is a junk file for test" >> $MYSQLD_DATADIR/test/t1.frm ;
SHOW TABLE STATUS like 't1';
--error ER_NOT_FORM_FILE
show create table t1;
--disable_warnings
drop table if exists t1;
--enable_warnings
--error 1,0
--remove_file $MYSQLD_DATADIR/test/t1.frm
......
......@@ -491,11 +491,11 @@ int ha_initialize_handlerton(st_plugin_int *plugin)
// if the enfine can discover a single table and it is file-based
// then it can use a default file-based table names discovery
if (!hton->discover_table_names &&
hton->discover && hton->tablefile_extensions[0])
hton->discover_table && hton->tablefile_extensions[0])
hton->discover_table_names= hton_ext_based_table_discovery;
// default discover_table_existence implementation
if (!hton->discover_table_existence && hton->discover)
if (!hton->discover_table_existence && hton->discover_table)
{
if (hton->tablefile_extensions[0])
hton->discover_table_existence= ext_based_existence;
......@@ -4279,56 +4279,44 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
}
/**
Try to discover one table from handler(s).
@retval
-1 Table did not exists
@retval
0 OK. In this case *frmblob and *frmlen are set
@retval
>0 error. frmblob and frmlen may not be set
*/
struct st_discover_args
{
const char *db;
const char *name;
uchar **frmblob;
size_t *frmlen;
};
static my_bool discover_handlerton(THD *thd, plugin_ref plugin,
void *arg)
{
st_discover_args *vargs= (st_discover_args *)arg;
TABLE_SHARE *share= (TABLE_SHARE *)arg;
handlerton *hton= plugin_data(plugin, handlerton *);
if (hton->state == SHOW_OPTION_YES && hton->discover &&
(!(hton->discover(hton, thd, vargs->db, vargs->name,
vargs->frmblob,
vargs->frmlen))))
return TRUE;
if (hton->state == SHOW_OPTION_YES && hton->discover_table)
{
int error= hton->discover_table(hton, thd, share);
if (error != HA_ERR_NO_SUCH_TABLE)
{
if (error)
{
DBUG_ASSERT(share->error); // MUST be always set for get_cached_table_share to work
my_error(ER_GET_ERRNO, MYF(0), error);
}
else
share->error= OPEN_FRM_OK;
return FALSE;
status_var_increment(thd->status_var.ha_discover_count);
return TRUE; // abort the search
}
}
DBUG_ASSERT(share->error == OPEN_FRM_OPEN_ERROR);
return FALSE; // continue with the next engine
}
int ha_discover(THD *thd, const char *db, const char *name,
uchar **frmblob, size_t *frmlen)
int ha_discover_table(THD *thd, TABLE_SHARE *share)
{
int error= -1; // Table does not exist in any handler
DBUG_ENTER("ha_discover");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
st_discover_args args= {db, name, frmblob, frmlen};
DBUG_ENTER("ha_discover_table");
if (is_prefix(name,tmp_file_prefix)) /* skip temporary tables */
DBUG_RETURN(error);
DBUG_ASSERT(share->error == OPEN_FRM_OPEN_ERROR); // share is not OK yet
if (plugin_foreach(thd, discover_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &args))
error= 0;
if (!plugin_foreach(thd, discover_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, share))
open_table_error(share, OPEN_FRM_OPEN_ERROR, ENOENT); // not found
if (!error)
status_var_increment(thd->status_var.ha_discover_count);
DBUG_RETURN(error);
DBUG_RETURN(share->error != OPEN_FRM_OK);
}
/**
......@@ -4363,6 +4351,45 @@ static my_bool discover_existence(THD *thd, plugin_ref plugin,
return ht->discover_table_existence(ht, args->db, args->table_name);
}
class Table_exists_error_handler : public Internal_error_handler
{
public:
Table_exists_error_handler()
: m_handled_errors(0), m_unhandled_errors(0)
{}
bool handle_condition(THD *thd,
uint sql_errno,
const char* sqlstate,
MYSQL_ERROR::enum_warning_level level,
const char* msg,
MYSQL_ERROR ** cond_hdl)
{
*cond_hdl= NULL;
if (sql_errno == ER_NO_SUCH_TABLE ||
sql_errno == ER_NO_SUCH_TABLE_IN_ENGINE ||
sql_errno == ER_WRONG_OBJECT ||
sql_errno == ER_OPTION_PREVENTS_STATEMENT) // partition_disabled.test
{
m_handled_errors++;
return TRUE;
}
if (level == MYSQL_ERROR::WARN_LEVEL_ERROR)
m_unhandled_errors++;
return FALSE;
}
bool safely_trapped_errors()
{
return ((m_handled_errors > 0) && (m_unhandled_errors == 0));
}
private:
int m_handled_errors;
int m_unhandled_errors;
};
bool ha_table_exists(THD *thd, const char *db, const char *table_name)
{
DBUG_ENTER("ha_discover_table_existence");
......@@ -4371,10 +4398,13 @@ bool ha_table_exists(THD *thd, const char *db, const char *table_name)
{
TABLE_LIST table;
DBUG_ASSERT(0);
TABLE_SHARE *share= get_table_share(thd, db, table_name,
GTS_TABLE | GTS_NOLOCK);
DBUG_RETURN(share != 0);
Table_exists_error_handler no_such_table_handler;
thd->push_internal_handler(&no_such_table_handler);
get_table_share(thd, db, table_name, GTS_TABLE | GTS_VIEW | GTS_NOLOCK);
thd->pop_internal_handler();
// the table doesn't exist if we've caught ER_NO_SUCH_TABLE and nothing else
DBUG_RETURN(!no_such_table_handler.safely_trapped_errors());
}
mysql_mutex_lock(&LOCK_open);
......
......@@ -1075,10 +1075,6 @@ struct handlerton
enum handler_create_iterator_result
(*create_iterator)(handlerton *hton, enum handler_iterator_type type,
struct handler_iterator *fill_this_in);
int (*discover)(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
size_t *frmlen);
/*
Optional clauses in the CREATE/ALTER TABLE
*/
......@@ -1110,6 +1106,20 @@ struct handlerton
engine, without user issuing an explicit CREATE TABLE statement.
**********************************************************************/
/*
This method is required for any engine that supports automatic table
discovery, there is no default implementation.
Given a TABLE_SHARE discover_table() fills it in with a correct table
structure using one of the TABLE_SHARE::init_from_* methods.
Returns HA_ERR_NO_SUCH_TABLE if the table did not exist in the engine,
zero if the table was discovered successfully, or any other
HA_ERR_* error code as appropriate if the table existed, but the
discovery failed.
*/
int (*discover_table)(handlerton *hton, THD* thd, TABLE_SHARE *share);
/*
The discover_table_names method tells the server
about all tables in the specified database that the engine
......@@ -1157,6 +1167,7 @@ struct handlerton
*/
int (*discover_table_existence)(handlerton *hton, const char *db,
const char *table_name);
};
......@@ -3088,8 +3099,7 @@ int ha_delete_table(THD *thd, handlerton *db_type, const char *path,
bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat);
/* discovery */
int ha_discover(THD* thd, const char* dbname, const char* name,
uchar** frmblob, size_t* frmlen);
int ha_discover_table(THD *thd, TABLE_SHARE *share);
int ha_discover_table_names(THD *thd, LEX_STRING *db, MY_DIR *dirp,
handlerton::discovered_list *result);
bool ha_table_exists(THD *thd, const char *db, const char *table_name);
......
......@@ -585,24 +585,15 @@ TABLE_SHARE *get_table_share(THD *thd, const char *db, const char *table_name,
char *key, uint key_length, uint flags,
my_hash_value_type hash_value)
{
bool open_failed;
TABLE_SHARE *share;
DBUG_ENTER("get_table_share");
DBUG_ASSERT(!(flags & GTS_FORCE_DISCOVERY)); // FIXME not implemented
mysql_mutex_lock(&LOCK_open);
/*
To be able perform any operation on table we should own
some kind of metadata lock on it.
*/
DBUG_ASSERT(thd->mdl_context.is_lock_owner(MDL_key::TABLE, db, table_name,
MDL_SHARED));
/* Read table definition from cache */
share= (TABLE_SHARE*) my_hash_search_using_hash_value(&table_def_cache,
hash_value, (uchar*) key, key_length);
if (!share)
{
if (!(share= alloc_table_share(db, table_name, key, key_length)))
......@@ -633,12 +624,15 @@ TABLE_SHARE *get_table_share(THD *thd, const char *db, const char *table_name,
mysql_mutex_lock(&share->LOCK_ha_data);
mysql_mutex_unlock(&LOCK_open);
open_failed= open_table_def(thd, share, flags);
if (flags & GTS_FORCE_DISCOVERY)
ha_discover_table(thd, share); // don't read the frm at all
else
open_table_def(thd, share, flags | GTS_FORCE_DISCOVERY); // frm or discover
mysql_mutex_unlock(&share->LOCK_ha_data);
mysql_mutex_lock(&LOCK_open);
if (open_failed)
if (share->error)
{
share->ref_count--;
(void) my_hash_delete(&table_def_cache, (uchar*) share);
......@@ -650,6 +644,9 @@ TABLE_SHARE *get_table_share(THD *thd, const char *db, const char *table_name,
goto end;
}
/* cannot force discovery of a cached share */
DBUG_ASSERT(!(flags & GTS_FORCE_DISCOVERY));
/* make sure that open_table_def() for this share is not running */
mysql_mutex_lock(&share->LOCK_ha_data);
mysql_mutex_unlock(&share->LOCK_ha_data);
......@@ -706,7 +703,7 @@ err:
end:
if (flags & GTS_NOLOCK)
{
share->ref_count--;
release_table_share(share);
/*
if GTS_NOLOCK is requested, the returned share pointer cannot be used,
the share it points to may go away any moment.
......@@ -716,6 +713,15 @@ end:
*/
share= (TABLE_SHARE*) 1;
}
else
{
/*
To be able perform any operation on table we should own
some kind of metadata lock on it.
*/
DBUG_ASSERT(thd->mdl_context.is_lock_owner(MDL_key::TABLE, db, table_name,
MDL_SHARED));
}
mysql_mutex_unlock(&LOCK_open);
DBUG_RETURN(share);
......
......@@ -832,11 +832,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
mysql_ha_rm_tables(thd, tables);
for (table= tables; table; table= table->next_local)
{
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, table->db, table->table_name,
false);
deleted_tables++;
}
thd->push_internal_handler(&err_handler);
if (!thd->killed &&
......
......@@ -1928,10 +1928,6 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
if (lock_table_names(thd, tables, NULL, thd->variables.lock_wait_timeout,
MYSQL_OPEN_SKIP_TEMPORARY))
DBUG_RETURN(true);
for (table= tables; table; table= table->next_local)
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, table->db, table->table_name,
false);
}
else
{
......@@ -2222,23 +2218,9 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
{
non_temp_tables_count++;
if (thd->locked_tables_mode)
{
if (wait_while_table_is_used(thd, table->table, HA_EXTRA_NOT_USED,
TDC_RT_REMOVE_NOT_OWN_AND_MARK_NOT_USABLE))
{
error= -1;
goto err;
}
close_all_tables_for_name(thd, table->table->s,
HA_EXTRA_PREPARE_FOR_DROP);
table->table= 0;
}
/* Check that we have an exclusive lock on the table to be dropped. */
DBUG_ASSERT(thd->mdl_context.is_lock_owner(MDL_key::TABLE, table->db,
table->table_name,
MDL_EXCLUSIVE));
MDL_SHARED));
alias= (lower_case_table_names == 2) ? table->alias : table->table_name;
/* remove .frm file and engine files */
......@@ -2307,6 +2289,28 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
else
{
char *end;
if (thd->locked_tables_mode)
{
if (wait_while_table_is_used(thd, table->table, HA_EXTRA_NOT_USED,
TDC_RT_REMOVE_NOT_OWN_AND_MARK_NOT_USABLE))
{
error= -1;
goto err;
}
close_all_tables_for_name(thd, table->table->s,
HA_EXTRA_PREPARE_FOR_DROP);
table->table= 0;
}
else
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, table->db, table->table_name,
false);
/* Check that we have an exclusive lock on the table to be dropped. */
DBUG_ASSERT(thd->mdl_context.is_lock_owner(MDL_key::TABLE, table->db,
table->table_name,
MDL_EXCLUSIVE));
/*
Cannot use the db_type from the table, since that might have changed
while waiting for the exclusive name lock.
......@@ -2372,6 +2376,7 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
err:
if (wrong_tables.length())
{
thd->clear_error();
if (!foreign_key_error)
my_printf_error(ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), MYF(0),
wrong_tables.c_ptr_safe());
......
......@@ -316,6 +316,7 @@ TABLE_SHARE *alloc_table_share(const char *db, const char *table_name,
share->normalized_path.length= path_length;
share->table_category= get_table_category(& share->db, & share->table_name);
share->set_refresh_version();
share->open_errno= ENOENT;
/*
Since alloc_table_share() can be called without any locking (for
......@@ -570,24 +571,21 @@ inline bool is_system_table_name(const char *name, uint length)
}
/**
Check if a string contains path elements
*/
/*
We don't try to open 5.0 unencoded name, if
- non-encoded name contains '@' signs,
because '@' can be misinterpreted.
It is not clear if '@' is escape character in 5.1,
or a normal character in 5.0.
- non-encoded db or table name contain "#mysql50#" prefix.
This kind of tables must have been opened only by the
mysql_file_open() above.
*/
static bool has_disabled_path_chars(const char *str)
{
for (; *str; str++)
{
switch (*str) {
case FN_EXTCHAR:
case '/':
case '\\':
case '~':
case '@':
return TRUE;
}
}
return FALSE;
return strpbrk(str, "/\\~@.") != 0 ||
strncmp(str, STRING_WITH_LEN(MYSQL50_TABLE_NAME_PREFIX)) == 0;
}
......@@ -626,25 +624,9 @@ enum open_frm_error open_table_def(THD *thd, TABLE_SHARE *share, uint flags)
if ((file= mysql_file_open(key_file_frm,
path, O_RDONLY | O_SHARE, MYF(0))) < 0)
{
/*
We don't try to open 5.0 unencoded name, if
- non-encoded name contains '@' signs,
because '@' can be misinterpreted.
It is not clear if '@' is escape character in 5.1,
or a normal character in 5.0.
- non-encoded db or table name contain "#mysql50#" prefix.
This kind of tables must have been opened only by the
mysql_file_open() above.
*/
if (has_disabled_path_chars(share->table_name.str) ||
has_disabled_path_chars(share->db.str) ||
!strncmp(share->db.str, MYSQL50_TABLE_NAME_PREFIX,
MYSQL50_TABLE_NAME_PREFIX_LENGTH) ||
!strncmp(share->table_name.str, MYSQL50_TABLE_NAME_PREFIX,
MYSQL50_TABLE_NAME_PREFIX_LENGTH))
goto err_not_open;
if (!has_disabled_path_chars(share->table_name.str) &&
!has_disabled_path_chars(share->db.str))
{
/* Try unencoded 5.0 name */
uint length;
strxnmov(path, sizeof(path)-1,
......@@ -661,16 +643,27 @@ enum open_frm_error open_table_def(THD *thd, TABLE_SHARE *share, uint flags)
then table name does not have tricky characters,
so no need to check the old file name.
*/
if (length == share->normalized_path.length ||
((file= mysql_file_open(key_file_frm,
path, O_RDONLY | O_SHARE, MYF(0))) < 0))
goto err_not_open;
if (length != share->normalized_path.length &&
(file= mysql_file_open(key_file_frm,
path, O_RDONLY | O_SHARE, MYF(0))) >= 0)
{
/* Unencoded 5.0 table name found */
path[length]= '\0'; // Remove .frm extension
strmov(share->normalized_path.str, path);
share->normalized_path.length= length;
}
}
/* still no luck? try to discover the table */
if (file < 0)
{
if (flags & GTS_TABLE && flags & GTS_FORCE_DISCOVERY)
{
ha_discover_table(thd, share);
error_given= true;
}
goto err_not_open;
}
}
if (mysql_file_read(file, head, sizeof(head), MYF(MY_NABP)))
{
......
......@@ -2450,8 +2450,8 @@ static inline void dbug_tmp_restore_column_maps(MY_BITMAP *read_set,
enum get_table_share_flags {
GTS_TABLE = 1,
GTS_VIEW = 2,
GTS_NOLOCK = 4, // don't increase share->ref_count
GTS_FORCE_DISCOVERY = 8 // don't use the .frm file
GTS_NOLOCK = 4,
GTS_FORCE_DISCOVERY = 8
};
size_t max_row_length(TABLE *table, const uchar *data);
......
......@@ -855,7 +855,7 @@ int azclose (azio_stream *s)
Though this was added to support MySQL's FRM file, anything can be
stored in this location.
*/
int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
int azwrite_frm(azio_stream *s, uchar *blob, unsigned int length)
{
if (s->mode == 'r')
return 1;
......@@ -867,7 +867,7 @@ int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
s->frm_length= length;
s->start+= length;
if (my_pwrite(s->file, (uchar*) blob, s->frm_length,
if (my_pwrite(s->file, blob, s->frm_length,
s->frm_start_pos, MYF(MY_NABP)) ||
write_header(s) ||
(my_seek(s->file, 0, MY_SEEK_END, MYF(0)) == MY_FILEPOS_ERROR))
......@@ -876,9 +876,9 @@ int azwrite_frm(azio_stream *s, char *blob, unsigned int length)
return 0;
}
int azread_frm(azio_stream *s, char *blob)
int azread_frm(azio_stream *s, uchar *blob)
{
return my_pread(s->file, (uchar*) blob, s->frm_length,
return my_pread(s->file, blob, s->frm_length,
s->frm_start_pos, MYF(MY_NABP)) ? 1 : 0;
}
......
......@@ -331,8 +331,8 @@ extern int azclose(azio_stream *file);
error number (see function gzerror below).
*/
extern int azwrite_frm (azio_stream *s, char *blob, unsigned int length);
extern int azread_frm (azio_stream *s, char *blob);
extern int azwrite_frm (azio_stream *s, uchar *blob, unsigned int length);
extern int azread_frm (azio_stream *s, uchar *blob);
extern int azwrite_comment (azio_stream *s, char *blob, unsigned int length);
extern int azread_comment (azio_stream *s, char *blob);
......
......@@ -26,6 +26,7 @@
#include <myisam.h> // T_EXTEND
#include "ha_archive.h"
#include "discover.h"
#include <my_dir.h>
#include <mysql/plugin.h>
......@@ -120,10 +121,7 @@ extern "C" PSI_file_key arch_key_file_data;
static handler *archive_create_handler(handlerton *hton,
TABLE_SHARE *table,
MEM_ROOT *mem_root);
int archive_discover(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
size_t *frmlen);
int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share);
/*
Number of rows that will force a bulk insert.
......@@ -220,7 +218,7 @@ int archive_db_init(void *p)
archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
archive_hton->create= archive_create_handler;
archive_hton->flags= HTON_NO_FLAGS;
archive_hton->discover= archive_discover;
archive_hton->discover_table= archive_discover;
archive_hton->tablefile_extensions= ha_archive_exts;
if (mysql_mutex_init(az_key_mutex_archive_mutex,
......@@ -270,19 +268,17 @@ ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
archive_reader_open= FALSE;
}
int archive_discover(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
size_t *frmlen)
int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
{
DBUG_ENTER("archive_discover");
DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
DBUG_PRINT("archive_discover", ("db: '%s' name: '%s'", share->db.str,
share->table_name.str));
azio_stream frm_stream;
char az_file[FN_REFLEN];
char *frm_ptr;
uchar *frm_ptr;
MY_STAT file_stat;
build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
strxmov(az_file, share->normalized_path.str, ARZ, NullS);
if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0))))
goto err;
......@@ -295,19 +291,23 @@ int archive_discover(handlerton *hton, THD* thd, const char *db,
}
if (frm_stream.frm_length == 0)
goto err;
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
frm_ptr= (uchar *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
azread_frm(&frm_stream, frm_ptr);
azclose(&frm_stream);
*frmlen= frm_stream.frm_length;
*frmblob= (uchar*) frm_ptr;
// don't go through the discovery again
if (writefrm(share->normalized_path.str, frm_ptr, frm_stream.frm_length))
DBUG_RETURN(my_errno);
share->init_from_binary_frm_image(thd, frm_ptr);
my_free(frm_ptr);
DBUG_RETURN(0);
err:
my_errno= 0;
DBUG_RETURN(1);
DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
}
/*
......@@ -650,9 +650,9 @@ int ha_archive::close(void)
int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
{
int rc= 0;
char *frm_ptr;
uchar *frm_ptr;
if (!(frm_ptr= (char *) my_malloc(src->frm_length, MYF(0))))
if (!(frm_ptr= (uchar *) my_malloc(src->frm_length, MYF(0))))
return HA_ERR_OUT_OF_MEM;
/* Write file offset is set to the end of the file. */
......@@ -758,7 +758,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if (frm_ptr)
{
mysql_file_read(frm_file, frm_ptr, (size_t)file_stat.st_size, MYF(0));
azwrite_frm(&create_stream, (char *)frm_ptr, (size_t)file_stat.st_size);
azwrite_frm(&create_stream, frm_ptr, (size_t)file_stat.st_size);
my_free(frm_ptr);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment