Bug #26783 replication status unknown after cluster or mysqld failure

- update the ndb_apply_status table with binlog info
parent 77d37e58
......@@ -2956,3 +2956,4 @@ win/vs71cache.txt
win/vs8cache.txt
zlib/*.ds?
zlib/*.vcproj
client/rpl_constants.h
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
create table t1 (a int key, b int) engine innodb;
create table t2 (a int key, b int) engine innodb;
alter table t1 engine ndb;
alter table t2 engine ndb;
insert into t1 values (1,2);
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
@start_pos:=start_pos @end_pos:=end_pos
<start_pos> <end_pos>
show binlog events from <start_pos> limit 1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 <start_pos> Query 1 # use `test`; insert into t1 values (1,2)
show binlog events from <start_pos> limit 1,1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Xid 1 445 COMMIT /* XID */
begin;
insert into t1 values (2,3);
insert into t2 values (3,4);
commit;
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
@start_pos:=start_pos @end_pos:=end_pos
<start_pos> <end_pos>
show binlog events from <start_pos> limit 1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 <start_pos> Query 1 # use `test`; BEGIN
show binlog events from <start_pos> limit 1,2;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # use `test`; insert into t1 values (2,3)
master-bin.000001 # Query # # use `test`; insert into t2 values (3,4)
show binlog events from <start_pos> limit 3,1;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Xid 1 <end_pos> COMMIT /* XID */
--source include/have_ndb.inc
--source include/have_innodb.inc
--source include/have_binlog_format_mixed_or_statement.inc
--source include/master-slave.inc
--connection master
create table t1 (a int key, b int) engine innodb;
create table t2 (a int key, b int) engine innodb;
--sync_slave_with_master
--connection slave
alter table t1 engine ndb;
alter table t2 engine ndb;
# check binlog position without begin
--connection master
insert into t1 values (1,2);
--sync_slave_with_master
--connection slave
--replace_column 1 <start_pos> 2 <end_pos>
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
--let $start_pos = `select @start_pos`
--let $end_pos = `select @end_pos`
--connection master
# here is actually a bug, since there is no begin statement, the
# query is autocommitted, and end_pos shows end of the insert and not
# end of the commit
--replace_result $start_pos <start_pos>
--replace_column 5 #
--eval show binlog events from $start_pos limit 1
--replace_result $start_pos <start_pos> $end_pos <end_pos>
--replace_column 2 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
--eval show binlog events from $start_pos limit 1,1
# check binlog position with begin
--connection master
begin;
insert into t1 values (2,3);
insert into t2 values (3,4);
commit;
--sync_slave_with_master
--connection slave
--replace_column 1 <start_pos> 2 <end_pos>
select @start_pos:=start_pos, @end_pos:=end_pos from mysql.ndb_apply_status;
--let $start_pos = `select @start_pos`
--let $end_pos = `select @end_pos`
--connection master
--replace_result $start_pos <start_pos>
--replace_column 5 #
--eval show binlog events from $start_pos limit 1
--replace_result $start_pos <start_pos>
--replace_column 2 # 4 # 5 #
--eval show binlog events from $start_pos limit 1,2
--replace_result $start_pos <start_pos> $end_pos <end_pos>
--replace_column 2 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
--eval show binlog events from $start_pos limit 3,1
......@@ -4123,6 +4123,58 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
- refresh list of the indexes for the table if needed (if altered)
*/
#ifdef HAVE_NDB_BINLOG
extern MASTER_INFO *active_mi;
static int ndbcluster_update_apply_status(THD *thd, int do_update)
{
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *ndbtab;
NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
ndb->setDatabaseName(NDB_REP_DB);
Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
if (!(ndbtab= ndbtab_g.get_table()))
{
return -1;
}
NdbOperation *op= 0;
int r= 0;
r|= (op= trans->getNdbOperation(ndbtab)) == 0;
DBUG_ASSERT(r == 0);
if (do_update)
r|= op->updateTuple();
else
r|= op->writeTuple();
DBUG_ASSERT(r == 0);
// server_id
r|= op->equal(0u, (Uint64)thd->server_id);
DBUG_ASSERT(r == 0);
if (!do_update)
{
// epoch
r|= op->setValue(1u, (Uint64)0);
DBUG_ASSERT(r == 0);
}
// log_name
char tmp_buf[FN_REFLEN];
ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
active_mi->rli.group_master_log_name,
strlen(active_mi->rli.group_master_log_name));
r|= op->setValue(2u, tmp_buf);
DBUG_ASSERT(r == 0);
// start_pos
r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
DBUG_ASSERT(r == 0);
// end_pos
r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos +
((Uint64)active_mi->rli.future_event_relay_log_pos -
(Uint64)active_mi->rli.group_relay_log_pos));
DBUG_ASSERT(r == 0);
return 0;
}
#endif /* HAVE_NDB_BINLOG */
int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
int error=0;
......@@ -4173,6 +4225,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
trans_register_ha(thd, FALSE, ndbcluster_hton);
}
else
......@@ -4189,6 +4242,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd_ndb->init_open_tables();
thd_ndb->all= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
thd_ndb->trans_options= 0;
trans_register_ha(thd, TRUE, ndbcluster_hton);
/*
......@@ -4229,7 +4283,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
// Start of transaction
m_rows_changed= 0;
m_ops_pending= 0;
#ifdef HAVE_NDB_BINLOG
if (m_share == ndb_apply_status_share && thd->slave_thread)
thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
#endif
// TODO remove double pointers...
m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
m_table_info= &m_thd_ndb_share->stat;
......@@ -4372,6 +4429,11 @@ static int ndbcluster_commit(handlerton *hton, THD *thd, bool all)
"stmt" : "all"));
DBUG_ASSERT(ndb && trans);
#ifdef HAVE_NDB_BINLOG
if (thd->slave_thread)
ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
#endif /* HAVE_NDB_BINLOG */
if (execute_commit(thd,trans) != 0)
{
const NdbError err= trans->getNdbError();
......
......@@ -593,6 +593,11 @@ enum THD_NDB_OPTIONS
TNO_NO_LOG_SCHEMA_OP= 1 << 0
};
enum THD_NDB_TRANS_OPTIONS
{
TNTO_INJECTED_APPLY_STATUS= 1 << 0
};
struct Ndb_local_table_statistics {
int no_uncommitted_rows_count;
ulong last_count;
......@@ -620,6 +625,7 @@ class Thd_ndb
NdbTransaction *stmt;
int error;
uint32 options;
uint32 trans_options;
List<NDB_SHARE> changed_tables;
uint query_state;
HASH open_tables;
......
......@@ -953,8 +953,8 @@ static void ndbcluster_get_schema(NDB_SHARE *share,
/*
helper function to pack a ndb varchar
*/
static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz)
char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz)
{
switch (col->getArrayType())
{
......
......@@ -181,6 +181,8 @@ int ndbcluster_find_all_files(THD *thd);
void ndb_unpack_record(TABLE *table, NdbValue *value,
MY_BITMAP *defined, byte *buf);
char *ndb_pack_varchar(const NDBCOL *col, char *buf,
const char *str, int sz);
NDB_SHARE *ndbcluster_get_share(const char *key,
TABLE *table,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment