Commit d08b893b authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6775: Wrong binlog order in parallel replication: Intermediate commit

The code in binlog group commit around wait_for_commit that controls commit
order, did the wakeup of subsequent commits early, as soon as a following
transaction is put into the group commit queue, but before any such commit has
actually taken place. This causes problems with too early wakeup of
transactions that need to wait for prior to commit, but do not take part in
the binlog group commit for one reason or the other.

This patch solves the problem, by moving the wakeup to happen only after the
binlog group commit is completed.

This requires a new solution to ensure that transactions that arrive later
than the leader are still able to participate in group commit. This patch
introduces a flag wait_for_commit::commit_started. When this is set, a waiter
can queue up itself in the group commit queue.

This way, effectively the wait_for_prior_commit() is skipped only for
transactions that participate in group commit, so that skipping the wait is
safe. Other transactions still wait as needed for correctness.
parent eec04fb4
......@@ -923,6 +923,55 @@ a
32
33
34
*** MDEV-6775: Wrong binlog order in parallel replication ***
DELETE FROM t4;
INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6);
include/save_master_gtid.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log";
SET @old_format=@@GLOBAL.binlog_format;
SET GLOBAL binlog_format=ROW;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET @old_format= @@binlog_format;
SET binlog_format= statement;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
UPDATE t4 SET b=NULL WHERE a=6;
SET debug_sync='now WAIT_FOR master_queued1';
SET @old_format= @@binlog_format;
SET binlog_format= statement;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
DELETE FROM t4 WHERE b <= 3;
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
SET binlog_format= @old_format;
SET binlog_format= @old_format;
SET debug_sync='RESET';
SELECT * FROM t4 ORDER BY a;
a b
1 NULL
3 NULL
4 4
5 NULL
6 NULL
include/start_slave.inc
SET debug_sync= 'now WAIT_FOR waiting';
SELECT * FROM t4 ORDER BY a;
a b
1 NULL
3 NULL
4 4
5 NULL
6 NULL
SET debug_sync= 'now SIGNAL cont';
include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL binlog_format= @old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
......
......@@ -1466,6 +1466,75 @@ SET sql_slave_skip_counter= 1;
SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
--echo *** MDEV-6775: Wrong binlog order in parallel replication ***
--connection server_1
# A bit tricky bug to reproduce. On the master, we binlog in statement-mode
# two transactions, an UPDATE followed by a DELETE. On the slave, we replicate
# with binlog-mode set to ROW, which means the DELETE, which modifies no rows,
# is not binlogged. Then we inject a wait in the group commit code on the
# slave, shortly before the actual commit of the UPDATE. The bug was that the
# DELETE could wake up from wait_for_prior_commit() before the commit of the
# UPDATE. So the test could see the slave position updated to after DELETE,
# while the UPDATE was still not visible.
DELETE FROM t4;
INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6);
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log";
SET @old_format=@@GLOBAL.binlog_format;
SET GLOBAL binlog_format=ROW;
# Re-spawn the worker threads to be sure they pick up the new binlog format
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--connection con1
SET @old_format= @@binlog_format;
SET binlog_format= statement;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
send UPDATE t4 SET b=NULL WHERE a=6;
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connection con2
SET @old_format= @@binlog_format;
SET binlog_format= statement;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
send DELETE FROM t4 WHERE b <= 3;
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
--connection con1
REAP;
SET binlog_format= @old_format;
--connection con2
REAP;
SET binlog_format= @old_format;
SET debug_sync='RESET';
--save_master_pos
SELECT * FROM t4 ORDER BY a;
--connection server_2
--source include/start_slave.inc
SET debug_sync= 'now WAIT_FOR waiting';
--sync_with_master
SELECT * FROM t4 ORDER BY a;
SET debug_sync= 'now SIGNAL cont';
# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC.
--source include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL binlog_format= @old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
......@@ -6771,6 +6771,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all
waiters.
And if a transaction is marked to wait for a prior transaction, but that
prior transaction is already queued for group commit, then we can queue the
new transaction directly to participate in the group commit.
@retval < 0 Error
@retval > 0 If queued as the first entry in the queue (meaning this
is the leader)
......@@ -6780,8 +6784,8 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
group_commit_entry *entry, *orig_queue;
wait_for_commit *cur, *last;
group_commit_entry *entry, *orig_queue, *last;
wait_for_commit *cur;
wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
......@@ -6798,8 +6802,17 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
if (wfc && wfc->waitee)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
if (wfc->waitee)
/*
Do an extra check here, this time safely under lock.
If waitee->commit_started is set, it means that the transaction we need
to wait for has already queued up for group commit. In this case it is
safe for us to queue up immediately as well, increasing the opprtunities
for group commit. Because waitee has taken the LOCK_prepare_ordered
before setting the flag, so there is no risk that we can queue ahead of
it.
*/
if (wfc->waitee && !wfc->waitee->commit_started)
{
PSI_stage_info old_stage;
wait_for_commit *loc_waitee;
......@@ -6812,6 +6825,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
This other transaction may then take over the commit process for us to
get us included in its own group commit. If this happens, the
queued_by_other flag is set.
Setting this flag may or may not be seen by the other thread, but we
are safe in any case: The other thread will set queued_by_other under
its LOCK_wait_commit, and we will not check queued_by_other only after
we have been woken up.
*/
wfc->opaque_pointer= orig_entry;
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
......@@ -6884,41 +6902,41 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/*
Iteratively process everything added to the queue, looking for waiters,
and their waiters, and so on. If a waiter is ready to commit, we
immediately add it to the queue; if not we just wake it up.
immediately add it to the queue, and mark it as queued_by_other.
This would be natural to do with recursion, but we want to avoid
potentially unbounded recursion blowing the C stack, so we use the list
approach instead.
We keep a list of all the waiters that need to be processed in `list',
linked through the next_subsequent_commit pointer. Initially this list
contains only the entry passed into this function.
We keep a list of the group_commit_entry of all the waiters that need to
be processed. Initially this list contains only the entry passed into this
function.
We process entries in the list one by one. The element currently being
processed is pointed to by `cur`, and the element at the end of the list
processed is pointed to by `entry`, and the element at the end of the list
is pointed to by `last` (we do not use NULL to terminate the list).
As we process an element, it is first added to the group_commit_queue.
Then any waiters for that element are added at the end of the list, to
be processed in subsequent iterations. This continues until the list
is exhausted, with all elements ever added eventually processed.
As we process an entry, any waiters for that entry are added at the end of
the list, to be processed in subsequent iterations. The the entry is added
to the group_commit_queue. This continues until the list is exhausted,
with all entries ever added eventually processed.
The end result is a breath-first traversal of the tree of waiters,
re-using the next_subsequent_commit pointers in place of extra stack
space in a recursive traversal.
re-using the `next' pointers of the group_commit_entry objects in place of
extra stack space in a recursive traversal.
The temporary list created in next_subsequent_commit is not
used by the caller or any other function.
The temporary list linked through these `next' pointers is not used by the
caller or any other function; it only exists while doing the iterative
tree traversal. After, all the processed entries are linked into the
group_commit_queue.
*/
cur= wfc;
last= wfc;
last= orig_entry;
entry= orig_entry;
for (;;)
{
/* Add the entry to the group commit queue. */
entry->next= group_commit_queue;
group_commit_queue= entry;
group_commit_entry *next_entry;
if (entry->cache_mngr->using_xa)
{
......@@ -6927,20 +6945,29 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
}
if (!cur)
break; // Can happen if initial entry has no wait_for_commit
if (cur)
{
/*
Now that we have taken LOCK_prepare_ordered and will queue up in the
group commit queue, it is safe for following transactions to queue
themselves. We will grab here any transaction that is now ready to
queue up, but after that, more transactions may become ready while the
leader is waiting to start the group commit. So set the flag
`commit_started', so that later transactions can still participate in
the group commit..
*/
cur->commit_started= true;
/*
Check if this transaction has other transaction waiting for it to commit.
Check if this transaction has other transaction waiting for it to
commit.
If so, process the waiting transactions, and their waiters and so on,
transitively.
*/
if (cur->subsequent_commits_list)
{
wait_for_commit *waiter;
wait_for_commit *wakeup_list= NULL;
wait_for_commit **wakeup_next_ptr= &wakeup_list;
wait_for_commit *waiter, **waiter_ptr;
mysql_mutex_lock(&cur->LOCK_wait_commit);
/*
......@@ -6948,10 +6975,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
non-empty.
*/
waiter= cur->subsequent_commits_list;
cur->subsequent_commits_list= NULL;
waiter_ptr= &cur->subsequent_commits_list;
while (waiter)
{
wait_for_commit *next= waiter->next_subsequent_commit;
wait_for_commit *next_waiter= waiter->next_subsequent_commit;
group_commit_entry *entry2=
(group_commit_entry *)waiter->opaque_pointer;
if (entry2)
......@@ -6963,99 +6990,50 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
so that the other thread will know when it wakes up that it was
already processed.
So put it at the end of the list to be processed in a subsequent
iteration of the outer loop.
So remove it from the list of our waiters, and instead put it at
the end of the list to be processed in a subsequent iteration of
the outer loop.
*/
*waiter_ptr= next_waiter;
entry2->queued_by_other= true;
last->next_subsequent_commit= waiter;
last= waiter;
last->next= entry2;
last= entry2;
/*
As a small optimisation, we do not actually need to set
waiter->next_subsequent_commit to NULL, as we can use the
pointer `last' to check for end-of-list.
entry2->next to NULL, as we can use the pointer `last' to check
for end-of-list.
*/
}
else
{
/*
Wake up the waiting transaction.
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
So we need to put these on a list and delay the wakeup until we
have released the lock.
This transaction is not ready to participate in the group commit
yet, so leave it in the waiter list. It might join the group
commit later, if it completes soon enough to do so (it will see
our wfc->commit_started flag set), or it might commit later in a
later group commit.
*/
*wakeup_next_ptr= waiter;
wakeup_next_ptr= &waiter->next_subsequent_commit;
waiter_ptr= &waiter->next_subsequent_commit;
}
waiter= next;
waiter= next_waiter;
}
if (wakeup_list)
{
/* Now release our lock and do the wakeups that were delayed above. */
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
for (;;)
{
wait_for_commit *next;
/*
ToDo: We wakeup the waiter here, so that it can have the chance to
reach its own commit state and queue up for this same group commit,
if it is still pending.
One problem with this is that if the waiter does not reach its own
commit state before this group commit starts, and then the group
commit fails (binlog write failure), we do not get to propagate
the error to the waiter.
A solution for this could be to delay the wakeup until commit is
successful. But then we need to set a flag in the waitee that it is
already queued for group commit, so that the waiter can check this
flag and queue itself if it _does_ reach the commit state in time.
(But error handling in case of binlog write failure is currently
broken in other ways, as well).
*/
if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
{
/* The last one in the list. */
wakeup_list->wakeup(0);
break;
}
/*
Important: don't access wakeup_list->next after the wakeup() call,
it may be invalidated by the other thread.
*/
next= wakeup_list->next_subsequent_commit;
wakeup_list->wakeup(0);
wakeup_list= next;
}
/*
We need a full memory barrier between walking the list and clearing
the flag wakeup_subsequent_commits_running. This barrier is needed
to ensure that no other thread will start to modify the list
pointers before we are done traversing the list.
But wait_for_commit::wakeup(), which was called above, does a full
memory barrier already (it locks a mutex).
*/
cur->wakeup_subsequent_commits_running= false;
}
else
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
/* Add the entry to the group commit queue. */
next_entry= entry->next;
entry->next= group_commit_queue;
group_commit_queue= entry;
if (entry == last)
break;
/*
Move to the next entry in the flattened list of waiting transactions
that still need to be processed transitively.
*/
cur= cur->next_subsequent_commit;
entry= (group_commit_entry *)cur->opaque_pointer;
entry= next_entry;
DBUG_ASSERT(entry != NULL);
cur= entry->thd->wait_for_commit_ptr;
}
if (opt_binlog_commit_wait_count > 0)
......@@ -7111,6 +7089,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
}
mysql_mutex_unlock(&LOCK_commit_ordered);
entry->thd->wakeup_subsequent_commits(entry->error);
if (next)
{
......@@ -7144,7 +7123,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
}
if (likely(!entry->error))
return 0;
return entry->thd->wait_for_prior_commit();
switch (entry->error)
{
......@@ -7202,10 +7181,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
LINT_INIT(binlog_id);
{
DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN
("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1")));
);
/*
Lock the LOCK_log(), and once we get it, collect any additional writes
that queued up while we were waiting.
*/
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log");
mysql_mutex_lock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
......@@ -7412,6 +7396,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
if (current->cache_mngr->using_xa && !current->error &&
DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
run_commit_ordered(current->thd, current->all);
current->thd->wakeup_subsequent_commits(current->error);
/*
Careful not to access current->next after waking up the other thread! As
......
......@@ -6342,6 +6342,7 @@ wait_for_commit::reinit()
opaque_pointer= NULL;
wakeup_error= 0;
wakeup_subsequent_commits_running= false;
commit_started= false;
#ifdef SAFE_MUTEX
/*
When using SAFE_MUTEX, the ordering between taking the LOCK_wait_commit
......
......@@ -1713,6 +1713,16 @@ struct wait_for_commit
on that function for details.
*/
bool wakeup_subsequent_commits_running;
/*
This flag can be set when a commit starts, but has not completed yet.
It is used by binlog group commit to allow a waiting transaction T2 to
join the group commit of an earlier transaction T1. When T1 has queued
itself for group commit, it will set the commit_started flag. Then when
T2 becomes ready to commit and needs to wait for T1 to commit first, T2
can queue itself before waiting, and thereby participate in the same
group commit as T1.
*/
bool commit_started;
void register_wait_for_prior_commit(wait_for_commit *waitee);
int wait_for_prior_commit(THD *thd)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment