Commit 2842f6b5 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: error handling.

Add an error code to the wait_for_commit facility.

Now, when a transaction fails, it can signal the error to
any subsequent transaction that is waiting for it to commit.
The waiting transactions then receive the error code back from
wait_for_prior_commit() and can handle the error appropriately.

Also fix one race that could cause crash if @@slave_parallel_threads
were changed several times quickly in succession.
parent 2e100cc5
......@@ -716,7 +716,7 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
thd_wakeup_subsequent_commits() is only needed when no transaction
coordinator is used, meaning a single storage engine and no binary log.
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd);
void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);
#ifdef __cplusplus
}
......
......@@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
struct mysql_event_general
{
unsigned int event_subclass;
......
......@@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{
......
......@@ -189,7 +189,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,
......
......@@ -1458,10 +1458,11 @@ int ha_commit_one_phase(THD *thd, bool all)
transaction.all.ha_list, see why in trans_register_ha()).
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
int res;
DBUG_ENTER("ha_commit_one_phase");
if (is_real_trans)
thd->wait_for_prior_commit();
int res= commit_one_phase_2(thd, all, trans, is_real_trans);
if (is_real_trans && (res= thd->wait_for_prior_commit()))
DBUG_RETURN(res);
res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
......@@ -1501,7 +1502,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
{
thd->wakeup_subsequent_commits();
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
......@@ -1579,7 +1580,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
thd->wakeup_subsequent_commits();
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
if (all)
......
......@@ -6743,7 +6743,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
waiter->wakeup();
waiter->wakeup(0);
}
waiter= next;
}
......@@ -6849,7 +6849,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
field.
*/
if (next->queued_by_other)
next->thd->wait_for_commit_ptr->wakeup();
next->thd->wait_for_commit_ptr->wakeup(entry->error);
else
next->thd->signal_wakeup_ready();
}
......@@ -7145,7 +7145,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
if (current != leader) // Don't wake up ourself
{
if (current->queued_by_other)
current->thd->wait_for_commit_ptr->wakeup();
current->thd->wait_for_commit_ptr->wakeup(current->error);
else
current->thd->signal_wakeup_ready();
}
......@@ -7844,7 +7844,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
mysql_mutex_unlock(&LOCK_prepare_ordered);
}
thd->wait_for_prior_commit();
if (thd->wait_for_prior_commit())
return 0;
cookie= 0;
if (xid)
......
......@@ -52,7 +52,7 @@
struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void
static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
......@@ -70,6 +70,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
/* ToDo: error handling. */
return err;
}
......@@ -104,6 +105,7 @@ handle_rpl_parallel_thread(void *arg)
bool group_standalone= true;
bool in_event_group= false;
uint64 event_gtid_sub_id= 0;
int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
......@@ -139,6 +141,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
while (!rpt->stop && !thd->killed)
{
......@@ -163,6 +166,7 @@ handle_rpl_parallel_thread(void *arg)
uint64 wait_start_sub_id;
bool end_of_group;
err= 0;
/* Handle a new event group, which will be initiated by a GTID event. */
if (event_type == GTID_EVENT)
{
......@@ -221,9 +225,9 @@ handle_rpl_parallel_thread(void *arg)
everything is stopped and cleaned up correctly.
*/
if (!sql_worker_killed(thd, rgi, in_event_group))
rpt_handle_event(events, rpt);
err= rpt_handle_event(events, rpt);
else
thd->wait_for_prior_commit();
err= thd->wait_for_prior_commit();
end_of_group=
in_event_group &&
......@@ -272,7 +276,7 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
rgi->commit_orderer.wakeup_subsequent_commits();
rgi->commit_orderer.wakeup_subsequent_commits(err);
delete rgi;
}
......@@ -431,6 +435,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
while (!pool->threads[i]->running)
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
&pool->threads[i]->LOCK_rpl_thread);
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
......
......@@ -6557,3 +6557,5 @@ ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a stored function or trigger"
ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE
eng "Cannot change @@slave_parallel_threads while another change is in progress"
ER_PRIOR_COMMIT_FAILED
eng "Commit failed due to failure of an earlier commit on which this one depends"
......@@ -610,9 +610,9 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
@see thd_wakeup_subsequent_commits() definition in plugin.h
*/
extern "C"
void thd_wakeup_subsequent_commits(THD *thd)
void thd_wakeup_subsequent_commits(THD *thd, int wakeup_error)
{
thd->wakeup_subsequent_commits();
thd->wakeup_subsequent_commits(wakeup_error);
}
......@@ -5618,7 +5618,8 @@ bool THD::rgi_have_temporary_tables()
wait_for_commit::wait_for_commit()
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
opaque_pointer(0),
waiting_for_commit(false), wakeup_subsequent_commits_running(false)
waiting_for_commit(false), wakeup_error(0),
wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
......@@ -5633,7 +5634,7 @@ wait_for_commit::~wait_for_commit()
void
wait_for_commit::wakeup()
wait_for_commit::wakeup(int wakeup_error)
{
/*
We signal each waiter on their own condition and mutex (rather than using
......@@ -5649,6 +5650,7 @@ wait_for_commit::wakeup()
*/
mysql_mutex_lock(&LOCK_wait_commit);
waiting_for_commit= false;
this->wakeup_error= wakeup_error;
mysql_mutex_unlock(&LOCK_wait_commit);
mysql_cond_signal(&COND_wait_commit);
}
......@@ -5675,6 +5677,7 @@ void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
waiting_for_commit= true;
wakeup_error= 0;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
this->waitee= waitee;
......@@ -5704,7 +5707,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
with register_wait_for_prior_commit(). If the commit already completed,
returns immediately.
*/
void
int
wait_for_commit::wait_for_prior_commit2()
{
mysql_mutex_lock(&LOCK_wait_commit);
......@@ -5712,6 +5715,7 @@ wait_for_commit::wait_for_prior_commit2()
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit);
waitee= NULL;
return wakeup_error;
}
......@@ -5755,7 +5759,7 @@ wait_for_commit::wait_for_prior_commit2()
*/
void
wait_for_commit::wakeup_subsequent_commits2()
wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
{
wait_for_commit *waiter;
......@@ -5772,7 +5776,7 @@ wait_for_commit::wakeup_subsequent_commits2()
once the wakeup is done, the field could be invalidated at any time.
*/
wait_for_commit *next= waiter->next_subsequent_commit;
waiter->wakeup();
waiter->wakeup(wakeup_error);
waiter= next;
}
......
......@@ -1614,6 +1614,8 @@ struct wait_for_commit
cleared.
*/
bool waiting_for_commit;
/* The wakeup error code from the waitee. 0 means no error. */
int wakeup_error;
/*
Flag set when wakeup_subsequent_commits_running() is active, see comments
on that function for details.
......@@ -1621,16 +1623,18 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee);
void wait_for_prior_commit()
int wait_for_prior_commit()
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waiting_for_commit)
wait_for_prior_commit2();
return wait_for_prior_commit2();
else
return wakeup_error;
}
void wakeup_subsequent_commits()
void wakeup_subsequent_commits(int wakeup_error)
{
/*
Do the check inline, so only the wakeup case takes the cost of a function
......@@ -1645,7 +1649,7 @@ struct wait_for_commit
prevent a waiter from arriving just after releasing the lock.
*/
if (subsequent_commits_list)
wakeup_subsequent_commits2();
wakeup_subsequent_commits2(wakeup_error);
}
void unregister_wait_for_prior_commit()
{
......@@ -1653,10 +1657,10 @@ struct wait_for_commit
unregister_wait_for_prior_commit2();
}
void wakeup();
void wakeup(int wakeup_error);
void wait_for_prior_commit2();
void wakeup_subsequent_commits2();
int wait_for_prior_commit2();
void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();
wait_for_commit();
......@@ -3308,15 +3312,21 @@ public:
void signal_wakeup_ready();
wait_for_commit *wait_for_commit_ptr;
void wait_for_prior_commit()
int wait_for_prior_commit()
{
if (wait_for_commit_ptr)
wait_for_commit_ptr->wait_for_prior_commit();
{
int err= wait_for_commit_ptr->wait_for_prior_commit();
if (err)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return err;
}
return 0;
}
void wakeup_subsequent_commits()
void wakeup_subsequent_commits(int wakeup_error)
{
if (wait_for_commit_ptr)
wait_for_commit_ptr->wakeup_subsequent_commits();
wait_for_commit_ptr->wakeup_subsequent_commits(wakeup_error);
}
private:
......
......@@ -2927,7 +2927,7 @@ innobase_commit(
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
thd_wakeup_subsequent_commits(thd);
thd_wakeup_subsequent_commits(thd, 0);
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */
......
......@@ -3588,7 +3588,7 @@ innobase_commit(
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
thd_wakeup_subsequent_commits(thd);
thd_wakeup_subsequent_commits(thd, 0);
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment