WL 2826: Error handling of ALTER TABLE for partitioning

Various bug fixes (mostly mixed to and from in replace methods)
parent 0a053c3d
......@@ -4733,8 +4733,9 @@ static void release_part_info_log_entries(DDL_LOG_MEMORY_ENTRY *log_entry)
write_log_replace_delete_frm()
lpt Struct for parameters
next_entry Next reference to use in log record
path Name to rename from
rename_flag TRUE if rename, else delete
from_path Name to rename from
to_path Name to rename to
replace_flag TRUE if replace, else delete
RETURN VALUES
TRUE Error
FALSE Success
......@@ -4759,7 +4760,7 @@ static bool write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
else
ddl_log_entry.action_type= DDL_LOG_DELETE_ACTION;
ddl_log_entry.next_entry= next_entry;
ddl_log_entry.handler_name[0]= 0;
ddl_log_entry.handler_name= reg_ext;
ddl_log_entry.name= to_path;
if (replace_flag)
ddl_log_entry.from_name= from_path;
......@@ -5072,7 +5073,7 @@ static bool write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
if (write_log_replace_delete_frm(lpt, 0UL, shadow_path, path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
......@@ -5129,8 +5130,8 @@ static bool write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
goto error;
if (write_log_replace_delete_frm(lpt, next_entry, (const char*)path,
(const char*)tmp_path, TRUE))
if (write_log_replace_delete_frm(lpt, next_entry, (const char*)tmp_path,
(const char*)path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
......@@ -5245,7 +5246,7 @@ static bool write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
goto error;
if (write_log_changed_partitions(lpt, &next_entry, (const char*)path))
goto error;
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
if (write_log_replace_delete_frm(lpt, 0UL, shadow_path, path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
......
......@@ -474,6 +474,7 @@ bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
{
char *file_entry_buf= (char*)&global_ddl_log.file_entry_buf;
uint inx;
uchar single_char;
DBUG_ENTER("read_ddl_log_entry");
if (read_ddl_log_file_entry(read_entry))
......@@ -481,10 +482,10 @@ bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
DBUG_RETURN(TRUE);
}
ddl_log_entry->entry_pos= read_entry;
ddl_log_entry->entry_type=
(enum ddl_log_entry_code)file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
ddl_log_entry->action_type=
(enum ddl_log_action_code)file_entry_buf[DDL_LOG_ACTION_TYPE_POS];
single_char= file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
ddl_log_entry->entry_type= (enum ddl_log_entry_code)single_char;
single_char= file_entry_buf[DDL_LOG_ACTION_TYPE_POS];
ddl_log_entry->action_type= (enum ddl_log_action_code)single_char;
ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS];
ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]);
ddl_log_entry->name= &file_entry_buf[DDL_LOG_NAME_POS];
......@@ -553,10 +554,10 @@ static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
{
bool frm_action= FALSE;
LEX_STRING handler_name;
handler *file;
handler *file= NULL;
MEM_ROOT mem_root;
bool error= TRUE;
char path[FN_REFLEN];
char to_path[FN_REFLEN];
char from_path[FN_REFLEN];
char *par_ext= (char*)".par";
handlerton *hton;
......@@ -569,7 +570,7 @@ static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
handler_name.str= (char*)ddl_log_entry->handler_name;
handler_name.length= strlen(ddl_log_entry->handler_name);
init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0);
if (ddl_log_entry->handler_name[0] == 0)
if (!strcmp(ddl_log_entry->handler_name, reg_ext))
frm_action= TRUE;
else
{
......@@ -598,24 +599,29 @@ static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
{
if (frm_action)
{
strxmov(path, ddl_log_entry->name, reg_ext, NullS);
if (my_delete(path, MYF(MY_WME)))
break;
strxmov(to_path, ddl_log_entry->name, reg_ext, NullS);
if ((error= my_delete(to_path, MYF(MY_WME))))
{
if (error != ENOENT)
break;
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(path, ddl_log_entry->name, par_ext, NullS);
VOID(my_delete(path, MYF(MY_WME)));
strxmov(to_path, ddl_log_entry->name, par_ext, NullS);
VOID(my_delete(to_path, MYF(MY_WME)));
#endif
}
else
{
if (file->delete_table(ddl_log_entry->name))
break;
if ((error= file->delete_table(ddl_log_entry->name)))
{
if (error != ENOENT && error != HA_ERR_NO_SUCH_TABLE)
break;
}
}
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
{
VOID(sync_ddl_log());
error= FALSE;
}
break;
VOID(sync_ddl_log());
error= FALSE;
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
break;
}
......@@ -631,27 +637,26 @@ static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
error= TRUE;
if (frm_action)
{
strxmov(path, ddl_log_entry->name, reg_ext, NullS);
strxmov(to_path, ddl_log_entry->name, reg_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, reg_ext, NullS);
if (my_rename(path, from_path, MYF(MY_WME)))
if (my_rename(from_path, to_path, MYF(MY_WME)))
break;
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(path, ddl_log_entry->name, par_ext, NullS);
strxmov(to_path, ddl_log_entry->name, par_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, par_ext, NullS);
VOID(my_rename(path, from_path, MYF(MY_WME)));
VOID(my_rename(from_path, to_path, MYF(MY_WME)));
#endif
}
else
{
if (file->rename_table(ddl_log_entry->name,
ddl_log_entry->from_name))
if (file->rename_table(ddl_log_entry->from_name,
ddl_log_entry->name))
break;
}
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
{
VOID(sync_ddl_log());
error= FALSE;
}
break;
VOID(sync_ddl_log());
error= FALSE;
break;
}
default:
......@@ -744,9 +749,10 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
{
DBUG_RETURN(TRUE);
}
global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_ENTRY_CODE;
global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]=
(char)DDL_LOG_ENTRY_CODE;
global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS]=
ddl_log_entry->action_type;
(char)ddl_log_entry->action_type;
global_ddl_log.file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NEXT_ENTRY_POS],
ddl_log_entry->next_entry);
......@@ -836,10 +842,10 @@ bool write_execute_ddl_log_entry(uint first_entry,
entry to indicate it is done.
*/
VOID(sync_ddl_log());
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_EXECUTE_CODE;
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= (char)DDL_LOG_EXECUTE_CODE;
}
else
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= (char)DDL_IGNORE_LOG_ENTRY_CODE;
file_entry_buf[DDL_LOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry);
......@@ -1063,7 +1069,7 @@ void execute_ddl_log_recovery()
thd->store_globals();
num_entries= read_ddl_log_header();
for (i= 0; i < num_entries; i++)
for (i= 1; i < num_entries + 1; i++)
{
if (read_ddl_log_entry(i, &ddl_log_entry))
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment