Commit d5072ac1 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0

into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0

parents 3357d89b 293539ca
...@@ -36,8 +36,16 @@ ...@@ -36,8 +36,16 @@
#define MAKE_VERSION(A,B,C) (((A) << 16) | ((B) << 8) | ((C) << 0)) #define MAKE_VERSION(A,B,C) (((A) << 16) | ((B) << 8) | ((C) << 0))
#define NDB_VERSION_D MAKE_VERSION(NDB_VERSION_MAJOR, NDB_VERSION_MINOR, NDB_VERSION_BUILD) #define NDB_VERSION_D MAKE_VERSION(NDB_VERSION_MAJOR, NDB_VERSION_MINOR, NDB_VERSION_BUILD)
#define NDB_VERSION_STRING_BUF_SZ 100
#define NDB_VERSION_STRING (getVersionString(NDB_VERSION, NDB_VERSION_STATUS)) #ifdef __cplusplus
extern "C"
#else
extern
#endif
char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
#define NDB_VERSION_STRING (getVersionString(NDB_VERSION, NDB_VERSION_STATUS, \
ndb_version_string_buf, \
sizeof(ndb_version_string_buf)))
#define NDB_VERSION ndbGetOwnVersion() #define NDB_VERSION ndbGetOwnVersion()
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#define NDB_NONBLOCK FNDELAY #define NDB_NONBLOCK FNDELAY
#define NDB_SOCKET_TYPE int #define NDB_SOCKET_TYPE int
#define NDB_INVALID_SOCKET -1 #define NDB_INVALID_SOCKET -1
#define NDB_CLOSE_SOCKET(x) close(x) #define _NDB_CLOSE_SOCKET(x) close(x)
/** /**
* socklen_t not defined in the header files of OSE * socklen_t not defined in the header files of OSE
...@@ -52,7 +52,7 @@ typedef int socklen_t; ...@@ -52,7 +52,7 @@ typedef int socklen_t;
#define EWOULDBLOCK WSAEWOULDBLOCK #define EWOULDBLOCK WSAEWOULDBLOCK
#define NDB_SOCKET_TYPE SOCKET #define NDB_SOCKET_TYPE SOCKET
#define NDB_INVALID_SOCKET INVALID_SOCKET #define NDB_INVALID_SOCKET INVALID_SOCKET
#define NDB_CLOSE_SOCKET(x) closesocket(x) #define _NDB_CLOSE_SOCKET(x) closesocket(x)
#else #else
...@@ -64,7 +64,7 @@ typedef int socklen_t; ...@@ -64,7 +64,7 @@ typedef int socklen_t;
#define NDB_NONBLOCK O_NONBLOCK #define NDB_NONBLOCK O_NONBLOCK
#define NDB_SOCKET_TYPE int #define NDB_SOCKET_TYPE int
#define NDB_INVALID_SOCKET -1 #define NDB_INVALID_SOCKET -1
#define NDB_CLOSE_SOCKET(x) ::close(x) #define _NDB_CLOSE_SOCKET(x) ::close(x)
#define InetErrno errno #define InetErrno errno
...@@ -89,6 +89,12 @@ extern "C" { ...@@ -89,6 +89,12 @@ extern "C" {
*/ */
int Ndb_getInAddr(struct in_addr * dst, const char *address); int Ndb_getInAddr(struct in_addr * dst, const char *address);
#ifdef DBUG_OFF
#define NDB_CLOSE_SOCKET(fd) _NDB_CLOSE_SOCKET(fd)
#else
int NDB_CLOSE_SOCKET(int fd);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -41,7 +41,13 @@ public: ...@@ -41,7 +41,13 @@ public:
protected: protected:
friend class SocketServer; friend class SocketServer;
friend void* sessionThread_C(void*); friend void* sessionThread_C(void*);
Session(NDB_SOCKET_TYPE sock): m_socket(sock){ m_stop = m_stopped = false;} Session(NDB_SOCKET_TYPE sock): m_socket(sock)
{
DBUG_ENTER("SocketServer::Session");
DBUG_PRINT("enter",("NDB_SOCKET: %d", m_socket));
m_stop = m_stopped = false;
DBUG_VOID_RETURN;
}
bool m_stop; // Has the session been ordered to stop? bool m_stop; // Has the session been ordered to stop?
bool m_stopped; // Has the session stopped? bool m_stopped; // Has the session stopped?
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
my_bool opt_ndb_optimized_node_selection my_bool opt_ndb_optimized_node_selection
int opt_ndb_nodeid; int opt_ndb_nodeid;
bool opt_endinfo= 0;
my_bool opt_ndb_shm; my_bool opt_ndb_shm;
const char *opt_ndb_connectstring= 0; const char *opt_ndb_connectstring= 0;
const char *opt_connect_str= 0; const char *opt_connect_str= 0;
...@@ -119,6 +120,7 @@ ndb_std_get_one_option(int optid, ...@@ -119,6 +120,7 @@ ndb_std_get_one_option(int optid,
{ {
DBUG_PUSH("d:t"); DBUG_PUSH("d:t");
} }
opt_endinfo= 1;
break; break;
#endif #endif
case 'V': case 'V':
......
...@@ -30,7 +30,8 @@ extern "C" { ...@@ -30,7 +30,8 @@ extern "C" {
Uint32 makeVersion(Uint32 major, Uint32 minor, Uint32 build); Uint32 makeVersion(Uint32 major, Uint32 minor, Uint32 build);
const char* getVersionString(Uint32 version, const char * status); const char* getVersionString(Uint32 version, const char * status,
char *buf, unsigned sz);
void ndbPrintVersion(); void ndbPrintVersion();
Uint32 ndbGetOwnVersion(); Uint32 ndbGetOwnVersion();
......
...@@ -31,6 +31,7 @@ LogHandler::LogHandler() : ...@@ -31,6 +31,7 @@ LogHandler::LogHandler() :
m_last_message[0]= 0; m_last_message[0]= 0;
m_last_log_time= 0; m_last_log_time= 0;
m_now= 0; m_now= 0;
m_last_level= (Logger::LoggerLevel)-1;
} }
LogHandler::~LogHandler() LogHandler::~LogHandler()
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
ConfigRetriever::ConfigRetriever(const char * _connect_string, ConfigRetriever::ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 node_type) Uint32 version, Uint32 node_type)
{ {
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
m_version = version; m_version = version;
m_node_type = node_type; m_node_type = node_type;
_ownNodeId= 0; _ownNodeId= 0;
...@@ -55,23 +57,26 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string, ...@@ -55,23 +57,26 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string,
if (m_handle == 0) { if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle"); setError(CR_ERROR, "Unable to allocate mgm handle");
return; DBUG_VOID_RETURN;
} }
if (ndb_mgm_set_connectstring(m_handle, _connect_string)) if (ndb_mgm_set_connectstring(m_handle, _connect_string))
{ {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return; DBUG_VOID_RETURN;
} }
resetError(); resetError();
DBUG_VOID_RETURN;
} }
ConfigRetriever::~ConfigRetriever() ConfigRetriever::~ConfigRetriever()
{ {
DBUG_ENTER("ConfigRetriever::~ConfigRetriever");
if (m_handle) { if (m_handle) {
ndb_mgm_disconnect(m_handle); ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle); ndb_mgm_destroy_handle(&m_handle);
} }
DBUG_VOID_RETURN;
} }
Uint32 Uint32
......
...@@ -23,33 +23,37 @@ ...@@ -23,33 +23,37 @@
NdbMutex* NdbMutex_Create(void) NdbMutex* NdbMutex_Create(void)
{ {
DBUG_ENTER("NdbMutex_Create");
NdbMutex* pNdbMutex; NdbMutex* pNdbMutex;
int result; int result;
pNdbMutex = (NdbMutex*)NdbMem_Allocate(sizeof(NdbMutex)); pNdbMutex = (NdbMutex*)NdbMem_Allocate(sizeof(NdbMutex));
DBUG_PRINT("info",("NdbMem_Allocate 0x%lx",pNdbMutex));
if (pNdbMutex == NULL) if (pNdbMutex == NULL)
return NULL; DBUG_RETURN(NULL);
result = pthread_mutex_init(pNdbMutex, NULL); result = pthread_mutex_init(pNdbMutex, NULL);
assert(result == 0); assert(result == 0);
return pNdbMutex; DBUG_RETURN(pNdbMutex);
} }
int NdbMutex_Destroy(NdbMutex* p_mutex) int NdbMutex_Destroy(NdbMutex* p_mutex)
{ {
DBUG_ENTER("NdbMutex_Destroy");
int result; int result;
if (p_mutex == NULL) if (p_mutex == NULL)
return -1; DBUG_RETURN(-1);
result = pthread_mutex_destroy(p_mutex); result = pthread_mutex_destroy(p_mutex);
free(p_mutex);
DBUG_PRINT("info",("NdbMem_Free 0x%lx",p_mutex));
NdbMem_Free(p_mutex);
return result; DBUG_RETURN(result);
} }
......
...@@ -54,6 +54,15 @@ Ndb_getInAddr(struct in_addr * dst, const char *address) { ...@@ -54,6 +54,15 @@ Ndb_getInAddr(struct in_addr * dst, const char *address) {
return -1; //DBUG_RETURN(-1); return -1; //DBUG_RETURN(-1);
} }
#ifndef DBUG_OFF
extern "C"
int NDB_CLOSE_SOCKET(int fd)
{
DBUG_PRINT("info", ("NDB_CLOSE_SOCKET(%d)", fd));
return _NDB_CLOSE_SOCKET(fd);
}
#endif
#if 0 #if 0
int int
Ndb_getInAddr(struct in_addr * dst, const char *address) { Ndb_getInAddr(struct in_addr * dst, const char *address) {
......
...@@ -56,6 +56,7 @@ ndb_thread_wrapper(void* _ss){ ...@@ -56,6 +56,7 @@ ndb_thread_wrapper(void* _ss){
void *ret; void *ret;
struct NdbThread * ss = (struct NdbThread *)_ss; struct NdbThread * ss = (struct NdbThread *)_ss;
ret= (* ss->func)(ss->object); ret= (* ss->func)(ss->object);
DBUG_POP();
NdbThread_Exit(ret); NdbThread_Exit(ret);
} }
/* will never be reached */ /* will never be reached */
...@@ -70,6 +71,7 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func, ...@@ -70,6 +71,7 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func,
const char* p_thread_name, const char* p_thread_name,
NDB_THREAD_PRIO thread_prio) NDB_THREAD_PRIO thread_prio)
{ {
DBUG_ENTER("NdbThread_Create");
struct NdbThread* tmpThread; struct NdbThread* tmpThread;
int result; int result;
pthread_attr_t thread_attr; pthread_attr_t thread_attr;
...@@ -77,11 +79,13 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func, ...@@ -77,11 +79,13 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func,
(void)thread_prio; /* remove warning for unused parameter */ (void)thread_prio; /* remove warning for unused parameter */
if (p_thread_func == NULL) if (p_thread_func == NULL)
return 0; DBUG_RETURN(NULL);
tmpThread = (struct NdbThread*)NdbMem_Allocate(sizeof(struct NdbThread)); tmpThread = (struct NdbThread*)NdbMem_Allocate(sizeof(struct NdbThread));
if (tmpThread == NULL) if (tmpThread == NULL)
return NULL; DBUG_RETURN(NULL);
DBUG_PRINT("info",("thread_name: %s", p_thread_name));
strnmov(tmpThread->thread_name,p_thread_name,sizeof(tmpThread->thread_name)); strnmov(tmpThread->thread_name,p_thread_name,sizeof(tmpThread->thread_name));
...@@ -108,16 +112,20 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func, ...@@ -108,16 +112,20 @@ struct NdbThread* NdbThread_Create(NDB_THREAD_FUNC *p_thread_func,
assert(result==0); assert(result==0);
pthread_attr_destroy(&thread_attr); pthread_attr_destroy(&thread_attr);
return tmpThread; DBUG_PRINT("exit",("ret: %lx", tmpThread));
DBUG_RETURN(tmpThread);
} }
void NdbThread_Destroy(struct NdbThread** p_thread) void NdbThread_Destroy(struct NdbThread** p_thread)
{ {
if (*p_thread != NULL){ DBUG_ENTER("NdbThread_Destroy");
if (*p_thread != NULL){
DBUG_PRINT("enter",("*p_thread: %lx", * p_thread));
free(* p_thread); free(* p_thread);
* p_thread = 0; * p_thread = 0;
} }
DBUG_VOID_RETURN;
} }
......
...@@ -74,7 +74,9 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) ...@@ -74,7 +74,9 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
TransporterRegistry::TransporterRegistry(void * callback, TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters, unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) { unsigned sizeOfLongSignalMemory)
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
nodeIdSpecified = false; nodeIdSpecified = false;
maxTransporters = _maxTransporters; maxTransporters = _maxTransporters;
...@@ -112,6 +114,8 @@ TransporterRegistry::TransporterRegistry(void * callback, ...@@ -112,6 +114,8 @@ TransporterRegistry::TransporterRegistry(void * callback,
theOSEReceiver = 0; theOSEReceiver = 0;
theOSEJunkSocketSend = 0; theOSEJunkSocketSend = 0;
theOSEJunkSocketRecv = 0; theOSEJunkSocketRecv = 0;
DBUG_VOID_RETURN;
} }
void TransporterRegistry::set_mgm_handle(NdbMgmHandle h) void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
...@@ -135,7 +139,9 @@ void TransporterRegistry::set_mgm_handle(NdbMgmHandle h) ...@@ -135,7 +139,9 @@ void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
TransporterRegistry::~TransporterRegistry() { TransporterRegistry::~TransporterRegistry()
{
DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
removeAll(); removeAll();
...@@ -157,6 +163,8 @@ TransporterRegistry::~TransporterRegistry() { ...@@ -157,6 +163,8 @@ TransporterRegistry::~TransporterRegistry() {
#endif #endif
if (m_mgm_handle) if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle); ndb_mgm_destroy_handle(&m_mgm_handle);
DBUG_VOID_RETURN;
} }
void void
......
...@@ -141,7 +141,10 @@ split(char * buf, char ** name, char ** value){ ...@@ -141,7 +141,10 @@ split(char * buf, char ** name, char ** value){
bool bool
ParserImpl::run(Context * ctx, const class Properties ** pDst, ParserImpl::run(Context * ctx, const class Properties ** pDst,
volatile bool * stop) const { volatile bool * stop) const
{
DBUG_ENTER("ParserImpl::run");
* pDst = 0; * pDst = 0;
bool ownStop = false; bool ownStop = false;
if(stop == 0) if(stop == 0)
...@@ -153,24 +156,24 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -153,24 +156,24 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){ if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof; ctx->m_status = Parser<Dummy>::Eof;
return false; DBUG_RETURN(false);
} }
if(ctx->m_currentToken[0] == 0){ if(ctx->m_currentToken[0] == 0){
ctx->m_status = Parser<Dummy>::NoLine; ctx->m_status = Parser<Dummy>::NoLine;
return false; DBUG_RETURN(false);
} }
if(Empty(ctx->m_currentToken)){ if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine; ctx->m_status = Parser<Dummy>::EmptyLine;
return false; DBUG_RETURN(false);
} }
trim(ctx->m_currentToken); trim(ctx->m_currentToken);
ctx->m_currentCmd = matchCommand(ctx, ctx->m_currentToken, m_rows); ctx->m_currentCmd = matchCommand(ctx, ctx->m_currentToken, m_rows);
if(ctx->m_currentCmd == 0){ if(ctx->m_currentCmd == 0){
ctx->m_status = Parser<Dummy>::UnknownCommand; ctx->m_status = Parser<Dummy>::UnknownCommand;
return false; DBUG_RETURN(false);
} }
Properties * p = new Properties(); Properties * p = new Properties();
...@@ -200,19 +203,19 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -200,19 +203,19 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
tmp = input.gets(buf, sz); tmp = input.gets(buf, sz);
} while((! * stop) && !Eof(tmp) && !Empty(tmp)); } while((! * stop) && !Eof(tmp) && !Empty(tmp));
} }
return false; DBUG_RETURN(false);
} }
if(* stop){ if(* stop){
delete p; delete p;
ctx->m_status = Parser<Dummy>::ExternalStop; ctx->m_status = Parser<Dummy>::ExternalStop;
return false; DBUG_RETURN(false);
} }
if(!checkMandatory(ctx, p)){ if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument; ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p; delete p;
return false; DBUG_RETURN(false);
} }
/** /**
...@@ -229,7 +232,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -229,7 +232,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_status = Parser<Dummy>::Ok; ctx->m_status = Parser<Dummy>::Ok;
* pDst = p; * pDst = p;
return true; DBUG_RETURN(true);
} }
const ParserImpl::DummyRow* const ParserImpl::DummyRow*
......
...@@ -57,6 +57,8 @@ SocketClient::init() ...@@ -57,6 +57,8 @@ SocketClient::init()
return false; return false;
} }
DBUG_PRINT("info",("NDB_SOCKET: %d", m_sockfd));
return true; return true;
} }
......
...@@ -64,6 +64,8 @@ SocketServer::tryBind(unsigned short port, const char * intface) { ...@@ -64,6 +64,8 @@ SocketServer::tryBind(unsigned short port, const char * intface) {
return false; return false;
} }
DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
const int on = 1; const int on = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) == -1) { (const char*)&on, sizeof(on)) == -1) {
...@@ -104,6 +106,8 @@ SocketServer::setup(SocketServer::Service * service, ...@@ -104,6 +106,8 @@ SocketServer::setup(SocketServer::Service * service,
DBUG_RETURN(false); DBUG_RETURN(false);
} }
DBUG_PRINT("info",("NDB_SOCKET: %d", sock));
const int on = 1; const int on = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) == -1) { (const char*)&on, sizeof(on)) == -1) {
......
...@@ -38,22 +38,24 @@ Uint32 makeVersion(Uint32 major, Uint32 minor, Uint32 build) { ...@@ -38,22 +38,24 @@ Uint32 makeVersion(Uint32 major, Uint32 minor, Uint32 build) {
} }
const char * getVersionString(Uint32 version, const char * status) { char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
char buff[100]; const char * getVersionString(Uint32 version, const char * status,
char *buf, unsigned sz)
{
if (status && status[0] != 0) if (status && status[0] != 0)
basestring_snprintf(buff, sizeof(buff), basestring_snprintf(buf, sz,
"Version %d.%d.%d (%s)", "Version %d.%d.%d (%s)",
getMajor(version), getMajor(version),
getMinor(version), getMinor(version),
getBuild(version), getBuild(version),
status); status);
else else
basestring_snprintf(buff, sizeof(buff), basestring_snprintf(buf, sz,
"Version %d.%d.%d", "Version %d.%d.%d",
getMajor(version), getMajor(version),
getMinor(version), getMinor(version),
getBuild(version)); getBuild(version));
return strdup(buff); return buf;
} }
typedef enum { typedef enum {
......
...@@ -257,8 +257,8 @@ private: ...@@ -257,8 +257,8 @@ private:
void hbReceivedLab(Signal* signal); void hbReceivedLab(Signal* signal);
void sendCmRegrefLab(Signal* signal, BlockReference ref, void sendCmRegrefLab(Signal* signal, BlockReference ref,
CmRegRef::ErrorCode); CmRegRef::ErrorCode);
void systemErrorBecauseOtherNodeFailed(Signal* signal, NodeId); void systemErrorBecauseOtherNodeFailed(Signal* signal, Uint32 line, NodeId);
void systemErrorLab(Signal* signal, void systemErrorLab(Signal* signal, Uint32 line,
const char* message = NULL); const char* message = NULL);
void prepFailReqLab(Signal* signal); void prepFailReqLab(Signal* signal);
void prepFailConfLab(Signal* signal); void prepFailConfLab(Signal* signal);
......
...@@ -76,7 +76,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* signal) ...@@ -76,7 +76,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* signal)
void Qmgr::execCM_NODEINFOREF(Signal* signal) void Qmgr::execCM_NODEINFOREF(Signal* signal)
{ {
jamEntry(); jamEntry();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
return; return;
}//Qmgr::execCM_NODEINFOREF() }//Qmgr::execCM_NODEINFOREF()
...@@ -121,7 +121,7 @@ void Qmgr::execCONTINUEB(Signal* signal) ...@@ -121,7 +121,7 @@ void Qmgr::execCONTINUEB(Signal* signal)
default: default:
jam(); jam();
// ZCOULD_NOT_OCCUR_ERROR; // ZCOULD_NOT_OCCUR_ERROR;
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
return; return;
break; break;
}//switch }//switch
...@@ -593,7 +593,7 @@ void Qmgr::execCM_REGCONF(Signal* signal) ...@@ -593,7 +593,7 @@ void Qmgr::execCM_REGCONF(Signal* signal)
jam(); jam();
char buf[128]; char buf[128];
BaseString::snprintf(buf,sizeof(buf),"incompatible version own=0x%x other=0x%x, shutting down", NDB_VERSION, cmRegConf->presidentVersion); BaseString::snprintf(buf,sizeof(buf),"incompatible version own=0x%x other=0x%x, shutting down", NDB_VERSION, cmRegConf->presidentVersion);
systemErrorLab(signal, buf); systemErrorLab(signal, __LINE__, buf);
return; return;
} }
...@@ -688,7 +688,7 @@ void Qmgr::execCM_REGREF(Signal* signal) ...@@ -688,7 +688,7 @@ void Qmgr::execCM_REGREF(Signal* signal)
switch (TrefuseReason) { switch (TrefuseReason) {
case CmRegRef::ZINCOMPATIBLE_VERSION: case CmRegRef::ZINCOMPATIBLE_VERSION:
jam(); jam();
systemErrorLab(signal, "incompatible version, connection refused by running ndb node"); systemErrorLab(signal, __LINE__, "incompatible version, connection refused by running ndb node");
break; break;
case CmRegRef::ZBUSY: case CmRegRef::ZBUSY:
case CmRegRef::ZBUSY_TO_PRES: case CmRegRef::ZBUSY_TO_PRES:
...@@ -1751,7 +1751,7 @@ void Qmgr::execAPI_FAILCONF(Signal* signal) ...@@ -1751,7 +1751,7 @@ void Qmgr::execAPI_FAILCONF(Signal* signal)
if (failedNodePtr.p->rcv[0] == failedNodePtr.p->rcv[1]) { if (failedNodePtr.p->rcv[0] == failedNodePtr.p->rcv[1]) {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
} else { } else {
jam(); jam();
failedNodePtr.p->rcv[0] = 0; failedNodePtr.p->rcv[0] = 0;
...@@ -1763,7 +1763,7 @@ void Qmgr::execAPI_FAILCONF(Signal* signal) ...@@ -1763,7 +1763,7 @@ void Qmgr::execAPI_FAILCONF(Signal* signal)
ndbout << "failedNodePtr.p->failState = " ndbout << "failedNodePtr.p->failState = "
<< (Uint32)(failedNodePtr.p->failState) << endl; << (Uint32)(failedNodePtr.p->failState) << endl;
#endif #endif
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
}//if }//if
return; return;
}//Qmgr::execAPI_FAILCONF() }//Qmgr::execAPI_FAILCONF()
...@@ -1780,7 +1780,7 @@ void Qmgr::execNDB_FAILCONF(Signal* signal) ...@@ -1780,7 +1780,7 @@ void Qmgr::execNDB_FAILCONF(Signal* signal)
failedNodePtr.p->failState = NORMAL; failedNodePtr.p->failState = NORMAL;
} else { } else {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
}//if }//if
if (cpresident == getOwnNodeId()) { if (cpresident == getOwnNodeId()) {
jam(); jam();
...@@ -1931,20 +1931,13 @@ void Qmgr::execAPI_REGREQ(Signal* signal) ...@@ -1931,20 +1931,13 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
#endif #endif
bool compatability_check; bool compatability_check;
switch(getNodeInfo(apiNodePtr.i).getType()){ NodeInfo::NodeType type= getNodeInfo(apiNodePtr.i).getType();
switch(type){
case NodeInfo::API: case NodeInfo::API:
compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version); compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version);
if (!compatability_check)
infoEvent("Connection attempt from api or mysqld id=%d with %s "
"incompatible with %s", apiNodePtr.i,
getVersionString(version,""), NDB_VERSION_STRING);
break; break;
case NodeInfo::MGM: case NodeInfo::MGM:
compatability_check = ndbCompatible_ndb_mgmt(NDB_VERSION, version); compatability_check = ndbCompatible_ndb_mgmt(NDB_VERSION, version);
if (!compatability_check)
infoEvent("Connection attempt from management server id=%d with %s "
"incompatible with %s", apiNodePtr.i,
getVersionString(version,""), NDB_VERSION_STRING);
break; break;
case NodeInfo::REP: case NodeInfo::REP:
// compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version); // compatability_check = ndbCompatible_ndb_api(NDB_VERSION, version);
...@@ -1953,13 +1946,19 @@ void Qmgr::execAPI_REGREQ(Signal* signal) ...@@ -1953,13 +1946,19 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
case NodeInfo::INVALID: case NodeInfo::INVALID:
default: default:
sendApiRegRef(signal, ref, ApiRegRef::WrongType); sendApiRegRef(signal, ref, ApiRegRef::WrongType);
infoEvent("Invalid connection attempt with type %d", infoEvent("Invalid connection attempt with type %d", type);
getNodeInfo(apiNodePtr.i).getType());
return; return;
} }
if (!compatability_check) { if (!compatability_check) {
jam(); jam();
char buf[NDB_VERSION_STRING_BUF_SZ];
infoEvent("Connection attempt from %s id=%d with %s "
"incompatible with %s",
type == NodeInfo::API ? "api or mysqld" : "management server",
apiNodePtr.i,
getVersionString(version,"",buf,sizeof(buf)),
NDB_VERSION_STRING);
apiNodePtr.p->phase = ZAPI_INACTIVE; apiNodePtr.p->phase = ZAPI_INACTIVE;
sendApiRegRef(signal, ref, ApiRegRef::UnsupportedVersion); sendApiRegRef(signal, ref, ApiRegRef::UnsupportedVersion);
return; return;
...@@ -2085,7 +2084,7 @@ void Qmgr::failReportLab(Signal* signal, Uint16 aFailedNode, ...@@ -2085,7 +2084,7 @@ void Qmgr::failReportLab(Signal* signal, Uint16 aFailedNode,
ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec); ptrCheckGuard(failedNodePtr, MAX_NODES, nodeRec);
if (failedNodePtr.i == getOwnNodeId()) { if (failedNodePtr.i == getOwnNodeId()) {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
return; return;
}//if }//if
...@@ -2093,7 +2092,7 @@ void Qmgr::failReportLab(Signal* signal, Uint16 aFailedNode, ...@@ -2093,7 +2092,7 @@ void Qmgr::failReportLab(Signal* signal, Uint16 aFailedNode,
ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec);
if (myNodePtr.p->phase != ZRUNNING) { if (myNodePtr.p->phase != ZRUNNING) {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
return; return;
}//if }//if
TnoFailedNodes = cnoFailedNodes; TnoFailedNodes = cnoFailedNodes;
...@@ -2172,7 +2171,7 @@ void Qmgr::execPREP_FAILREQ(Signal* signal) ...@@ -2172,7 +2171,7 @@ void Qmgr::execPREP_FAILREQ(Signal* signal)
ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec); ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec);
if (myNodePtr.p->phase != ZRUNNING) { if (myNodePtr.p->phase != ZRUNNING) {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
return; return;
}//if }//if
...@@ -2675,7 +2674,7 @@ void Qmgr::execREAD_NODESREQ(Signal* signal) ...@@ -2675,7 +2674,7 @@ void Qmgr::execREAD_NODESREQ(Signal* signal)
ReadNodesConf::SignalLength, JBB); ReadNodesConf::SignalLength, JBB);
}//Qmgr::execREAD_NODESREQ() }//Qmgr::execREAD_NODESREQ()
void Qmgr::systemErrorBecauseOtherNodeFailed(Signal* signal, void Qmgr::systemErrorBecauseOtherNodeFailed(Signal* signal, Uint32 line,
NodeId failedNodeId) { NodeId failedNodeId) {
jam(); jam();
...@@ -2687,11 +2686,11 @@ void Qmgr::systemErrorBecauseOtherNodeFailed(Signal* signal, ...@@ -2687,11 +2686,11 @@ void Qmgr::systemErrorBecauseOtherNodeFailed(Signal* signal,
"Node was shutdown during startup because node %d failed", "Node was shutdown during startup because node %d failed",
failedNodeId); failedNodeId);
progError(__LINE__, ERR_SR_OTHERNODEFAILED, buf); progError(line, ERR_SR_OTHERNODEFAILED, buf);
} }
void Qmgr::systemErrorLab(Signal* signal, const char * message) void Qmgr::systemErrorLab(Signal* signal, Uint32 line, const char * message)
{ {
jam(); jam();
// Broadcast that this node is failing to other nodes // Broadcast that this node is failing to other nodes
...@@ -2699,7 +2698,7 @@ void Qmgr::systemErrorLab(Signal* signal, const char * message) ...@@ -2699,7 +2698,7 @@ void Qmgr::systemErrorLab(Signal* signal, const char * message)
// If it's known why shutdown occured // If it's known why shutdown occured
// an error message has been passed to this function // an error message has been passed to this function
progError(__LINE__, 0, message); progError(line, 0, message);
return; return;
}//Qmgr::systemErrorLab() }//Qmgr::systemErrorLab()
...@@ -2867,7 +2866,7 @@ Uint16 Qmgr::translateDynamicIdToNodeId(Signal* signal, UintR TdynamicId) ...@@ -2867,7 +2866,7 @@ Uint16 Qmgr::translateDynamicIdToNodeId(Signal* signal, UintR TdynamicId)
}//for }//for
if (TtdiNodeId == ZNIL) { if (TtdiNodeId == ZNIL) {
jam(); jam();
systemErrorLab(signal); systemErrorLab(signal, __LINE__);
}//if }//if
return TtdiNodeId; return TtdiNodeId;
}//Qmgr::translateDynamicIdToNodeId() }//Qmgr::translateDynamicIdToNodeId()
......
...@@ -265,6 +265,9 @@ static const Properties * ...@@ -265,6 +265,9 @@ static const Properties *
ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
const char *cmd, const Properties *cmd_args) const char *cmd, const Properties *cmd_args)
{ {
DBUG_ENTER("ndb_mgm_call");
DBUG_PRINT("enter",("handle->socket: %d, cmd: %s",
handle->socket, cmd));
SocketOutputStream out(handle->socket); SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout); SocketInputStream in(handle->socket, handle->read_timeout);
...@@ -331,7 +334,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ...@@ -331,7 +334,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
<< "' status=" << (Uint32)ctx.m_status << "' status=" << (Uint32)ctx.m_status
<< ", curr=" << ctx.m_currentToken << ", curr=" << ctx.m_currentToken
<< endl; << endl;
DBUG_PRINT("info",("parser.parse returned NULL")); DBUG_PRINT("info",("ctx.status: %d, ctx.m_currentToken: %s",
ctx.m_status, ctx.m_currentToken));
} }
#ifdef MGMAPI_LOG #ifdef MGMAPI_LOG
else { else {
...@@ -341,7 +345,7 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, ...@@ -341,7 +345,7 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
p->print(handle->logfile, "IN: "); p->print(handle->logfile, "IN: ");
} }
#endif #endif
return p; DBUG_RETURN(p);
} }
/** /**
......
...@@ -369,6 +369,9 @@ int ndb_logevent_get_next(const NdbLogEventHandle h, ...@@ -369,6 +369,9 @@ int ndb_logevent_get_next(const NdbLogEventHandle h,
Properties p; Properties p;
char buf[256]; char buf[256];
struct timeval start_time;
gettimeofday(&start_time, 0);
/* header */ /* header */
while (1) { while (1) {
if (in.gets(buf,sizeof(buf)) == 0) if (in.gets(buf,sizeof(buf)) == 0)
...@@ -383,7 +386,23 @@ int ndb_logevent_get_next(const NdbLogEventHandle h, ...@@ -383,7 +386,23 @@ int ndb_logevent_get_next(const NdbLogEventHandle h,
} }
if ( strcmp("log event reply\n", buf) == 0 ) if ( strcmp("log event reply\n", buf) == 0 )
break; break;
ndbout_c("skipped: %s", buf);
if ( strcmp("<PING>\n", buf) )
ndbout_c("skipped: %s", buf);
struct timeval now;
gettimeofday(&now, 0);
unsigned elapsed_ms=
(now.tv_sec-start_time.tv_sec)*1000 +
(now.tv_usec-start_time.tv_usec)/1000;
if (elapsed_ms >= timeout_in_milliseconds)
{
// timed out
return 0;
}
new (&in) SocketInputStream(h->socket, timeout_in_milliseconds-elapsed_ms);
} }
/* read name-value pairs into properties object */ /* read name-value pairs into properties object */
......
...@@ -455,11 +455,13 @@ static int do_event_thread; ...@@ -455,11 +455,13 @@ static int do_event_thread;
static void* static void*
event_thread_run(void* m) event_thread_run(void* m)
{ {
DBUG_ENTER("event_thread_run");
NdbMgmHandle handle= *(NdbMgmHandle*)m; NdbMgmHandle handle= *(NdbMgmHandle*)m;
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 }; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
int fd = ndb_mgm_listen_event(handle, filter); int fd = ndb_mgm_listen_event(handle, filter);
if (fd > 0) if (fd != NDB_INVALID_SOCKET)
{ {
do_event_thread= 1; do_event_thread= 1;
char *tmp= 0; char *tmp= 0;
...@@ -468,15 +470,20 @@ event_thread_run(void* m) ...@@ -468,15 +470,20 @@ event_thread_run(void* m)
do { do {
if (tmp == 0) NdbSleep_MilliSleep(10); if (tmp == 0) NdbSleep_MilliSleep(10);
if((tmp = in.gets(buf, 1024))) if((tmp = in.gets(buf, 1024)))
ndbout << tmp; {
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
ndbout << tmp;
}
} while(do_event_thread); } while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
} }
else else
{ {
do_event_thread= -1; do_event_thread= -1;
} }
return NULL; DBUG_RETURN(NULL);
} }
bool bool
...@@ -516,9 +523,19 @@ CommandInterpreter::connect() ...@@ -516,9 +523,19 @@ CommandInterpreter::connect()
do_event_thread == 0 || do_event_thread == 0 ||
do_event_thread == -1) do_event_thread == -1)
{ {
DBUG_PRINT("warning",("thread not started")); DBUG_PRINT("info",("Warning, event thread startup failed, "
printf("Warning, event thread startup failed, degraded printouts as result\n"); "degraded printouts as result, errno=%d",
errno));
printf("Warning, event thread startup failed, "
"degraded printouts as result, errno=%d\n", errno);
do_event_thread= 0; do_event_thread= 0;
if (m_event_thread)
{
void *res;
NdbThread_WaitFor(m_event_thread, &res);
NdbThread_Destroy(&m_event_thread);
}
ndb_mgm_disconnect(m_mgmsrv2);
} }
} }
else else
...@@ -548,6 +565,7 @@ CommandInterpreter::connect() ...@@ -548,6 +565,7 @@ CommandInterpreter::connect()
bool bool
CommandInterpreter::disconnect() CommandInterpreter::disconnect()
{ {
DBUG_ENTER("CommandInterpreter::disconnect");
if (m_event_thread) { if (m_event_thread) {
void *res; void *res;
do_event_thread= 0; do_event_thread= 0;
...@@ -564,7 +582,7 @@ CommandInterpreter::disconnect() ...@@ -564,7 +582,7 @@ CommandInterpreter::disconnect()
} }
m_connected= false; m_connected= false;
} }
return true; DBUG_RETURN(true);
} }
//***************************************************************************** //*****************************************************************************
......
...@@ -44,7 +44,9 @@ static Ndb_mgmclient* com; ...@@ -44,7 +44,9 @@ static Ndb_mgmclient* com;
extern "C" extern "C"
void void
handler(int sig){ handler(int sig)
{
DBUG_ENTER("handler");
switch(sig){ switch(sig){
case SIGPIPE: case SIGPIPE:
/** /**
...@@ -54,6 +56,7 @@ handler(int sig){ ...@@ -54,6 +56,7 @@ handler(int sig){
com->disconnect(); com->disconnect();
break; break;
} }
DBUG_VOID_RETURN;
} }
NDB_STD_OPTS_VARS; NDB_STD_OPTS_VARS;
...@@ -163,7 +166,8 @@ int main(int argc, char** argv){ ...@@ -163,7 +166,8 @@ int main(int argc, char** argv){
com->execute(opt_execute_str,_try_reconnect, &ret); com->execute(opt_execute_str,_try_reconnect, &ret);
} }
delete com; delete com;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return ret; return ret;
} }
...@@ -569,10 +569,11 @@ MgmtSrvr::check_start() ...@@ -569,10 +569,11 @@ MgmtSrvr::check_start()
bool bool
MgmtSrvr::start(BaseString &error_string) MgmtSrvr::start(BaseString &error_string)
{ {
DBUG_ENTER("MgmtSrvr::start");
if (_props == NULL) { if (_props == NULL) {
if (!check_start()) { if (!check_start()) {
error_string.append("MgmtSrvr.cpp: check_start() failed."); error_string.append("MgmtSrvr.cpp: check_start() failed.");
return false; DBUG_RETURN(false);
} }
} }
theFacade= TransporterFacade::theFacadeInstance theFacade= TransporterFacade::theFacadeInstance
...@@ -581,12 +582,12 @@ MgmtSrvr::start(BaseString &error_string) ...@@ -581,12 +582,12 @@ MgmtSrvr::start(BaseString &error_string)
if(theFacade == 0) { if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL."); DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
error_string.append("MgmtSrvr.cpp: theFacade is NULL."); error_string.append("MgmtSrvr.cpp: theFacade is NULL.");
return false; DBUG_RETURN(false);
} }
if ( theFacade->start_instance if ( theFacade->start_instance
(_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) { (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0."); DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
return false; DBUG_RETURN(false);
} }
MGM_REQUIRE(_blockNumber == 1); MGM_REQUIRE(_blockNumber == 1);
...@@ -602,7 +603,7 @@ MgmtSrvr::start(BaseString &error_string) ...@@ -602,7 +603,7 @@ MgmtSrvr::start(BaseString &error_string)
error_string.append("MgmtSrvr.cpp: _blockNumber is -1."); error_string.append("MgmtSrvr.cpp: _blockNumber is -1.");
theFacade->stop_instance(); theFacade->stop_instance();
theFacade = 0; theFacade = 0;
return false; DBUG_RETURN(false);
} }
TransporterRegistry *reg = theFacade->get_registry(); TransporterRegistry *reg = theFacade->get_registry();
...@@ -624,7 +625,6 @@ MgmtSrvr::start(BaseString &error_string) ...@@ -624,7 +625,6 @@ MgmtSrvr::start(BaseString &error_string)
DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str())); DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str()));
} }
_ownReference = numberToRef(_blockNumber, _ownNodeId); _ownReference = numberToRef(_blockNumber, _ownNodeId);
startEventLog(); startEventLog();
...@@ -644,7 +644,7 @@ MgmtSrvr::start(BaseString &error_string) ...@@ -644,7 +644,7 @@ MgmtSrvr::start(BaseString &error_string)
"MgmtSrvr_Service", "MgmtSrvr_Service",
NDB_THREAD_PRIO_LOW); NDB_THREAD_PRIO_LOW);
return true; DBUG_RETURN(true);
} }
...@@ -658,6 +658,7 @@ MgmtSrvr::~MgmtSrvr() ...@@ -658,6 +658,7 @@ MgmtSrvr::~MgmtSrvr()
if(theFacade != 0){ if(theFacade != 0){
theFacade->stop_instance(); theFacade->stop_instance();
delete theFacade;
theFacade = 0; theFacade = 0;
} }
...@@ -2604,19 +2605,6 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) ...@@ -2604,19 +2605,6 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
} }
/*****************************************************************************
* Area 51 ???
*****************************************************************************/
MgmtSrvr::Area51
MgmtSrvr::getStuff()
{
Area51 ret;
ret.theFacade = theFacade;
ret.theRegistry = theFacade->theTransporterRegistry;
return ret;
}
NodeId NodeId
MgmtSrvr::getPrimaryNode() const { MgmtSrvr::getPrimaryNode() const {
#if 0 #if 0
......
...@@ -61,6 +61,7 @@ public: ...@@ -61,6 +61,7 @@ public:
} }
void add_listener(const Event_listener&); void add_listener(const Event_listener&);
void check_listeners();
void update_max_log_level(const LogLevel&); void update_max_log_level(const LogLevel&);
void update_log_level(const LogLevel&); void update_log_level(const LogLevel&);
...@@ -775,16 +776,6 @@ private: ...@@ -775,16 +776,6 @@ private:
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type); int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
ConfigRetriever *m_config_retriever; ConfigRetriever *m_config_retriever;
public:
/**
* This method does not exist
*/
struct Area51 {
class TransporterFacade * theFacade;
class TransporterRegistry * theRegistry;
};
Area51 getStuff();
}; };
inline inline
......
...@@ -272,15 +272,19 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -272,15 +272,19 @@ ParserRow<MgmApiSession> commands[] = {
}; };
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm) { : SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock); m_input = new SocketInputStream(sock);
m_output = new SocketOutputStream(sock); m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true); m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
DBUG_VOID_RETURN;
} }
MgmApiSession::~MgmApiSession() MgmApiSession::~MgmApiSession()
{ {
DBUG_ENTER("MgmApiSession::~MgmApiSession");
if (m_input) if (m_input)
delete m_input; delete m_input;
if (m_output) if (m_output)
...@@ -289,10 +293,19 @@ MgmApiSession::~MgmApiSession() ...@@ -289,10 +293,19 @@ MgmApiSession::~MgmApiSession()
delete m_parser; delete m_parser;
if (m_allocated_resources) if (m_allocated_resources)
delete m_allocated_resources; delete m_allocated_resources;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
} }
void void
MgmApiSession::runSession() { MgmApiSession::runSession()
{
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx; Parser_t::Context ctx;
while(!m_stop) { while(!m_stop) {
m_parser->run(ctx, *this); m_parser->run(ctx, *this);
...@@ -321,7 +334,12 @@ MgmApiSession::runSession() { ...@@ -321,7 +334,12 @@ MgmApiSession::runSession() {
} }
} }
if(m_socket != NDB_INVALID_SOCKET) if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket); NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
} }
#ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT #ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT
...@@ -1259,7 +1277,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1259,7 +1277,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
LogLevel::EventCategory cat; LogLevel::EventCategory cat;
Logger::LoggerLevel severity; Logger::LoggerLevel severity;
EventLoggerBase::EventTextFunction textF; EventLoggerBase::EventTextFunction textF;
int i; int i, n;
DBUG_ENTER("Ndb_mgmd_event_service::log"); DBUG_ENTER("Ndb_mgmd_event_service::log");
DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId)); DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId));
...@@ -1286,19 +1304,22 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1286,19 +1304,22 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Vector<NDB_SOCKET_TYPE> copy; Vector<NDB_SOCKET_TYPE> copy;
m_clients.lock(); m_clients.lock();
for(i = m_clients.size() - 1; i >= 0; i--){ for(i = m_clients.size() - 1; i >= 0; i--)
if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){ {
if(m_clients[i].m_socket != NDB_INVALID_SOCKET) if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat))
{
int fd= m_clients[i].m_socket;
if(fd != NDB_INVALID_SOCKET)
{ {
int r; int r;
if (m_clients[i].m_parsable) if (m_clients[i].m_parsable)
r= println_socket(m_clients[i].m_socket, r= println_socket(fd,
MAX_WRITE_TIMEOUT, str.c_str()); MAX_WRITE_TIMEOUT, str.c_str());
else else
r= println_socket(m_clients[i].m_socket, r= println_socket(fd,
MAX_WRITE_TIMEOUT, m_text); MAX_WRITE_TIMEOUT, m_text);
if (r == -1) { if (r == -1) {
copy.push_back(m_clients[i].m_socket); copy.push_back(fd);
m_clients.erase(i, false); m_clients.erase(i, false);
} }
} }
...@@ -1306,16 +1327,15 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1306,16 +1327,15 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
} }
m_clients.unlock(); m_clients.unlock();
for(i = 0; (unsigned)i < copy.size(); i++){ if ((n= (int)copy.size()))
NDB_CLOSE_SOCKET(copy[i]); {
} for(i= 0; i < n; i++)
NDB_CLOSE_SOCKET(copy[i]);
if(copy.size()){
LogLevel tmp; tmp.clear(); LogLevel tmp; tmp.clear();
m_clients.lock(); m_clients.lock();
for(i = 0; (unsigned)i < m_clients.size(); i++){ for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel); tmp.set_max(m_clients[i].m_logLevel);
}
m_clients.unlock(); m_clients.unlock();
update_log_level(tmp); update_log_level(tmp);
} }
...@@ -1343,9 +1363,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) ...@@ -1343,9 +1363,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
} }
void void
Ndb_mgmd_event_service::add_listener(const Event_listener& client){ Ndb_mgmd_event_service::check_listeners()
{
int i, n= 0;
DBUG_ENTER("Ndb_mgmd_event_service::check_listeners");
m_clients.lock();
for(i= m_clients.size() - 1; i >= 0; i--)
{
int fd= m_clients[i].m_socket;
DBUG_PRINT("info",("%d %d",i,fd));
char buf[1];
buf[0]=0;
if (fd != NDB_INVALID_SOCKET &&
println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1)
{
NDB_CLOSE_SOCKET(fd);
m_clients.erase(i, false);
n=1;
}
}
if (n)
{
LogLevel tmp; tmp.clear();
for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
update_log_level(tmp);
}
m_clients.unlock();
DBUG_VOID_RETURN;
}
void
Ndb_mgmd_event_service::add_listener(const Event_listener& client)
{
DBUG_ENTER("Ndb_mgmd_event_service::add_listener");
DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket));
check_listeners();
m_clients.push_back(client); m_clients.push_back(client);
update_max_log_level(client.m_logLevel); update_max_log_level(client.m_logLevel);
DBUG_VOID_RETURN;
} }
void void
......
...@@ -96,16 +96,17 @@ read_and_execute(Ndb_mgmclient* com, const char * prompt, int _try_reconnect) ...@@ -96,16 +96,17 @@ read_and_execute(Ndb_mgmclient* com, const char * prompt, int _try_reconnect)
* @struct MgmGlobals * @struct MgmGlobals
* @brief Global Variables used in the management server * @brief Global Variables used in the management server
*****************************************************************************/ *****************************************************************************/
/** Command line arguments */
static int opt_daemon; // NOT bool, bool need not be int
static int opt_non_interactive;
static int opt_interactive;
static const char * opt_config_filename= 0;
struct MgmGlobals { struct MgmGlobals {
MgmGlobals(); MgmGlobals();
~MgmGlobals(); ~MgmGlobals();
/** Command line arguments */
int daemon; // NOT bool, bool need not be int
int non_interactive;
int interactive;
const char * config_filename;
/** Stuff found in environment or in local config */ /** Stuff found in environment or in local config */
NodeId localNodeId; NodeId localNodeId;
bool use_specific_ip; bool use_specific_ip;
...@@ -120,7 +121,7 @@ struct MgmGlobals { ...@@ -120,7 +121,7 @@ struct MgmGlobals {
}; };
int g_no_nodeid_checks= 0; int g_no_nodeid_checks= 0;
static MgmGlobals glob; static MgmGlobals *glob= 0;
/****************************************************************************** /******************************************************************************
* Function prototypes * Function prototypes
...@@ -144,14 +145,14 @@ static struct my_option my_long_options[] = ...@@ -144,14 +145,14 @@ static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndb_mgmd"), NDB_STD_OPTS("ndb_mgmd"),
{ "config-file", 'f', "Specify cluster configuration file", { "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, (gptr*) &opt_config_filename, (gptr*) &opt_config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0, (gptr*) &opt_daemon, (gptr*) &opt_daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "interactive", OPT_INTERACTIVE, { "interactive", OPT_INTERACTIVE,
"Run interactive. Not supported but provided for testing purposes", "Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0, (gptr*) &opt_interactive, (gptr*) &opt_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-nodeid-checks", OPT_NO_NODEID_CHECKS, { "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
"Do not provide any node id checks", "Do not provide any node id checks",
...@@ -159,7 +160,7 @@ static struct my_option my_long_options[] = ...@@ -159,7 +160,7 @@ static struct my_option my_long_options[] =
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "nodaemon", OPT_NO_DAEMON, { "nodaemon", OPT_NO_DAEMON,
"Don't run as daemon, but don't read from stdin", "Don't run as daemon, but don't read from stdin",
(gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0, (gptr*) &opt_non_interactive, (gptr*) &opt_non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
...@@ -182,6 +183,7 @@ static void usage() ...@@ -182,6 +183,7 @@ static void usage()
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
glob= new MgmGlobals;
/** /**
* OSE specific. Enable shared ownership of file system resources. * OSE specific. Enable shared ownership of file system resources.
...@@ -205,40 +207,40 @@ int main(int argc, char** argv) ...@@ -205,40 +207,40 @@ int main(int argc, char** argv)
ndb_std_get_one_option))) ndb_std_get_one_option)))
exit(ho_error); exit(ho_error);
if (glob.interactive || if (opt_interactive ||
glob.non_interactive) { opt_non_interactive) {
glob.daemon= 0; opt_daemon= 0;
} }
glob.socketServer = new SocketServer(); glob->socketServer = new SocketServer();
MgmApiService * mapi = new MgmApiService(); MgmApiService * mapi = new MgmApiService();
glob.mgmObject = new MgmtSrvr(glob.socketServer, glob->mgmObject = new MgmtSrvr(glob->socketServer,
glob.config_filename, opt_config_filename,
opt_connect_str); opt_connect_str);
if (glob.mgmObject->init()) if (glob->mgmObject->init())
goto error_end; goto error_end;
my_setwd(NdbConfig_get_path(0), MYF(0)); my_setwd(NdbConfig_get_path(0), MYF(0));
glob.localNodeId= glob.mgmObject->getOwnNodeId(); glob->localNodeId= glob->mgmObject->getOwnNodeId();
if (glob.localNodeId == 0) { if (glob->localNodeId == 0) {
goto error_end; goto error_end;
} }
glob.port= glob.mgmObject->getPort(); glob->port= glob->mgmObject->getPort();
if (glob.port == 0) if (glob->port == 0)
goto error_end; goto error_end;
glob.interface_name = 0; glob->interface_name = 0;
glob.use_specific_ip = false; glob->use_specific_ip = false;
if(!glob.use_specific_ip){ if(!glob->use_specific_ip){
int count= 5; // no of retries for tryBind int count= 5; // no of retries for tryBind
while(!glob.socketServer->tryBind(glob.port, glob.interface_name)){ while(!glob->socketServer->tryBind(glob->port, glob->interface_name)){
if (--count > 0) { if (--count > 0) {
NdbSleep_MilliSleep(1000); NdbSleep_MilliSleep(1000);
continue; continue;
...@@ -247,19 +249,20 @@ int main(int argc, char** argv) ...@@ -247,19 +249,20 @@ int main(int argc, char** argv)
"Please check if the port is already used,\n" "Please check if the port is already used,\n"
"(perhaps a ndb_mgmd is already running),\n" "(perhaps a ndb_mgmd is already running),\n"
"and if you are executing on the correct computer", "and if you are executing on the correct computer",
(glob.interface_name ? glob.interface_name : "*"), glob.port); (glob->interface_name ? glob->interface_name : "*"), glob->port);
goto error_end; goto error_end;
} }
free(glob.interface_name); free(glob->interface_name);
glob.interface_name = 0; glob->interface_name = 0;
} }
if(!glob.socketServer->setup(mapi, &glob.port, glob.interface_name)){ if(!glob->socketServer->setup(mapi, &glob->port, glob->interface_name))
{
ndbout_c("Unable to setup management port: %d!\n" ndbout_c("Unable to setup management port: %d!\n"
"Please check if the port is already used,\n" "Please check if the port is already used,\n"
"(perhaps a ndb_mgmd is already running),\n" "(perhaps a ndb_mgmd is already running),\n"
"and if you are executing on the correct computer", "and if you are executing on the correct computer",
glob.port); glob->port);
delete mapi; delete mapi;
goto error_end; goto error_end;
} }
...@@ -267,12 +270,12 @@ int main(int argc, char** argv) ...@@ -267,12 +270,12 @@ int main(int argc, char** argv)
/* Construct a fake connectstring to connect back to ourselves */ /* Construct a fake connectstring to connect back to ourselves */
char connect_str[20]; char connect_str[20];
if(!opt_connect_str) { if(!opt_connect_str) {
snprintf(connect_str,20,"localhost:%u",glob.mgmObject->getPort()); snprintf(connect_str,20,"localhost:%u",glob->mgmObject->getPort());
opt_connect_str= connect_str; opt_connect_str= connect_str;
} }
glob.mgmObject->set_connect_string(opt_connect_str); glob->mgmObject->set_connect_string(opt_connect_str);
if(!glob.mgmObject->check_start()){ if(!glob->mgmObject->check_start()){
ndbout_c("Unable to check start management server."); ndbout_c("Unable to check start management server.");
ndbout_c("Probably caused by illegal initial configuration file."); ndbout_c("Probably caused by illegal initial configuration file.");
goto error_end; goto error_end;
...@@ -283,7 +286,7 @@ int main(int argc, char** argv) ...@@ -283,7 +286,7 @@ int main(int argc, char** argv)
* config info * config info
*/ */
int mgm_connect_result; int mgm_connect_result;
mgm_connect_result = glob.mgmObject->get_config_retriever()-> mgm_connect_result = glob->mgmObject->get_config_retriever()->
do_connect(0,0,0); do_connect(0,0,0);
if(mgm_connect_result<0) { if(mgm_connect_result<0) {
...@@ -292,11 +295,10 @@ int main(int argc, char** argv) ...@@ -292,11 +295,10 @@ int main(int argc, char** argv)
ndbout_c("This is probably a bug."); ndbout_c("This is probably a bug.");
} }
if (opt_daemon) {
if (glob.daemon) {
// Become a daemon // Become a daemon
char *lockfile= NdbConfig_PidFileName(glob.localNodeId); char *lockfile= NdbConfig_PidFileName(glob->localNodeId);
char *logfile= NdbConfig_StdoutFileName(glob.localNodeId); char *logfile= NdbConfig_StdoutFileName(glob->localNodeId);
NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile); NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile);
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) { if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
...@@ -310,7 +312,7 @@ int main(int argc, char** argv) ...@@ -310,7 +312,7 @@ int main(int argc, char** argv)
#endif #endif
{ {
BaseString error_string; BaseString error_string;
if(!glob.mgmObject->start(error_string)){ if(!glob->mgmObject->start(error_string)){
ndbout_c("Unable to start management server."); ndbout_c("Unable to start management server.");
ndbout_c("Probably caused by illegal initial configuration file."); ndbout_c("Probably caused by illegal initial configuration file.");
ndbout_c(error_string.c_str()); ndbout_c(error_string.c_str());
...@@ -318,8 +320,8 @@ int main(int argc, char** argv) ...@@ -318,8 +320,8 @@ int main(int argc, char** argv)
} }
} }
//glob.mgmObject->saveConfig(); //glob->mgmObject->saveConfig();
mapi->setMgm(glob.mgmObject); mapi->setMgm(glob->mgmObject);
char msg[256]; char msg[256];
BaseString::snprintf(msg, sizeof(msg), BaseString::snprintf(msg, sizeof(msg),
...@@ -328,20 +330,20 @@ int main(int argc, char** argv) ...@@ -328,20 +330,20 @@ int main(int argc, char** argv)
g_eventLogger.info(msg); g_eventLogger.info(msg);
BaseString::snprintf(msg, 256, "Id: %d, Command port: %d", BaseString::snprintf(msg, 256, "Id: %d, Command port: %d",
glob.localNodeId, glob.port); glob->localNodeId, glob->port);
ndbout_c(msg); ndbout_c(msg);
g_eventLogger.info(msg); g_eventLogger.info(msg);
g_StopServer = false; g_StopServer = false;
glob.socketServer->startServer(); glob->socketServer->startServer();
#if ! defined NDB_OSE && ! defined NDB_SOFTOSE #if ! defined NDB_OSE && ! defined NDB_SOFTOSE
if(glob.interactive) { if(opt_interactive) {
BaseString con_str; BaseString con_str;
if(glob.interface_name) if(glob->interface_name)
con_str.appfmt("host=%s:%d", glob.interface_name, glob.port); con_str.appfmt("host=%s:%d", glob->interface_name, glob->port);
else else
con_str.appfmt("localhost:%d", glob.port); con_str.appfmt("localhost:%d", glob->port);
Ndb_mgmclient com(con_str.c_str(), 1); Ndb_mgmclient com(con_str.c_str(), 1);
while(g_StopServer != true && read_and_execute(&com, "ndb_mgm> ", 1)); while(g_StopServer != true && read_and_execute(&com, "ndb_mgm> ", 1));
} else } else
...@@ -352,23 +354,23 @@ int main(int argc, char** argv) ...@@ -352,23 +354,23 @@ int main(int argc, char** argv)
} }
g_eventLogger.info("Shutting down server..."); g_eventLogger.info("Shutting down server...");
glob.socketServer->stopServer(); glob->socketServer->stopServer();
glob.mgmObject->get_config_retriever()->disconnect(); glob->mgmObject->get_config_retriever()->disconnect();
glob.socketServer->stopSessions(true); glob->socketServer->stopSessions(true);
g_eventLogger.info("Shutdown complete"); g_eventLogger.info("Shutdown complete");
delete glob;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return 0; return 0;
error_end: error_end:
delete glob;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return 1; return 1;
} }
MgmGlobals::MgmGlobals(){ MgmGlobals::MgmGlobals(){
// Default values // Default values
port = 0; port = 0;
config_filename = NULL;
interface_name = 0; interface_name = 0;
daemon = 1;
non_interactive = 0;
interactive = 0;
socketServer = 0; socketServer = 0;
mgmObject = 0; mgmObject = 0;
} }
......
...@@ -64,16 +64,21 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -64,16 +64,21 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
theStop(0), theStop(0),
theFacade(_facade) theFacade(_facade)
{ {
DBUG_ENTER("ClusterMgr::ClusterMgr");
ndbSetOwnVersion(); ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create(); clusterMgrThreadMutex = NdbMutex_Create();
noOfAliveNodes= 0; noOfAliveNodes= 0;
noOfConnectedNodes= 0; noOfConnectedNodes= 0;
theClusterMgrThread= 0; theClusterMgrThread= 0;
DBUG_VOID_RETURN;
} }
ClusterMgr::~ClusterMgr(){ ClusterMgr::~ClusterMgr()
{
DBUG_ENTER("ClusterMgr::~ClusterMgr");
doStop(); doStop();
NdbMutex_Destroy(clusterMgrThreadMutex); NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
} }
void void
...@@ -152,7 +157,6 @@ ClusterMgr::doStop( ){ ...@@ -152,7 +157,6 @@ ClusterMgr::doStop( ){
if (theClusterMgrThread) { if (theClusterMgrThread) {
NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread); NdbThread_Destroy(&theClusterMgrThread);
theClusterMgrThread= 0;
} }
NdbMutex_Unlock(clusterMgrThreadMutex); NdbMutex_Unlock(clusterMgrThreadMutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -481,6 +485,8 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){ ...@@ -481,6 +485,8 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){
ArbitMgr::ArbitMgr(TransporterFacade & _fac) ArbitMgr::ArbitMgr(TransporterFacade & _fac)
: theFacade(_fac) : theFacade(_fac)
{ {
DBUG_ENTER("ArbitMgr::ArbitMgr");
theThreadMutex = NdbMutex_Create(); theThreadMutex = NdbMutex_Create();
theInputCond = NdbCondition_Create(); theInputCond = NdbCondition_Create();
theInputMutex = NdbMutex_Create(); theInputMutex = NdbMutex_Create();
...@@ -498,13 +504,17 @@ ArbitMgr::ArbitMgr(TransporterFacade & _fac) ...@@ -498,13 +504,17 @@ ArbitMgr::ArbitMgr(TransporterFacade & _fac)
memset(&theChooseReq1, 0, sizeof(theChooseReq1)); memset(&theChooseReq1, 0, sizeof(theChooseReq1));
memset(&theChooseReq2, 0, sizeof(theChooseReq2)); memset(&theChooseReq2, 0, sizeof(theChooseReq2));
memset(&theStopOrd, 0, sizeof(theStopOrd)); memset(&theStopOrd, 0, sizeof(theStopOrd));
DBUG_VOID_RETURN;
} }
ArbitMgr::~ArbitMgr() ArbitMgr::~ArbitMgr()
{ {
DBUG_ENTER("ArbitMgr::~ArbitMgr");
NdbMutex_Destroy(theThreadMutex); NdbMutex_Destroy(theThreadMutex);
NdbCondition_Destroy(theInputCond); NdbCondition_Destroy(theInputCond);
NdbMutex_Destroy(theInputMutex); NdbMutex_Destroy(theInputMutex);
DBUG_VOID_RETURN;
} }
// Start arbitrator thread. This is kernel request. // Start arbitrator thread. This is kernel request.
...@@ -521,7 +531,7 @@ ArbitMgr::doStart(const Uint32* theData) ...@@ -521,7 +531,7 @@ ArbitMgr::doStart(const Uint32* theData)
sendSignalToThread(aSignal); sendSignalToThread(aSignal);
void* value; void* value;
NdbThread_WaitFor(theThread, &value); NdbThread_WaitFor(theThread, &value);
theThread = NULL; NdbThread_Destroy(&theThread);
theState = StateInit; theState = StateInit;
theInputFull = false; theInputFull = false;
} }
...@@ -560,7 +570,7 @@ ArbitMgr::doStop(const Uint32* theData) ...@@ -560,7 +570,7 @@ ArbitMgr::doStop(const Uint32* theData)
sendSignalToThread(aSignal); sendSignalToThread(aSignal);
void* value; void* value;
NdbThread_WaitFor(theThread, &value); NdbThread_WaitFor(theThread, &value);
theThread = NULL; NdbThread_Destroy(&theThread);
theState = StateInit; theState = StateInit;
} }
NdbMutex_Unlock(theThreadMutex); NdbMutex_Unlock(theThreadMutex);
......
...@@ -395,12 +395,10 @@ TransporterFacade::doStop(){ ...@@ -395,12 +395,10 @@ TransporterFacade::doStop(){
if (theReceiveThread) { if (theReceiveThread) {
NdbThread_WaitFor(theReceiveThread, &status); NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_Destroy(&theReceiveThread); NdbThread_Destroy(&theReceiveThread);
theReceiveThread= 0;
} }
if (theSendThread) { if (theSendThread) {
NdbThread_WaitFor(theSendThread, &status); NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theSendThread); NdbThread_Destroy(&theSendThread);
theSendThread= 0;
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -435,7 +433,7 @@ void TransporterFacade::threadMainSend(void) ...@@ -435,7 +433,7 @@ void TransporterFacade::threadMainSend(void)
theTransporterRegistry->stopSending(); theTransporterRegistry->stopSending();
m_socket_server.stopServer(); m_socket_server.stopServer();
m_socket_server.stopSessions(); m_socket_server.stopSessions(true);
theTransporterRegistry->stop_clients(); theTransporterRegistry->stop_clients();
} }
...@@ -477,6 +475,8 @@ TransporterFacade::TransporterFacade() : ...@@ -477,6 +475,8 @@ TransporterFacade::TransporterFacade() :
theReceiveThread(NULL), theReceiveThread(NULL),
m_fragmented_signal_id(0) m_fragmented_signal_id(0)
{ {
DBUG_ENTER("TransporterFacade::TransporterFacade");
theOwnId = 0; theOwnId = 0;
theMutexPtr = NdbMutex_Create(); theMutexPtr = NdbMutex_Create();
...@@ -493,11 +493,15 @@ TransporterFacade::TransporterFacade() : ...@@ -493,11 +493,15 @@ TransporterFacade::TransporterFacade() :
m_max_trans_id = 0; m_max_trans_id = 0;
theClusterMgr = new ClusterMgr(* this); theClusterMgr = new ClusterMgr(* this);
DBUG_VOID_RETURN;
} }
bool bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{ {
DBUG_ENTER("TransporterFacade::init");
theOwnId = nodeId; theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(this); theTransporterRegistry = new TransporterRegistry(this);
...@@ -506,7 +510,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -506,7 +510,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
* theTransporterRegistry); * theTransporterRegistry);
if(res <= 0){ if(res <= 0){
TRP_DEBUG( "configureTransporters returned 0 or less" ); TRP_DEBUG( "configureTransporters returned 0 or less" );
return false; DBUG_RETURN(false);
} }
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
...@@ -516,7 +520,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -516,7 +520,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
iter.first(); iter.first();
if(iter.find(CFG_NODE_ID, nodeId)){ if(iter.find(CFG_NODE_ID, nodeId)){
TRP_DEBUG( "Node info missing from config." ); TRP_DEBUG( "Node info missing from config." );
return false; DBUG_RETURN(false);
} }
Uint32 rank = 0; Uint32 rank = 0;
...@@ -542,7 +546,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -542,7 +546,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
if (!theTransporterRegistry->start_service(m_socket_server)){ if (!theTransporterRegistry->start_service(m_socket_server)){
ndbout_c("Unable to start theTransporterRegistry->start_service"); ndbout_c("Unable to start theTransporterRegistry->start_service");
return false; DBUG_RETURN(false);
} }
theReceiveThread = NdbThread_Create(runReceiveResponse_C, theReceiveThread = NdbThread_Create(runReceiveResponse_C,
...@@ -562,7 +566,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -562,7 +566,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut); signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
#endif #endif
return true; DBUG_RETURN(true);
} }
...@@ -683,8 +687,10 @@ TransporterFacade::open(void* objRef, ...@@ -683,8 +687,10 @@ TransporterFacade::open(void* objRef,
DBUG_RETURN(r); DBUG_RETURN(r);
} }
TransporterFacade::~TransporterFacade(){ TransporterFacade::~TransporterFacade()
{
DBUG_ENTER("TransporterFacade::~TransporterFacade");
NdbMutex_Lock(theMutexPtr); NdbMutex_Lock(theMutexPtr);
delete theClusterMgr; delete theClusterMgr;
delete theArbitMgr; delete theArbitMgr;
...@@ -694,6 +700,7 @@ TransporterFacade::~TransporterFacade(){ ...@@ -694,6 +700,7 @@ TransporterFacade::~TransporterFacade(){
#ifdef API_TRACE #ifdef API_TRACE
signalLogger.setOutputStream(0); signalLogger.setOutputStream(0);
#endif #endif
DBUG_VOID_RETURN;
} }
void void
......
...@@ -255,8 +255,9 @@ main(int argc, char** argv) ...@@ -255,8 +255,9 @@ main(int argc, char** argv)
const BackupFormat::FileHeader & tmp = metaData.getFileHeader(); const BackupFormat::FileHeader & tmp = metaData.getFileHeader();
const Uint32 version = tmp.NdbVersion; const Uint32 version = tmp.NdbVersion;
char buf[NDB_VERSION_STRING_BUF_SZ];
ndbout << "Ndb version in backup files: " ndbout << "Ndb version in backup files: "
<< getVersionString(version, 0) << endl; << getVersionString(version, 0, buf, sizeof(buf)) << endl;
/** /**
* check wheater we can restore the backup (right version). * check wheater we can restore the backup (right version).
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment