Commit 3551165c authored by stewart@willster.(none)'s avatar stewart@willster.(none)

Merge willster.(none):/home/stewart/Documents/MySQL/5.0/bug21154

into  willster.(none):/home/stewart/Documents/MySQL/5.1/bug13987

Merge 5.0-ndb into 5.1-ndb
parents 0b5ad7f3 abbb262c
......@@ -8,3 +8,7 @@ ndbd,1,localhost,52428800,26214400 ndbd,2,localhost,52428800,36700160 ndbd,3,loc
ndbd,1,localhost ndbd,2,localhost ndbd,3,localhost ndbd,4,localhost ndb_mgmd,5,localhost mysqld,6, mysqld,7, mysqld,8, mysqld,9, mysqld,10,
ndbd,2,localhost ndbd,3,localhost ndbd,4,localhost ndbd,5,localhost ndb_mgmd,6,localhost mysqld,1, mysqld,7, mysqld,8, mysqld,9, mysqld,10,
ndbd,3,localhost ndbd,4,localhost ndbd,5,localhost ndbd,6,localhost ndb_mgmd,1,localhost ndb_mgmd,2,localhost mysqld,11, mysqld,12, mysqld,13, mysqld,14, mysqld,15,
shm,3,4,35,3 shm,3,5,35,3 shm,3,6,35,3 shm,4,5,35,4 shm,4,6,35,4 shm,5,6,35,5 tcp,11,3,55,3 tcp,11,4,55,4 tcp,11,5,55,5 tcp,11,6,55,6 tcp,12,3,55,3 tcp,12,4,55,4 tcp,12,5,55,5 tcp,12,6,55,6 tcp,13,3,55,3 tcp,13,4,55,4 tcp,13,5,55,5 tcp,13,6,55,6 tcp,14,3,55,3 tcp,14,4,55,4 tcp,14,5,55,5 tcp,14,6,55,6 tcp,15,3,55,3 tcp,15,4,55,4 tcp,15,5,55,5 tcp,15,6,55,6 tcp,1,3,55,1 tcp,1,4,55,1 tcp,1,5,55,1 tcp,1,6,55,1 tcp,2,3,55,2 tcp,2,4,55,2 tcp,2,5,55,2 tcp,2,6,55,2
1 2 3
1 2 3
......@@ -15,3 +15,9 @@
--exec $NDB_TOOLS_DIR/ndb_config --defaults-group-suffix=.cluster0 --defaults-file=$MYSQL_TEST_DIR/std_data/ndb_config_mycnf2.cnf --query=type,nodeid,host --mycnf 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --defaults-group-suffix=.cluster1 --defaults-file=$MYSQL_TEST_DIR/std_data/ndb_config_mycnf2.cnf --query=type,nodeid,host --mycnf 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --defaults-group-suffix=.cluster2 --defaults-file=$MYSQL_TEST_DIR/std_data/ndb_config_mycnf2.cnf --query=type,nodeid,host --mycnf 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --defaults-group-suffix=.cluster2 --defaults-file=$MYSQL_TEST_DIR/std_data/ndb_config_mycnf2.cnf --ndb-shm --connections --query=type,nodeid1,nodeid2,group,nodeidserver --mycnf 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --no-defaults --query=nodeid --host=localhost --config-file=$NDB_BACKUP_DIR/config.ini 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --no-defaults --query=nodeid --host=1.2.3.4 --config-file=$NDB_BACKUP_DIR/config.ini 2> /dev/null
--exec $NDB_TOOLS_DIR/ndb_config --no-defaults --query=nodeid --host=127.0.0.1 --config-file=$NDB_BACKUP_DIR/config.ini 2> /dev/null
......@@ -1080,6 +1080,19 @@ extern "C" {
*/
int ndb_mgm_end_session(NdbMgmHandle handle);
/**
* ndb_mgm_get_fd
*
* get the file descriptor of the handle.
* INTERNAL ONLY.
* USE FOR TESTING. OTHER USES ARE NOT A GOOD IDEA.
*
* @param handle NDB management handle
* @return handle->socket
*
*/
int ndb_mgm_get_fd(NdbMgmHandle handle);
/**
* Get the node id of the mgm server we're connected to
*/
......
......@@ -132,6 +132,20 @@ extern "C" {
const char * value,
struct ndb_mgm_reply* reply);
Uint64 ndb_mgm_get_session_id(NdbMgmHandle handle);
struct NdbMgmSession {
Uint64 id;
Uint32 m_stopSelf;
Uint32 m_stop;
Uint32 nodeid;
Uint32 parser_buffer_len;
Uint32 parser_status;
};
int ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
struct NdbMgmSession *s, int *len);
#ifdef __cplusplus
}
#endif
......
......@@ -19,14 +19,22 @@
#include <ndb_global.h>
#include <NdbTCP.h>
#include <NdbMutex.h>
/**
* Input stream
*/
class InputStream {
public:
virtual ~InputStream() {}
InputStream() { m_mutex= NULL; };
virtual ~InputStream() {};
virtual char* gets(char * buf, int bufLen) = 0;
/**
* Set the mutex to be UNLOCKED when blocking (e.g. select(2))
*/
void set_mutex(NdbMutex *m) { m_mutex= m; };
protected:
NdbMutex *m_mutex;
};
class FileInputStream : public InputStream {
......@@ -42,6 +50,7 @@ extern FileInputStream Stdin;
class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket;
unsigned m_timeout;
bool m_startover;
public:
SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000);
virtual ~SocketInputStream() {}
......
......@@ -61,12 +61,15 @@ public:
/**
* Context for parse
*/
struct Context {
class Context {
public:
Context() { m_mutex= NULL; };
ParserStatus m_status;
const ParserRow<T> * m_currentCmd;
const ParserRow<T> * m_currentArg;
char * m_currentToken;
char m_tokenBuffer[512];
NdbMutex *m_mutex;
Vector<const ParserRow<T> *> m_aliasUsed;
};
......
......@@ -21,12 +21,17 @@
#include <NdbTCP.h>
#include <NdbMutex.h>
#ifdef __cplusplus
extern "C" {
#endif
int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
int readln_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
char * buf, int buflen, NdbMutex *mutex);
int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len);
int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...);
......
......@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){
SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned readTimeout)
: m_socket(socket) {
m_timeout = readTimeout;
: m_socket(socket) {
m_startover= true;
m_timeout = readTimeout;
}
char*
char*
SocketInputStream::gets(char * buf, int bufLen) {
buf[0] = 77;
assert(bufLen >= 2);
int res = readln_socket(m_socket, m_timeout, buf, bufLen - 1);
int offset= 0;
if(m_startover)
{
buf[0]= '\0';
m_startover= false;
}
else
offset= strlen(buf);
int res = readln_socket(m_socket, m_timeout, buf+offset, bufLen-offset, m_mutex);
if(res == 0)
{
buf[0]=0;
return buf;
}
m_startover= true;
if(res == -1)
return 0;
if(res == 0 && buf[0] == 77){ // select return 0
buf[0] = 0;
} else if(res == 0 && buf[0] == 0){ // only newline
buf[0] = '\n';
buf[1] = 0;
} else {
int len = strlen(buf);
buf[len + 1] = '\0';
buf[len] = '\n';
}
return buf;
}
......@@ -32,6 +32,7 @@ public:
char* gets(char * buf, int bufLen);
void push_back(const char *);
void set_mutex(NdbMutex *m) { in.set_mutex(m); };
private:
InputStream & in;
char * buffer;
......@@ -144,25 +145,32 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
{
DBUG_ENTER("ParserImpl::run");
input.set_mutex(ctx->m_mutex);
* pDst = 0;
bool ownStop = false;
if(stop == 0)
stop = &ownStop;
ctx->m_aliasUsed.clear();
const unsigned sz = sizeof(ctx->m_tokenBuffer);
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof;
DBUG_RETURN(false);
}
if(ctx->m_currentToken[0] == 0){
int last= strlen(ctx->m_currentToken);
if(last>0)
last--;
if(ctx->m_currentToken[last] !='\n'){
ctx->m_status = Parser<Dummy>::NoLine;
ctx->m_tokenBuffer[0]= '\0';
DBUG_RETURN(false);
}
if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine;
DBUG_RETURN(false);
......@@ -174,14 +182,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_status = Parser<Dummy>::UnknownCommand;
DBUG_RETURN(false);
}
Properties * p = new Properties();
bool invalidArgument = false;
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
while((! * stop) &&
!Eof(ctx->m_currentToken) &&
while((! * stop) &&
!Eof(ctx->m_currentToken) &&
!Empty(ctx->m_currentToken)){
if(ctx->m_currentToken[0] != 0){
trim(ctx->m_currentToken);
......@@ -193,7 +201,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
}
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
}
if(invalidArgument){
char buf[sz];
char * tmp;
......@@ -204,13 +212,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
}
DBUG_RETURN(false);
}
if(* stop){
delete p;
ctx->m_status = Parser<Dummy>::ExternalStop;
DBUG_RETURN(false);
}
if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p;
......@@ -226,9 +234,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
tmp.put("name", alias->name);
tmp.put("realName", alias->realName);
p->put("$ALIAS", i, &tmp);
}
}
p->put("$ALIAS", ctx->m_aliasUsed.size());
ctx->m_status = Parser<Dummy>::Ok;
* pDst = p;
DBUG_RETURN(true);
......
......@@ -49,7 +49,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C"
int
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
char * buf, int buflen){
char * buf, int buflen, NdbMutex *mutex){
if(buflen <= 1)
return 0;
......@@ -65,7 +65,12 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000;
if(mutex)
NdbMutex_Unlock(mutex);
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
if(mutex)
NdbMutex_Lock(mutex);
if(selectRes == 0){
return 0;
}
......@@ -75,7 +80,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
return -1;
}
buf[0] = 0;
const int t = recv(socket, buf, buflen, MSG_PEEK);
if(t < 1)
......@@ -87,27 +91,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
for(int i=0; i< t;i++)
{
if(buf[i] == '\n'){
recv(socket, buf, i+1, 0);
buf[i] = 0;
int r= recv(socket, buf, i+1, 0);
buf[i+1]= 0;
if(r < 1) {
fcntl(socket, F_SETFL, sock_flags);
return -1;
}
if(i > 0 && buf[i-1] == '\r'){
i--;
buf[i] = 0;
buf[i-1] = '\n';
buf[i]= '\0';
}
fcntl(socket, F_SETFL, sock_flags);
return t;
return r;
}
}
if(t == (buflen - 1)){
recv(socket, buf, t, 0);
buf[t] = 0;
fcntl(socket, F_SETFL, sock_flags);
return buflen;
}
return 0;
int r= recv(socket, buf, t, 0);
if(r>=0)
buf[r] = 0;
fcntl(socket, F_SETFL, sock_flags);
return r;
}
extern "C"
......
......@@ -82,6 +82,8 @@ int main(int argc, char** argv){
load_defaults("ndb_cpcd",load_default_groups,&argc,&argv);
if (handle_options(&argc, &argv, my_long_options, get_one_option)) {
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
exit(1);
......
......@@ -64,6 +64,8 @@ static const char* _bind_address = 0;
extern Uint32 g_start_type;
extern NdbNodeBitmask g_nowait_nodes;
const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
/**
* Arguments to NDB process
*/
......@@ -113,6 +115,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......@@ -120,7 +124,6 @@ static void usage()
bool
Configuration::init(int argc, char** argv)
{
const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
......
......@@ -570,6 +570,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
DBUG_RETURN(0);
}
/**
* Only used for low level testing
* Never to be used by end user.
* Or anybody who doesn't know exactly what they're doing.
*/
extern "C"
int
ndb_mgm_get_fd(NdbMgmHandle handle)
{
return handle->socket;
}
/**
* Disconnect from a mgm server
*/
......@@ -760,22 +772,16 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
if(strcmp("node status", buf) != 0) {
if(strcmp("node status\n", buf) != 0) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
if(!in.gets(buf, sizeof(buf)))
{
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
BaseString tmp(buf);
Vector<BaseString> split;
tmp.split(split, ":");
......@@ -783,7 +789,7 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
......@@ -2360,7 +2366,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){
SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
if (out.println("check connection"))
goto ndb_mgm_check_connection_error;
......@@ -2570,7 +2575,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle)
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
in.gets(buf, sizeof(buf));
DBUG_RETURN(0);
......@@ -2628,4 +2632,104 @@ int ndb_mgm_get_version(NdbMgmHandle handle,
DBUG_RETURN(1);
}
extern "C"
Uint64
ndb_mgm_get_session_id(NdbMgmHandle handle)
{
Uint64 session_id=0;
DBUG_ENTER("ndb_mgm_get_session_id");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get session id reply", NULL, ""),
MGM_ARG("id", Int, Mandatory, "Node ID"),
MGM_END()
};
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get session id", &args);
CHECK_REPLY(prop, 0);
if(!prop->get("id",&session_id)){
fprintf(handle->errstream, "Unable to get session id\n");
return 0;
}
delete prop;
DBUG_RETURN(session_id);
}
extern "C"
int
ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
struct NdbMgmSession *s, int *len)
{
int retval= 0;
DBUG_ENTER("ndb_mgm_get_session");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
args.put("id", id);
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get session reply", NULL, ""),
MGM_ARG("id", Int, Mandatory, "Node ID"),
MGM_ARG("m_stopSelf", Int, Optional, "m_stopSelf"),
MGM_ARG("m_stop", Int, Optional, "stop session"),
MGM_ARG("nodeid", Int, Optional, "allocated node id"),
MGM_ARG("parser_buffer_len", Int, Optional, "waiting in buffer"),
MGM_ARG("parser_status", Int, Optional, "parser status"),
MGM_END()
};
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get session", &args);
CHECK_REPLY(prop, 0);
Uint64 r_id;
int rlen= 0;
if(!prop->get("id",&r_id)){
fprintf(handle->errstream, "Unable to get session id\n");
goto err;
}
s->id= r_id;
rlen+=sizeof(s->id);
if(prop->get("m_stopSelf",&(s->m_stopSelf)))
rlen+=sizeof(s->m_stopSelf);
else
goto err;
if(prop->get("m_stop",&(s->m_stop)))
rlen+=sizeof(s->m_stop);
else
goto err;
if(prop->get("nodeid",&(s->nodeid)))
rlen+=sizeof(s->nodeid);
else
goto err;
if(prop->get("parser_buffer_len",&(s->parser_buffer_len)))
{
rlen+=sizeof(s->parser_buffer_len);
if(prop->get("parser_status",&(s->parser_status)))
rlen+=sizeof(s->parser_status);
}
*len= rlen;
retval= 1;
err:
delete prop;
DBUG_RETURN(retval);
}
template class Vector<const ParserRow<ParserDummy>*>;
......@@ -38,6 +38,7 @@ extern "C" int add_history(const char *command); /* From readline directory */
#include "ndb_mgmclient.hpp"
const char *progname = "ndb_mgm";
const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
static Ndb_mgmclient* com;
......@@ -87,6 +88,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......@@ -128,7 +131,6 @@ int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *_host = 0;
int _port = 0;
const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
......
......@@ -270,6 +270,13 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("length", Int, Mandatory, "Length"),
MGM_ARG("data", String, Mandatory, "Data"),
MGM_CMD("list sessions", &MgmApiSession::listSessions, ""),
MGM_CMD("get session id", &MgmApiSession::getSessionId, ""),
MGM_CMD("get session", &MgmApiSession::getSession, ""),
MGM_ARG("id", Int, Mandatory, "SessionID"),
MGM_END()
};
......@@ -282,7 +289,7 @@ struct PurgeStruct
NDB_TICKS tick;
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id)
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
......@@ -291,6 +298,9 @@ MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
m_stopSelf= 0;
m_ctx= NULL;
m_session_id= session_id;
m_mutex= NdbMutex_Create();
DBUG_VOID_RETURN;
}
......@@ -314,6 +324,7 @@ MgmApiSession::~MgmApiSession()
g_RestartServer= true;
if(m_stopSelf)
g_StopServer= true;
NdbMutex_Destroy(m_mutex);
DBUG_VOID_RETURN;
}
......@@ -323,11 +334,19 @@ MgmApiSession::runSession()
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx;
while(!m_stop) {
ctx.m_mutex= m_mutex;
m_ctx= &ctx;
bool stop= false;
while(!stop) {
NdbMutex_Lock(m_mutex);
m_parser->run(ctx, *this);
if(ctx.m_currentToken == 0)
{
NdbMutex_Unlock(m_mutex);
break;
}
switch(ctx.m_status) {
case Parser_t::UnknownCommand:
......@@ -348,13 +367,19 @@ MgmApiSession::runSession()
default:
break;
}
}
stop= m_stop;
NdbMutex_Unlock(m_mutex);
};
NdbMutex_Lock(m_mutex);
m_ctx= NULL;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
NdbMutex_Unlock(m_mutex);
DBUG_VOID_RETURN;
}
......@@ -1592,11 +1617,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
result = -1;
goto done;
}
m_mgmsrv.m_event_listner.add_listener(le);
m_stop = true;
m_socket = NDB_INVALID_SOCKET;
done:
m_output->println("listen event");
......@@ -1604,6 +1624,13 @@ done:
if(result != 0)
m_output->println("msg: %s", msg.c_str());
m_output->println("");
if(result==0)
{
m_mgmsrv.m_event_listner.add_listener(le);
m_stop = true;
m_socket = NDB_INVALID_SOCKET;
}
}
void
......@@ -1706,5 +1733,122 @@ MgmApiSession::report_event(Parser_t::Context &ctx,
m_output->println("");
}
void
MgmApiSession::list_session(SocketServer::Session *_s, void *data)
{
MgmApiSession *s= (MgmApiSession *)_s;
MgmApiSession *lister= (MgmApiSession*) data;
if(s!=lister)
NdbMutex_Lock(s->m_mutex);
Uint64 id= s->m_session_id;
lister->m_output->println("session: %llu",id);
lister->m_output->println("session.%llu.m_stopSelf: %d",id,s->m_stopSelf);
lister->m_output->println("session.%llu.m_stop: %d",id,s->m_stop);
lister->m_output->println("session.%llu.allocated.nodeid: %d",id,s->m_allocated_resources->get_nodeid());
if(s->m_ctx)
{
int l= strlen(s->m_ctx->m_tokenBuffer);
char *buf= (char*) malloc(2*l+1);
char *b= buf;
for(int i=0; i<l;i++)
if(s->m_ctx->m_tokenBuffer[i]=='\n')
{
*b++='\\';
*b++='n';
}
else
{
*b++= s->m_ctx->m_tokenBuffer[i];
}
*b= '\0';
lister->m_output->println("session.%llu.parser.buffer.len: %u",id,l);
lister->m_output->println("session.%llu.parser.buffer: %s",id,buf);
lister->m_output->println("session.%llu.parser.status: %d",id,s->m_ctx->m_status);
free(buf);
}
if(s!=lister)
NdbMutex_Unlock(s->m_mutex);
}
void
MgmApiSession::listSessions(Parser_t::Context &ctx,
Properties const &args) {
m_mgmsrv.get_socket_server()->foreachSession(list_session,(void*)this);
m_output->println("");
}
void
MgmApiSession::getSessionId(Parser_t::Context &ctx,
Properties const &args) {
m_output->println("get session id reply");
m_output->println("id: %llu",m_session_id);
m_output->println("");
}
struct get_session_param {
MgmApiSession *l;
Uint64 id;
int found;
};
void
MgmApiSession::get_session(SocketServer::Session *_s, void *data)
{
struct get_session_param *p= (struct get_session_param*)data;
MgmApiSession *s= (MgmApiSession *)_s;
if(s!=p->l)
NdbMutex_Lock(s->m_mutex);
if(p->id != s->m_session_id)
{
if(s!=p->l)
NdbMutex_Unlock(s->m_mutex);
return;
}
p->found= true;
p->l->m_output->println("id: %llu",s->m_session_id);
p->l->m_output->println("m_stopSelf: %d",s->m_stopSelf);
p->l->m_output->println("m_stop: %d",s->m_stop);
p->l->m_output->println("nodeid: %d",s->m_allocated_resources->get_nodeid());
if(s->m_ctx)
{
int l= strlen(s->m_ctx->m_tokenBuffer);
p->l->m_output->println("parser_buffer_len: %u",l);
p->l->m_output->println("parser_status: %d",s->m_ctx->m_status);
}
if(s!=p->l)
NdbMutex_Unlock(s->m_mutex);
}
void
MgmApiSession::getSession(Parser_t::Context &ctx,
Properties const &args) {
Uint64 id;
struct get_session_param p;
args.get("id", &id);
p.l= this;
p.id= id;
p.found= false;
m_output->println("get session reply");
m_mgmsrv.get_socket_server()->foreachSession(get_session,(void*)&p);
if(p.found==false)
m_output->println("id: 0");
m_output->println("");
}
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
......@@ -32,6 +32,8 @@ class MgmApiSession : public SocketServer::Session
{
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
static void list_session(SocketServer::Session *_s, void *data);
static void get_session(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;
......@@ -42,6 +44,11 @@ private:
MgmtSrvr::Allocated_resources *m_allocated_resources;
char m_err_str[1024];
int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop
NdbMutex *m_mutex;
// for listing sessions and other fun:
Parser_t::Context *m_ctx;
Uint64 m_session_id;
void getConfig_common(Parser_t::Context &ctx,
const class Properties &args,
......@@ -50,7 +57,7 @@ private:
{ return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); }
public:
MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock);
MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id);
virtual ~MgmApiSession();
void runSession();
......@@ -108,13 +115,20 @@ public:
void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args);
void report_event(Parser_t::Context &ctx, Properties const &args);
void listSessions(Parser_t::Context &ctx, Properties const &args);
void getSessionId(Parser_t::Context &ctx, Properties const &args);
void getSession(Parser_t::Context &ctx, Properties const &args);
};
class MgmApiService : public SocketServer::Service {
class MgmtSrvr * m_mgmsrv;
Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer
public:
MgmApiService(){
m_mgmsrv = 0;
m_next_session_id= 1;
}
void setMgm(class MgmtSrvr * mgmsrv){
......@@ -122,7 +136,7 @@ public:
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){
return new MgmApiSession(* m_mgmsrv, socket);
return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++);
}
};
......
......@@ -47,6 +47,7 @@
#define DEBUG(x) ndbout << x << endl;
const char progname[] = "mgmtsrvr";
const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
// copied from mysql.cc to get readline
extern "C" {
......@@ -183,6 +184,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......@@ -196,7 +199,6 @@ int main(int argc, char** argv)
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
......
......@@ -21,6 +21,8 @@
#include <NdbRestarter.hpp>
#include <Vector.hpp>
#include <random.h>
#include <mgmapi.h>
#include <mgmapi_debug.h>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
......@@ -167,6 +169,44 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){
return result;
}
int runTestApiSession(NDBT_Context* ctx, NDBT_Step* step)
{
char *mgm= ctx->getRemoteMgm();
Uint64 session_id= 0;
NdbMgmHandle h;
h= ndb_mgm_create_handle();
ndb_mgm_set_connectstring(h, mgm);
ndb_mgm_connect(h,0,0,0);
int s= ndb_mgm_get_fd(h);
session_id= ndb_mgm_get_session_id(h);
ndbout << "MGM Session id: " << session_id << endl;
write(s,"get",3);
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
struct NdbMgmSession sess;
int slen= sizeof(struct NdbMgmSession);
h= ndb_mgm_create_handle();
ndb_mgm_set_connectstring(h, mgm);
ndb_mgm_connect(h,0,0,0);
if(ndb_mgm_get_session(h,session_id,&sess,&slen))
{
ndbout << "Failed, session still exists" << endl;
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
return NDBT_FAILED;
}
else
{
ndbout << "SUCCESS: session is gone" << endl;
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
return NDBT_OK;
}
}
NDBT_TESTSUITE(testMgm);
......@@ -175,6 +215,11 @@ TESTCASE("SingleUserMode",
INITIALIZER(runTestSingleUserMode);
FINALIZER(runClearTable);
}
TESTCASE("ApiSessionFailure",
"Test failures in MGMAPI session"){
INITIALIZER(runTestApiSession);
}
NDBT_TESTSUITE_END(testMgm);
int main(int argc, const char** argv){
......
......@@ -27,6 +27,8 @@ static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
NDB_STD_OPTS_VARS;
const char *load_default_groups[]= { "mysql_cluster",0 };
static const char* _dbname = "TEST_DB";
static my_bool _transactional = false;
static my_bool _tupscan = 0;
......@@ -54,13 +56,14 @@ static void usage()
"tabname\n"\
"This program will delete all records in the specified table using scan delete.\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -32,6 +32,9 @@ NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
static int _unqualified = 0;
static int _partinfo = 0;
const char *load_default_groups[]= { "mysql_cluster",0 };
static int _retries = 0;
static struct my_option my_long_options[] =
{
......@@ -57,6 +60,8 @@ static void usage()
"This program list all properties of table(s) in NDB Cluster.\n"\
" ex: desc T1 T2 T4\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......@@ -65,7 +70,6 @@ static void print_part_info(Ndb* pNdb, NDBT_Table* pTab);
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -24,6 +24,9 @@
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
const char *load_default_groups[]= { "mysql_cluster",0 };
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
......@@ -38,13 +41,14 @@ static void usage()
"[<table> <index>]+\n"\
"This program will drop index(es) in Ndb\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -24,6 +24,9 @@
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
const char *load_default_groups[]= { "mysql_cluster",0 };
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
......@@ -37,14 +40,15 @@ static void usage()
char desc[] =
"tabname\n"\
"This program will drop one table in Ndb\n";
ndb_std_print_version();
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -34,6 +34,8 @@ static int _unqualified = 0;
static int _parsable = 0;
static int show_temp_status = 0;
const char *load_default_groups[]= { "mysql_cluster",0 };
static void
fatal(char const* fmt, ...)
{
......@@ -284,6 +286,8 @@ static void usage()
"To show all indexes for a table write table name as final argument\n"\
" ex: ndb_show_tables T1\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......@@ -291,7 +295,6 @@ static void usage()
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char* _tabname;
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -19,6 +19,8 @@
*/
#include <ndb_global.h>
#include <ndb_opts.h>
#include <my_sys.h>
#include <my_getopt.h>
#include <mysql_version.h>
......@@ -29,6 +31,7 @@
#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
#include <ConfigInfo.hpp>
#include <NdbAutoPtr.hpp>
static int g_verbose = 0;
static int try_reconnect = 3;
......@@ -45,34 +48,17 @@ static const char * g_row_delimiter=" ";
static const char * g_config_file = 0;
static int g_mycnf = 0;
int g_print_full_config, opt_ndb_shm;
my_bool opt_core;
const char *load_default_groups[]= { "mysql_cluster",0 };
typedef ndb_mgm_configuration_iterator Iter;
NDB_STD_OPTS_VARS;
static void ndb_std_print_version()
{
printf("MySQL distrib %s, for %s (%s)\n",
MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE);
}
int g_print_full_config;
typedef ndb_mgm_configuration_iterator Iter;
static struct my_option my_long_options[] =
{
{ "usage", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "help", '?', "Display this help and exit.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "version", 'V', "Output version information and exit.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "ndb-connectstring", 256,
"Set connect string for connecting to ndb_mgmd. "
"Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". "
"Overrides specifying entries in NDB_CONNECTSTRING and my.cnf",
(gptr*) &g_connectstring, (gptr*) &g_connectstring,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "ndb-shm", 256, "Print nodes",
(gptr*) &opt_ndb_shm, (gptr*) &opt_ndb_shm,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
NDB_STD_OPTS("ndb_config"),
{ "nodes", 256, "Print nodes",
(gptr*) &g_nodes, (gptr*) &g_nodes,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
......@@ -114,24 +100,11 @@ static void usage()
char desc[] =
"This program will retreive config options for a ndb cluster\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
static my_bool
ndb_std_get_one_option(int optid,
const struct my_option *opt __attribute__((unused)),
char *argument)
{
switch (optid) {
case 'V':
ndb_std_print_version();
exit(0);
case '?':
usage();
exit(0);
}
return 0;
}
/**
* Match/Apply framework
......@@ -176,7 +149,6 @@ static ndb_mgm_configuration* load_configuration();
int
main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options,
......@@ -408,28 +380,36 @@ HostMatch::eval(const Iter& iter)
if(iter.get(m_key, &valc) == 0)
{
struct hostent *h1, *h2;
struct hostent *h1, *h2, copy1;
char *addr1;
h1 = gethostbyname(m_value.c_str());
if (h1 == NULL) {
return 0;
}
// gethostbyname returns a pointer to a static structure
// so we need to copy the results before doing the next call
memcpy(&copy1, h1, sizeof(struct hostent));
addr1 = (char *)malloc(copy1.h_length);
NdbAutoPtr<char> tmp_aptr(addr1);
memcpy(addr1, h1->h_addr, copy1.h_length);
h2 = gethostbyname(valc);
if (h2 == NULL) {
return 0;
}
if (h1->h_addrtype != h2->h_addrtype) {
if (copy1.h_addrtype != h2->h_addrtype) {
return 0;
}
if (h1->h_length != h2->h_length)
if (copy1.h_length != h2->h_length)
{
return 0;
}
return 0 == memcmp(h1->h_addr, h2->h_addr, h1->h_length);
return 0 == memcmp(addr1, h2->h_addr, copy1.h_length);
}
return 0;
......
......@@ -60,6 +60,8 @@ static int _restore_meta = 0;
static int _no_restore_disk = 0;
BaseString g_options("ndb_restore");
const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_restore"),
......@@ -240,6 +242,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
......
......@@ -43,6 +43,8 @@ static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
_order, _descending;
const char *load_default_groups[]= { "mysql_cluster",0 };
static int _tup = 0;
static int _dumpDisk = 0;
static int use_rowid = 0;
......@@ -103,13 +105,14 @@ static void usage()
"It can also be used to dump the content of a table to file \n"\
" ex: select_all --no-header --delimiter=';' T4 > T4.data\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _tabname;
int ho_error;
......
......@@ -37,6 +37,9 @@ NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
static int _parallelism = 240;
static int _lock = 0;
const char *load_default_groups[]= { "mysql_cluster",0 };
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
......@@ -57,13 +60,14 @@ static void usage()
"tabname1 ... tabnameN\n"\
"This program will count the number of records in tables\n";
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
......
......@@ -38,6 +38,9 @@ NDB_STD_OPTS_VARS;
static int _no_contact = 0;
static int _not_started = 0;
static int _timeout = 120;
const char *load_default_groups[]= { "mysql_cluster",0 };
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
......@@ -56,13 +59,14 @@ static struct my_option my_long_options[] =
static void usage()
{
ndb_std_print_version();
print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _hostName = NULL;
int ho_error;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment