Commit aa23fe7d authored by unknown's avatar unknown

MWL#116: Replace atomic queue with simple mutex locking for non-transactional binlog writes.

Also add missing destroy of pthread_cond.
parent f0707b38
......@@ -158,15 +158,15 @@ public:
before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0), commit_bin_log_file_pos(0)
{
trans_log.end_of_file= max_binlog_cache_size;
(void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
"LOCK_group_commit", MYF(0));
(void) my_pthread_mutex_init(&LOCK_binlog_participant, MY_MUTEX_INIT_SLOW,
"LOCK_binlog_participant", MYF(0));
(void) pthread_cond_init(&COND_group_commit, 0);
}
~binlog_trx_data()
{
DBUG_ASSERT(pending() == NULL);
(void) pthread_mutex_destroy(&LOCK_group_commit);
(void) pthread_mutex_destroy(&LOCK_binlog_participant);
close_cached_file(&trans_log);
}
......@@ -274,13 +274,13 @@ public:
/*
Flag set true when group commit for this transaction is finished; used
with pthread_cond_wait() to wait until commit is done.
This flag is protected by LOCK_group_commit.
This flag is protected by LOCK_binlog_participant.
*/
bool done;
/*
Flag set if this transaction is the group commit leader that will handle
the actual writing to the binlog.
This flag is protected by LOCK_group_commit.
This flag is protected by LOCK_binlog_participant.
*/
bool group_commit_leader;
/*
......@@ -297,7 +297,7 @@ public:
Log_event *end_event;
Log_event *incident_event;
/* Mutex and condition for wakeup after group commit. */
pthread_mutex_t LOCK_group_commit;
pthread_mutex_t LOCK_binlog_participant;
pthread_cond_t COND_group_commit;
/*
Binlog position after current commit, available to storage engines during
......@@ -4994,8 +4994,13 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
the commit and wake them up.
*/
pthread_mutex_lock(&trx_data->LOCK_group_commit);
const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data);
pthread_mutex_lock(&trx_data->LOCK_binlog_participant);
pthread_mutex_lock(&LOCK_queue);
binlog_trx_data *orig_queue= group_commit_queue;
trx_data->next= orig_queue;
group_commit_queue= trx_data;
pthread_mutex_unlock(&LOCK_queue);
if (orig_queue != NULL)
{
......@@ -5006,7 +5011,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
else
{
trx_data->group_commit_leader= TRUE;
pthread_mutex_unlock(&trx_data->LOCK_group_commit);
pthread_mutex_unlock(&trx_data->LOCK_binlog_participant);
trx_group_commit_leader(NULL);
}
......@@ -5020,19 +5025,19 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
this thread in the group commit once the log is obtained. So here we put
ourself in the queue and wait to be signalled that the group commit is done.
Note that this function must be called with the trs_data->LOCK_group_commit
Note that this function must be called with trx_data->LOCK_binlog_participant
locked; the mutex will be released before return.
*/
void
MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data)
{
safe_mutex_assert_owner(&trx_data->LOCK_group_commit);
safe_mutex_assert_owner(&trx_data->LOCK_binlog_participant);
/* Wait until trx_data.done == true and woken up by the leader. */
while (!trx_data->done)
pthread_cond_wait(&trx_data->COND_group_commit,
&trx_data->LOCK_group_commit);
pthread_mutex_unlock(&trx_data->LOCK_group_commit);
&trx_data->LOCK_binlog_participant);
pthread_mutex_unlock(&trx_data->LOCK_binlog_participant);
}
bool
......@@ -5131,7 +5136,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
it to the existing one. Note that there is no ordering defined between
transactional and non-transactional commits.
*/
binlog_trx_data *current= atomic_grab_trx_queue();
pthread_mutex_lock(&LOCK_queue);
binlog_trx_data *current= group_commit_queue;
group_commit_queue= NULL;
pthread_mutex_unlock(&LOCK_queue);
binlog_trx_data *xtra_queue= NULL;
while (current)
{
......@@ -5230,18 +5238,19 @@ MYSQL_BIN_LOG::trx_group_commit_leader(TC_group_commit_entry *first)
for (current= xtra_queue; current != NULL; current= current->next)
{
/*
Note that we need to take LOCK_group_commit even in the case of a leader!
Note that we need to take LOCK_binlog_participant even in the case of a
leader!
Otherwise there is a race between setting and testing the
group_commit_leader flag.
*/
pthread_mutex_lock(&current->LOCK_group_commit);
pthread_mutex_lock(&current->LOCK_binlog_participant);
if (!current->group_commit_leader)
{
current->done= true;
pthread_cond_signal(&current->COND_group_commit);
}
pthread_mutex_unlock(&current->LOCK_group_commit);
pthread_mutex_unlock(&current->LOCK_binlog_participant);
}
}
......@@ -5291,32 +5300,6 @@ MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
return 0;
}
binlog_trx_data *
MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data)
{
my_atomic_rwlock_wrlock(&LOCK_queue);
trx_data->next= group_commit_queue;
while (!my_atomic_casptr((void **)(&group_commit_queue),
(void **)(&trx_data->next),
trx_data))
;
my_atomic_rwlock_wrunlock(&LOCK_queue);
return trx_data->next;
}
binlog_trx_data *
MYSQL_BIN_LOG::atomic_grab_trx_queue()
{
my_atomic_rwlock_wrlock(&LOCK_queue);
binlog_trx_data *queue= group_commit_queue;
while (!my_atomic_casptr((void **)(&group_commit_queue),
(void **)(&queue),
NULL))
;
my_atomic_rwlock_wrunlock(&LOCK_queue);
return queue;
}
/**
Wait until we get a signal that the binary log has been updated.
......
......@@ -404,8 +404,8 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG
pthread_mutex_t LOCK_index;
pthread_mutex_t LOCK_prep_xids;
/*
Mutex to protect the queue of transactions waiting to participate in group
commit. (Only used on platforms without native atomic operations).
Mutex to protect the queue of non-transactional binlog writes waiting to
participate in group commit.
*/
pthread_mutex_t LOCK_queue;
......@@ -462,8 +462,6 @@ class MYSQL_BIN_LOG: public TC_LOG_group_commit, private MYSQL_LOG
bool write_transaction_to_binlog_events(binlog_trx_data *trx_data);
void trx_group_commit_participant(binlog_trx_data *trx_data);
void trx_group_commit_leader(TC_group_commit_entry *first);
binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data);
binlog_trx_data *atomic_grab_trx_queue();
void mark_xid_done();
void mark_xids_active(uint xid_count);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment