Commit 6d1e55f5 authored by unknown's avatar unknown

MDEV-4506: Parallel replication: Intermediate commit.

A few fixes following tests. Now can apply one INSERT event in
a separate worker thread.
parent 26a9fbc4
--source include/have_binlog_format_statement.inc
connect (s1,127.0.0.1,root,,test,$MASTER_MYPORT,);
connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,);
--connection s1
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM;
SET sql_log_bin=1;
--connection s2
SELECT @@server_id;
SET sql_log_bin=0;
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM;
SET sql_log_bin=1;
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
master_user='root', master_use_gtid=current_pos;
--connection s1
INSERT INTO t1 VALUES (1);
--connection s2
query_vertical SHOW SLAVE STATUS;
--source include/start_slave.inc
SELECT * FROM t1;
--sleep 1
SELECT * FROM t1;
--source include/stop_slave.inc
--connection s1
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
--connection s2
RESET SLAVE ALL;
SET sql_log_bin=0;
DROP TABLE t1;
SET sql_log_bin=1;
...@@ -13,9 +13,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -13,9 +13,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt) struct rpl_parallel_thread *rpt)
{ {
int err; int err;
Relay_log_info *rli= qev->rli;
thd->rli_slave= rli;
thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */ /* ToDo: Access to thd, and what about rli, split out a parallel part? */
err= apply_event_and_update_pos(qev->ev, thd, qev->rli, rpt); mysql_mutex_lock(&rli->data_lock);
err= apply_event_and_update_pos(qev->ev, thd, rli, rpt);
/* ToDo: error handling. */ /* ToDo: error handling. */
/* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */
} }
...@@ -108,7 +112,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -108,7 +112,7 @@ handle_rpl_parallel_thread(void *arg)
} }
} }
rpt_handle_event(events, thd, rpt); rpt_handle_event(events, thd, rpt);
free(events); my_free(events);
events= next; events= next;
} }
...@@ -313,6 +317,7 @@ rpl_parallel_thread_pool::destroy() ...@@ -313,6 +317,7 @@ rpl_parallel_thread_pool::destroy()
rpl_parallel_change_thread_count(this, 0, true); rpl_parallel_change_thread_count(this, 0, true);
mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_mutex_destroy(&LOCK_rpl_thread_pool);
mysql_cond_destroy(&COND_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool);
inited= false;
} }
...@@ -325,8 +330,8 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) ...@@ -325,8 +330,8 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
while ((rpt= free_list) == NULL) while ((rpt= free_list) == NULL)
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
free_list= rpt->next; free_list= rpt->next;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->current_entry= entry; rpt->current_entry= entry;
return rpt; return rpt;
...@@ -383,6 +388,9 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd) ...@@ -383,6 +388,9 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd)
rpl_parallel_thread *cur_thread; rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev; rpl_parallel_thread::queued_event *qev;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0)))) MYF(0))))
{ {
......
...@@ -5800,7 +5800,8 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5800,7 +5800,8 @@ static Log_event* next_event(Relay_log_info* rli)
llstr(my_b_tell(cur_log),llbuf1), llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2))); llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); DBUG_ASSERT(opt_slave_parallel_threads > 0 ||
my_b_tell(cur_log) == rli->event_relay_log_pos);
} }
#endif #endif
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment