Merge dev3-240.dev.cn.tlan:/home/justin.he/mysql/mysql-5.0/bug27640-5.0-ndb

into  dev3-240.dev.cn.tlan:/home/justin.he/mysql/mysql-5.1/bug27640-5.1-new-ndb
parents afdee6e0 51fe11b4
use test;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
Connected to Management Server at: :
Waiting for completed, this may take several minutes
Backup started from node
Backup started from node completed
StartGCP: StopGCP:
#Records: #LogRecords:
Data: bytes Log: bytes
create table t1
(pk int key
,a1 BIT(1), a2 BIT(5), a3 BIT(33), a4 BIT(63), a5 BIT(64)
,b1 TINYINT, b2 TINYINT UNSIGNED
,c1 SMALLINT, c2 SMALLINT UNSIGNED
,d1 INT, d2 INT UNSIGNED
,e1 BIGINT, e2 BIGINT UNSIGNED
,f1 CHAR(1) BINARY, f2 CHAR(32) BINARY, f3 CHAR(255) BINARY
,g1 VARCHAR(32) BINARY, g2 VARCHAR(255) BINARY, g3 VARCHAR(1000) BINARY
,h1 BINARY(1), h2 BINARY(8), h3 BINARY(255)
,i1 VARBINARY(32), i2 VARBINARY(255), i3 VARBINARY(1000)
) engine ndb;
insert into t1 values
(1
,0x1, 0x17, 0x789a, 0x789abcde, 0xfedc0001
,127, 255
,32767, 65535
,2147483647, 4294967295
,9223372036854775807, 18446744073709551615
,'1','12345678901234567890123456789012','123456789'
,'1','12345678901234567890123456789012','123456789'
,0x12,0x123456789abcdef0, 0x012345
,0x12,0x123456789abcdef0, 0x00123450
);
insert into t1 values
(2
,0, 0, 0, 0, 0
,-128, 0
,-32768, 0
,-2147483648, 0
,-9223372036854775808, 0
,'','',''
,'','',''
,0x0,0x0,0x0
,0x0,0x0,0x0
);
insert into t1 values
(3
,NULL,NULL,NULL,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
);
Connected to Management Server at: :
Waiting for completed, this may take several minutes
Backup started from node
Backup started from node completed
StartGCP: StopGCP:
#Records: #LogRecords:
Data: bytes Log: bytes
-- source include/have_ndb.inc
-- source include/ndb_default_cluster.inc
-- source include/not_embedded.inc
--disable_warnings
use test;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
--enable_warnings
#NO.1 test output of backup
--exec $NDB_TOOLS_DIR/../src/mgmclient/ndb_mgm -e "start backup" |sed -e 's/[0-9]//g' |sed -e 's/localhost//g' |sed -e 's/\.\.\.*//g'
create table t1
(pk int key
,a1 BIT(1), a2 BIT(5), a3 BIT(33), a4 BIT(63), a5 BIT(64)
,b1 TINYINT, b2 TINYINT UNSIGNED
,c1 SMALLINT, c2 SMALLINT UNSIGNED
,d1 INT, d2 INT UNSIGNED
,e1 BIGINT, e2 BIGINT UNSIGNED
,f1 CHAR(1) BINARY, f2 CHAR(32) BINARY, f3 CHAR(255) BINARY
,g1 VARCHAR(32) BINARY, g2 VARCHAR(255) BINARY, g3 VARCHAR(1000) BINARY
,h1 BINARY(1), h2 BINARY(8), h3 BINARY(255)
,i1 VARBINARY(32), i2 VARBINARY(255), i3 VARBINARY(1000)
) engine ndb;
insert into t1 values
(1
,0x1, 0x17, 0x789a, 0x789abcde, 0xfedc0001
,127, 255
,32767, 65535
,2147483647, 4294967295
,9223372036854775807, 18446744073709551615
,'1','12345678901234567890123456789012','123456789'
,'1','12345678901234567890123456789012','123456789'
,0x12,0x123456789abcdef0, 0x012345
,0x12,0x123456789abcdef0, 0x00123450
);
insert into t1 values
(2
,0, 0, 0, 0, 0
,-128, 0
,-32768, 0
,-2147483648, 0
,-9223372036854775808, 0
,'','',''
,'','',''
,0x0,0x0,0x0
,0x0,0x0,0x0
);
insert into t1 values
(3
,NULL,NULL,NULL,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
,NULL,NULL,NULL
);
#NO.2 test output of backup after some simple SQL operations
--exec $NDB_TOOLS_DIR/../src/mgmclient/ndb_mgm -e "start backup" |sed -e 's/[0-9]//g' |sed -e 's/localhost//g' |sed -e 's/\.\.\.*//g'
......@@ -173,5 +173,5 @@ private:
STATIC_CONST(MAX_TEXT_LENGTH = 256);
};
extern void getRestartAction(Uint32 action, BaseString &str);
#endif
......@@ -15,9 +15,17 @@
#include <ndb_global.h>
#include <my_sys.h>
//#define HAVE_GLOBAL_REPLICATION
#include <Vector.hpp>
#ifdef HAVE_GLOBAL_REPLICATION
#include "../rep/repapi/repapi.h"
#endif
#include <mgmapi.h>
#include <util/BaseString.hpp>
#include <ndbd_exit_codes.h>
class MgmtSrvr;
......@@ -160,6 +168,11 @@ private:
int m_verbose;
int try_reconnect;
int m_error;
#ifdef HAVE_GLOBAL_REPLICATION
NdbRepHandle m_repserver;
const char *rep_host;
bool rep_connected;
#endif
struct NdbThread* m_event_thread;
NdbMutex *m_print_mutex;
};
......@@ -224,6 +237,10 @@ extern "C" {
#include <NdbMem.h>
#include <EventLogger.hpp>
#include <signaldata/SetLogLevelOrd.hpp>
#include <signaldata/GrepImpl.hpp>
#ifdef HAVE_GLOBAL_REPLICATION
#endif // HAVE_GLOBAL_REPLICATION
#include "MgmtErrorReporter.hpp"
#include <Parser.hpp>
#include <SocketServer.hpp>
......@@ -251,6 +268,9 @@ static const char* helpText =
"---------------------------------------------------------------------------\n"
"HELP Print help text\n"
"HELP COMMAND Print detailed help for COMMAND(e.g. SHOW)\n"
#ifdef HAVE_GLOBAL_REPLICATION
"HELP REPLICATION Help for global replication\n"
#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
"HELP DEBUG Help for debug compiled version\n"
#endif
......@@ -274,6 +294,9 @@ static const char* helpText =
"EXIT SINGLE USER MODE Exit single user mode\n"
"<id> STATUS Print status\n"
"<id> CLUSTERLOG {<category>=<level>}+ Set log level for cluster log\n"
#ifdef HAVE_GLOBAL_REPLICATION
"REP CONNECT <host:port> Connect to REP server on host:port\n"
#endif
"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n"
"CONNECT [<connectstring>] Connect to management server (reconnect if already connected)\n"
"QUIT Quit management client\n"
......@@ -573,6 +596,39 @@ static const char* helpTextQuit =
;
#ifdef HAVE_GLOBAL_REPLICATION
static const char* helpTextRep =
"---------------------------------------------------------------------------\n"
" NDB Cluster -- Management Client -- Help for Global Replication\n"
"---------------------------------------------------------------------------\n"
"Commands should be executed on the standby NDB Cluster\n"
"These features are in an experimental release state.\n"
"\n"
"Simple Commands:\n"
"REP START Start Global Replication\n"
"REP START REQUESTOR Start Global Replication Requestor\n"
"REP STATUS Show Global Replication status\n"
"REP STOP Stop Global Replication\n"
"REP STOP REQUESTOR Stop Global Replication Requestor\n"
"\n"
"Advanced Commands:\n"
"REP START <protocol> Starts protocol\n"
"REP STOP <protocol> Stops protocol\n"
"<protocol> = TRANSFER | APPLY | DELETE\n"
"\n"
#ifdef VM_TRACE // DEBUG ONLY
"Debugging commands:\n"
"REP DELETE Removes epochs stored in primary and standy systems\n"
"REP DROP <tableid> Drop a table in SS identified by table id\n"
"REP SLOWSTOP Stop Replication (Tries to synchonize with primary)\n"
"REP FASTSTOP Stop Replication (Stops in consistent state)\n"
"<component> = SUBSCRIPTION\n"
" METALOG | METASCAN | DATALOG | DATASCAN\n"
" REQUESTOR | TRANSFER | APPLY | DELETE\n"
#endif
;
#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
static const char* helpTextDebug =
"---------------------------------------------------------------------------\n"
......@@ -625,6 +681,10 @@ struct st_cmd_help {
{"PURGE STALE SESSIONS", helpTextPurgeStaleSessions},
{"CONNECT", helpTextConnect},
{"QUIT", helpTextQuit},
#ifdef HAVE_GLOBAL_REPLICATION
{"REPLICATION", helpTextRep},
{"REP", helpTextRep},
#endif // HAVE_GLOBAL_REPLICATION
#ifdef VM_TRACE // DEBUG ONLY
{"DEBUG", helpTextDebug},
#endif //VM_TRACE
......@@ -664,6 +724,11 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
m_event_thread= NULL;
try_reconnect = 0;
m_print_mutex= NdbMutex_Create();
#ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL;
m_repserver = NULL;
rep_connected = false;
#endif
}
/*
......@@ -704,6 +769,113 @@ CommandInterpreter::printError()
}
}
/*
* print log event from mgmsrv to console screen
*/
static void
printLogEvent(struct ndb_logevent* event)
{
switch (event->type) {
/**
* NDB_MGM_EVENT_CATEGORY_BACKUP
*/
case NDB_LE_BackupStarted:
ndbout_c("Backup %d started from node %d",
event->BackupStarted.backup_id, event->BackupStarted.starting_node);
break;
case NDB_LE_BackupFailedToStart:
ndbout_c("Backup request from %d failed to start. Error: %d",
event->BackupFailedToStart.starting_node, event->BackupFailedToStart.error);
break;
case NDB_LE_BackupCompleted:
ndbout_c("Backup %u started from node %u completed\n"
" StartGCP: %u StopGCP: %u\n"
" #Records: %u #LogRecords: %u\n"
" Data: %u bytes Log: %u bytes",
event->BackupCompleted.backup_id, event->BackupCompleted.starting_node,
event->BackupCompleted.start_gci, event->BackupCompleted.stop_gci,
event->BackupCompleted.n_records, event->BackupCompleted.n_log_records,
event->BackupCompleted.n_bytes, event->BackupCompleted.n_log_bytes);
break;
case NDB_LE_BackupAborted:
ndbout_c("Backup %d started from %d has been aborted. Error: %d",
event->BackupAborted.backup_id, event->BackupAborted.starting_node,
event->BackupAborted.error);
break;
/**
* NDB_MGM_EVENT_CATEGORY_STARTUP
*/
case NDB_LE_NDBStartStarted:
ndbout_c("Start initiated (version %d.%d.%d)",
getMajor(event->NDBStartStarted.version),
getMinor(event->NDBStartStarted.version),
getBuild(event->NDBStartStarted.version));
break;
case NDB_LE_NDBStartCompleted:
ndbout_c("Started (version %d.%d.%d)",
getMajor(event->NDBStartCompleted.version),
getMinor(event->NDBStartCompleted.version),
getBuild(event->NDBStartCompleted.version));
break;
case NDB_LE_NDBStopStarted:
ndbout_c("%s shutdown initiated",
(event->NDBStopStarted.stoptype == 1 ? "Cluster" : "Node"));
break;
case NDB_LE_NDBStopCompleted:
{
BaseString action_str("");
BaseString signum_str("");
getRestartAction(event->NDBStopCompleted.action, action_str);
if (event->NDBStopCompleted.signum)
signum_str.appfmt(" Initiated by signal %d.",
event->NDBStopCompleted.signum);
ndbout_c("Node shutdown completed%s.%s",
action_str.c_str(),
signum_str.c_str());
}
break;
case NDB_LE_NDBStopForced:
{
BaseString action_str("");
BaseString reason_str("");
BaseString sphase_str("");
int signum = event->NDBStopForced.signum;
int error = event->NDBStopForced.error;
int sphase = event->NDBStopForced.sphase;
int extra = event->NDBStopForced.extra;
getRestartAction(event->NDBStopForced.action, action_str);
if (signum)
reason_str.appfmt(" Initiated by signal %d.", signum);
if (error)
{
ndbd_exit_classification cl;
ndbd_exit_status st;
const char *msg = ndbd_exit_message(error, &cl);
const char *cl_msg = ndbd_exit_classification_message(cl, &st);
const char *st_msg = ndbd_exit_status_message(st);
reason_str.appfmt(" Caused by error %d: \'%s(%s). %s\'.",
error, msg, cl_msg, st_msg);
if (extra != 0)
reason_str.appfmt(" (extra info %d)", extra);
}
if (sphase < 255)
sphase_str.appfmt(" Occured during startphase %u.", sphase);
ndbout_c("Forced node shutdown completed%s.%s%s",
action_str.c_str(), sphase_str.c_str(),
reason_str.c_str());
}
break;
case NDB_LE_NDBStopAborted:
ndbout_c("Node shutdown aborted");
break;
/**
* default nothing to print
*/
default:
break;
}
}
//*****************************************************************************
//*****************************************************************************
......@@ -720,30 +892,21 @@ event_thread_run(void* p)
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
1, NDB_MGM_EVENT_CATEGORY_STARTUP,
0 };
int fd = ndb_mgm_listen_event(handle, filter);
if (fd != NDB_INVALID_SOCKET)
NdbLogEventHandle log_handle= NULL;
struct ndb_logevent log_event;
log_handle= ndb_mgm_create_logevent_handle(handle, filter);
if (log_handle)
{
do_event_thread= 1;
char *tmp= 0;
char buf[1024];
do {
SocketInputStream in(fd,2000);
if((tmp = in.gets(buf, sizeof(buf))))
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
if(tmp && strlen(tmp))
{
Guard g(printmutex);
ndbout << tmp;
}
}
else if(in.timedout() && ndb_mgm_check_connection(handle)<0)
{
break;
}
if (ndb_logevent_get_next(log_handle, &log_event, 2000) <= 0)
continue;
Guard g(printmutex);
printLogEvent(&log_event);
} while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
ndb_mgm_destroy_logevent_handle(&log_handle);
}
else
{
......@@ -1005,7 +1168,14 @@ CommandInterpreter::execute_impl(const char *_line, bool interactive)
else if (strcasecmp(firstToken, "PURGE") == 0) {
m_error = executePurge(allAfterFirstToken);
DBUG_RETURN(true);
}
}
#ifdef HAVE_GLOBAL_REPLICATION
else if(strcasecmp(firstToken, "REPLICATION") == 0 ||
strcasecmp(firstToken, "REP") == 0) {
m_error = executeRep(allAfterFirstToken);
DBUG_RETURN(true);
}
#endif // HAVE_GLOBAL_REPLICATION
else if(strcasecmp(firstToken, "ENTER") == 0 &&
allAfterFirstToken != NULL &&
strncasecmp(allAfterFirstToken, "SINGLE USER MODE ",
......@@ -1481,6 +1651,7 @@ CommandInterpreter::executePurge(char* parameters)
return -1;
}
int i;
char *str;
if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
......@@ -1559,8 +1730,8 @@ CommandInterpreter::executeShow(char* parameters)
case NDB_MGM_NODE_TYPE_UNKNOWN:
ndbout << "Error: Unknown Node Type" << endl;
return -1;
case NDB_MGM_NODE_TYPE_MAX:
break; /* purify: deadcode */
case NDB_MGM_NODE_TYPE_REP:
abort();
}
}
......@@ -1598,6 +1769,7 @@ CommandInterpreter::executeConnect(char* parameters, bool interactive)
{
BaseString *basestring = NULL;
int retval;
disconnect();
if (!emptyString(parameters)) {
basestring= new BaseString(parameters);
......@@ -1634,15 +1806,7 @@ CommandInterpreter::executeClusterLog(char* parameters)
char * item = strtok_r(tmpString, " ", &tmpPtr);
int enable;
ndb_mgm_severity enabled[NDB_MGM_EVENT_SEVERITY_ALL] =
{{NDB_MGM_EVENT_SEVERITY_ON,0},
{NDB_MGM_EVENT_SEVERITY_DEBUG,0},
{NDB_MGM_EVENT_SEVERITY_INFO,0},
{NDB_MGM_EVENT_SEVERITY_WARNING,0},
{NDB_MGM_EVENT_SEVERITY_ERROR,0},
{NDB_MGM_EVENT_SEVERITY_CRITICAL,0},
{NDB_MGM_EVENT_SEVERITY_ALERT,0}};
ndb_mgm_get_clusterlog_severity_filter(m_mgmsrv, &enabled[0], NDB_MGM_EVENT_SEVERITY_ALL);
const unsigned int *enabled= ndb_mgm_get_logfilter(m_mgmsrv);
if(enabled == NULL) {
ndbout << "Couldn't get status" << endl;
printError();
......@@ -1655,25 +1819,25 @@ CommandInterpreter::executeClusterLog(char* parameters)
********************/
if (strcasecmp(item, "INFO") == 0) {
DBUG_PRINT("info",("INFO"));
if(enabled[0].value == 0)
if(enabled[0] == 0)
{
ndbout << "Cluster logging is disabled." << endl;
m_error = 0;
DBUG_VOID_RETURN;
}
#if 0
for(i = 0; i<DB_MGM_EVENT_SEVERITY_ALL;i++)
printf("enabled[%d] = %d\n", i, enabled[i].value);
for(i = 0; i<7;i++)
printf("enabled[%d] = %d\n", i, enabled[i]);
#endif
ndbout << "Severities enabled: ";
for(i = 1; i < (int)NDB_MGM_EVENT_SEVERITY_ALL; i++) {
const char *str= ndb_mgm_get_event_severity_string(enabled[i].category);
const char *str= ndb_mgm_get_event_severity_string((ndb_mgm_event_severity)i);
if (str == 0)
{
DBUG_ASSERT(false);
continue;
}
if(enabled[i].value)
if(enabled[i])
ndbout << BaseString(str).ndb_toupper() << " ";
}
ndbout << endl;
......@@ -1966,9 +2130,6 @@ CommandInterpreter::executeRestart(Vector<BaseString> &command_list,
return -1;
}
if (!nostart)
ndbout_c("Shutting down nodes with \"-n, no start\" option, to subsequently start the nodes.");
result= ndb_mgm_restart3(m_mgmsrv, no_of_nodes, node_ids,
initialstart, nostart, abort, &need_disconnect);
......@@ -2043,6 +2204,7 @@ CommandInterpreter::executeStatus(int processId,
ndb_mgm_node_status status;
Uint32 startPhase, version;
bool system;
struct ndb_mgm_cluster_state *cl;
cl = ndb_mgm_get_status(m_mgmsrv);
......@@ -2060,19 +2222,6 @@ CommandInterpreter::executeStatus(int processId,
ndbout << processId << ": Node not found" << endl;
return -1;
}
if (cl->node_states[i].node_type != NDB_MGM_NODE_TYPE_NDB){
if (cl->node_states[i].version != 0){
version = cl->node_states[i].version;
ndbout << "Node "<< cl->node_states[i].node_id <<": connected" ;
ndbout_c(" (Version %d.%d.%d)",
getMajor(version) ,
getMinor(version),
getBuild(version));
}else
ndbout << "Node "<< cl->node_states[i].node_id <<": not connected" << endl;
return 0;
}
status = cl->node_states[i].node_status;
startPhase = cl->node_states[i].start_phase;
version = cl->node_states[i].version;
......@@ -2467,7 +2616,6 @@ CommandInterpreter::executeEventReporting(int processId,
return retval;
}
/*****************************************************************************
* Backup
*****************************************************************************/
......@@ -2476,8 +2624,7 @@ CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
{
struct ndb_mgm_reply reply;
unsigned int backupId;
int fd = -1;
Vector<BaseString> args;
{
BaseString(parameters).split(args);
......@@ -2494,8 +2641,6 @@ CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
if (sz == 2 && args[1] == "NOWAIT")
{
flags = 0;
result = ndb_mgm_start_backup(m_mgmsrv, 0, &backupId, &reply);
goto END_BACKUP;
}
else if (sz == 1 || (sz == 3 && args[1] == "WAIT" && args[2] == "COMPLETED"))
{
......@@ -2513,67 +2658,80 @@ CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
return -1;
}
/**
* If interactive...event listner is already running
*/
NdbLogEventHandle log_handle= NULL;
struct ndb_logevent log_event;
if (flags == 2 && !interactive)
{
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0, 0 };
fd = ndb_mgm_listen_event(m_mgmsrv, filter);
if (fd < 0)
log_handle = ndb_mgm_create_logevent_handle(m_mgmsrv, filter);
if (!log_handle)
{
ndbout << "Initializing start of backup failed" << endl;
printError();
return fd;
return -1;
}
}
result = ndb_mgm_start_backup(m_mgmsrv, flags, &backupId, &reply);
END_BACKUP:
if (result != 0) {
ndbout << "Backup failed" << endl;
printError();
if (fd >= 0)
close(fd);
if (log_handle)
ndb_mgm_destroy_logevent_handle(&log_handle);
return result;
}
if (fd >= 0)
/**
* If interactive, event listner thread is already running
*/
if (log_handle && !interactive)
{
char *tmp;
char buf[1024];
{
SocketInputStream in(fd);
int count = 0;
do {
tmp = in.gets(buf, 1024);
if(tmp)
{
ndbout << tmp;
unsigned int id;
if(sscanf(tmp, "%*[^:]: Backup %d ", &id) == 1 && id == backupId){
count++;
}
}
} while(count < 2);
}
SocketInputStream in(fd, 10);
int count = 0;
int retry = 0;
do {
tmp = in.gets(buf, 1024);
if(tmp && tmp[0] != 0)
if (ndb_logevent_get_next(log_handle, &log_event, 60000) > 0)
{
ndbout << tmp;
int print = 0;
switch (log_event.type) {
case NDB_LE_BackupStarted:
if (log_event.BackupStarted.backup_id == backupId)
print = 1;
break;
case NDB_LE_BackupCompleted:
if (log_event.BackupCompleted.backup_id == backupId)
print = 1;
break;
case NDB_LE_BackupAborted:
if (log_event.BackupAborted.backup_id == backupId)
print = 1;
break;
default:
break;
}
if (print)
{
Guard g(m_print_mutex);
printLogEvent(&log_event);
count++;
}
}
} while(tmp && tmp[0] != 0);
close(fd);
else
{
retry++;
}
} while(count < 2 && retry < 3);
if (retry >= 3)
ndbout << "get backup event failed for " << retry << " times" << endl;
ndb_mgm_destroy_logevent_handle(&log_handle);
}
return 0;
}
int
CommandInterpreter::executeAbortBackup(char* parameters)
{
......@@ -2604,4 +2762,233 @@ CommandInterpreter::executeAbortBackup(char* parameters)
return -1;
}
#ifdef HAVE_GLOBAL_REPLICATION
/*****************************************************************************
* Global Replication
*
* For information about the different commands, see
* GrepReq::Request in file signaldata/grepImpl.cpp.
*
* Below are commands as of 2003-07-05 (may change!):
* START = 0, ///< Start Global Replication (all phases)
* START_METALOG = 1, ///< Start Global Replication (all phases)
* START_METASCAN = 2, ///< Start Global Replication (all phases)
* START_DATALOG = 3, ///< Start Global Replication (all phases)
* START_DATASCAN = 4, ///< Start Global Replication (all phases)
* START_REQUESTOR = 5, ///< Start Global Replication (all phases)
* ABORT = 6, ///< Immediate stop (removes subscription)
* SLOW_STOP = 7, ///< Stop after finishing applying current GCI epoch
* FAST_STOP = 8, ///< Stop after finishing applying all PS GCI epochs
* START_TRANSFER = 9, ///< Start SS-PS transfer
* STOP_TRANSFER = 10, ///< Stop SS-PS transfer
* START_APPLY = 11, ///< Start applying GCI epochs in SS
* STOP_APPLY = 12, ///< Stop applying GCI epochs in SS
* STATUS = 13, ///< Status
* START_SUBSCR = 14,
* REMOVE_BUFFERS = 15,
* DROP_TABLE = 16
*****************************************************************************/
int
CommandInterpreter::executeRep(char* parameters)
{
if (emptyString(parameters)) {
ndbout << helpTextRep;
return 0;
}
char * line = my_strdup(parameters,MYF(MY_WME));
My_auto_ptr<char> ap1((char*)line);
char * firstToken = strtok(line, " ");
struct ndb_rep_reply reply;
unsigned int repId;
if (!strcasecmp(firstToken, "CONNECT")) {
char * host = strtok(NULL, "\0");
for (unsigned int i = 0; i < strlen(host); ++i) {
host[i] = tolower(host[i]);
}
if(host == NULL)
{
ndbout_c("host:port must be specified.");
return -1;
}
if(rep_connected) {
if(m_repserver != NULL) {
ndb_rep_disconnect(m_repserver);
rep_connected = false;
}
}
if(m_repserver == NULL)
m_repserver = ndb_rep_create_handle();
if(ndb_rep_connect(m_repserver, host) < 0){
ndbout_c("Failed to connect to %s", host);
return -1;
}
else
rep_connected=true;
return 0;
if(!rep_connected) {
ndbout_c("Not connected to REP server");
return -1;
}
}
/********
* START
********/
if (!strcasecmp(firstToken, "START")) {
unsigned int req;
char *startType = strtok(NULL, "\0");
if (startType == NULL) {
req = GrepReq::START;
} else if (!strcasecmp(startType, "SUBSCRIPTION")) {
req = GrepReq::START_SUBSCR;
} else if (!strcasecmp(startType, "METALOG")) {
req = GrepReq::START_METALOG;
} else if (!strcasecmp(startType, "METASCAN")) {
req = GrepReq::START_METASCAN;
} else if (!strcasecmp(startType, "DATALOG")) {
req = GrepReq::START_DATALOG;
} else if (!strcasecmp(startType, "DATASCAN")) {
req = GrepReq::START_DATASCAN;
} else if (!strcasecmp(startType, "REQUESTOR")) {
req = GrepReq::START_REQUESTOR;
} else if (!strcasecmp(startType, "TRANSFER")) {
req = GrepReq::START_TRANSFER;
} else if (!strcasecmp(startType, "APPLY")) {
req = GrepReq::START_APPLY;
} else if (!strcasecmp(startType, "DELETE")) {
req = GrepReq::START_DELETE;
} else {
ndbout_c("Illegal argument to command 'REPLICATION START'");
return -1;
}
int result = ndb_rep_command(m_repserver, req, &repId, &reply);
if (result != 0) {
ndbout << "Start of Global Replication failed" << endl;
return -1;
} else {
ndbout << "Start of Global Replication ordered" << endl;
}
return 0;
}
/********
* STOP
********/
if (!strcasecmp(firstToken, "STOP")) {
unsigned int req;
char *startType = strtok(NULL, " ");
unsigned int epoch = 0;
if (startType == NULL) {
/**
* Stop immediately
*/
req = GrepReq::STOP;
} else if (!strcasecmp(startType, "EPOCH")) {
char *strEpoch = strtok(NULL, "\0");
if(strEpoch == NULL) {
ndbout_c("Epoch expected!");
return -1;
}
req = GrepReq::STOP;
epoch=atoi(strEpoch);
} else if (!strcasecmp(startType, "SUBSCRIPTION")) {
req = GrepReq::STOP_SUBSCR;
} else if (!strcasecmp(startType, "METALOG")) {
req = GrepReq::STOP_METALOG;
} else if (!strcasecmp(startType, "METASCAN")) {
req = GrepReq::STOP_METASCAN;
} else if (!strcasecmp(startType, "DATALOG")) {
req = GrepReq::STOP_DATALOG;
} else if (!strcasecmp(startType, "DATASCAN")) {
req = GrepReq::STOP_DATASCAN;
} else if (!strcasecmp(startType, "REQUESTOR")) {
req = GrepReq::STOP_REQUESTOR;
} else if (!strcasecmp(startType, "TRANSFER")) {
req = GrepReq::STOP_TRANSFER;
} else if (!strcasecmp(startType, "APPLY")) {
req = GrepReq::STOP_APPLY;
} else if (!strcasecmp(startType, "DELETE")) {
req = GrepReq::STOP_DELETE;
} else {
ndbout_c("Illegal argument to command 'REPLICATION STOP'");
return -1;
}
int result = ndb_rep_command(m_repserver, req, &repId, &reply, epoch);
if (result != 0) {
ndbout << "Stop command failed" << endl;
return -1;
} else {
ndbout << "Stop ordered" << endl;
}
return 0;
}
/*********
* STATUS
*********/
if (!strcasecmp(firstToken, "STATUS")) {
struct rep_state repstate;
int result =
ndb_rep_get_status(m_repserver, &repId, &reply, &repstate);
if (result != 0) {
ndbout << "Status request of Global Replication failed" << endl;
return -1;
} else {
ndbout << "Status request of Global Replication ordered" << endl;
ndbout << "See printout at one of the DB nodes" << endl;
ndbout << "(Better status report is under development.)" << endl;
ndbout << " SubscriptionId " << repstate.subid
<< " SubscriptionKey " << repstate.subkey << endl;
}
return 0;
}
/*********
* QUERY (see repapi.h for querable counters)
*********/
if (!strcasecmp(firstToken, "QUERY")) {
char *query = strtok(NULL, "\0");
int queryCounter=-1;
if(query != NULL) {
queryCounter = atoi(query);
}
struct rep_state repstate;
unsigned repId = 0;
int result = ndb_rep_query(m_repserver, (QueryCounter)queryCounter,
&repId, &reply, &repstate);
if (result != 0) {
ndbout << "Query repserver failed" << endl;
return -1;
} else {
ndbout << "Query repserver sucessful" << endl;
ndbout_c("repstate : QueryCounter %d, f=%d l=%d"
" nodegroups %d" ,
repstate.queryCounter,
repstate.first[0], repstate.last[0],
repstate.no_of_nodegroups );
}
return 0;
}
return 0;
}
#endif // HAVE_GLOBAL_REPLICATION
template class Vector<char const*>;
......@@ -21,7 +21,8 @@ libndbmgmclient_la_LIBADD = ../mgmapi/libmgmapi.la \
../common/logger/liblogger.la \
../common/portlib/libportlib.la \
../common/util/libgeneral.la \
../common/portlib/libportlib.la
../common/portlib/libportlib.la \
../common/debugger/libtrace.la
ndb_mgm_SOURCES = main.cpp
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment