Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
bd2a0a23
Commit
bd2a0a23
authored
Mar 04, 2014
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge MDEV-5754, MDEV-5769, and MDEV-5764 into 10.0-base
parents
1a536c8d
b5b82108
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
292 additions
and
66 deletions
+292
-66
mysql-test/std_data/mariadb-5.5-binlog.000001
mysql-test/std_data/mariadb-5.5-binlog.000001
+0
-0
mysql-test/suite/rpl/r/rpl_mariadb_slave_capability.result
mysql-test/suite/rpl/r/rpl_mariadb_slave_capability.result
+27
-1
mysql-test/suite/rpl/r/rpl_old_master.result
mysql-test/suite/rpl/r/rpl_old_master.result
+27
-0
mysql-test/suite/rpl/t/rpl_mariadb_slave_capability.test
mysql-test/suite/rpl/t/rpl_mariadb_slave_capability.test
+49
-1
mysql-test/suite/rpl/t/rpl_old_master.test
mysql-test/suite/rpl/t/rpl_old_master.test
+49
-0
sql/log_event.cc
sql/log_event.cc
+28
-9
sql/log_event.h
sql/log_event.h
+6
-3
sql/rpl_parallel.cc
sql/rpl_parallel.cc
+77
-38
sql/rpl_parallel.h
sql/rpl_parallel.h
+3
-3
sql/rpl_rli.cc
sql/rpl_rli.cc
+2
-1
sql/rpl_rli.h
sql/rpl_rli.h
+1
-0
sql/slave.cc
sql/slave.cc
+23
-10
No files found.
mysql-test/std_data/mariadb-5.5-binlog.000001
0 → 100644
View file @
bd2a0a23
File added
mysql-test/suite/rpl/r/rpl_mariadb_slave_capability.result
View file @
bd2a0a23
...
@@ -62,6 +62,32 @@ slave-relay-bin.000007 # Query # # # Dummy ev
...
@@ -62,6 +62,32 @@ slave-relay-bin.000007 # Query # # # Dummy ev
slave-relay-bin.000007 # Table_map # # table_id: # (test.t1)
slave-relay-bin.000007 # Table_map # # table_id: # (test.t1)
slave-relay-bin.000007 # Write_rows # # table_id: # flags: STMT_END_F
slave-relay-bin.000007 # Write_rows # # table_id: # flags: STMT_END_F
slave-relay-bin.000007 # Query # # COMMIT
slave-relay-bin.000007 # Query # # COMMIT
*** MDEV-5754: MySQL 5.5 slaves cannot replicate from MariaDB 10.0 ***
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
INSERT INTO t2 VALUES (1);
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
INSERT INTO t2 VALUES (2);
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
SET debug_sync='RESET';
SET debug_sync='RESET';
SET debug_sync='RESET';
show binlog events in 'master-bin.000003' from <binlog_start> limit 0, 8;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000003 # Table_map # # table_id: # (test.t2)
master-bin.000003 # Write_rows # # table_id: # flags: STMT_END_F
master-bin.000003 # Xid # # COMMIT /* XID */
master-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000003 # Table_map # # table_id: # (test.t2)
master-bin.000003 # Write_rows # # table_id: # flags: STMT_END_F
master-bin.000003 # Xid # # COMMIT /* XID */
SELECT * FROM t2 ORDER BY a;
a
1
2
# Test that slave which cannot tolerate holes in binlog stream but
# Test that slave which cannot tolerate holes in binlog stream but
# knows the event does not get dummy event
# knows the event does not get dummy event
include/stop_slave.inc
include/stop_slave.inc
...
@@ -95,5 +121,5 @@ select @@global.replicate_annotate_row_events;
...
@@ -95,5 +121,5 @@ select @@global.replicate_annotate_row_events;
set @@global.debug_dbug= @old_slave_dbug;
set @@global.debug_dbug= @old_slave_dbug;
Clean up.
Clean up.
set @@global.binlog_checksum = @old_master_binlog_checksum;
set @@global.binlog_checksum = @old_master_binlog_checksum;
DROP TABLE t1;
DROP TABLE t1
, t2
;
include/rpl_end.inc
include/rpl_end.inc
mysql-test/suite/rpl/r/rpl_old_master.result
0 → 100644
View file @
bd2a0a23
include/master-slave.inc
[connection master]
include/stop_slave.inc
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1]
SET @old_parallel= @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_host='127.0.0.1', master_port=SERVER_MYPORT_1, master_user='root', master_log_file='master-bin.000001', master_log_pos=4;
include/start_slave.inc
CREATE TABLE t2 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t2 VALUES (1);
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 2
3 4
4 8
5 16
SELECT * FROM t2;
a
1
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel;
DROP TABLE t1;
include/start_slave.inc
DROP TABLE t2;
include/rpl_end.inc
mysql-test/suite/rpl/t/rpl_mariadb_slave_capability.test
View file @
bd2a0a23
--
source
include
/
master
-
slave
.
inc
--
source
include
/
master
-
slave
.
inc
--
source
include
/
have_debug
.
inc
--
source
include
/
have_debug
.
inc
--
source
include
/
have_debug_sync
.
inc
--
source
include
/
have_binlog_format_row
.
inc
--
source
include
/
have_binlog_format_row
.
inc
--
source
include
/
have_innodb
.
inc
connection
master
;
connection
master
;
...
@@ -71,6 +73,52 @@ let $binlog_start= 0;
...
@@ -71,6 +73,52 @@ let $binlog_start= 0;
let
$binlog_limit
=
7
,
5
;
let
$binlog_limit
=
7
,
5
;
--
source
include
/
show_relaylog_events
.
inc
--
source
include
/
show_relaylog_events
.
inc
--
echo
***
MDEV
-
5754
:
MySQL
5.5
slaves
cannot
replicate
from
MariaDB
10.0
***
# The problem was that for a group commit, we get commit id into the
# GTID event, and there was a bug in the code that replaces GTID with
# dummy that failed when commit id was present.
#
# So setup a group commit in InnoDB.
--
connection
master
CREATE
TABLE
t2
(
a
INT
PRIMARY
KEY
)
ENGINE
=
InnoDB
;
let
$binlog_file
=
query_get_value
(
SHOW
MASTER
STATUS
,
File
,
1
);
let
$binlog_start
=
query_get_value
(
SHOW
MASTER
STATUS
,
Position
,
1
);
--
connect
(
con1
,
127.0
.
0.1
,
root
,,
test
,
$SERVER_MYPORT_1
,)
SET
debug_sync
=
'commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'
;
send
INSERT
INTO
t2
VALUES
(
1
);
--
connection
master
SET
debug_sync
=
'now WAIT_FOR master_queued1'
;
--
connect
(
con2
,
127.0
.
0.1
,
root
,,
test
,
$SERVER_MYPORT_1
,)
SET
debug_sync
=
'commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'
;
send
INSERT
INTO
t2
VALUES
(
2
);
--
connection
master
SET
debug_sync
=
'now WAIT_FOR master_queued2'
;
SET
debug_sync
=
'now SIGNAL master_cont1'
;
--
connection
con1
REAP
;
SET
debug_sync
=
'RESET'
;
--
connection
con2
REAP
;
SET
debug_sync
=
'RESET'
;
--
connection
master
SET
debug_sync
=
'RESET'
;
let
$binlog_limit
=
0
,
8
;
--
source
include
/
show_binlog_events
.
inc
--
save_master_pos
--
connection
slave
--
sync_with_master
SELECT
*
FROM
t2
ORDER
BY
a
;
--
echo
# Test that slave which cannot tolerate holes in binlog stream but
--
echo
# Test that slave which cannot tolerate holes in binlog stream but
--
echo
# knows the event does not get dummy event
--
echo
# knows the event does not get dummy event
...
@@ -106,6 +154,6 @@ set @@global.debug_dbug= @old_slave_dbug;
...
@@ -106,6 +154,6 @@ set @@global.debug_dbug= @old_slave_dbug;
--
echo
Clean
up
.
--
echo
Clean
up
.
connection
master
;
connection
master
;
set
@@
global
.
binlog_checksum
=
@
old_master_binlog_checksum
;
set
@@
global
.
binlog_checksum
=
@
old_master_binlog_checksum
;
DROP
TABLE
t1
;
DROP
TABLE
t1
,
t2
;
sync_slave_with_master
;
sync_slave_with_master
;
--
source
include
/
rpl_end
.
inc
--
source
include
/
rpl_end
.
inc
mysql-test/suite/rpl/t/rpl_old_master.test
0 → 100644
View file @
bd2a0a23
# Test replicating off old master.
# We simulate old master by copying in pre-generated binlog files from earlier
# server versions.
--
source
include
/
have_innodb
.
inc
--
source
include
/
master
-
slave
.
inc
--
connection
slave
--
source
include
/
stop_slave
.
inc
--
connection
master
--
let
$datadir
=
`SELECT @@datadir`
--
let
$rpl_server_number
=
1
--
source
include
/
rpl_stop_server
.
inc
--
remove_file
$datadir
/
master
-
bin
.
000001
--
copy_file
$MYSQL_TEST_DIR
/
std_data
/
mariadb
-
5.5
-
binlog
.
000001
$datadir
/
master
-
bin
.
000001
--
let
$rpl_server_number
=
1
--
source
include
/
rpl_start_server
.
inc
--
source
include
/
wait_until_connected_again
.
inc
--
connection
slave
SET
@
old_parallel
=
@@
GLOBAL
.
slave_parallel_threads
;
SET
GLOBAL
slave_parallel_threads
=
10
;
--
replace_result
$SERVER_MYPORT_1
SERVER_MYPORT_1
eval
CHANGE
MASTER
TO
master_host
=
'127.0.0.1'
,
master_port
=
$SERVER_MYPORT_1
,
master_user
=
'root'
,
master_log_file
=
'master-bin.000001'
,
master_log_pos
=
4
;
--
source
include
/
start_slave
.
inc
--
connection
master
CREATE
TABLE
t2
(
a
INT
PRIMARY
KEY
)
ENGINE
=
InnoDB
;
INSERT
INTO
t2
VALUES
(
1
);
--
save_master_pos
--
connection
slave
--
sync_with_master
SELECT
*
FROM
t1
ORDER
BY
a
;
SELECT
*
FROM
t2
;
--
source
include
/
stop_slave
.
inc
SET
GLOBAL
slave_parallel_threads
=@
old_parallel
;
DROP
TABLE
t1
;
--
source
include
/
start_slave
.
inc
--
connection
master
DROP
TABLE
t2
;
--
source
include
/
rpl_end
.
inc
sql/log_event.cc
View file @
bd2a0a23
...
@@ -3648,9 +3648,14 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
...
@@ -3648,9 +3648,14 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
DBUG_ASSERT
(
checksum_alg
==
BINLOG_CHECKSUM_ALG_UNDEF
||
DBUG_ASSERT
(
checksum_alg
==
BINLOG_CHECKSUM_ALG_UNDEF
||
checksum_alg
==
BINLOG_CHECKSUM_ALG_OFF
);
checksum_alg
==
BINLOG_CHECKSUM_ALG_OFF
);
/* Currently we only need to replace GTID event. */
/*
DBUG_ASSERT
(
data_len
==
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
);
Currently we only need to replace GTID event.
if
(
data_len
!=
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
)
The length of GTID differs depending on whether it contains commit id.
*/
DBUG_ASSERT
(
data_len
==
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
||
data_len
==
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
+
2
);
if
(
data_len
!=
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
&&
data_len
!=
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
+
2
)
return
1
;
return
1
;
flags
=
uint2korr
(
p
+
FLAGS_OFFSET
);
flags
=
uint2korr
(
p
+
FLAGS_OFFSET
);
...
@@ -3663,9 +3668,22 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
...
@@ -3663,9 +3668,22 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
int4store
(
q
+
Q_EXEC_TIME_OFFSET
,
0
);
int4store
(
q
+
Q_EXEC_TIME_OFFSET
,
0
);
q
[
Q_DB_LEN_OFFSET
]
=
0
;
q
[
Q_DB_LEN_OFFSET
]
=
0
;
int2store
(
q
+
Q_ERR_CODE_OFFSET
,
0
);
int2store
(
q
+
Q_ERR_CODE_OFFSET
,
0
);
if
(
data_len
==
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
)
{
int2store
(
q
+
Q_STATUS_VARS_LEN_OFFSET
,
0
);
int2store
(
q
+
Q_STATUS_VARS_LEN_OFFSET
,
0
);
q
[
Q_DATA_OFFSET
]
=
0
;
/* Zero terminator for empty db */
q
[
Q_DATA_OFFSET
]
=
0
;
/* Zero terminator for empty db */
q
+=
Q_DATA_OFFSET
+
1
;
q
+=
Q_DATA_OFFSET
+
1
;
}
else
{
DBUG_ASSERT
(
data_len
==
LOG_EVENT_HEADER_LEN
+
GTID_HEADER_LEN
+
2
);
/* Put in an empty time_zone_str to take up the extra 2 bytes. */
int2store
(
q
+
Q_STATUS_VARS_LEN_OFFSET
,
2
);
q
[
Q_DATA_OFFSET
]
=
Q_TIME_ZONE_CODE
;
q
[
Q_DATA_OFFSET
+
1
]
=
0
;
/* Zero length for empty time_zone_str */
q
[
Q_DATA_OFFSET
+
2
]
=
0
;
/* Zero terminator for empty db */
q
+=
Q_DATA_OFFSET
+
3
;
}
memcpy
(
q
,
"BEGIN"
,
5
);
memcpy
(
q
,
"BEGIN"
,
5
);
if
(
checksum_alg
==
BINLOG_CHECKSUM_ALG_CRC32
)
if
(
checksum_alg
==
BINLOG_CHECKSUM_ALG_CRC32
)
...
@@ -6669,7 +6687,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
...
@@ -6669,7 +6687,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
int
Gtid_list_log_event
::
do_apply_event
(
rpl_group_info
*
rgi
)
Gtid_list_log_event
::
do_apply_event
(
rpl_group_info
*
rgi
)
{
{
Relay_log_info
const
*
rli
=
rgi
->
rli
;
Relay_log_info
*
rli
=
const_cast
<
Relay_log_info
*>
(
rgi
->
rli
)
;
int
ret
;
int
ret
;
if
(
gl_flags
&
FLAG_IGN_GTIDS
)
if
(
gl_flags
&
FLAG_IGN_GTIDS
)
{
{
...
@@ -6689,10 +6707,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
...
@@ -6689,10 +6707,11 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
{
char
str_buf
[
128
];
char
str_buf
[
128
];
String
str
(
str_buf
,
sizeof
(
str_buf
),
system_charset_info
);
String
str
(
str_buf
,
sizeof
(
str_buf
),
system_charset_info
);
const_cast
<
Relay_log_info
*>
(
rli
)
->
until_gtid_pos
.
to_string
(
&
str
);
rli
->
until_gtid_pos
.
to_string
(
&
str
);
sql_print_information
(
"Slave SQL thread stops because it reached its"
sql_print_information
(
"Slave SQL thread stops because it reached its"
" UNTIL master_gtid_pos %s"
,
str
.
c_ptr_safe
());
" UNTIL master_gtid_pos %s"
,
str
.
c_ptr_safe
());
const_cast
<
Relay_log_info
*>
(
rli
)
->
abort_slave
=
true
;
rli
->
abort_slave
=
true
;
rli
->
stop_for_until
=
true
;
}
}
return
ret
;
return
ret
;
}
}
...
...
sql/log_event.h
View file @
bd2a0a23
...
@@ -3105,12 +3105,15 @@ public:
...
@@ -3105,12 +3105,15 @@ public:
<td>flags</td>
<td>flags</td>
<td>1 byte bitfield</td>
<td>1 byte bitfield</td>
<td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
<td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
<td>Bit 1 set indicates group commit, and that commit id exists</td>
</tr>
</tr>
<tr>
<tr>
<td>Reserved</td>
<td>Reserved (no group commit) / commit id (group commit) (see flags bit 1)</td>
<td>6 bytes</td>
<td>6 bytes / 8 bytes</td>
<td>Reserved bytes, set to 0. Maybe be used for future expansion.</td>
<td>Reserved bytes, set to 0. Maybe be used for future expansion (no
group commit). OR commit id, same for all GTIDs in the same group
commit (see flags bit 1).</td>
</tr>
</tr>
</table>
</table>
...
...
sql/rpl_parallel.cc
View file @
bd2a0a23
...
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
...
@@ -173,6 +173,7 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
rgi
->
is_error
=
true
;
rgi
->
is_error
=
true
;
rgi
->
cleanup_context
(
thd
,
true
);
rgi
->
cleanup_context
(
thd
,
true
);
rgi
->
rli
->
abort_slave
=
true
;
rgi
->
rli
->
abort_slave
=
true
;
rgi
->
rli
->
stop_for_until
=
false
;
mysql_mutex_lock
(
rgi
->
rli
->
relay_log
.
get_log_lock
());
mysql_mutex_lock
(
rgi
->
rli
->
relay_log
.
get_log_lock
());
mysql_mutex_unlock
(
rgi
->
rli
->
relay_log
.
get_log_lock
());
mysql_mutex_unlock
(
rgi
->
rli
->
relay_log
.
get_log_lock
());
rgi
->
rli
->
relay_log
.
signal_update
();
rgi
->
rli
->
relay_log
.
signal_update
();
...
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
...
@@ -1122,7 +1123,7 @@ rpl_parallel::find(uint32 domain_id)
void
void
rpl_parallel
::
wait_for_done
(
THD
*
thd
)
rpl_parallel
::
wait_for_done
(
THD
*
thd
,
Relay_log_info
*
rli
)
{
{
struct
rpl_parallel_entry
*
e
;
struct
rpl_parallel_entry
*
e
;
rpl_parallel_thread
*
rpt
;
rpl_parallel_thread
*
rpt
;
...
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
...
@@ -1152,9 +1153,13 @@ rpl_parallel::wait_for_done(THD *thd)
started executing yet. So we set e->stop_count here and use it to
started executing yet. So we set e->stop_count here and use it to
decide in the worker threads whether to continue executing an event
decide in the worker threads whether to continue executing an event
group or whether to skip it, when force_abort is set.
group or whether to skip it, when force_abort is set.
If we stop due to reaching the START SLAVE UNTIL condition, then we
need to continue executing any queued events up to that point.
*/
*/
e
->
force_abort
=
true
;
e
->
force_abort
=
true
;
e
->
stop_count
=
e
->
count_committing_event_groups
;
e
->
stop_count
=
rli
->
stop_for_until
?
e
->
count_queued_event_groups
:
e
->
count_committing_event_groups
;
mysql_mutex_unlock
(
&
e
->
LOCK_parallel_entry
);
mysql_mutex_unlock
(
&
e
->
LOCK_parallel_entry
);
for
(
j
=
0
;
j
<
e
->
rpl_thread_max
;
++
j
)
for
(
j
=
0
;
j
<
e
->
rpl_thread_max
;
++
j
)
{
{
...
@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
...
@@ -1190,6 +1195,30 @@ rpl_parallel::wait_for_done(THD *thd)
}
}
/*
This function handles the case where the SQL driver thread reached the
START SLAVE UNTIL position; we stop queueing more events but continue
processing remaining, already queued events; then use executes manual
STOP SLAVE; then this function signals to worker threads that they
should stop the processing of any remaining queued events.
*/
void
rpl_parallel
::
stop_during_until
()
{
struct
rpl_parallel_entry
*
e
;
uint32
i
;
for
(
i
=
0
;
i
<
domain_hash
.
records
;
++
i
)
{
e
=
(
struct
rpl_parallel_entry
*
)
my_hash_element
(
&
domain_hash
,
i
);
mysql_mutex_lock
(
&
e
->
LOCK_parallel_entry
);
if
(
e
->
force_abort
)
e
->
stop_count
=
e
->
count_committing_event_groups
;
mysql_mutex_unlock
(
&
e
->
LOCK_parallel_entry
);
}
}
bool
bool
rpl_parallel
::
workers_idle
()
rpl_parallel
::
workers_idle
()
{
{
...
@@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
...
@@ -1230,11 +1259,12 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
do_event() is executed by the sql_driver_thd thread.
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
It's main purpose is to find a thread that can execute the query.
@retval false ok, event was accepted
@retval 0 ok, event was accepted
@retval true error
@retval 1 error
@retval -1 event should be executed serially, in the sql driver thread
*/
*/
bool
int
rpl_parallel
::
do_event
(
rpl_group_info
*
serial_rgi
,
Log_event
*
ev
,
rpl_parallel
::
do_event
(
rpl_group_info
*
serial_rgi
,
Log_event
*
ev
,
ulonglong
event_size
)
ulonglong
event_size
)
{
{
...
@@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1248,6 +1278,32 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
bool
did_enter_cond
=
false
;
bool
did_enter_cond
=
false
;
const
char
*
old_msg
=
NULL
;
const
char
*
old_msg
=
NULL
;
/* Handle master log name change, seen in Rotate_log_event. */
typ
=
ev
->
get_type_code
();
if
(
unlikely
(
typ
==
ROTATE_EVENT
))
{
Rotate_log_event
*
rev
=
static_cast
<
Rotate_log_event
*>
(
ev
);
if
((
rev
->
server_id
!=
global_system_variables
.
server_id
||
rli
->
replicate_same_server_id
)
&&
!
rev
->
is_relay_log_event
()
&&
!
rli
->
is_in_group
())
{
memcpy
(
rli
->
future_event_master_log_name
,
rev
->
new_log_ident
,
rev
->
ident_len
+
1
);
}
}
/*
Execute queries non-parallel if slave_skip_counter is set, as it's is
easier to skip queries in single threaded mode.
*/
if
(
rli
->
slave_skip_counter
)
return
-
1
;
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
if
(
unlikely
(
!
current
)
&&
typ
!=
GTID_EVENT
)
return
-
1
;
/* ToDo: what to do with this lock?!? */
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock
(
&
rli
->
data_lock
);
mysql_mutex_unlock
(
&
rli
->
data_lock
);
...
@@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1259,21 +1315,20 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
been partially queued, but after that we will just ignore any further
been partially queued, but after that we will just ignore any further
events the SQL driver thread may try to queue, and eventually it will stop.
events the SQL driver thread may try to queue, and eventually it will stop.
*/
*/
if
(((
typ
=
ev
->
get_type_code
())
==
GTID_EVENT
||
is_group_event
=
Log_event
::
is_group_event
(
typ
);
!
(
is_group_event
=
Log_event
::
is_group_event
(
typ
)))
&&
if
((
typ
==
GTID_EVENT
||
!
is_group_event
)
&&
rli
->
abort_slave
)
rli
->
abort_slave
)
sql_thread_stopping
=
true
;
sql_thread_stopping
=
true
;
if
(
sql_thread_stopping
)
if
(
sql_thread_stopping
)
{
{
delete
ev
;
delete
ev
;
/*
/*
Return
false ("no error"); normal stop is not an error, and otherwise the
Return
"no error"; normal stop is not an error, and otherwise the error
error
has already been recorded.
has already been recorded.
*/
*/
return
false
;
return
0
;
}
}
if
(
typ
==
GTID_EVENT
||
unlikely
(
!
current
)
)
if
(
typ
==
GTID_EVENT
)
{
{
uint32
domain_id
;
uint32
domain_id
;
if
(
likely
(
typ
==
GTID_EVENT
))
if
(
likely
(
typ
==
GTID_EVENT
))
...
@@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1288,7 +1343,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
{
my_error
(
ER_OUT_OF_RESOURCES
,
MYF
(
MY_WME
));
my_error
(
ER_OUT_OF_RESOURCES
,
MYF
(
MY_WME
));
delete
ev
;
delete
ev
;
return
true
;
return
1
;
}
}
current
=
e
;
current
=
e
;
}
}
...
@@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1307,7 +1362,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
{
/* This means we were killed. The error is already signalled. */
/* This means we were killed. The error is already signalled. */
delete
ev
;
delete
ev
;
return
true
;
return
1
;
}
}
if
(
!
(
qev
=
cur_thread
->
get_qev
(
ev
,
event_size
,
rli
)))
if
(
!
(
qev
=
cur_thread
->
get_qev
(
ev
,
event_size
,
rli
)))
...
@@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1315,7 +1370,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
&
did_enter_cond
,
old_msg
);
&
did_enter_cond
,
old_msg
);
delete
ev
;
delete
ev
;
return
true
;
return
1
;
}
}
if
(
typ
==
GTID_EVENT
)
if
(
typ
==
GTID_EVENT
)
...
@@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1328,7 +1383,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
&
did_enter_cond
,
old_msg
);
&
did_enter_cond
,
old_msg
);
delete
ev
;
delete
ev
;
return
true
;
return
1
;
}
}
/*
/*
...
@@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1366,7 +1421,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
&
did_enter_cond
,
old_msg
);
&
did_enter_cond
,
old_msg
);
delete
ev
;
delete
ev
;
return
true
;
return
1
;
}
}
e
->
current_gco
=
rgi
->
gco
=
gco
;
e
->
current_gco
=
rgi
->
gco
=
gco
;
}
}
...
@@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1380,7 +1435,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
e
->
current_sub_id
=
rgi
->
gtid_sub_id
;
e
->
current_sub_id
=
rgi
->
gtid_sub_id
;
++
e
->
count_queued_event_groups
;
++
e
->
count_queued_event_groups
;
}
}
else
if
(
!
is_group_event
||
!
e
)
else
if
(
!
is_group_event
)
{
{
my_off_t
log_pos
;
my_off_t
log_pos
;
int
err
;
int
err
;
...
@@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1389,38 +1444,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
but they might be from an old master).
The variable `e' is NULL for the case where the master did not
have GTID, like a MariaDB 5.5 or MySQL master.
*/
*/
qev
->
rgi
=
serial_rgi
;
qev
->
rgi
=
serial_rgi
;
/* Handle master log name change, seen in Rotate_log_event. */
if
(
typ
==
ROTATE_EVENT
)
{
Rotate_log_event
*
rev
=
static_cast
<
Rotate_log_event
*>
(
qev
->
ev
);
if
((
rev
->
server_id
!=
global_system_variables
.
server_id
||
rli
->
replicate_same_server_id
)
&&
!
rev
->
is_relay_log_event
()
&&
!
rli
->
is_in_group
())
{
memcpy
(
rli
->
future_event_master_log_name
,
rev
->
new_log_ident
,
rev
->
ident_len
+
1
);
}
}
tmp
=
serial_rgi
->
is_parallel_exec
;
tmp
=
serial_rgi
->
is_parallel_exec
;
serial_rgi
->
is_parallel_exec
=
true
;
serial_rgi
->
is_parallel_exec
=
true
;
err
=
rpt_handle_event
(
qev
,
NULL
);
err
=
rpt_handle_event
(
qev
,
NULL
);
serial_rgi
->
is_parallel_exec
=
tmp
;
serial_rgi
->
is_parallel_exec
=
tmp
;
log_pos
=
qev
->
ev
->
log_pos
;
log_pos
=
ev
->
log_pos
;
delete_or_keep_event_post_apply
(
serial_rgi
,
typ
,
qev
->
ev
);
delete_or_keep_event_post_apply
(
serial_rgi
,
typ
,
ev
);
if
(
err
)
if
(
err
)
{
{
cur_thread
->
free_qev
(
qev
);
cur_thread
->
free_qev
(
qev
);
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
abandon_worker_thread
(
rli
->
sql_driver_thd
,
cur_thread
,
&
did_enter_cond
,
old_msg
);
&
did_enter_cond
,
old_msg
);
return
true
;
return
1
;
}
}
/*
/*
Queue an empty event, so that the position will be updated in a
Queue an empty event, so that the position will be updated in a
...
@@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
...
@@ -1451,5 +1490,5 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
&
did_enter_cond
,
old_msg
);
&
did_enter_cond
,
old_msg
);
mysql_cond_signal
(
&
cur_thread
->
COND_rpl_thread
);
mysql_cond_signal
(
&
cur_thread
->
COND_rpl_thread
);
return
false
;
return
0
;
}
}
sql/rpl_parallel.h
View file @
bd2a0a23
...
@@ -222,10 +222,10 @@ struct rpl_parallel {
...
@@ -222,10 +222,10 @@ struct rpl_parallel {
~
rpl_parallel
();
~
rpl_parallel
();
void
reset
();
void
reset
();
rpl_parallel_entry
*
find
(
uint32
domain_id
);
rpl_parallel_entry
*
find
(
uint32
domain_id
);
void
wait_for_done
(
THD
*
thd
);
void
wait_for_done
(
THD
*
thd
,
Relay_log_info
*
rli
);
void
stop_during_until
();
bool
workers_idle
();
bool
workers_idle
();
bool
do_event
(
rpl_group_info
*
serial_rgi
,
Log_event
*
ev
,
int
do_event
(
rpl_group_info
*
serial_rgi
,
Log_event
*
ev
,
ulonglong
event_size
);
ulonglong
event_size
);
};
};
...
...
sql/rpl_rli.cc
View file @
bd2a0a23
...
@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
...
@@ -60,7 +60,8 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos
(
0
),
log_space_total
(
0
),
ignore_log_space_limit
(
0
),
group_master_log_pos
(
0
),
log_space_total
(
0
),
ignore_log_space_limit
(
0
),
last_master_timestamp
(
0
),
sql_thread_caught_up
(
true
),
slave_skip_counter
(
0
),
last_master_timestamp
(
0
),
sql_thread_caught_up
(
true
),
slave_skip_counter
(
0
),
abort_pos_wait
(
0
),
slave_run_id
(
0
),
sql_driver_thd
(),
abort_pos_wait
(
0
),
slave_run_id
(
0
),
sql_driver_thd
(),
inited
(
0
),
abort_slave
(
0
),
slave_running
(
0
),
until_condition
(
UNTIL_NONE
),
inited
(
0
),
abort_slave
(
0
),
stop_for_until
(
0
),
slave_running
(
0
),
until_condition
(
UNTIL_NONE
),
until_log_pos
(
0
),
retried_trans
(
0
),
executed_entries
(
0
),
until_log_pos
(
0
),
retried_trans
(
0
),
executed_entries
(
0
),
m_flags
(
0
)
m_flags
(
0
)
{
{
...
...
sql/rpl_rli.h
View file @
bd2a0a23
...
@@ -262,6 +262,7 @@ public:
...
@@ -262,6 +262,7 @@ public:
*/
*/
volatile
bool
inited
;
volatile
bool
inited
;
volatile
bool
abort_slave
;
volatile
bool
abort_slave
;
volatile
bool
stop_for_until
;
volatile
uint
slave_running
;
volatile
uint
slave_running
;
/*
/*
...
...
sql/slave.cc
View file @
bd2a0a23
...
@@ -615,6 +615,13 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
...
@@ -615,6 +615,13 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if
(
thread_mask
&
(
SLAVE_SQL
|
SLAVE_FORCE_ALL
))
if
(
thread_mask
&
(
SLAVE_SQL
|
SLAVE_FORCE_ALL
))
{
{
DBUG_PRINT
(
"info"
,(
"Terminating SQL thread"
));
DBUG_PRINT
(
"info"
,(
"Terminating SQL thread"
));
if
(
opt_slave_parallel_threads
>
0
&&
mi
->
rli
.
abort_slave
&&
mi
->
rli
.
stop_for_until
)
{
mi
->
rli
.
stop_for_until
=
false
;
mi
->
rli
.
parallel
.
stop_during_until
();
}
else
mi
->
rli
.
abort_slave
=
1
;
mi
->
rli
.
abort_slave
=
1
;
if
((
error
=
terminate_slave_thread
(
mi
->
rli
.
sql_driver_thd
,
sql_lock
,
if
((
error
=
terminate_slave_thread
(
mi
->
rli
.
sql_driver_thd
,
sql_lock
,
&
mi
->
rli
.
stop_cond
,
&
mi
->
rli
.
stop_cond
,
...
@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
...
@@ -3414,6 +3421,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
message about error in query execution to be printed.
message about error in query execution to be printed.
*/
*/
rli
->
abort_slave
=
1
;
rli
->
abort_slave
=
1
;
rli
->
stop_for_until
=
true
;
mysql_mutex_unlock
(
&
rli
->
data_lock
);
mysql_mutex_unlock
(
&
rli
->
data_lock
);
delete
ev
;
delete
ev
;
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
...
@@ -3441,13 +3449,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
...
@@ -3441,13 +3449,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_state_of_relay_log
(
rli
,
ev
);
update_state_of_relay_log
(
rli
,
ev
);
if
(
opt_slave_parallel_threads
>
0
)
{
int
res
=
rli
->
parallel
.
do_event
(
serial_rgi
,
ev
,
event_size
);
if
(
res
>=
0
)
DBUG_RETURN
(
res
);
/*
/*
Execute queries in parallel, except if slave_skip_counter is set,
Else we proceed to execute the event non-parallel.
as it's is easier to skip queries in single threaded mode.
This is the case for pre-10.0 events without GTID, and for handling
slave_skip_counter.
*/
*/
}
if
(
opt_slave_parallel_threads
>
0
&&
rli
->
slave_skip_counter
==
0
)
DBUG_RETURN
(
rli
->
parallel
.
do_event
(
serial_rgi
,
ev
,
event_size
));
/*
/*
For GTID, allocate a new sub_id for the given domain_id.
For GTID, allocate a new sub_id for the given domain_id.
...
@@ -4356,6 +4368,7 @@ pthread_handler_t handle_slave_sql(void *arg)
...
@@ -4356,6 +4368,7 @@ pthread_handler_t handle_slave_sql(void *arg)
Seconds_Behind_Master grows. No big deal.
Seconds_Behind_Master grows. No big deal.
*/
*/
rli
->
abort_slave
=
0
;
rli
->
abort_slave
=
0
;
rli
->
stop_for_until
=
false
;
mysql_mutex_unlock
(
&
rli
->
run_lock
);
mysql_mutex_unlock
(
&
rli
->
run_lock
);
mysql_cond_broadcast
(
&
rli
->
start_cond
);
mysql_cond_broadcast
(
&
rli
->
start_cond
);
...
@@ -4526,7 +4539,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
...
@@ -4526,7 +4539,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
}
if
(
opt_slave_parallel_threads
>
0
)
if
(
opt_slave_parallel_threads
>
0
)
rli
->
parallel
.
wait_for_done
(
thd
);
rli
->
parallel
.
wait_for_done
(
thd
,
rli
);
/* Thread stopped. Print the current replication position to the log */
/* Thread stopped. Print the current replication position to the log */
{
{
...
@@ -4552,7 +4565,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
...
@@ -4552,7 +4565,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
get the correct position printed.)
*/
*/
if
(
opt_slave_parallel_threads
>
0
)
if
(
opt_slave_parallel_threads
>
0
)
rli
->
parallel
.
wait_for_done
(
thd
);
rli
->
parallel
.
wait_for_done
(
thd
,
rli
);
/*
/*
Some events set some playgrounds, which won't be cleared because thread
Some events set some playgrounds, which won't be cleared because thread
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment