wl2325, distribution of schema operations between mysql servers

parent aef3ead4
...@@ -8,8 +8,6 @@ a b c ...@@ -8,8 +8,6 @@ a b c
2 two two 2 two two
alter table t1 drop index c; alter table t1 drop index c;
select * from t1 where c = 'two'; select * from t1 where c = 'two';
ERROR HY000: Table definition has changed, please retry transaction
select * from t1 where c = 'two';
a b c a b c
2 two two 2 two two
drop table t1; drop table t1;
......
drop table if exists t1,t2;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop database if exists mysqltest;
reset master;
reset master;
create database mysqltest;
use mysqltest;
create table t1 (a int primary key) engine=ndb;
create table t2 (a int primary key) engine=ndb;
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # create database mysqltest
master-bin1.000001 # Query # # use `mysqltest`; create table t1 (a int primary key) engine=ndb
master-bin1.000001 # Query # # use `test`; create table t2 (a int primary key) engine=ndb
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # create database mysqltest
master-bin.000001 # Query # # use `mysqltest`; create table t1 (a int primary key) engine=ndb
master-bin.000001 # Query # # use `test`; create table t2 (a int primary key) engine=ndb
reset master;
reset master;
use mysqltest;
drop table test.t2;
create table t2 (a int primary key) engine=ndb;
alter table t2 add column (b int);
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster_replication.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `mysqltest`; drop table test.t2
master-bin1.000001 # Query # # use `mysqltest`; create table t2 (a int primary key) engine=ndb
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster_replication.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `mysqltest`; alter table t2 add column (b int)
reset master;
reset master;
ALTER DATABASE mysqltest CHARACTER SET latin1;
insert into t1 values (1);
drop table t1;
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # ALTER DATABASE mysqltest CHARACTER SET latin1
master-bin.000001 # Query # # BEGIN
master-bin.000001 # Table_map # # cluster_replication.apply_status
master-bin.000001 # Write_rows # #
master-bin.000001 # Table_map # # mysqltest.t1
master-bin.000001 # Write_rows # #
master-bin.000001 # Query # # COMMIT
master-bin.000001 # Query # # BEGIN
master-bin.000001 # Table_map # # cluster_replication.apply_status
master-bin.000001 # Write_rows # #
master-bin.000001 # Query # # COMMIT
master-bin.000001 # Query # # use `mysqltest`; drop table `t1`
reset master;
reset master;
insert into t2 values (1,2);
drop database mysqltest;
use test;
create table t1 (a int primary key) engine=ndb;
use test;
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster_replication.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Table_map # # mysqltest.t2
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster_replication.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # drop database mysqltest
master-bin1.000001 # Query # # use `test`; create table t1 (a int primary key) engine=ndb
reset master;
reset master;
CREATE LOGFILE GROUP lg1
ADD UNDOFILE 'undofile.dat'
INITIAL_SIZE 16M
UNDO_BUFFER_SIZE = 1M
ENGINE=NDB;
ALTER LOGFILE GROUP lg1
ADD UNDOFILE 'undofile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB;
CREATE TABLESPACE ts1
ADD DATAFILE 'datafile.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB;
ALTER TABLESPACE ts1
ADD DATAFILE 'datafile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB;
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile.dat'
ENGINE = NDB;
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile02.dat'
ENGINE = NDB;
DROP TABLESPACE ts1
ENGINE = NDB;
DROP LOGFILE GROUP lg1
ENGINE =NDB;
drop table t1;
show binlog events from 102;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin1.000001 # Query # # CREATE LOGFILE GROUP lg1
ADD UNDOFILE 'undofile.dat'
INITIAL_SIZE 16M
UNDO_BUFFER_SIZE = 1M
ENGINE=NDB
master-bin1.000001 # Query # # ALTER LOGFILE GROUP lg1
ADD UNDOFILE 'undofile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB
master-bin1.000001 # Query # # CREATE TABLESPACE ts1
ADD DATAFILE 'datafile.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB
master-bin1.000001 # Query # # ALTER TABLESPACE ts1
ADD DATAFILE 'datafile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB
master-bin1.000001 # Query # # ALTER TABLESPACE ts1
DROP DATAFILE 'datafile.dat'
ENGINE = NDB
master-bin1.000001 # Query # # ALTER TABLESPACE ts1
DROP DATAFILE 'datafile02.dat'
ENGINE = NDB
master-bin1.000001 # Query # # DROP TABLESPACE ts1
ENGINE = NDB
master-bin1.000001 # Query # # DROP LOGFILE GROUP lg1
ENGINE =NDB
master-bin1.000001 # Query # # BEGIN
master-bin1.000001 # Table_map # # cluster_replication.apply_status
master-bin1.000001 # Write_rows # #
master-bin1.000001 # Query # # COMMIT
master-bin1.000001 # Query # # use `test`; drop table `t1`
drop table if exists t1;
drop database if exists mysqltest; drop database if exists mysqltest;
drop table if exists t1;
drop database if exists mysqltest;
create database mysqltest;
create database mysqltest; create database mysqltest;
create table mysqltest.t1 (a int primary key, b int) engine=ndb; create table mysqltest.t1 (a int primary key, b int) engine=ndb;
use mysqltest; use mysqltest;
...@@ -10,18 +6,8 @@ show tables; ...@@ -10,18 +6,8 @@ show tables;
Tables_in_mysqltest Tables_in_mysqltest
t1 t1
drop database mysqltest; drop database mysqltest;
use mysqltest;
show tables;
Tables_in_mysqltest
create database mysqltest; create database mysqltest;
create table mysqltest.t1 (c int, d int primary key) engine=ndb;
use mysqltest; use mysqltest;
show tables; show tables;
Tables_in_mysqltest Tables_in_mysqltest
t1
drop database mysqltest; drop database mysqltest;
use mysqltest;
show tables;
Tables_in_mysqltest
drop table if exists t1;
drop database if exists mysqltest;
...@@ -50,15 +50,9 @@ a ...@@ -50,15 +50,9 @@ a
select * from t3; select * from t3;
a b c last_col a b c last_col
1 Hi! 89 Longtext column 1 Hi! 89 Longtext column
show status like 'handler_discover%';
Variable_name Value
Handler_discover 1
show tables like 't4'; show tables like 't4';
Tables_in_test (t4) Tables_in_test (t4)
t4 t4
show status like 'handler_discover%';
Variable_name Value
Handler_discover 2
show tables; show tables;
Tables_in_test Tables_in_test
t1 t1
......
...@@ -17,9 +17,6 @@ select * from t1 where c = 'two'; ...@@ -17,9 +17,6 @@ select * from t1 where c = 'two';
connection server1; connection server1;
alter table t1 drop index c; alter table t1 drop index c;
connection server2; connection server2;
--error 1412
select * from t1 where c = 'two';
--sleep 5
select * from t1 where c = 'two'; select * from t1 where c = 'two';
connection server1; connection server1;
drop table t1; drop table t1;
......
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/have_binlog_format_row.inc
--disable_warnings
connection server2;
drop table if exists t1,t2;
drop database if exists mysqltest;
connection server1;
drop table if exists t1,t2;
drop database if exists mysqltest;
--connection server1
reset master;
--connection server2
reset master;
--enable_warnings
--let $binlog_start=102
#
# basic test to see if ddl distribution works across
# multiple binlogs
#
# create database
--connection server1
create database mysqltest;
# create table
--connection server1
use mysqltest;
create table t1 (a int primary key) engine=ndb;
--connection server2
create table t2 (a int primary key) engine=ndb;
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
--connection server1
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# alter table
--connection server1
reset master;
--connection server2
reset master;
--connection server2
use mysqltest;
#alter table test.t2 rename t2;
drop table test.t2;
create table t2 (a int primary key) engine=ndb;
alter table t2 add column (b int);
--connections server1
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# alter database
--connection server1
reset master;
--connection server2
reset master;
--connection server2
ALTER DATABASE mysqltest CHARACTER SET latin1;
# drop table and drop should come after data events
--connection server2
insert into t1 values (1);
drop table t1;
--connection server1
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# drop database and drop should come after data events
--connection server1
reset master;
--connection server2
reset master;
--connection server1
insert into t2 values (1,2);
drop database mysqltest;
use test;
create table t1 (a int primary key) engine=ndb;
--connection server2
use test;
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
# logfile groups and table spaces
--connection server1
reset master;
--connection server2
reset master;
--connection server1
CREATE LOGFILE GROUP lg1
ADD UNDOFILE 'undofile.dat'
INITIAL_SIZE 16M
UNDO_BUFFER_SIZE = 1M
ENGINE=NDB;
ALTER LOGFILE GROUP lg1
ADD UNDOFILE 'undofile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB;
CREATE TABLESPACE ts1
ADD DATAFILE 'datafile.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB;
ALTER TABLESPACE ts1
ADD DATAFILE 'datafile02.dat'
INITIAL_SIZE = 4M
ENGINE=NDB;
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile.dat'
ENGINE = NDB;
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile02.dat'
ENGINE = NDB;
DROP TABLESPACE ts1
ENGINE = NDB;
DROP LOGFILE GROUP lg1
ENGINE =NDB;
drop table t1;
--connection server2
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
-- source include/have_ndb.inc -- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/not_embedded.inc -- source include/not_embedded.inc
--disable_warnings --disable_warnings
connection server1;
drop table if exists t1;
drop database if exists mysqltest;
connection server2;
drop table if exists t1;
drop database if exists mysqltest; drop database if exists mysqltest;
--enable_warnings --enable_warnings
...@@ -15,38 +9,16 @@ drop database if exists mysqltest; ...@@ -15,38 +9,16 @@ drop database if exists mysqltest;
# Check that all tables in a database are dropped when database is dropped # Check that all tables in a database are dropped when database is dropped
# #
connection server1;
create database mysqltest;
connection server2;
create database mysqltest; create database mysqltest;
create table mysqltest.t1 (a int primary key, b int) engine=ndb; create table mysqltest.t1 (a int primary key, b int) engine=ndb;
use mysqltest; use mysqltest;
show tables; show tables;
connection server1;
drop database mysqltest; drop database mysqltest;
connection server2;
use mysqltest;
show tables;
connection server1;
create database mysqltest; create database mysqltest;
create table mysqltest.t1 (c int, d int primary key) engine=ndb;
use mysqltest; use mysqltest;
show tables; show tables;
connection server2;
drop database mysqltest; drop database mysqltest;
connection server1;
use mysqltest;
show tables;
--disable_warnings
drop table if exists t1;
drop database if exists mysqltest;
--enable_warnings
# End of 4.1 tests # End of 4.1 tests
...@@ -66,9 +66,7 @@ create table t4 (pk int primary key, b int) engine=ndb; ...@@ -66,9 +66,7 @@ create table t4 (pk int primary key, b int) engine=ndb;
connection server1; connection server1;
select * from t1; select * from t1;
select * from t3; select * from t3;
show status like 'handler_discover%';
show tables like 't4'; show tables like 't4';
show status like 'handler_discover%';
show tables; show tables;
drop table t1, t2, t3, t4; drop table t1, t2, t3, t4;
......
...@@ -4382,7 +4382,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -4382,7 +4382,7 @@ int ha_ndbcluster::create(const char *name,
if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0) if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0)
{ {
/* this is only a serious error if the binlog is on */ /* this is only a serious error if the binlog is on */
if (share && ndb_binlog_thread_running > 0) if (share && ndb_binlog_running)
{ {
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG), ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
...@@ -4395,14 +4395,14 @@ int ha_ndbcluster::create(const char *name, ...@@ -4395,14 +4395,14 @@ int ha_ndbcluster::create(const char *name,
sql_print_information("NDB Binlog: CREATE TABLE Event: %s", sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
event_name.c_ptr()); event_name.c_ptr());
if (share && ndb_binlog_thread_running > 0 && if (share && ndb_binlog_running &&
ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0) ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
{ {
sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations." sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
" Event: %s", name2); " Event: %s", name2);
/* a warning has been issued to the client */ /* a warning has been issued to the client */
} }
if (share && ndb_binlog_thread_running <= 0) if (share && !ndb_binlog_running)
share->flags|= NSF_NO_BINLOG; share->flags|= NSF_NO_BINLOG;
ndbcluster_log_schema_op(current_thd, share, ndbcluster_log_schema_op(current_thd, share,
current_thd->query, current_thd->query_length, current_thd->query, current_thd->query_length,
...@@ -4686,7 +4686,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) ...@@ -4686,7 +4686,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
} }
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
NDB_SHARE *share= 0; NDB_SHARE *share= 0;
if (ndb_binlog_thread_running > 0 && if (ndb_binlog_running &&
(share= get_share(from, 0, false))) (share= get_share(from, 0, false)))
{ {
int r= rename_share(share, to); int r= rename_share(share, to);
...@@ -5866,18 +5866,8 @@ static bool ndbcluster_init() ...@@ -5866,18 +5866,8 @@ static bool ndbcluster_init()
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* start the ndb injector thread */ /* start the ndb injector thread */
if (opt_bin_log)
{
if (binlog_row_based)
{
if (ndbcluster_binlog_start()) if (ndbcluster_binlog_start())
goto ndbcluster_init_error; goto ndbcluster_init_error;
}
else
{
sql_print_error("NDB: only row based binary logging is supported");
}
}
#endif /* HAVE_NDB_BINLOG */ #endif /* HAVE_NDB_BINLOG */
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
...@@ -7440,8 +7430,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ...@@ -7440,8 +7430,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
ndbcluster_util_inited= 1; ndbcluster_util_inited= 1;
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* If running, signal injector thread that all is setup */ /* Signal injector thread that all is setup */
if (ndb_binlog_thread_running > 0)
pthread_cond_signal(&injector_cond); pthread_cond_signal(&injector_cond);
#endif #endif
...@@ -9360,6 +9349,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) ...@@ -9360,6 +9349,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
{ {
DBUG_ENTER("ha_ndbcluster::alter_tablespace"); DBUG_ENTER("ha_ndbcluster::alter_tablespace");
int is_tablespace= 0;
Ndb *ndb= check_ndb_in_thd(thd); Ndb *ndb= check_ndb_in_thd(thd);
if (ndb == NULL) if (ndb == NULL)
{ {
...@@ -9398,6 +9388,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) ...@@ -9398,6 +9388,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
DBUG_PRINT("error", ("createDatafile returned %d", error)); DBUG_PRINT("error", ("createDatafile returned %d", error));
goto ndberror; goto ndberror;
} }
is_tablespace= 1;
break; break;
} }
case (ALTER_TABLESPACE): case (ALTER_TABLESPACE):
...@@ -9441,6 +9432,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) ...@@ -9441,6 +9432,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
info->ts_alter_tablespace_type)); info->ts_alter_tablespace_type));
DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED); DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
} }
is_tablespace= 1;
break; break;
} }
case (CREATE_LOGFILE_GROUP): case (CREATE_LOGFILE_GROUP):
...@@ -9506,6 +9498,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) ...@@ -9506,6 +9498,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
{ {
goto ndberror; goto ndberror;
} }
is_tablespace= 1;
break; break;
} }
case (DROP_LOGFILE_GROUP): case (DROP_LOGFILE_GROUP):
...@@ -9531,6 +9524,20 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info) ...@@ -9531,6 +9524,20 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED); DBUG_RETURN(HA_ADMIN_NOT_IMPLEMENTED);
} }
} }
if (is_tablespace)
ndbcluster_log_schema_op(thd, 0,
thd->query, thd->query_length,
"", info->tablespace_name,
0, 0,
SOT_TABLESPACE);
else
ndbcluster_log_schema_op(thd, 0,
thd->query, thd->query_length,
"", info->logfile_group_name,
0, 0,
SOT_LOGFILE_GROUP);
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
ndberror: ndberror:
......
...@@ -41,6 +41,11 @@ ...@@ -41,6 +41,11 @@
0 if never started 0 if never started
*/ */
int ndb_binlog_thread_running= 0; int ndb_binlog_thread_running= 0;
/*
Flag showing if the ndb binlog should be created, if so == TRUE
FALSE if not
*/
my_bool ndb_binlog_running= FALSE;
/* /*
Global reference to the ndb injector thread THD oject Global reference to the ndb injector thread THD oject
...@@ -237,7 +242,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -237,7 +242,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
share->op= 0; share->op= 0;
share->table= 0; share->table= 0;
if (ndb_binlog_thread_running <= 0) if (!ndb_binlog_running)
{ {
if (_table) if (_table)
{ {
...@@ -349,7 +354,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -349,7 +354,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
*/ */
static void ndbcluster_binlog_wait(THD *thd) static void ndbcluster_binlog_wait(THD *thd)
{ {
if (ndb_binlog_thread_running > 0) if (ndb_binlog_running)
{ {
DBUG_ENTER("ndbcluster_binlog_wait"); DBUG_ENTER("ndbcluster_binlog_wait");
const char *save_info= thd ? thd->proc_info : 0; const char *save_info= thd ? thd->proc_info : 0;
...@@ -358,7 +363,7 @@ static void ndbcluster_binlog_wait(THD *thd) ...@@ -358,7 +363,7 @@ static void ndbcluster_binlog_wait(THD *thd)
if (thd) if (thd)
thd->proc_info= "Waiting for ndbcluster binlog update to " thd->proc_info= "Waiting for ndbcluster binlog update to "
"reach current position"; "reach current position";
while (count && ndb_binlog_thread_running > 0 && while (count && ndb_binlog_running &&
ndb_latest_handled_binlog_epoch < wait_epoch) ndb_latest_handled_binlog_epoch < wait_epoch)
{ {
count--; count--;
...@@ -375,7 +380,7 @@ static void ndbcluster_binlog_wait(THD *thd) ...@@ -375,7 +380,7 @@ static void ndbcluster_binlog_wait(THD *thd)
*/ */
static int ndbcluster_reset_logs(THD *thd) static int ndbcluster_reset_logs(THD *thd)
{ {
if (ndb_binlog_thread_running <= 0) if (!ndb_binlog_running)
return 0; return 0;
DBUG_ENTER("ndbcluster_reset_logs"); DBUG_ENTER("ndbcluster_reset_logs");
...@@ -402,7 +407,7 @@ static int ndbcluster_reset_logs(THD *thd) ...@@ -402,7 +407,7 @@ static int ndbcluster_reset_logs(THD *thd)
static int static int
ndbcluster_binlog_index_purge_file(THD *thd, const char *file) ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
{ {
if (ndb_binlog_thread_running <= 0) if (!ndb_binlog_running)
return 0; return 0;
DBUG_ENTER("ndbcluster_binlog_index_purge_file"); DBUG_ENTER("ndbcluster_binlog_index_purge_file");
...@@ -427,6 +432,37 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, ...@@ -427,6 +432,37 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
DBUG_ENTER("ndbcluster_binlog_log_query"); DBUG_ENTER("ndbcluster_binlog_log_query");
DBUG_PRINT("enter", ("db: %s table_name: %s query: %s", DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
db, table_name, query)); db, table_name, query));
enum SCHEMA_OP_TYPE type;
int log= 0;
switch (binlog_command)
{
case LOGCOM_CREATE_TABLE:
type= SOT_CREATE_TABLE;
break;
case LOGCOM_ALTER_TABLE:
type= SOT_ALTER_TABLE;
break;
case LOGCOM_RENAME_TABLE:
type= SOT_RENAME_TABLE;
break;
case LOGCOM_DROP_TABLE:
type= SOT_DROP_TABLE;
break;
case LOGCOM_CREATE_DB:
type= SOT_CREATE_DB;
log= 1;
break;
case LOGCOM_ALTER_DB:
type= SOT_ALTER_DB;
log= 1;
break;
case LOGCOM_DROP_DB:
type= SOT_DROP_DB;
break;
}
if (log)
ndbcluster_log_schema_op(thd, 0, query, query_length,
db, table_name, 0, 0, type);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -499,7 +535,7 @@ static int ndbcluster_binlog_end(THD *thd) ...@@ -499,7 +535,7 @@ static int ndbcluster_binlog_end(THD *thd)
****************************************************************/ ****************************************************************/
static void ndbcluster_reset_slave(THD *thd) static void ndbcluster_reset_slave(THD *thd)
{ {
if (ndb_binlog_thread_running <= 0) if (!ndb_binlog_running)
return; return;
DBUG_ENTER("ndbcluster_reset_slave"); DBUG_ENTER("ndbcluster_reset_slave");
...@@ -835,7 +871,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -835,7 +871,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
enum SCHEMA_OP_TYPE type) enum SCHEMA_OP_TYPE type)
{ {
DBUG_ENTER("ndbcluster_log_schema_op"); DBUG_ENTER("ndbcluster_log_schema_op");
#ifdef NOT_YET
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
if (!thd_ndb) if (!thd_ndb)
{ {
...@@ -879,6 +914,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ...@@ -879,6 +914,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
break; break;
case SOT_ALTER_DB: case SOT_ALTER_DB:
break; break;
case SOT_TABLESPACE:
break;
case SOT_LOGFILE_GROUP:
break;
default: default:
abort(); /* should not happen, programming error */ abort(); /* should not happen, programming error */
} }
...@@ -1070,13 +1109,13 @@ end: ...@@ -1070,13 +1109,13 @@ end:
sql_print_error("NDB create table: timed out. Ignoring..."); sql_print_error("NDB create table: timed out. Ignoring...");
break; break;
} }
if (ndb_extra_logging)
sql_print_information("NDB create table: " sql_print_information("NDB create table: "
"waiting max %u sec for create table %s.", "waiting max %u sec for create table %s.",
max_timeout, share->key); max_timeout, share->key);
} }
(void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&share->mutex);
} }
#endif
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1315,11 +1354,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1315,11 +1354,18 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema_list->push_back(schema, mem_root); schema_list->push_back(schema, mem_root);
log_query= 0; log_query= 0;
break; break;
case SOT_CREATE_TABLE:
/* fall through */
case SOT_RENAME_TABLE: case SOT_RENAME_TABLE:
/* fall through */ /* fall through */
case SOT_ALTER_TABLE: case SOT_ALTER_TABLE:
/* fall through */
if (!ndb_binlog_running)
{
log_query= 1;
break; /* discovery will be handled by binlog */
}
/* fall through */
case SOT_CREATE_TABLE:
/* fall through */
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
if (ndb_create_table_from_engine(thd, schema->db, schema->name)) if (ndb_create_table_from_engine(thd, schema->db, schema->name))
{ {
...@@ -1329,12 +1375,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1329,12 +1375,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
schema->node_id); schema->node_id);
} }
pthread_mutex_unlock(&LOCK_open); pthread_mutex_unlock(&LOCK_open);
{
/* signal that schema operation has been handled */
DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
if (bitmap_is_set(&slock, node_id))
ndbcluster_update_slock(thd, schema->db, schema->name);
}
log_query= 1; log_query= 1;
break; break;
case SOT_DROP_DB: case SOT_DROP_DB:
...@@ -1374,14 +1414,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ...@@ -1374,14 +1414,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
case SOT_TABLESPACE:
case SOT_LOGFILE_GROUP:
log_query= 1;
break;
}
/* signal that schema operation has been handled */
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
{
DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
if (bitmap_is_set(&slock, node_id))
ndbcluster_update_slock(thd, schema->db, schema->name);
} }
if (log_query) if (log_query)
{ {
char *thd_db_save= thd->db; char *thd_db_save= thd->db;
thd->db= schema->db; thd->db= schema->db;
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
schema->query_length, FALSE, schema->query_length, FALSE,
schema->name[0] == 0); schema->name[0] == 0 || thd->db[0] == 0);
thd->db= thd_db_save; thd->db= thd_db_save;
} }
} }
...@@ -1672,7 +1725,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ...@@ -1672,7 +1725,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
"allocating table share for %s failed", key); "allocating table share for %s failed", key);
} }
if (ndb_binlog_thread_running <= 0) if (!ndb_binlog_running)
{ {
share->flags|= NSF_NO_BINLOG; share->flags|= NSF_NO_BINLOG;
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
...@@ -2521,7 +2574,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2521,7 +2574,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
injector_thd= thd; injector_thd= thd;
injector_ndb= ndb; injector_ndb= ndb;
ndb_binlog_thread_running= 1; ndb_binlog_thread_running= 1;
if (opt_bin_log)
{
if (binlog_row_based)
{
ndb_binlog_running= TRUE;
}
else
{
sql_print_error("NDB: only row based binary logging is supported");
}
}
/* /*
We signal the thread that started us that we've finished We signal the thread that started us that we've finished
starting up. starting up.
...@@ -2562,6 +2625,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2562,6 +2625,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{ {
static char db[]= ""; static char db[]= "";
thd->db= db; thd->db= db;
if (ndb_binlog_running)
open_binlog_index(thd, &binlog_tables, &binlog_index); open_binlog_index(thd, &binlog_tables, &binlog_index);
if (!apply_status_share) if (!apply_status_share)
{ {
...@@ -2590,16 +2654,22 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2590,16 +2654,22 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->set_time(); thd->set_time();
/* wait for event or 1000 ms */ /* wait for event or 1000 ms */
Uint64 gci, schema_gci; Uint64 gci= 0, schema_gci;
int res= ndb->pollEvents(1000, &gci); int res= 0, tot_poll_wait= 1000;
int schema_res= schema_ndb->pollEvents(0, &schema_gci); if (ndb_binlog_running)
{
res= ndb->pollEvents(tot_poll_wait, &gci);
tot_poll_wait= 0;
}
int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci);
ndb_latest_received_binlog_epoch= gci; ndb_latest_received_binlog_epoch= gci;
while (gci > schema_gci && schema_res >= 0) while (gci > schema_gci && schema_res >= 0)
schema_res= schema_ndb->pollEvents(10, &schema_gci); schema_res= schema_ndb->pollEvents(10, &schema_gci);
if ((abort_loop || do_ndbcluster_binlog_close_connection) && if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci ||
!ndb_binlog_running))
break; /* Shutting down server */ break; /* Shutting down server */
if (binlog_index && binlog_index->s->version < refresh_version) if (binlog_index && binlog_index->s->version < refresh_version)
...@@ -2810,6 +2880,7 @@ err: ...@@ -2810,6 +2880,7 @@ err:
delete thd; delete thd;
ndb_binlog_thread_running= -1; ndb_binlog_thread_running= -1;
ndb_binlog_running= FALSE;
(void) pthread_cond_signal(&injector_cond); (void) pthread_cond_signal(&injector_cond);
DBUG_PRINT("exit", ("ndb_binlog_thread")); DBUG_PRINT("exit", ("ndb_binlog_thread"));
......
...@@ -38,7 +38,9 @@ enum SCHEMA_OP_TYPE ...@@ -38,7 +38,9 @@ enum SCHEMA_OP_TYPE
SOT_DROP_DB, SOT_DROP_DB,
SOT_CREATE_DB, SOT_CREATE_DB,
SOT_ALTER_DB, SOT_ALTER_DB,
SOT_CLEAR_SLOCK SOT_CLEAR_SLOCK,
SOT_TABLESPACE,
SOT_LOGFILE_GROUP
}; };
const uint max_ndb_nodes= 64; /* multiple of 32 */ const uint max_ndb_nodes= 64; /* multiple of 32 */
...@@ -104,7 +106,7 @@ extern NDB_SHARE *apply_status_share; ...@@ -104,7 +106,7 @@ extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share; extern NDB_SHARE *schema_share;
extern THD *injector_thd; extern THD *injector_thd;
extern int ndb_binlog_thread_running; extern my_bool ndb_binlog_running;
bool bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment