Commit fd92d807 authored by mats@mysql.com's avatar mats@mysql.com

BUG#20821 (INSERT DELAYED fails to write some rows to binlog):

Reverting to old behaviour of writing the query before all rows
have been written.
parent 78a9adb6
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE SCHEMA IF NOT EXISTS mysqlslap;
USE mysqlslap;
CREATE TABLE t1 (id INT, name VARCHAR(64));
SELECT COUNT(*) FROM mysqlslap.t1;
COUNT(*)
20000
SELECT COUNT(*) FROM mysqlslap.t1;
COUNT(*)
20000
DROP SCHEMA IF EXISTS mysqlslap;
...@@ -17,8 +17,10 @@ Log_name Pos Event_type Server_id End_log_pos Info ...@@ -17,8 +17,10 @@ Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 4 Format_desc 1 102 Server ver: VERSION, Binlog ver: 4 master-bin.000001 4 Format_desc 1 102 Server ver: VERSION, Binlog ver: 4
master-bin.000001 102 Query 1 222 use `test`; create table t1(a int not null primary key) engine=myisam master-bin.000001 102 Query 1 222 use `test`; create table t1(a int not null primary key) engine=myisam
master-bin.000001 222 Table_map 1 261 table_id: # (test.t1) master-bin.000001 222 Table_map 1 261 table_id: # (test.t1)
master-bin.000001 261 Write_rows 1 305 table_id: # flags: STMT_END_F master-bin.000001 261 Write_rows 1 295 table_id: # flags: STMT_END_F
master-bin.000001 305 Query 1 380 use `test`; flush tables master-bin.000001 295 Table_map 1 334 table_id: # (test.t1)
master-bin.000001 334 Write_rows 1 373 table_id: # flags: STMT_END_F
master-bin.000001 373 Query 1 448 use `test`; flush tables
SELECT * FROM t1 ORDER BY a; SELECT * FROM t1 ORDER BY a;
a a
1 1
......
#
# Bug#20821: INSERT DELAYED fails to write some rows to binlog
#
--source include/master-slave.inc
--source include/not_embedded.inc
--source include/not_windows.inc
--disable_warnings
CREATE SCHEMA IF NOT EXISTS mysqlslap;
USE mysqlslap;
--enable_warnings
CREATE TABLE t1 (id INT, name VARCHAR(64));
let $query = "INSERT INTO t1 VALUES (1, 'Dr. No'), (2, 'From Russia With Love'), (3, 'Goldfinger'), (4, 'Thunderball'), (5, 'You Only Live Twice')";
--exec $MYSQL_SLAP --silent --concurrency=20 --iterations=200 --query=$query --delimiter=";"
SELECT COUNT(*) FROM mysqlslap.t1;
sync_slave_with_master;
SELECT COUNT(*) FROM mysqlslap.t1;
connection master;
DROP SCHEMA IF EXISTS mysqlslap;
sync_slave_with_master;
...@@ -2717,6 +2717,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, ...@@ -2717,6 +2717,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype,
bool is_trans, bool suppress_use) bool is_trans, bool suppress_use)
{ {
DBUG_ENTER("THD::binlog_query"); DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype=%d, query='%s'", qtype, query));
DBUG_ASSERT(query && mysql_bin_log.is_open()); DBUG_ASSERT(query && mysql_bin_log.is_open());
switch (qtype) { switch (qtype) {
......
...@@ -26,8 +26,8 @@ ...@@ -26,8 +26,8 @@
static int check_null_fields(THD *thd,TABLE *entry); static int check_null_fields(THD *thd,TABLE *entry);
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore, static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
char *query, uint query_length, bool log_on); LEX_STRING query, bool ignore, bool log_on);
static void end_delayed_insert(THD *thd); static void end_delayed_insert(THD *thd);
pthread_handler_t handle_delayed_insert(void *arg); pthread_handler_t handle_delayed_insert(void *arg);
static void unlink_blobs(register TABLE *table); static void unlink_blobs(register TABLE *table);
...@@ -511,7 +511,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -511,7 +511,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED) if (lock_type == TL_WRITE_DELAYED)
{ {
error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on); LEX_STRING const st_query = { query, thd->query_length };
error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
query=0; query=0;
} }
else else
...@@ -1237,11 +1238,16 @@ public: ...@@ -1237,11 +1238,16 @@ public:
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query; bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
ulonglong last_insert_id; ulonglong last_insert_id;
timestamp_auto_set_type timestamp_field_type; timestamp_auto_set_type timestamp_field_type;
LEX_STRING query;
delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg) delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
:record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {} bool ignore_arg, bool log_query_arg)
: record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
query(query_arg)
{}
~delayed_row() ~delayed_row()
{ {
x_free(query.str);
x_free(record); x_free(record);
} }
}; };
...@@ -1249,9 +1255,6 @@ public: ...@@ -1249,9 +1255,6 @@ public:
class delayed_insert :public ilink { class delayed_insert :public ilink {
uint locks_in_memory; uint locks_in_memory;
char *query;
ulong query_length;
ulong query_allocated;
public: public:
THD thd; THD thd;
TABLE *table; TABLE *table;
...@@ -1265,7 +1268,7 @@ public: ...@@ -1265,7 +1268,7 @@ public:
TABLE_LIST table_list; // Argument TABLE_LIST table_list; // Argument
delayed_insert() delayed_insert()
:locks_in_memory(0), query(0), query_length(0), query_allocated(0), :locks_in_memory(0),
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
group_count(0) group_count(0)
{ {
...@@ -1291,7 +1294,6 @@ public: ...@@ -1291,7 +1294,6 @@ public:
} }
~delayed_insert() ~delayed_insert()
{ {
my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
/* The following is not really needed, but just for safety */ /* The following is not really needed, but just for safety */
delayed_row *row; delayed_row *row;
while ((row=rows.get())) while ((row=rows.get()))
...@@ -1311,25 +1313,6 @@ public: ...@@ -1311,25 +1313,6 @@ public:
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */ VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
} }
int set_query(char const *q, ulong qlen) {
if (q && qlen > 0)
{
if (query_allocated < qlen + 1)
{
ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
query= my_realloc(query, qlen + 1, MYF(flags));
if (query == 0)
return HA_ERR_OUT_OF_MEM;
query_allocated= qlen;
}
query_length= qlen;
memcpy(query, q, qlen + 1);
}
else
query_length= 0;
return 0;
}
/* The following is for checking when we can delete ourselves */ /* The following is for checking when we can delete ourselves */
inline void lock() inline void lock()
{ {
...@@ -1616,13 +1599,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) ...@@ -1616,13 +1599,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
/* Put a question in queue */ /* Put a question in queue */
static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, static int
bool ignore, char *query, uint query_length, write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
bool log_on) LEX_STRING query, bool ignore, bool log_on)
{ {
delayed_row *row=0; delayed_row *row;
delayed_insert *di=thd->di; delayed_insert *di=thd->di;
DBUG_ENTER("write_delayed"); DBUG_ENTER("write_delayed");
DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
thd->proc_info="waiting for handler insert"; thd->proc_info="waiting for handler insert";
pthread_mutex_lock(&di->mutex); pthread_mutex_lock(&di->mutex);
...@@ -1630,13 +1614,28 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, ...@@ -1630,13 +1614,28 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
pthread_cond_wait(&di->cond_client,&di->mutex); pthread_cond_wait(&di->cond_client,&di->mutex);
thd->proc_info="storing row into queue"; thd->proc_info="storing row into queue";
if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on))) if (thd->killed)
goto err;
/*
Take a copy of the query string, if there is any. The string will
be free'ed when the row is destroyed. If there is no query string,
we don't do anything special.
*/
if (query.str)
if (!(query.str= my_strndup(query.str, MYF(MY_WME), query.length)))
goto err;
row= new delayed_row(query, duplic, ignore, log_on);
if (row == NULL)
{
my_free(query.str, MYF(MY_WME));
goto err; goto err;
}
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME)))) if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
goto err; goto err;
memcpy(row->record, table->record[0], table->s->reclength); memcpy(row->record, table->record[0], table->s->reclength);
di->set_query(query, query_length);
row->start_time= thd->start_time; row->start_time= thd->start_time;
row->query_start_used= thd->query_start_used; row->query_start_used= thd->query_start_used;
row->last_insert_id_used= thd->last_insert_id_used; row->last_insert_id_used= thd->last_insert_id_used;
...@@ -1995,7 +1994,7 @@ bool delayed_insert::handle_inserts(void) ...@@ -1995,7 +1994,7 @@ bool delayed_insert::handle_inserts(void)
if (thd.killed || table->s->version != refresh_version) if (thd.killed || table->s->version != refresh_version)
{ {
thd.killed= THD::KILL_CONNECTION; thd.killed= THD::KILL_CONNECTION;
max_rows= ~(ulong)0; // Do as much as possible max_rows= ULONG_MAX; // Do as much as possible
} }
/* /*
...@@ -2042,11 +2041,18 @@ bool delayed_insert::handle_inserts(void) ...@@ -2042,11 +2041,18 @@ bool delayed_insert::handle_inserts(void)
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
row->log_query = 0; row->log_query = 0;
} }
if (using_ignore) if (using_ignore)
{ {
using_ignore=0; using_ignore=0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
} }
if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
thd.binlog_query(THD::ROW_QUERY_TYPE,
row->query.str, row->query.length,
FALSE, FALSE);
if (table->s->blob_fields) if (table->s->blob_fields)
free_delayed_insert_blobs(table); free_delayed_insert_blobs(table);
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status); thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
...@@ -2093,13 +2099,25 @@ bool delayed_insert::handle_inserts(void) ...@@ -2093,13 +2099,25 @@ bool delayed_insert::handle_inserts(void)
pthread_cond_broadcast(&cond_client); // If waiting clients pthread_cond_broadcast(&cond_client); // If waiting clients
} }
} }
thd.proc_info=0; thd.proc_info=0;
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
/* After releasing the mutex, to prevent deadlocks. */ #ifdef HAVE_ROW_BASED_REPLICATION
if (mysql_bin_log.is_open()) /*
thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE); We need to flush the pending event when using row-based
replication since the flushing normally done in binlog_query() is
not done last in the statement: for delayed inserts, the insert
statement is logged *before* all rows are inserted.
We can flush the pending event without checking the thd->lock
since the delayed insert *thread* is not inside a stored function
or trigger.
TODO: Move the logging to last in the sequence of rows.
*/
if (thd.current_stmt_binlog_row_based)
thd.binlog_flush_pending_rows_event(TRUE);
#endif /* HAVE_ROW_BASED_REPLICATION */
if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen { // This shouldn't happen
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment