Commit fc241de3 authored by antony@ppcg5.local's avatar antony@ppcg5.local

Bug#25513

  "Federared Transactions Failure"
  Bug occurs when the user performs an operation which inserts more than 
  one row into the federated table and the federated table references a 
  remote table stored within a transactional storage engine. When the
  insert operation for any one row in the statement fails due to 
  constraint violation, the federated engine is unable to perform 
  statement rollback and so the remote table contains a partial commit. 
  The user would expect a statement to perform the same so a statement 
  rollback is expected.
  This bug was fixed by implementing  bulk-insert handling into the
  federated storage engine. This will relieve the bug for most common
  situations by enabling the generation of a multi-row insert into the
  remote table and thus permitting the remote table to perform 
  statement rollback when neccessary.
  The multi-row insert is limited to the maximum packet size between 
  servers and should the size overflow, more than one insert statement 
  will be sent and this bug will reappear. Multi-row insert is disabled
  when an "INSERT...ON DUPLICATE KEY UPDATE" is being performed.
  The bulk-insert handling will offer a significant performance boost 
  when inserting a large number of small rows.
This patch builds on Bug29019 and Bug25511
parent b0b0b0fb
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
DROP DATABASE IF EXISTS federated;
CREATE DATABASE federated;
create table federated.t1 (a int primary key, b varchar(64))
engine=myisam;
create table federated.t1 (a int primary key, b varchar(64))
engine=federated
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
1 Larry
2 Curly
truncate federated.t1;
alter table federated.t1 engine=innodb;
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
source include/federated.inc;
source include/have_innodb.inc;
#
# Bug#25513 Federated transaction failures
#
connection slave;
create table federated.t1 (a int primary key, b varchar(64))
engine=myisam;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b varchar(64))
engine=federated
connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';
--error ER_DUP_KEY
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
connection slave;
truncate federated.t1;
alter table federated.t1 engine=innodb;
connection master;
--error ER_DUP_KEY
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe");
select * from federated.t1;
drop table federated.t1;
connection slave;
drop table federated.t1;
source include/federated_cleanup.inc;
...@@ -352,6 +352,7 @@ static char ident_quote_char= '`'; // Character for quoting ...@@ -352,6 +352,7 @@ static char ident_quote_char= '`'; // Character for quoting
// identifiers // identifiers
static char value_quote_char= '\''; // Character for quoting static char value_quote_char= '\''; // Character for quoting
// literals // literals
static const int bulk_padding= 64; // bytes "overhead" in packet
/* Federated storage engine handlerton */ /* Federated storage engine handlerton */
...@@ -773,7 +774,9 @@ error: ...@@ -773,7 +774,9 @@ error:
ha_federated::ha_federated(TABLE *table_arg) ha_federated::ha_federated(TABLE *table_arg)
:handler(&federated_hton, table_arg), :handler(&federated_hton, table_arg),
mysql(0), stored_result(0) mysql(0), stored_result(0)
{} {
bzero(&bulk_insert, sizeof(bulk_insert));
}
/* /*
...@@ -1584,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table, ...@@ -1584,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
@brief Construct the INSERT statement.
@details This method will construct the INSERT statement and appends it to
the supplied query string buffer.
@return
@retval FALSE No error
@retval TRUE Failure
*/
bool ha_federated::append_stmt_insert(String *query)
{
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
Field **field;
uint tmp_length;
/* The main insert query string */
String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
DBUG_ENTER("ha_federated::append_stmt_insert");
insert_string.length(0);
if (replace_duplicates)
insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
else if (ignore_duplicates && !insert_dup_update)
insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
else
insert_string.append(STRING_WITH_LEN("INSERT INTO "));
append_ident(&insert_string, share->table_name, share->table_name_length,
ident_quote_char);
insert_string.append(FEDERATED_OPENPAREN);
tmp_length= insert_string.length() - strlen(FEDERATED_COMMA);
/*
loop through the field pointer array, add any fields to both the values
list and the fields list that match the current query id
*/
for (field= table->field; *field; field++)
{
/* append the field name */
append_ident(&insert_string, (*field)->field_name,
strlen((*field)->field_name), ident_quote_char);
/* append commas between both fields and fieldnames */
/*
unfortunately, we can't use the logic
if *(fields + 1) to make the following
appends conditional because we may not append
if the next field doesn't match the condition:
(((*field)->query_id && (*field)->query_id == current_query_id)
*/
insert_string.append(FEDERATED_COMMA);
}
/*
remove trailing comma
*/
insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
/*
if there were no fields, we don't want to add a closing paren
AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();"
*/
if (insert_string.length() > tmp_length)
{
insert_string.append(FEDERATED_CLOSEPAREN);
}
insert_string.append(FEDERATED_VALUES);
DBUG_RETURN(query->append(insert_string));
}
/* /*
write_row() inserts a row. No extra() hint is given currently if a bulk load write_row() inserts a row. No extra() hint is given currently if a bulk load
is happeneding. buf() is a byte array of data. You can use the field is happeneding. buf() is a byte array of data. You can use the field
...@@ -1600,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table, ...@@ -1600,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table,
int ha_federated::write_row(byte *buf) int ha_federated::write_row(byte *buf)
{ {
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
Field **field; Field **field;
uint tmp_length;
int error= 0;
bool use_bulk_insert;
bool auto_increment_update_required= table->next_number_field;
/* The main insert query string */
String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
/* The string containing the values to be added to the insert */ /* The string containing the values to be added to the insert */
String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin);
/* The actual value of the field, to be added to the values_string */ /* The actual value of the field, to be added to the values_string */
...@@ -1614,7 +1695,6 @@ int ha_federated::write_row(byte *buf) ...@@ -1614,7 +1695,6 @@ int ha_federated::write_row(byte *buf)
sizeof(insert_field_value_buffer), sizeof(insert_field_value_buffer),
&my_charset_bin); &my_charset_bin);
values_string.length(0); values_string.length(0);
insert_string.length(0);
insert_field_value_string.length(0); insert_field_value_string.length(0);
DBUG_ENTER("ha_federated::write_row"); DBUG_ENTER("ha_federated::write_row");
...@@ -1624,19 +1704,19 @@ int ha_federated::write_row(byte *buf) ...@@ -1624,19 +1704,19 @@ int ha_federated::write_row(byte *buf)
/* /*
start both our field and field values strings start both our field and field values strings
We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE"
Ignore duplicates is always true when insert_dup_update is true.
When replace_duplicates == TRUE, we can safely enable multi-row insert.
When performing multi-row insert, we only collect the columns values for
the row. The start of the statement is only created when the first
row is copied in to the bulk_insert string.
*/ */
if (replace_duplicates) if (!(use_bulk_insert= bulk_insert.str &&
insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); (!insert_dup_update || replace_duplicates)))
else if (ignore_duplicates && !insert_dup_update) append_stmt_insert(&values_string);
insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
else
insert_string.append(STRING_WITH_LEN("INSERT INTO "));
append_ident(&insert_string, share->table_name,
share->table_name_length, ident_quote_char);
insert_string.append(FEDERATED_OPENPAREN);
values_string.append(FEDERATED_VALUES);
values_string.append(FEDERATED_OPENPAREN); values_string.append(FEDERATED_OPENPAREN);
tmp_length= values_string.length();
/* /*
loop through the field pointer array, add any fields to both the values loop through the field pointer array, add any fields to both the values
...@@ -1655,9 +1735,6 @@ int ha_federated::write_row(byte *buf) ...@@ -1655,9 +1735,6 @@ int ha_federated::write_row(byte *buf)
insert_field_value_string.length(0); insert_field_value_string.length(0);
} }
/* append the field name */
append_ident(&insert_string, (*field)->field_name,
strlen((*field)->field_name), ident_quote_char);
/* append the value */ /* append the value */
values_string.append(insert_field_value_string); values_string.append(insert_field_value_string);
...@@ -1671,32 +1748,61 @@ int ha_federated::write_row(byte *buf) ...@@ -1671,32 +1748,61 @@ int ha_federated::write_row(byte *buf)
if the next field doesn't match the condition: if the next field doesn't match the condition:
(((*field)->query_id && (*field)->query_id == current_query_id) (((*field)->query_id && (*field)->query_id == current_query_id)
*/ */
insert_string.append(FEDERATED_COMMA);
values_string.append(FEDERATED_COMMA); values_string.append(FEDERATED_COMMA);
} }
/*
remove trailing comma
*/
insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA));
/* /*
if there were no fields, we don't want to add a closing paren if there were no fields, we don't want to add a closing paren
AND, we don't want to chop off the last char '(' AND, we don't want to chop off the last char '('
insert will be "INSERT INTO t1 VALUES ();" insert will be "INSERT INTO t1 VALUES ();"
*/ */
if (table->s->fields) if (values_string.length() > tmp_length)
{ {
/* chops off leading commas */ /* chops off leading commas */
values_string.length(values_string.length() - strlen(FEDERATED_COMMA)); values_string.length(values_string.length() - strlen(FEDERATED_COMMA));
insert_string.append(FEDERATED_CLOSEPAREN);
} }
/* we always want to append this, even if there aren't any fields */ /* we always want to append this, even if there aren't any fields */
values_string.append(FEDERATED_CLOSEPAREN); values_string.append(FEDERATED_CLOSEPAREN);
/* add the values */ if (use_bulk_insert)
insert_string.append(values_string); {
/*
Send the current bulk insert out if appending the current row would
cause the statement to overflow the packet size, otherwise set
auto_increment_update_required to FALSE as no query was executed.
*/
if (bulk_insert.length + values_string.length() + bulk_padding >
mysql->net.max_packet_size && bulk_insert.length)
{
error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length);
bulk_insert.length= 0;
}
else
auto_increment_update_required= FALSE;
if (bulk_insert.length == 0)
{
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE];
String insert_string(insert_buffer, sizeof(insert_buffer),
&my_charset_bin);
insert_string.length(0);
append_stmt_insert(&insert_string);
dynstr_append_mem(&bulk_insert, insert_string.ptr(),
insert_string.length());
}
else
dynstr_append_mem(&bulk_insert, ",", 1);
dynstr_append_mem(&bulk_insert, values_string.ptr(),
values_string.length());
}
else
{
error= mysql_real_query(mysql, values_string.ptr(),
values_string.length());
}
if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) if (error)
{ {
DBUG_RETURN(stash_remote_error()); DBUG_RETURN(stash_remote_error());
} }
...@@ -1704,12 +1810,79 @@ int ha_federated::write_row(byte *buf) ...@@ -1704,12 +1810,79 @@ int ha_federated::write_row(byte *buf)
If the table we've just written a record to contains an auto_increment If the table we've just written a record to contains an auto_increment
field, then store the last_insert_id() value from the foreign server field, then store the last_insert_id() value from the foreign server
*/ */
if (table->next_number_field) if (auto_increment_update_required)
update_auto_increment(); update_auto_increment();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
@brief Prepares the storage engine for bulk inserts.
@param[in] rows estimated number of rows in bulk insert
or 0 if unknown.
@details Initializes memory structures required for bulk insert.
*/
void ha_federated::start_bulk_insert(ha_rows rows)
{
uint page_size;
DBUG_ENTER("ha_federated::start_bulk_insert");
dynstr_free(&bulk_insert);
/**
We don't bother with bulk-insert semantics when the estimated rows == 1
The rows value will be 0 if the server does not know how many rows
would be inserted. This can occur when performing INSERT...SELECT
*/
if (rows == 1)
DBUG_VOID_RETURN;
page_size= (uint) my_getpagesize();
if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size))
DBUG_VOID_RETURN;
bulk_insert.length= 0;
DBUG_VOID_RETURN;
}
/**
@brief End bulk insert.
@details This method will send any remaining rows to the remote server.
Finally, it will deinitialize the bulk insert data structure.
@return Operation status
@retval 0 No error
@retval != 0 Error occured at remote server. Also sets my_errno.
*/
int ha_federated::end_bulk_insert()
{
int error= 0;
DBUG_ENTER("ha_federated::end_bulk_insert");
if (bulk_insert.str && bulk_insert.length)
{
if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length))
error= stash_remote_error();
else
if (table->next_number_field)
update_auto_increment();
}
dynstr_free(&bulk_insert);
DBUG_RETURN(my_errno= error);
}
/* /*
ha_federated::update_auto_increment ha_federated::update_auto_increment
...@@ -2451,7 +2624,6 @@ int ha_federated::info(uint flag) ...@@ -2451,7 +2624,6 @@ int ha_federated::info(uint flag)
{ {
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; char error_buffer[FEDERATED_QUERY_BUFFER_SIZE];
char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; char status_buf[FEDERATED_QUERY_BUFFER_SIZE];
char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE];
int error; int error;
uint error_code; uint error_code;
MYSQL_RES *result= 0; MYSQL_RES *result= 0;
......
...@@ -159,6 +159,7 @@ class ha_federated: public handler ...@@ -159,6 +159,7 @@ class ha_federated: public handler
char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates; bool ignore_duplicates, replace_duplicates;
bool insert_dup_update; bool insert_dup_update;
DYNAMIC_STRING bulk_insert;
private: private:
/* /*
...@@ -173,6 +174,14 @@ private: ...@@ -173,6 +174,14 @@ private:
bool records_in_range); bool records_in_range);
int stash_remote_error(); int stash_remote_error();
bool append_stmt_insert(String *query);
int read_next(byte *buf, MYSQL_RES *result);
int index_read_idx_with_result_set(byte *buf, uint index,
const byte *key,
uint key_len,
ha_rkey_function find_flag,
MYSQL_RES **result);
public: public:
ha_federated(TABLE *table_arg); ha_federated(TABLE *table_arg);
~ha_federated() ~ha_federated()
...@@ -258,6 +267,8 @@ public: ...@@ -258,6 +267,8 @@ public:
int open(const char *name, int mode, uint test_if_locked); // required int open(const char *name, int mode, uint test_if_locked); // required
int close(void); // required int close(void); // required
void start_bulk_insert(ha_rows rows);
int end_bulk_insert();
int write_row(byte *buf); int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data); int update_row(const byte *old_data, byte *new_data);
int delete_row(const byte *buf); int delete_row(const byte *buf);
...@@ -301,14 +312,7 @@ public: ...@@ -301,14 +312,7 @@ public:
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); //required enum thr_lock_type lock_type); //required
virtual bool get_error_message(int error, String *buf); bool get_error_message(int error, String *buf);
int read_next(byte *buf, MYSQL_RES *result);
int index_read_idx_with_result_set(byte *buf, uint index,
const byte *key,
uint key_len,
ha_rkey_function find_flag,
MYSQL_RES **result);
}; };
bool federated_db_init(void); bool federated_db_init(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment