misc (memleak) fixes:

    added using ndb_end and showing mem allocation stat at exit
    added init of variable
    added destruction of LocalConfig to still memleak
    added delete of theFacade on exit
    stopSessions(true) on socket server to ensure destuction of threads
    added destruction of arbit manager thread
    added true option to stopSessions in transporter facade to ensure destructions of threads
parent 8afeb337
......@@ -28,6 +28,7 @@
const char *opt_connect_str= 0;\
my_bool opt_ndb_optimized_node_selection
bool opt_endinfo= 0;
my_bool opt_ndb_shm;
#define OPT_NDB_CONNECTSTRING 'c'
......@@ -93,6 +94,7 @@ ndb_std_get_one_option(int optid,
{
DBUG_PUSH(argument);
}
opt_endinfo= 1;
break;
case 'V':
ndb_std_print_version();
......
......@@ -31,6 +31,7 @@ LogHandler::LogHandler() :
m_last_message[0]= 0;
m_last_log_time= 0;
m_now= 0;
m_last_level= (Logger::LoggerLevel)-1;
}
LogHandler::~LogHandler()
......
......@@ -143,6 +143,7 @@ extern "C"
NdbMgmHandle
ndb_mgm_create_handle()
{
DBUG_ENTER("ndb_mgm_create_handle");
NdbMgmHandle h =
(NdbMgmHandle)my_malloc(sizeof(ndb_mgm_handle),MYF(MY_WME));
h->connected = 0;
......@@ -162,17 +163,20 @@ ndb_mgm_create_handle()
h->logfile = 0;
#endif
return h;
DBUG_PRINT("exit",("ret: %lx", h));
DBUG_RETURN(h);
}
extern "C"
int
ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
{
handle->cfg.~LocalConfig();
new (&(handle->cfg)) LocalConfig;
if (!handle->cfg.init(mgmsrv, 0) ||
handle->cfg.ids.size() == 0)
{
handle->cfg.~LocalConfig();
new (&(handle->cfg)) LocalConfig;
handle->cfg.init(0, 0); /* reset the LocalCongig */
SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
......@@ -189,8 +193,11 @@ extern "C"
void
ndb_mgm_destroy_handle(NdbMgmHandle * handle)
{
DBUG_ENTER("ndb_mgm_destroy_handle");
if(!handle)
return;
DBUG_PRINT("enter",("*handle: %lx", *handle));
if((* handle)->connected){
ndb_mgm_disconnect(* handle);
}
......@@ -203,6 +210,7 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
(*handle)->cfg.~LocalConfig();
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
DBUG_VOID_RETURN;
}
/*****************************************************************************
......@@ -251,6 +259,9 @@ static const Properties *
ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
const char *cmd, const Properties *cmd_args)
{
DBUG_ENTER("ndb_mgm_call");
DBUG_PRINT("enter",("handle->socket: %d, cmd: %s",
handle->socket, cmd));
SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout);
......@@ -310,6 +321,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
/**
* Print some info about why the parser returns NULL
*/
DBUG_PRINT("info",("ctx.status: %d, ctx.m_currentToken: %s",
ctx.m_status, ctx.m_currentToken));
//ndbout << " status=" << ctx.m_status << ", curr="
//<< ctx.m_currentToken << endl;
}
......@@ -321,9 +334,9 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
p->print(handle->logfile, "IN: ");
}
#endif
return p;
DBUG_RETURN(p);
#else
return parser.parse(ctx, session);
DBUG_RETURN(parser.parse(ctx, session));
#endif
}
......
......@@ -44,7 +44,9 @@ static Ndb_mgmclient* com;
extern "C"
void
handler(int sig){
handler(int sig)
{
DBUG_ENTER("handler");
switch(sig){
case SIGPIPE:
/**
......@@ -54,6 +56,7 @@ handler(int sig){
com->disconnect();
break;
}
DBUG_VOID_RETURN;
}
NDB_STD_OPTS_VARS;
......@@ -166,7 +169,8 @@ int main(int argc, char** argv){
com->execute(opt_execute_str,_try_reconnect, &ret);
}
delete com;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return ret;
}
......@@ -571,10 +571,11 @@ MgmtSrvr::check_start()
bool
MgmtSrvr::start(BaseString &error_string)
{
DBUG_ENTER("MgmtSrvr::start");
if (_props == NULL) {
if (!check_start()) {
error_string.append("MgmtSrvr.cpp: check_start() failed.");
return false;
DBUG_RETURN(false);
}
}
theFacade= TransporterFacade::theFacadeInstance= new TransporterFacade();
......@@ -582,12 +583,12 @@ MgmtSrvr::start(BaseString &error_string)
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
error_string.append("MgmtSrvr.cpp: theFacade is NULL.");
return false;
DBUG_RETURN(false);
}
if ( theFacade->start_instance
(_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
return false;
DBUG_RETURN(false);
}
MGM_REQUIRE(_blockNumber == 1);
......@@ -603,7 +604,7 @@ MgmtSrvr::start(BaseString &error_string)
error_string.append("MgmtSrvr.cpp: _blockNumber is -1.");
theFacade->stop_instance();
theFacade = 0;
return false;
DBUG_RETURN(false);
}
_ownReference = numberToRef(_blockNumber, _ownNodeId);
......@@ -625,7 +626,7 @@ MgmtSrvr::start(BaseString &error_string)
"MgmtSrvr_Service",
NDB_THREAD_PRIO_LOW);
return true;
DBUG_RETURN(true);
}
......@@ -639,6 +640,7 @@ MgmtSrvr::~MgmtSrvr()
if(theFacade != 0){
theFacade->stop_instance();
delete theFacade;
theFacade = 0;
}
......@@ -2583,19 +2585,6 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
}
/*****************************************************************************
* Area 51 ???
*****************************************************************************/
MgmtSrvr::Area51
MgmtSrvr::getStuff()
{
Area51 ret;
ret.theFacade = theFacade;
ret.theRegistry = theFacade->theTransporterRegistry;
return ret;
}
NodeId
MgmtSrvr::getPrimaryNode() const {
#if 0
......
......@@ -53,16 +53,16 @@ const char progname[] = "mgmtsrvr";
* @struct MgmGlobals
* @brief Global Variables used in the management server
******************************************************************************/
/** Command line arguments */
static int opt_daemon; // NOT bool, bool need not be int
static int opt_non_interactive;
static int opt_interactive;
static const char * opt_config_filename= 0;
struct MgmGlobals {
MgmGlobals();
~MgmGlobals();
/** Command line arguments */
int daemon; // NOT bool, bool need not be int
int non_interactive;
int interactive;
const char * config_filename;
/** Stuff found in environment or in local config */
NodeId localNodeId;
bool use_specific_ip;
......@@ -77,7 +77,7 @@ struct MgmGlobals {
};
int g_no_nodeid_checks= 0;
static MgmGlobals glob;
static MgmGlobals *glob= 0;
/******************************************************************************
* Function prototypes
......@@ -108,14 +108,14 @@ static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_mgmd"),
{ "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
(gptr*) &opt_config_filename, (gptr*) &opt_config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
(gptr*) &opt_daemon, (gptr*) &opt_daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "interactive", OPT_INTERACTIVE,
"Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
(gptr*) &opt_interactive, (gptr*) &opt_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-nodeid-checks", OPT_NO_NODEID_CHECKS,
"Do not provide any node id checks",
......@@ -123,13 +123,13 @@ static struct my_option my_long_options[] =
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "nodaemon", OPT_NO_DAEMON,
"Don't run as daemon, but don't read from stdin",
(gptr*) &glob.non_interactive, (gptr*) &glob.non_interactive, 0,
(gptr*) &opt_non_interactive, (gptr*) &opt_non_interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
#if NDB_VERSION_MAJOR <= 4
{ "config-file", 'c',
"-c provided for backwards compatability, will be removed in 5.0."
" Use -f instead",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
(gptr*) &opt_config_filename, (gptr*) &opt_config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
#endif
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
......@@ -167,6 +167,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv)
{
NDB_INIT(argv[0]);
glob= new MgmGlobals;
/**
* OSE specific. Enable shared ownership of file system resources.
......@@ -186,40 +187,40 @@ int main(int argc, char** argv)
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
exit(ho_error);
if (glob.interactive ||
glob.non_interactive) {
glob.daemon= 0;
if (opt_interactive ||
opt_non_interactive) {
opt_daemon= 0;
}
glob.socketServer = new SocketServer();
glob->socketServer = new SocketServer();
MgmApiService * mapi = new MgmApiService();
glob.mgmObject = new MgmtSrvr(glob.socketServer,
glob.config_filename,
glob->mgmObject = new MgmtSrvr(glob->socketServer,
opt_config_filename,
opt_connect_str);
if (glob.mgmObject->init())
if (glob->mgmObject->init())
goto error_end;
my_setwd(NdbConfig_get_path(0), MYF(0));
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0) {
glob->localNodeId= glob->mgmObject->getOwnNodeId();
if (glob->localNodeId == 0) {
goto error_end;
}
glob.port= glob.mgmObject->getPort();
glob->port= glob->mgmObject->getPort();
if (glob.port == 0)
if (glob->port == 0)
goto error_end;
glob.interface_name = 0;
glob.use_specific_ip = false;
glob->interface_name = 0;
glob->use_specific_ip = false;
if(!glob.use_specific_ip){
if(!glob->use_specific_ip){
int count= 5; // no of retries for tryBind
while(!glob.socketServer->tryBind(glob.port, glob.interface_name)){
while(!glob->socketServer->tryBind(glob->port, glob->interface_name)){
if (--count > 0) {
NdbSleep_MilliSleep(1000);
continue;
......@@ -228,33 +229,33 @@ int main(int argc, char** argv)
"Please check if the port is already used,\n"
"(perhaps a ndb_mgmd is already running),\n"
"and if you are executing on the correct computer",
(glob.interface_name ? glob.interface_name : "*"), glob.port);
(glob->interface_name ? glob->interface_name : "*"), glob->port);
goto error_end;
}
free(glob.interface_name);
glob.interface_name = 0;
free(glob->interface_name);
glob->interface_name = 0;
}
if(!glob.socketServer->setup(mapi, glob.port, glob.interface_name)){
if(!glob->socketServer->setup(mapi, glob->port, glob->interface_name)){
ndbout_c("Unable to setup management port: %d!\n"
"Please check if the port is already used,\n"
"(perhaps a ndb_mgmd is already running),\n"
"and if you are executing on the correct computer",
glob.port);
glob->port);
delete mapi;
goto error_end;
}
if(!glob.mgmObject->check_start()){
if(!glob->mgmObject->check_start()){
ndbout_c("Unable to check start management server.");
ndbout_c("Probably caused by illegal initial configuration file.");
goto error_end;
}
if (glob.daemon) {
if (opt_daemon) {
// Become a daemon
char *lockfile= NdbConfig_PidFileName(glob.localNodeId);
char *logfile= NdbConfig_StdoutFileName(glob.localNodeId);
char *lockfile= NdbConfig_PidFileName(glob->localNodeId);
char *logfile= NdbConfig_StdoutFileName(glob->localNodeId);
NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile);
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
......@@ -268,7 +269,7 @@ int main(int argc, char** argv)
#endif
{
BaseString error_string;
if(!glob.mgmObject->start(error_string)){
if(!glob->mgmObject->start(error_string)){
ndbout_c("Unable to start management server.");
ndbout_c("Probably caused by illegal initial configuration file.");
ndbout_c(error_string.c_str());
......@@ -276,8 +277,8 @@ int main(int argc, char** argv)
}
}
//glob.mgmObject->saveConfig();
mapi->setMgm(glob.mgmObject);
//glob->mgmObject->saveConfig();
mapi->setMgm(glob->mgmObject);
char msg[256];
BaseString::snprintf(msg, sizeof(msg),
......@@ -286,16 +287,16 @@ int main(int argc, char** argv)
g_eventLogger.info(msg);
BaseString::snprintf(msg, 256, "Id: %d, Command port: %d",
glob.localNodeId, glob.port);
glob->localNodeId, glob->port);
ndbout_c(msg);
g_eventLogger.info(msg);
g_StopServer = false;
glob.socketServer->startServer();
glob->socketServer->startServer();
#if ! defined NDB_OSE && ! defined NDB_SOFTOSE
if(glob.interactive) {
CommandInterpreter com(* glob.mgmObject);
if(opt_interactive) {
CommandInterpreter com(* glob->mgmObject);
while(com.readAndExecute());
} else
#endif
......@@ -305,22 +306,22 @@ int main(int argc, char** argv)
}
g_eventLogger.info("Shutting down server...");
glob.socketServer->stopServer();
glob.socketServer->stopSessions();
glob->socketServer->stopServer();
glob->socketServer->stopSessions(true);
g_eventLogger.info("Shutdown complete");
delete glob;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return 0;
error_end:
delete glob;
ndb_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
return 1;
}
MgmGlobals::MgmGlobals(){
// Default values
port = 0;
config_filename = NULL;
interface_name = 0;
daemon = 1;
non_interactive = 0;
interactive = 0;
socketServer = 0;
mgmObject = 0;
}
......
......@@ -64,16 +64,21 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
theStop(0),
theFacade(_facade)
{
DBUG_ENTER("ClusterMgr::ClusterMgr");
ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create();
noOfAliveNodes= 0;
noOfConnectedNodes= 0;
theClusterMgrThread= 0;
DBUG_VOID_RETURN;
}
ClusterMgr::~ClusterMgr(){
ClusterMgr::~ClusterMgr()
{
DBUG_ENTER("ClusterMgr::~ClusterMgr");
doStop();
NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
}
void
......@@ -152,7 +157,6 @@ ClusterMgr::doStop( ){
if (theClusterMgrThread) {
NdbThread_WaitFor(theClusterMgrThread, &status);
NdbThread_Destroy(&theClusterMgrThread);
theClusterMgrThread= 0;
}
NdbMutex_Unlock(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
......@@ -468,6 +472,8 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){
ArbitMgr::ArbitMgr(TransporterFacade & _fac)
: theFacade(_fac)
{
DBUG_ENTER("ArbitMgr::ArbitMgr");
theThreadMutex = NdbMutex_Create();
theInputCond = NdbCondition_Create();
theInputMutex = NdbMutex_Create();
......@@ -485,13 +491,17 @@ ArbitMgr::ArbitMgr(TransporterFacade & _fac)
memset(&theChooseReq1, 0, sizeof(theChooseReq1));
memset(&theChooseReq2, 0, sizeof(theChooseReq2));
memset(&theStopOrd, 0, sizeof(theStopOrd));
DBUG_VOID_RETURN;
}
ArbitMgr::~ArbitMgr()
{
DBUG_ENTER("ArbitMgr::~ArbitMgr");
NdbMutex_Destroy(theThreadMutex);
NdbCondition_Destroy(theInputCond);
NdbMutex_Destroy(theInputMutex);
DBUG_VOID_RETURN;
}
// Start arbitrator thread. This is kernel request.
......@@ -508,7 +518,7 @@ ArbitMgr::doStart(const Uint32* theData)
sendSignalToThread(aSignal);
void* value;
NdbThread_WaitFor(theThread, &value);
theThread = NULL;
NdbThread_Destroy(&theThread);
theState = StateInit;
theInputFull = false;
}
......@@ -547,7 +557,7 @@ ArbitMgr::doStop(const Uint32* theData)
sendSignalToThread(aSignal);
void* value;
NdbThread_WaitFor(theThread, &value);
theThread = NULL;
NdbThread_Destroy(&theThread);
theState = StateInit;
}
NdbMutex_Unlock(theThreadMutex);
......
......@@ -395,12 +395,10 @@ TransporterFacade::doStop(){
if (theReceiveThread) {
NdbThread_WaitFor(theReceiveThread, &status);
NdbThread_Destroy(&theReceiveThread);
theReceiveThread= 0;
}
if (theSendThread) {
NdbThread_WaitFor(theSendThread, &status);
NdbThread_Destroy(&theSendThread);
theSendThread= 0;
}
DBUG_VOID_RETURN;
}
......@@ -435,7 +433,7 @@ void TransporterFacade::threadMainSend(void)
theTransporterRegistry->stopSending();
m_socket_server.stopServer();
m_socket_server.stopSessions();
m_socket_server.stopSessions(true);
theTransporterRegistry->stop_clients();
}
......@@ -477,6 +475,8 @@ TransporterFacade::TransporterFacade() :
theReceiveThread(NULL),
m_fragmented_signal_id(0)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
theOwnId = 0;
theMutexPtr = NdbMutex_Create();
......@@ -493,11 +493,15 @@ TransporterFacade::TransporterFacade() :
m_max_trans_id = 0;
theClusterMgr = new ClusterMgr(* this);
DBUG_VOID_RETURN;
}
bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
DBUG_ENTER("TransporterFacade::init");
theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(this);
......@@ -506,7 +510,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
* theTransporterRegistry);
if(res <= 0){
TRP_DEBUG( "configureTransporters returned 0 or less" );
return false;
DBUG_RETURN(false);
}
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
......@@ -524,7 +528,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
iter.first();
if(iter.find(CFG_NODE_ID, nodeId)){
TRP_DEBUG( "Node info missing from config." );
return false;
DBUG_RETURN(false);
}
Uint32 rank = 0;
......@@ -553,7 +557,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
if (!theTransporterRegistry->start_service(m_socket_server)){
ndbout_c("Unable to start theTransporterRegistry->start_service");
return false;
DBUG_RETURN(false);
}
theReceiveThread = NdbThread_Create(runReceiveResponse_C,
......@@ -573,7 +577,7 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
#endif
return true;
DBUG_RETURN(true);
}
......@@ -694,8 +698,10 @@ TransporterFacade::open(void* objRef,
DBUG_RETURN(r);
}
TransporterFacade::~TransporterFacade(){
TransporterFacade::~TransporterFacade()
{
DBUG_ENTER("TransporterFacade::~TransporterFacade");
NdbMutex_Lock(theMutexPtr);
delete theClusterMgr;
delete theArbitMgr;
......@@ -705,6 +711,7 @@ TransporterFacade::~TransporterFacade(){
#ifdef API_TRACE
signalLogger.setOutputStream(0);
#endif
DBUG_VOID_RETURN;
}
void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment