diff --git a/include/thr_lock.h b/include/thr_lock.h index c7754ada299487923afd8215416dd366b3318c6b..e409df30972b27e6264e0b706f856fcfe0ab94b5 100644 --- a/include/thr_lock.h +++ b/include/thr_lock.h @@ -159,7 +159,8 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count); void thr_abort_locks(THR_LOCK *lock, my_bool upgrade_lock); my_bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread); void thr_print_locks(void); /* For debugging */ -my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data); +my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data, + enum thr_lock_type new_lock_type); void thr_downgrade_write_lock(THR_LOCK_DATA *data, enum thr_lock_type new_lock_type); my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data); diff --git a/mysql-test/r/delayed.result b/mysql-test/r/delayed.result index bcda6ddb6ab52bf479fb5ae2e4cd8bc2235fc110..4d5d656f3ce7bb7ae5fbea485e5d37050bf87d47 100644 --- a/mysql-test/r/delayed.result +++ b/mysql-test/r/delayed.result @@ -284,4 +284,30 @@ ERROR 22007: Incorrect date value: '0000-00-00' for column 'f1' at row 1 INSERT DELAYED INTO t2 VALUES (0,'2007-00-00'); ERROR 22007: Incorrect date value: '2007-00-00' for column 'f1' at row 1 DROP TABLE t1,t2; +set @old_delayed_updates = @@global.low_priority_updates; +set global low_priority_updates = 1; +select @@global.low_priority_updates; +@@global.low_priority_updates +1 +drop table if exists t1; +create table t1 (a int, b int); +insert into t1 values (1,1); +lock table t1 read; +connection: update +insert delayed into t1 values (2,2);; +connection: select +select * from t1; +a b +1 1 +connection: default +select * from t1; +a b +1 1 +unlock tables; +select * from t1; +a b +1 1 +2 2 +drop table t1; +set global low_priority_updates = @old_delayed_updates; End of 5.1 tests diff --git a/mysql-test/suite/rpl_ndb/t/disabled.def b/mysql-test/suite/rpl_ndb/t/disabled.def index 694f7098980dd9eeac0e90ea857f443a770825fa..6908269d014ebac47bc7196caa424feca4e60bc9 100644 --- a/mysql-test/suite/rpl_ndb/t/disabled.def +++ b/mysql-test/suite/rpl_ndb/t/disabled.def @@ -10,7 +10,4 @@ # ############################################################################## -rpl_ndb_circular : Bug#41183 rpl_ndb_circular, rpl_ndb_circular_simplex need maintenance, crash -rpl_ndb_circular_simplex : Bug#41183 rpl_ndb_circular, rpl_ndb_circular_simplex need maintenance, crash - # the below testcase have been reworked to avoid the bug, test contains comment, keep bug open diff --git a/mysql-test/t/delayed.test b/mysql-test/t/delayed.test index ce57645bd4ba397731e6133c7b3d6e7c1f26a293..94ad22b80d03289c0de38792d0efdd8439615e7c 100644 --- a/mysql-test/t/delayed.test +++ b/mysql-test/t/delayed.test @@ -285,4 +285,47 @@ INSERT DELAYED INTO t2 VALUES (0,'0000-00-00'); INSERT DELAYED INTO t2 VALUES (0,'2007-00-00'); DROP TABLE t1,t2; +# +# Bug#40536: SELECT is blocked by INSERT DELAYED waiting on upgrading lock, +# even with low_priority_updates +# + +set @old_delayed_updates = @@global.low_priority_updates; +set global low_priority_updates = 1; +select @@global.low_priority_updates; + +--disable_warnings +drop table if exists t1; +--enable_warnings +create table t1 (a int, b int); +insert into t1 values (1,1); +lock table t1 read; +connect (update,localhost,root,,); +connection update; +--echo connection: update +--send insert delayed into t1 values (2,2); +connection default; +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = "Delayed insert" and state = "upgrading lock"; +--source include/wait_condition.inc +connect (select,localhost,root,,); +--echo connection: select +select * from t1; +connection default; +--echo connection: default +select * from t1; +connection default; +disconnect update; +disconnect select; +unlock tables; +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = "Delayed insert" and state = "Waiting for INSERT"; +--source include/wait_condition.inc +select * from t1; +drop table t1; + +set global low_priority_updates = @old_delayed_updates; + --echo End of 5.1 tests diff --git a/mysys/thr_lock.c b/mysys/thr_lock.c index b13e8411771a00278421c1cdf459921756d9e904..31638ecee9afc4f03332e418cb6a8cd36e489765 100644 --- a/mysys/thr_lock.c +++ b/mysys/thr_lock.c @@ -1359,7 +1359,8 @@ void thr_downgrade_write_lock(THR_LOCK_DATA *in_data, /* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */ -my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) +my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data, + enum thr_lock_type new_lock_type) { THR_LOCK *lock=data->lock; DBUG_ENTER("thr_upgrade_write_delay_lock"); @@ -1372,7 +1373,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) } check_locks(lock,"before upgrading lock",0); /* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */ - data->type=TL_WRITE; /* Upgrade lock */ + data->type= new_lock_type; /* Upgrade lock */ /* Check if someone has given us the lock */ if (!data->cond) @@ -1411,6 +1412,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data) my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) { THR_LOCK *lock=data->lock; + enum thr_lock_type write_lock_type; DBUG_ENTER("thr_reschedule_write_lock"); pthread_mutex_lock(&lock->mutex); @@ -1420,6 +1422,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) DBUG_RETURN(0); } + write_lock_type= data->type; data->type=TL_WRITE_DELAYED; if (lock->update_status) (*lock->update_status)(data->status_param); @@ -1438,7 +1441,7 @@ my_bool thr_reschedule_write_lock(THR_LOCK_DATA *data) free_all_read_locks(lock,0); pthread_mutex_unlock(&lock->mutex); - DBUG_RETURN(thr_upgrade_write_delay_lock(data)); + DBUG_RETURN(thr_upgrade_write_delay_lock(data, write_lock_type)); } diff --git a/sql/log_event.cc b/sql/log_event.cc index 0f980f1808df943295b4c033846debb6d7f6b620..0e400ac270504bce3712225b2d6312ecb9024dbd 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -53,6 +53,8 @@ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd); + static const char *HA_ERR(int i) { switch (i) { @@ -2894,7 +2896,37 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); - const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock) + { + /* + Cleaning-up the last statement context: + the terminal event of the current statement flagged with + STMT_END_F got filtered out in ndb circular replication. + */ + int error; + char llbuff[22]; + if ((error= rows_event_stmt_cleanup(const_cast<Relay_log_info*>(rli), thd))) + { + const_cast<Relay_log_info*>(rli)->report(ERROR_LEVEL, error, + "Error in cleaning up after an event preceeding the commit; " + "the group log file/position: %s %s", + const_cast<Relay_log_info*>(rli)->group_master_log_name, + llstr(const_cast<Relay_log_info*>(rli)->group_master_log_pos, + llbuff)); + } + /* + Executing a part of rli->stmt_done() logics that does not deal + with group position change. The part is redundant now but is + future-change-proof addon, e.g if COMMIT handling will start checking + invariants like IN_STMT flag must be off at committing the transaction. + */ + const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos(); + const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT); + } + else + { + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + } /* Note: We do not need to execute reset_one_shot_variables() if this @@ -7403,16 +7435,20 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli) return Log_event::do_shall_skip(rli); } -int -Rows_log_event::do_update_pos(Relay_log_info *rli) -{ - DBUG_ENTER("Rows_log_event::do_update_pos"); - int error= 0; - - DBUG_PRINT("info", ("flags: %s", - get_flags(STMT_END_F) ? "STMT_END_F " : "")); +/** + The function is called at Rows_log_event statement commit time, + normally from Rows_log_event::do_update_pos() and possibly from + Query_log_event::do_apply_event() of the COMMIT. + The function commits the last statement for engines, binlog and + releases resources have been allocated for the statement. + + @retval 0 Ok. + @retval non-zero Error at the commit. + */ - if (get_flags(STMT_END_F)) +static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) +{ + int error; { /* This is the end of a statement or transaction, so close (and @@ -7454,14 +7490,39 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) thd->reset_current_stmt_binlog_row_based(); - rli->cleanup_context(thd, 0); - if (error == 0) + const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); + } + return error; +} + +/** + The method either increments the relay log position or + commits the current statement and increments the master group + possition if the event is STMT_END_F flagged and + the statement corresponds to the autocommit query (i.e replicated + without wrapping in BEGIN/COMMIT) + + @retval 0 Success + @retval non-zero Error in the statement commit + */ +int +Rows_log_event::do_update_pos(Relay_log_info *rli) +{ + DBUG_ENTER("Rows_log_event::do_update_pos"); + int error= 0; + + DBUG_PRINT("info", ("flags: %s", + get_flags(STMT_END_F) ? "STMT_END_F " : "")); + + if (get_flags(STMT_END_F)) + { + if ((error= rows_event_stmt_cleanup(rli, thd)) == 0) { /* Indicate that a statement is finished. Step the group log position if we are not in a transaction, otherwise increase the event log position. - */ + */ rli->stmt_done(log_pos, when); /* @@ -7475,11 +7536,13 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) thd->clear_error(); } else + { rli->report(ERROR_LEVEL, error, "Error in %s event: commit of row events failed, " "table `%s`.`%s`", get_type_str(), m_table->s->db.str, m_table->s->table_name.str); + } } else { diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index fb437bed3fa07ae31b62331a4de23ee1168f7f6d..fcf86edeaa9451d93f87a8e7efaaf589f189a5c1 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1690,6 +1690,7 @@ public: class Delayed_insert :public ilink { uint locks_in_memory; + thr_lock_type delayed_lock; public: THD thd; TABLE *table; @@ -1731,6 +1732,8 @@ public: pthread_cond_init(&cond_client,NULL); VOID(pthread_mutex_lock(&LOCK_thread_count)); delayed_insert_threads++; + delayed_lock= global_system_variables.low_priority_updates ? + TL_WRITE_LOW_PRIORITY : TL_WRITE; VOID(pthread_mutex_unlock(&LOCK_thread_count)); } ~Delayed_insert() @@ -2540,7 +2543,7 @@ bool Delayed_insert::handle_inserts(void) table->use_all_columns(); thd_proc_info(&thd, "upgrading lock"); - if (thr_upgrade_write_delay_lock(*thd.lock->locks)) + if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock)) { /* This can happen if thread is killed either by a shutdown