Commit ba55e5e0 authored by Sergei Golubchik's avatar Sergei Golubchik

merge mdev-4506 into 10.0

parents 598552db 97d50e21
......@@ -699,6 +699,41 @@ void *thd_get_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
*/
void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
const void *ha_data);
/**
Signal that the first part of handler commit is finished, and that the
committed transaction is now visible and has fixed commit ordering with
respect to other transactions. The commit need _not_ be durable yet, and
typically will not be when this call makes sense.
This call is optional, if the storage engine does not call it the upper
layer will after the handler commit() method is done. However, the storage
engine may choose to call it itself to increase the possibility for group
commit.
In-order parallel replication uses this to apply different transaction in
parallel, but delay the commits of later transactions until earlier
transactions have committed first, thus achieving increased performance on
multi-core systems while still preserving full transaction consistency.
The storage engine can call this from within the commit() method, typically
after the commit record has been written to the transaction log, but before
the log has been fsync()'ed. This will allow the next replicated transaction
to proceed to commit before the first one has done fsync() or similar. Thus,
it becomes possible for multiple sequential replicated transactions to share
a single fsync() inside the engine in group commit.
Note that this method should _not_ be called from within the commit_ordered()
method, or any other place in the storage engine. When commit_ordered() is
used (typically when binlog is enabled), the transaction coordinator takes
care of this and makes group commit in the storage engine possible without
any other action needed on the part of the storage engine. This function
thd_wakeup_subsequent_commits() is only needed when no transaction
coordinator is used, meaning a single storage engine and no binary log.
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);
#ifdef __cplusplus
}
#endif
......
......@@ -290,6 +290,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
struct mysql_event_general
{
unsigned int event_subclass;
......
......@@ -290,6 +290,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{
......
......@@ -243,6 +243,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,
......
......@@ -84,6 +84,7 @@ let $script=
s{Server ver:.*DOLLAR}{SERVER_VERSION, BINLOG_VERSION};
s{GTID [0-9]+-[0-9]+-[0-9]+}{GTID #-#-#};
s{\[[0-9]-[0-9]-[0-9]+\]}{[#-#-#]};
s{cid=[0-9]+}{cid=#};
s{SQL_LOAD-[a-z,0-9,-]*.[a-z]*}{SQL_LOAD-<SERVER UUID>-<MASTER server-id>-<file-id>.<extension>};
s{rand_seed1=[0-9]*,rand_seed2=[0-9]*}{rand_seed1=<seed 1>,rand_seed2=<seed 2>};
s{((?:master|slave|slave-relay)-bin\.[0-9]{6};pos=)[0-9]+DOLLAR}{DOLLAR1POS};
......
......@@ -41,6 +41,17 @@ The following options may be given as the first argument:
Type of BINLOG_CHECKSUM_ALG. Include checksum for log
events in the binary log. Possible values are NONE and
CRC32; default is NONE.
--binlog-commit-wait-count=#
If non-zero, binlog write will wait at most
binlog_commit_wait_usec microseconds for at least this
many commits to queue up for group commit to the binlog.
This can reduce I/O on the binlog and provide increased
opportunity for parallel apply on the slave, but too high
a value will decrease commit throughput.
--binlog-commit-wait-usec=#
Maximum time, in microseconds, to wait for more commits
to queue up for binlog group commit. Only takes effect if
the value of binlog_commit_wait_count is non-zero.
--binlog-direct-non-transactional-updates
Causes updates to non-transactional engines using
statement format to be written directly to binary log.
......@@ -861,6 +872,17 @@ The following options may be given as the first argument:
--slave-net-timeout=#
Number of seconds to wait for more data from any
master/slave connection before aborting the read
--slave-parallel-max-queued=#
Limit on how much memory SQL threads should use per
parallel replication thread when reading ahead in the
relay log looking for opportunities for parallel
replication. Only used when --slave-parallel-threads > 0.
--slave-parallel-threads=#
Alpha feature, to only be used by developers doing
testing! If non-zero, number of threads to spawn to apply
in parallel events on the slave that were group-committed
on the master or were logged with GTID in different
replication domains.
--slave-skip-errors=name
Tells the slave thread to continue replication when a
query event returns an error from the provided list
......@@ -1006,6 +1028,8 @@ bind-address (No default value)
binlog-annotate-row-events FALSE
binlog-cache-size 32768
binlog-checksum NONE
binlog-commit-wait-count 0
binlog-commit-wait-usec 100000
binlog-direct-non-transactional-updates FALSE
binlog-format STATEMENT
binlog-optimize-thread-scheduling TRUE
......@@ -1241,6 +1265,8 @@ slave-compressed-protocol FALSE
slave-exec-mode STRICT
slave-max-allowed-packet 1073741824
slave-net-timeout 3600
slave-parallel-max-queued 131072
slave-parallel-threads 0
slave-skip-errors (No default value)
slave-sql-verify-checksum TRUE
slave-transaction-retries 10
......
......@@ -31,6 +31,6 @@ a
1
2
3
InnoDB: Last MySQL binlog file position 0 922, file name ./master-bin.000001
InnoDB: Last MySQL binlog file position 0 926, file name ./master-bin.000001
SET DEBUG_SYNC= 'RESET';
DROP TABLE t1;
......@@ -32,6 +32,6 @@ a
1
2
3
InnoDB: Last MySQL binlog file position 0 922, file name ./master-bin.000001
InnoDB: Last MySQL binlog file position 0 926, file name ./master-bin.000001
SET DEBUG_SYNC= 'RESET';
DROP TABLE t1;
......@@ -38,14 +38,14 @@ order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES
wait/synch/cond/sql/COND_manager YES YES
wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES
wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
wait/synch/cond/sql/COND_server_started YES YES
wait/synch/cond/sql/COND_thread_cache YES YES
wait/synch/cond/sql/COND_thread_count YES YES
wait/synch/cond/sql/Delayed_insert::cond YES YES
wait/synch/cond/sql/Delayed_insert::cond_client YES YES
wait/synch/cond/sql/Event_scheduler::COND_state YES YES
wait/synch/cond/sql/Item_func_sleep::cond YES YES
select * from performance_schema.setup_instruments
where name='Wait';
select * from performance_schema.setup_instruments
......
......@@ -8,6 +8,7 @@ a
1
2
3
SET GLOBAL debug_dbug= '+d,incident_database_resync_on_replace,*';
REPLACE INTO t1 VALUES (4);
SELECT * FROM t1;
a
......
include/rpl_init.inc [topology=1->2]
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
INSERT INTO t2 VALUES (1);
LOCK TABLE t1 WRITE;
SET gtid_domain_id=1;
INSERT INTO t1 VALUES (2);
SET gtid_domain_id=0;
INSERT INTO t2 VALUES (2);
INSERT INTO t2 VALUES (3);
BEGIN;
INSERT INTO t2 VALUES (4);
INSERT INTO t2 VALUES (5);
COMMIT;
INSERT INTO t2 VALUES (6);
SELECT * FROM t2 ORDER by a;
a
1
2
3
4
5
6
SELECT * FROM t1;
a
1
UNLOCK TABLES;
SELECT * FROM t1 ORDER BY a;
a
1
2
*** Test two transactions in different domains committed in opposite order on slave but in a single group commit. ***
include/stop_slave.inc
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
RETURN x;
END
||
SET sql_log_bin=1;
SET @old_format= @@SESSION.binlog_format;
SET binlog_format='statement';
SET gtid_domain_id=1;
INSERT INTO t2 VALUES (foo(10,
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'));
FLUSH LOGS;
SET sql_log_bin=0;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
RETURNS INT DETERMINISTIC
BEGIN
IF d1 != '' THEN
SET debug_sync = d1;
END IF;
IF d2 != '' THEN
SET debug_sync = d2;
END IF;
RETURN x;
END
||
SET sql_log_bin=1;
SET @old_format=@@GLOBAL.binlog_format;
SET GLOBAL binlog_format=statement;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
SET debug_sync='now WAIT_FOR ready1';
SET gtid_domain_id=2;
INSERT INTO t2 VALUES (foo(11,
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'));
SET gtid_domain_id=0;
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
a
10
11
SET debug_sync='now WAIT_FOR ready3';
SET debug_sync='now SIGNAL cont3';
SET debug_sync='now WAIT_FOR ready4';
SET debug_sync='now SIGNAL cont1';
SET debug_sync='now WAIT_FOR ready2';
SET debug_sync='now SIGNAL cont4';
SELECT * FROM t2 WHERE a >= 10 ORDER BY a;
a
10
11
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000002 # Binlog_checkpoint # # slave-bin.000002
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(11,
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4'))
slave-bin.000002 # Xid # # COMMIT /* XID */
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(10,
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1',
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2'))
slave-bin.000002 # Xid # # COMMIT /* XID */
FLUSH LOGS;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
include/start_slave.inc
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
BEGIN;
INSERT INTO t3 VALUES (2,102);
BEGIN;
INSERT INTO t3 VALUES (4,104);
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''));
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
SET binlog_format=statement;
INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''));
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
SET binlog_format=statement;
INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
SELECT * FROM t3 ORDER BY a;
a b
1 1
2 12
3 3
4 14
5 5
6 16
7 7
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000002 # Binlog_checkpoint # # master-bin.000002
master-bin.000002 # Gtid # # GTID #-#-#
master-bin.000002 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
master-bin.000002 # Gtid # # BEGIN GTID #-#-#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7)
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=#
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
master-bin.000002 # Xid # # COMMIT /* XID */
SET debug_sync='now WAIT_FOR slave_queued3';
ROLLBACK;
SET debug_sync='now WAIT_FOR slave_queued1';
ROLLBACK;
SET debug_sync='now WAIT_FOR slave_queued2';
SET debug_sync='now SIGNAL slave_cont1';
SELECT * FROM t3 ORDER BY a;
a b
1 1
2 12
3 3
4 14
5 5
6 16
7 7
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000003 # Binlog_checkpoint # # slave-bin.000003
slave-bin.000003 # Gtid # # GTID #-#-#
slave-bin.000003 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
slave-bin.000003 # Gtid # # BEGIN GTID #-#-#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7)
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14,
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=#
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
*** Test STOP SLAVE in parallel mode ***
include/stop_slave.inc
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
SET sql_log_bin=1;
BEGIN;
INSERT INTO t2 VALUES (20);
INSERT INTO t1 VALUES (20);
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
BEGIN;
INSERT INTO t2 VALUES (21);
START SLAVE;
STOP SLAVE;
ROLLBACK;
include/wait_for_slave_to_stop.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
20
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
a
20
21
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
a b
20 20
include/start_slave.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
20
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
a
20
21
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
a b
20 20
21 21
22 22
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP function foo;
DROP TABLE t1,t2,t3;
include/rpl_end.inc
--loose-debug=+d,incident_database_resync_on_replace
......@@ -7,12 +7,19 @@ CREATE TABLE t1 (a INT);
INSERT INTO t1 VALUES (1),(2),(3);
SELECT * FROM t1;
let $debug_save= `SELECT @@GLOBAL.debug`;
SET GLOBAL debug_dbug= '+d,incident_database_resync_on_replace,*';
# This will generate an incident log event and store it in the binary
# log before the replace statement.
REPLACE INTO t1 VALUES (4);
--save_master_pos
SELECT * FROM t1;
--disable_query_log
eval SET GLOBAL debug_dbug= '$debug_save';
--enable_query_log
connection slave;
# Wait until SQL thread stops with error LOST_EVENT on master
call mtr.add_suppression("Slave SQL.*The incident LOST_EVENTS occured on the master.* 1590");
......
This diff is collapsed.
SET @save_binlog_commit_wait_count= @@GLOBAL.binlog_commit_wait_count;
SELECT @@GLOBAL.binlog_commit_wait_count as 'must be zero because of default';
must be zero because of default
0
SELECT @@SESSION.binlog_commit_wait_count as 'no session var';
ERROR HY000: Variable 'binlog_commit_wait_count' is a GLOBAL variable
SET GLOBAL binlog_commit_wait_count= 0;
SET GLOBAL binlog_commit_wait_count= DEFAULT;
SET GLOBAL binlog_commit_wait_count= 10;
SELECT @@GLOBAL.binlog_commit_wait_count;
@@GLOBAL.binlog_commit_wait_count
10
SET GLOBAL binlog_commit_wait_count = @save_binlog_commit_wait_count;
SET @save_binlog_commit_wait_usec= @@GLOBAL.binlog_commit_wait_usec;
SELECT @@GLOBAL.binlog_commit_wait_usec as 'check default';
check default
100000
SELECT @@SESSION.binlog_commit_wait_usec as 'no session var';
ERROR HY000: Variable 'binlog_commit_wait_usec' is a GLOBAL variable
SET GLOBAL binlog_commit_wait_usec= 0;
SET GLOBAL binlog_commit_wait_usec= DEFAULT;
SET GLOBAL binlog_commit_wait_usec= 10000;
SELECT @@GLOBAL.binlog_commit_wait_usec;
@@GLOBAL.binlog_commit_wait_usec
10000
SET GLOBAL binlog_commit_wait_usec = @save_binlog_commit_wait_usec;
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued;
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default';
Check default
131072
SELECT @@SESSION.slave_parallel_max_queued as 'no session var';
ERROR HY000: Variable 'slave_parallel_max_queued' is a GLOBAL variable
SET GLOBAL slave_parallel_max_queued= 0;
SET GLOBAL slave_parallel_max_queued= DEFAULT;
SET GLOBAL slave_parallel_max_queued= 65536;
SELECT @@GLOBAL.slave_parallel_max_queued;
@@GLOBAL.slave_parallel_max_queued
65536
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued;
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
must be zero because of default
0
SELECT @@SESSION.slave_parallel_threads as 'no session var';
ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable
SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT;
SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads;
@@GLOBAL.slave_parallel_threads
10
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
--source include/not_embedded.inc
SET @save_binlog_commit_wait_count= @@GLOBAL.binlog_commit_wait_count;
SELECT @@GLOBAL.binlog_commit_wait_count as 'must be zero because of default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.binlog_commit_wait_count as 'no session var';
SET GLOBAL binlog_commit_wait_count= 0;
SET GLOBAL binlog_commit_wait_count= DEFAULT;
SET GLOBAL binlog_commit_wait_count= 10;
SELECT @@GLOBAL.binlog_commit_wait_count;
SET GLOBAL binlog_commit_wait_count = @save_binlog_commit_wait_count;
--source include/not_embedded.inc
SET @save_binlog_commit_wait_usec= @@GLOBAL.binlog_commit_wait_usec;
SELECT @@GLOBAL.binlog_commit_wait_usec as 'check default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.binlog_commit_wait_usec as 'no session var';
SET GLOBAL binlog_commit_wait_usec= 0;
SET GLOBAL binlog_commit_wait_usec= DEFAULT;
SET GLOBAL binlog_commit_wait_usec= 10000;
SELECT @@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec = @save_binlog_commit_wait_usec;
--source include/not_embedded.inc
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued;
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.slave_parallel_max_queued as 'no session var';
SET GLOBAL slave_parallel_max_queued= 0;
SET GLOBAL slave_parallel_max_queued= DEFAULT;
SET GLOBAL slave_parallel_max_queued= 65536;
SELECT @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued;
--source include/not_embedded.inc
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads;
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
SELECT @@SESSION.slave_parallel_threads as 'no session var';
SET GLOBAL slave_parallel_threads= 0;
SET GLOBAL slave_parallel_threads= DEFAULT;
SET GLOBAL slave_parallel_threads= 10;
SELECT @@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads;
......@@ -841,7 +841,6 @@ static int findopt(char *optpat, uint length,
{
uint count;
const struct my_option *opt= *opt_res;
my_bool is_prefix= FALSE;
DBUG_ENTER("findopt");
for (count= 0; opt->name; opt++)
......@@ -857,8 +856,6 @@ static int findopt(char *optpat, uint length,
/* We only need to know one prev */
count= 1;
*ffname= opt->name;
if (opt->name[length])
is_prefix= TRUE;
}
else if (strcmp(*ffname, opt->name))
{
......
......@@ -91,7 +91,7 @@ SET (SQL_SOURCE
threadpool_common.cc
../sql-common/mysql_async.c
my_apc.cc my_apc.h
rpl_gtid.cc
rpl_gtid.cc rpl_parallel.cc
table_cache.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_builtin.cc
${GEN_SOURCES}
......
......@@ -1256,6 +1256,8 @@ int ha_commit_trans(THD *thd, bool all)
bool need_prepare_ordered, need_commit_ordered;
my_xid xid;
DBUG_ENTER("ha_commit_trans");
DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
thd, (ulong) thd->variables.option_bits, all));
/* Just a random warning to test warnings pushed during autocommit. */
DBUG_EXECUTE_IF("warn_during_ha_commit_trans",
......@@ -1320,6 +1322,8 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans && (rw_ha_count > 0);
MDL_request mdl_request;
DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
is_real_trans, rw_trans, rw_ha_count));
if (rw_trans)
{
......@@ -1468,8 +1472,11 @@ int ha_commit_one_phase(THD *thd, bool all)
transaction.all.ha_list, see why in trans_register_ha()).
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
int res;
DBUG_ENTER("ha_commit_one_phase");
int res= commit_one_phase_2(thd, all, trans, is_real_trans);
if (is_real_trans && (res= thd->wait_for_prior_commit()))
DBUG_RETURN(res);
res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
......@@ -1508,7 +1515,10 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
{
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
DBUG_RETURN(error);
}
......@@ -1583,7 +1593,10 @@ int ha_rollback_trans(THD *thd, bool all)
}
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
if (all)
thd->transaction_rollback_request= FALSE;
......
This diff is collapsed.
......@@ -45,6 +45,15 @@ class TC_LOG
virtual int open(const char *opt_name)=0;
virtual void close()=0;
/*
Transaction coordinator 2-phase commit.
Must invoke the run_prepare_ordered and run_commit_ordered methods, as
described below for these methods.
In addition, must invoke THD::wait_for_prior_commit(), or equivalent
wait, to ensure that one commit waits for another if registered to do so.
*/
virtual int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered,
bool need_commit_ordered) = 0;
......@@ -76,9 +85,11 @@ protected:
prepare_ordered() or commit_ordered() methods.
*/
extern mysql_mutex_t LOCK_prepare_ordered;
extern mysql_cond_t COND_prepare_ordered;
extern mysql_mutex_t LOCK_commit_ordered;
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
extern PSI_cond_key key_COND_prepare_ordered;
#endif
class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
......@@ -397,6 +408,7 @@ private:
class binlog_cache_mngr;
struct rpl_gtid;
struct wait_for_commit;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
......@@ -445,6 +457,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
group commit, only used when opt_optimize_thread_scheduling is not set.
*/
bool check_purge;
/* Flag used to optimise around wait_for_prior_commit. */
bool queued_by_other;
ulong binlog_id;
};
......@@ -525,7 +539,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
int new_file_impl(bool need_lock);
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry);
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
......@@ -672,6 +687,7 @@ public:
}
void set_max_size(ulong max_size_arg);
void signal_update();
void wait_for_sufficient_commits();
void wait_for_update_relay_log(THD* thd);
int wait_for_update_bin_log(THD* thd, const struct timespec * timeout);
void init(ulong max_size);
......@@ -777,7 +793,8 @@ public:
inline uint32 get_open_count() { return open_count; }
void set_status_variables(THD *thd);
bool is_xidlist_idle();
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional,
uint64 commit_id);
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -145,6 +145,7 @@ public:
{
return m_rows_buf && m_cols.bitmap;
}
bool is_part_of_group() { return 1; }
uint m_row_count; /* The number of rows added to the event */
......@@ -195,15 +196,15 @@ protected:
const uchar *m_curr_row_end; /* One-after the end of the current row */
uchar *m_key; /* Buffer to keep key value during searches */
int find_row(const Relay_log_info *const);
int write_row(const Relay_log_info *const, const bool);
int find_row(rpl_group_info *);
int write_row(rpl_group_info *, const bool);
// Unpack the current row into m_table->record[0]
int unpack_current_row(const Relay_log_info *const rli)
int unpack_current_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row,
int const result= ::unpack_row(rgi, m_table, m_width, m_curr_row,
m_rows_end, &m_cols,
&m_curr_row_end, &m_master_reclength);
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
......@@ -214,9 +215,9 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
virtual int do_update_pos(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
......@@ -267,7 +268,7 @@ private:
0 if execution succeeded, 1 if execution failed.
*/
virtual int do_exec_row(const Relay_log_info *const rli) = 0;
virtual int do_exec_row(rpl_group_info *rgi) = 0;
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
/********** END OF CUT & PASTE FROM Rows_log_event **********/
......@@ -275,7 +276,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int do_apply_event(Old_rows_log_event*,const Relay_log_info*);
int do_apply_event(Old_rows_log_event*, rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
......@@ -324,7 +325,7 @@ private:
RETURN VALUE
Error code, if something went wrong, 0 otherwise.
*/
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start,
uchar const **row_end) = 0;
......@@ -387,7 +388,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
/********** END OF CUT & PASTE FROM Write_rows_log_event **********/
......@@ -403,13 +404,13 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
......@@ -463,7 +464,7 @@ protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
/********** END OF CUT & PASTE FROM Update_rows_log_event **********/
......@@ -481,13 +482,13 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
......@@ -538,7 +539,7 @@ protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_before_row_operations(const Slave_reporting_capability *const);
virtual int do_after_row_operations(const Slave_reporting_capability *const,int);
virtual int do_exec_row(const Relay_log_info *const);
virtual int do_exec_row(rpl_group_info *);
#endif
/********** END CUT & PASTE FROM Delete_rows_log_event **********/
......@@ -556,13 +557,13 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(const Relay_log_info *rli)
{ return Old_rows_log_event::do_apply_event(this,rli); }
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*,
virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*,
uchar const *row_start, uchar const **row_end);
virtual int do_exec_row(TABLE *table);
#endif
......
......@@ -469,10 +469,11 @@ uint lower_case_table_names;
ulong tc_heuristic_recover= 0;
int32 thread_count;
int32 thread_running;
int32 slave_open_temp_tables;
ulong thread_created;
ulong back_log, connect_timeout, concurrency, server_id;
ulong what_to_log;
ulong slow_launch_time, slave_open_temp_tables;
ulong slow_launch_time;
ulong open_files_limit, max_binlog_size;
ulong slave_trans_retries;
uint slave_net_timeout;
......@@ -492,6 +493,7 @@ my_atomic_rwlock_t global_query_id_lock;
my_atomic_rwlock_t thread_running_lock;
my_atomic_rwlock_t thread_count_lock;
my_atomic_rwlock_t statistics_lock;
my_atomic_rwlock_t slave_executed_entries_lock;
ulong aborted_threads, aborted_connects;
ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
......@@ -544,6 +546,11 @@ ulong rpl_recovery_rank=0;
*/
ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
1e010, 1e011, 1e012, 1e013, 1e014, 1e015, 1e016, 1e017, 1e018, 1e019,
......@@ -843,19 +850,20 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_data_lock, key_master_info_run_lock,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_sleep_lock,
key_rpl_group_info_sleep_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOG_INFO_lock,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready;
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_rpl_gtid_state;
......@@ -903,6 +911,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
......@@ -914,7 +923,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
{ &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
{ &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
{ &key_rpl_group_info_sleep_lock, "Rpl_group_info::sleep_lock", 0},
{ &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
{ &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
{ &key_TABLE_SHARE_LOCK_share, "TABLE_SHARE::LOCK_share", 0},
......@@ -926,7 +935,10 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0}
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
......@@ -961,13 +973,16 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_relay_log_info_sleep_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
static PSI_cond_info all_server_conds[]=
{
......@@ -988,6 +1003,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL},
......@@ -1002,18 +1018,22 @@ static PSI_cond_info all_server_conds[]=
{ &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0},
{ &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0},
{ &key_relay_log_info_stop_cond, "Relay_log_info::stop_cond", 0},
{ &key_relay_log_info_sleep_cond, "Relay_log_info::sleep_cond", 0},
{ &key_rpl_group_info_sleep_cond, "Rpl_group_info::sleep_cond", 0},
{ &key_TABLE_SHARE_cond, "TABLE_SHARE::cond", 0},
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_init;
key_thread_slave_init, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]=
{
......@@ -1039,7 +1059,8 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL}
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
#ifdef HAVE_MMAP
......@@ -2040,6 +2061,7 @@ void clean_up(bool print_message)
my_atomic_rwlock_destroy(&thread_running_lock);
my_atomic_rwlock_destroy(&thread_count_lock);
my_atomic_rwlock_destroy(&statistics_lock);
my_atomic_rwlock_destroy(&slave_executed_entries_lock);
free_charsets();
mysql_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("got thread count lock"));
......@@ -2122,6 +2144,7 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_server_started);
mysql_cond_destroy(&COND_server_started);
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
DBUG_VOID_RETURN;
}
......@@ -4339,6 +4362,7 @@ static int init_thread_environment()
&LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
......@@ -7727,7 +7751,7 @@ SHOW_VAR status_vars[]= {
{"Select_range", (char*) offsetof(STATUS_VAR, select_range_count_), SHOW_LONG_STATUS},
{"Select_range_check", (char*) offsetof(STATUS_VAR, select_range_check_count_), SHOW_LONG_STATUS},
{"Select_scan", (char*) offsetof(STATUS_VAR, select_scan_count_), SHOW_LONG_STATUS},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_INT},
#ifdef HAVE_REPLICATION
{"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_SIMPLE_FUNC},
{"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_SIMPLE_FUNC},
......@@ -8003,6 +8027,7 @@ static int mysql_init_variables(void)
my_atomic_rwlock_init(&thread_running_lock);
my_atomic_rwlock_init(&thread_count_lock);
my_atomic_rwlock_init(&statistics_lock);
my_atomic_rwlock_init(slave_executed_entries_lock);
strmov(server_version, MYSQL_SERVER_VERSION);
threads.empty();
thread_cache.empty();
......@@ -9283,6 +9308,7 @@ PSI_stage_info stage_slave_waiting_event_from_coordinator= { 0, "Waiting for an
PSI_stage_info stage_binlog_waiting_background_tasks= { 0, "Waiting for background binlog tasks", 0};
PSI_stage_info stage_binlog_processing_checkpoint_notify= { 0, "Processing binlog checkpoint notification", 0};
PSI_stage_info stage_binlog_stopping_background_thread= { 0, "Stopping binlog background thread", 0};
PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work from SQL thread", 0};
#ifdef HAVE_PSI_INTERFACE
......
......@@ -155,7 +155,7 @@ extern ulong delayed_insert_timeout;
extern ulong delayed_insert_limit, delayed_queue_size;
extern ulong delayed_insert_threads, delayed_insert_writes;
extern ulong delayed_rows_in_use,delayed_insert_errors;
extern ulong slave_open_temp_tables;
extern int32 slave_open_temp_tables;
extern ulonglong query_cache_size;
extern ulong query_cache_limit;
extern ulong query_cache_min_res_unit;
......@@ -178,6 +178,10 @@ extern ulong slave_max_allowed_packet;
extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
extern ulong back_log;
extern ulong executed_events;
extern char language[FN_REFLEN];
......@@ -253,15 +257,16 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_relay_log_info_sleep_lock,
key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
extern PSI_mutex_key key_TABLE_SHARE_LOCK_share, key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_mutex_key key_LOCK_rpl_gtid_state;
......@@ -284,16 +289,20 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_relay_log_info_sleep_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init;
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
......@@ -424,6 +433,7 @@ extern PSI_stage_info stage_slave_waiting_workers_to_exit;
extern PSI_stage_info stage_binlog_waiting_background_tasks;
extern PSI_stage_info stage_binlog_processing_checkpoint_notify;
extern PSI_stage_info stage_binlog_stopping_background_thread;
extern PSI_stage_info stage_waiting_for_work_from_sql_thread;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
Statement instrumentation keys (sql).
......@@ -500,6 +510,7 @@ extern mysql_cond_t COND_manager;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;
extern my_atomic_rwlock_t slave_executed_entries_lock;
extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher,
*opt_ssl_key, *opt_ssl_crl, *opt_ssl_crlpath;
......@@ -642,6 +653,20 @@ inline void thread_safe_decrement32(int32 *value, my_atomic_rwlock_t *lock)
my_atomic_rwlock_wrunlock(lock);
}
inline void thread_safe_increment64(int64 *value, my_atomic_rwlock_t *lock)
{
my_atomic_rwlock_wrlock(lock);
(void) my_atomic_add64(value, 1);
my_atomic_rwlock_wrunlock(lock);
}
inline void thread_safe_decrement64(int64 *value, my_atomic_rwlock_t *lock)
{
my_atomic_rwlock_wrlock(lock);
(void) my_atomic_add64(value, -1);
my_atomic_rwlock_wrunlock(lock);
}
inline void
inc_thread_running()
{
......
......@@ -62,27 +62,28 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
int
rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
/*
Update the GTID position, if we have it and did not already update
it in a GTID transaction.
*/
if ((sub_id= rli->gtid_sub_id))
if ((sub_id= rgi->gtid_sub_id))
{
rli->gtid_sub_id= 0;
if (record_gtid(thd, &rli->current_gtid, sub_id, false, false))
return 1;
update_state_hash(sub_id, &rli->current_gtid);
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid);
}
return 0;
DBUG_RETURN(0);
}
rpl_slave_state::rpl_slave_state()
: inited(false), loaded(false)
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
......@@ -152,6 +153,9 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
list_elem->seq_no= seq_no;
elem->add(list_elem);
if (last_sub_id < sub_id)
last_sub_id= sub_id;
return 0;
}
......@@ -168,7 +172,6 @@ rpl_slave_state::get_element(uint32 domain_id)
if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
return NULL;
elem->list= NULL;
elem->last_sub_id= 0;
elem->domain_id= domain_id;
if (my_hash_insert(&hash, (uchar *)elem))
{
......@@ -311,6 +314,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
DBUG_ENTER("record_gtid");
if (unlikely(!loaded))
{
......@@ -321,7 +325,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
We already complained loudly about this, but we can try to continue
until the DBA fixes it.
*/
return 0;
DBUG_RETURN(0);
}
if (!in_statement)
......@@ -330,7 +334,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
return 1;
DBUG_RETURN(1);
} );
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
......@@ -349,8 +353,11 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->no_replicate= 1;
table->s->is_gtid_slave_pos= TRUE; // TEMPORARY CODE
if (!in_transaction)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
}
bitmap_set_all(table->write_set);
......@@ -485,7 +492,7 @@ end:
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
return err;
DBUG_RETURN(err);
}
......@@ -493,12 +500,9 @@ uint64
rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
element *elem;
lock();
elem= get_element(domain_id);
if (elem)
sub_id= ++elem->last_sub_id;
sub_id= ++last_sub_id;
unlock();
return sub_id;
......
......@@ -60,7 +60,6 @@ struct rpl_slave_state
struct element
{
struct list_element *list;
uint64 last_sub_id;
uint32 domain_id;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
......@@ -68,8 +67,6 @@ struct rpl_slave_state
{
l->next= list;
list= l;
if (last_sub_id < l->sub_id)
last_sub_id= l->sub_id;
}
};
......@@ -78,6 +75,7 @@ struct rpl_slave_state
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_slave_state;
uint64 last_sub_id;
bool inited;
bool loaded;
......@@ -108,7 +106,7 @@ struct rpl_slave_state
int put_back_list(uint32 domain_id, list_element *list);
void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
int record_and_update_gtid(THD *thd, Relay_log_info *rli);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
};
......
This diff is collapsed.
#ifndef RPL_PARALLEL_H
#define RPL_PARALLEL_H
#include "log_event.h"
struct rpl_parallel;
struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
struct rpl_parallel_thread *next; /* For free list. */
struct rpl_parallel_thread_pool *pool;
THD *thd;
struct rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
rpl_group_info *rgi;
ulonglong future_event_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
char future_event_master_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
my_off_t future_event_master_log_pos;
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
void enqueue(queued_event *qev)
{
if (last_in_queue)
last_in_queue->next= qev;
else
event_queue= qev;
last_in_queue= qev;
queued_size+= qev->event_size;
}
void dequeue(queued_event *list)
{
queued_event *tmp;
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
for (tmp= list; tmp; tmp= tmp->next)
queued_size-= tmp->event_size;
}
};
struct rpl_parallel_thread_pool {
uint32 count;
struct rpl_parallel_thread **threads;
struct rpl_parallel_thread *free_list;
mysql_mutex_t LOCK_rpl_thread_pool;
mysql_cond_t COND_rpl_thread_pool;
bool changing;
bool inited;
rpl_parallel_thread_pool();
int init(uint32 size);
void destroy();
struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
};
struct rpl_parallel_entry {
uint32 domain_id;
uint32 last_server_id;
uint64 last_seq_no;
uint64 last_commit_id;
bool active;
rpl_parallel_thread *rpl_thread;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
*/
uint64 last_committed_sub_id;
mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
/*
The sub_id of the last event group in this replication domain that was
queued for execution by a worker thread.
*/
uint64 current_sub_id;
rpl_group_info *current_group_info;
/*
The sub_id of the last event group in the previous batch of group-committed
transactions.
When we spawn parallel worker threads for the next group-committed batch,
they first need to wait for this sub_id to be committed before it is safe
to start executing them.
*/
uint64 prev_groupcommit_sub_id;
};
struct rpl_parallel {
HASH domain_hash;
rpl_parallel_entry *current;
bool sql_thread_stopping;
rpl_parallel();
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
};
extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count,
bool skip_check= false);
#endif /* RPL_PARALLEL_H */
......@@ -186,7 +186,7 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
unpack_row(Relay_log_info const *rli,
unpack_row(rpl_group_info *rgi,
TABLE *table, uint const colcnt,
uchar const *const row_data, uchar const *const row_buffer_end,
MY_BITMAP const *cols,
......@@ -214,18 +214,18 @@ unpack_row(Relay_log_info const *rli,
uint i= 0;
table_def *tabledef= NULL;
TABLE *conv_table= NULL;
bool table_found= rli && rli->get_table_data(table, &tabledef, &conv_table);
bool table_found= rgi && rgi->get_table_data(table, &tabledef, &conv_table);
DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p",
table_found, tabledef, conv_table));
DBUG_ASSERT(table_found);
/*
If rli is NULL it means that there is no source table and that the
If rgi is NULL it means that there is no source table and that the
row shall just be unpacked without doing any checks. This feature
is used by MySQL Backup, but can be used for other purposes as
well.
*/
if (rli && !table_found)
if (rgi && !table_found)
DBUG_RETURN(HA_ERR_GENERIC);
for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
......@@ -313,7 +313,7 @@ unpack_row(Relay_log_info const *rli,
(int) (pack_ptr - old_pack_ptr)));
if (!pack_ptr)
{
rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
"Could not read field '%s' of table '%s.%s'",
f->field_name, table->s->db.str,
table->s->table_name.str);
......
......@@ -21,7 +21,7 @@
#include <rpl_reporting.h>
#include "my_global.h" /* uchar */
class Relay_log_info;
struct rpl_group_info;
struct TABLE;
typedef struct st_bitmap MY_BITMAP;
......@@ -31,7 +31,7 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols,
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int unpack_row(Relay_log_info const *rli,
int unpack_row(rpl_group_info *rgi,
TABLE *table, uint const colcnt,
uchar const *const row_data, uchar const *row_buffer_end,
MY_BITMAP const *cols,
......
......@@ -88,7 +88,7 @@ pack_row_old(TABLE *table, MY_BITMAP const* cols,
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
unpack_row_old(Relay_log_info *rli,
unpack_row_old(rpl_group_info *rgi,
TABLE *table, uint const colcnt, uchar *record,
uchar const *row, const uchar *row_buffer_end,
MY_BITMAP const *cols,
......@@ -141,7 +141,7 @@ unpack_row_old(Relay_log_info *rli,
f->move_field_offset(-offset);
if (!ptr)
{
rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT,
"Could not read field `%s` of table `%s`.`%s`",
f->field_name, table->s->db.str,
table->s->table_name.str);
......@@ -183,7 +183,7 @@ unpack_row_old(Relay_log_info *rli,
if (event_type == WRITE_ROWS_EVENT &&
((*field_ptr)->flags & mask) == mask)
{
rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
rgi->rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD,
"Field `%s` of table `%s`.`%s` "
"has no default value and cannot be NULL",
(*field_ptr)->field_name, table->s->db.str,
......
......@@ -23,7 +23,7 @@ size_t pack_row_old(TABLE *table, MY_BITMAP const* cols,
uchar *row_data, const uchar *record);
#ifdef HAVE_REPLICATION
int unpack_row_old(Relay_log_info *rli,
int unpack_row_old(rpl_group_info *rgi,
TABLE *table, uint const colcnt, uchar *record,
uchar const *row, uchar const *row_buffer_end,
MY_BITMAP const *cols,
......
This diff is collapsed.
This diff is collapsed.
......@@ -1183,20 +1183,20 @@ bool Deferred_log_events::is_empty()
return array.elements == 0;
}
bool Deferred_log_events::execute(Relay_log_info *rli)
bool Deferred_log_events::execute(rpl_group_info *rgi)
{
bool res= false;
DBUG_ENTER("Deferred_log_events::execute");
DBUG_ASSERT(rli->deferred_events_collecting);
DBUG_ASSERT(rgi->deferred_events_collecting);
rli->deferred_events_collecting= false;
rgi->deferred_events_collecting= false;
for (uint i= 0; !res && i < array.elements; i++)
{
Log_event *ev= (* (Log_event **)
dynamic_array_ptr(&array, i));
res= ev->apply_event(rli);
res= ev->apply_event(rgi);
}
rli->deferred_events_collecting= true;
rgi->deferred_events_collecting= true;
DBUG_RETURN(res);
}
......
......@@ -283,7 +283,7 @@ public:
/* queue for exection at Query-log-event time prior the Query */
int add(Log_event *ev);
bool is_empty();
bool execute(Relay_log_info *rli);
bool execute(struct rpl_group_info *rgi);
void rewind();
bool is_last(Log_event *ev) { return ev == last_added; };
};
......
......@@ -7071,3 +7071,7 @@ ER_CANNOT_GRANT_ROLE
ER_CANNOT_REVOKE_ROLE
eng "Cannot revoke role '%s' from: %s."
rum "Rolul '%s' nu poate fi revocat de la: %s."
ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE
eng "Cannot change @@slave_parallel_threads while another change is in progress"
ER_PRIOR_COMMIT_FAILED
eng "Commit failed due to failure of an earlier commit on which this one depends"
This diff is collapsed.
......@@ -51,6 +51,7 @@
class Relay_log_info;
class Master_info;
class Master_info_index;
struct rpl_parallel_thread;
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
......@@ -227,9 +228,12 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
int rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt);
pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(Relay_log_info *rli, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
......
......@@ -1173,6 +1173,9 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
ret= SP_OK;
if (table->file->ha_write_row(table->record[0]))
ret= SP_WRITE_ROW_FAILED;
/* Make change permanent and avoid 'table is marked as crashed' errors */
table->file->extra(HA_EXTRA_FLUSH);
if (ret == SP_OK)
sp_cache_invalidate();
......@@ -1262,6 +1265,8 @@ sp_drop_routine(THD *thd, stored_procedure_type type, sp_name *name)
{
if (table->file->ha_delete_row(table->record[0]))
ret= SP_DELETE_ROW_FAILED;
/* Make change permanent and avoid 'table is marked as crashed' errors */
table->file->extra(HA_EXTRA_FLUSH);
}
if (ret == SP_OK)
......@@ -1372,6 +1377,8 @@ sp_update_routine(THD *thd, stored_procedure_type type, sp_name *name,
ret= SP_WRITE_ROW_FAILED;
else
ret= 0;
/* Make change permanent and avoid 'table is marked as crashed' errors */
table->file->extra(HA_EXTRA_FLUSH);
}
if (ret == SP_OK)
......@@ -1546,7 +1553,11 @@ sp_drop_db_routines(THD *thd, char *db)
if (nxtres != HA_ERR_END_OF_FILE)
ret= SP_KEY_NOT_FOUND;
if (deleted)
{
sp_cache_invalidate();
/* Make change permanent and avoid 'table is marked as crashed' errors */
table->file->extra(HA_EXTRA_FLUSH);
}
}
table->file->ha_index_end();
......
......@@ -57,6 +57,7 @@
#include "sql_table.h" // build_table_filename
#include "datadict.h" // dd_frm_is_view()
#include "sql_hset.h" // Hash_set
#include "rpl_rli.h" // rpl_group_info
#ifdef __WIN__
#include <io.h>
#endif
......@@ -644,11 +645,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection)
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
DBUG_ENTER("mark_temp_tables_as_free_for_reuse");
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
thd->unlock_temporary_tables();
if (thd->rgi_slave)
{
/*
Temporary tables are shared with other by sql execution threads.
As a safety messure, clear the pointer to the common area.
*/
thd->temporary_tables= 0;
}
DBUG_VOID_RETURN;
}
......@@ -662,6 +676,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
void mark_tmp_table_for_reuse(TABLE *table)
{
DBUG_ENTER("mark_tmp_table_for_reuse");
DBUG_ASSERT(table->s->tmp_table);
table->query_id= 0;
......@@ -692,6 +707,7 @@ void mark_tmp_table_for_reuse(TABLE *table)
LOCK TABLES is allowed (but ignored) for a temporary table.
*/
table->reginfo.lock_type= TL_WRITE;
DBUG_VOID_RETURN;
}
......@@ -1031,6 +1047,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table)
/*
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread
Temporary tables created in a sql slave is closed by
Relay_log_info::close_temporary_tables()
*/
bool close_temporary_tables(THD *thd)
......@@ -1045,6 +1065,7 @@ bool close_temporary_tables(THD *thd)
if (!thd->temporary_tables)
DBUG_RETURN(FALSE);
DBUG_ASSERT(!thd->rgi_slave);
if (!mysql_bin_log.is_open())
{
......@@ -1479,26 +1500,16 @@ TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name)
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
{
const char *key;
const char *tmp_key;
char key[MAX_DBKEY_LENGTH];
uint key_length;
char key_suffix[TMP_TABLE_KEY_EXTRA];
TABLE *table;
key_length= get_table_def_key(tl, &key);
int4store(key_suffix, thd->variables.server_id);
int4store(key_suffix + 4, thd->variables.pseudo_thread_id);
key_length= get_table_def_key(tl, &tmp_key);
memcpy(key, tmp_key, key_length);
int4store(key + key_length, thd->variables.server_id);
int4store(key + key_length + 4, thd->variables.pseudo_thread_id);
for (table= thd->temporary_tables; table; table= table->next)
{
if ((table->s->table_cache_key.length == key_length +
TMP_TABLE_KEY_EXTRA) &&
!memcmp(table->s->table_cache_key.str, key, key_length) &&
!memcmp(table->s->table_cache_key.str + key_length, key_suffix,
TMP_TABLE_KEY_EXTRA))
return table;
}
return NULL;
return find_temporary_table(thd, key, key_length + TMP_TABLE_KEY_EXTRA);
}
......@@ -1512,16 +1523,42 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
TABLE *result= 0;
if (!thd->have_temporary_tables())
return NULL;
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
return table;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
}
return NULL;
thd->unlock_temporary_tables();
return result;
}
......@@ -1570,6 +1607,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
/* Table might be in use by some outer statement. */
if (table->query_id && table->query_id != thd->query_id)
{
DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu",
(ulong) table->query_id, (ulong) thd->query_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(-1);
}
......@@ -1598,6 +1638,7 @@ void close_temporary_table(THD *thd, TABLE *table,
table->s->db.str, table->s->table_name.str,
(long) table, table->alias.c_ptr()));
thd->lock_temporary_tables();
if (table->prev)
{
table->prev->next= table->next;
......@@ -1617,12 +1658,14 @@ void close_temporary_table(THD *thd, TABLE *table,
if (thd->temporary_tables)
table->next->prev= 0;
}
if (thd->slave_thread)
if (thd->rgi_slave)
{
/* natural invariant of temporary_tables */
DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables);
slave_open_temp_tables--;
thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock);
table->in_use= 0; // No statistics
}
thd->unlock_temporary_tables();
close_temporary(table, free_share, delete_table);
DBUG_VOID_RETURN;
}
......@@ -5387,14 +5430,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
if (add_to_temporary_tables_list)
{
thd->lock_temporary_tables();
/* growing temp list at the head */
tmp_table->next= thd->temporary_tables;
if (tmp_table->next)
tmp_table->next->prev= tmp_table;
thd->temporary_tables= tmp_table;
thd->temporary_tables->prev= 0;
if (thd->slave_thread)
slave_open_temp_tables++;
if (thd->rgi_slave)
{
thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock);
}
thd->unlock_temporary_tables();
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,
......@@ -5549,9 +5596,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
*/
DBUG_ASSERT(!tl->derived && !tl->schema_table);
if (tl->open_type == OT_BASE_ONLY)
if (tl->open_type == OT_BASE_ONLY || !thd->have_temporary_tables())
{
DBUG_PRINT("info", ("skip_temporary is set"));
DBUG_PRINT("info", ("skip_temporary is set or no temporary tables"));
DBUG_RETURN(FALSE);
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment