BUG#11898 ndb_mgmd not releasing resources, added "ping" on add_listener

    + added close of some fd's
    + debug prints
parent 9be420ee
......@@ -455,11 +455,13 @@ static int do_event_thread;
static void*
event_thread_run(void* m)
{
DBUG_ENTER("event_thread_run");
NdbMgmHandle handle= *(NdbMgmHandle*)m;
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
int fd = ndb_mgm_listen_event(handle, filter);
if (fd > 0)
if (fd != NDB_INVALID_SOCKET)
{
do_event_thread= 1;
char *tmp= 0;
......@@ -468,20 +470,26 @@ event_thread_run(void* m)
do {
if (tmp == 0) NdbSleep_MilliSleep(10);
if((tmp = in.gets(buf, 1024)))
ndbout << tmp;
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
ndbout << tmp;
}
} while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
}
else
{
do_event_thread= -1;
}
return NULL;
DBUG_RETURN(NULL);
}
bool
CommandInterpreter::connect()
{
DBUG_ENTER("CommandInterpreter::connect");
if(!m_connected)
{
if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
......@@ -512,8 +520,19 @@ CommandInterpreter::connect()
do_event_thread == 0 ||
do_event_thread == -1)
{
printf("Warning, event thread startup failed, degraded printouts as result\n");
DBUG_PRINT("info",("Warning, event thread startup failed, "
"degraded printouts as result, errno=%d",
errno));
printf("Warning, event thread startup failed, "
"degraded printouts as result, errno=%d\n", errno);
do_event_thread= 0;
if (m_event_thread)
{
void *res;
NdbThread_WaitFor(m_event_thread, &res);
NdbThread_Destroy(&m_event_thread);
}
ndb_mgm_disconnect(m_mgmsrv2);
}
}
else
......@@ -521,6 +540,8 @@ CommandInterpreter::connect()
printf("Warning, event connect failed, degraded printouts as result\n");
}
m_connected= true;
DBUG_PRINT("info",("Connected to Management Server at: %s:%d",
host,port));
if (m_verbose)
{
printf("Connected to Management Server at: %s:%d\n",
......@@ -528,12 +549,13 @@ CommandInterpreter::connect()
}
}
}
return m_connected;
DBUG_RETURN(m_connected);
}
bool
CommandInterpreter::disconnect()
{
DBUG_ENTER("CommandInterpreter::disconnect");
if (m_event_thread) {
void *res;
do_event_thread= 0;
......@@ -550,7 +572,7 @@ CommandInterpreter::disconnect()
}
m_connected= false;
}
return true;
DBUG_RETURN(true);
}
//*****************************************************************************
......
......@@ -60,6 +60,7 @@ public:
}
void add_listener(const Event_listener&);
void check_listeners();
void update_max_log_level(const LogLevel&);
void update_log_level(const LogLevel&);
......@@ -764,16 +765,6 @@ private:
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
ConfigRetriever *m_config_retriever;
public:
/**
* This method does not exist
*/
struct Area51 {
class TransporterFacade * theFacade;
class TransporterRegistry * theRegistry;
};
Area51 getStuff();
};
inline
......
......@@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = {
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm) {
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock);
m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
DBUG_VOID_RETURN;
}
MgmApiSession::~MgmApiSession()
{
DBUG_ENTER("MgmApiSession::~MgmApiSession");
if (m_input)
delete m_input;
if (m_output)
......@@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession()
delete m_parser;
if (m_allocated_resources)
delete m_allocated_resources;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
}
void
MgmApiSession::runSession() {
MgmApiSession::runSession()
{
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx;
while(!m_stop) {
m_parser->run(ctx, *this);
......@@ -301,8 +314,13 @@ MgmApiSession::runSession() {
break;
}
}
if(m_socket >= 0)
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
}
#ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT
......@@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Uint32 threshold;
LogLevel::EventCategory cat;
Logger::LoggerLevel severity;
int i;
int i, n;
DBUG_ENTER("Ndb_mgmd_event_service::log");
DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId));
......@@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Vector<NDB_SOCKET_TYPE> copy;
m_clients.lock();
for(i = m_clients.size() - 1; i >= 0; i--){
if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){
if(m_clients[i].m_socket != NDB_INVALID_SOCKET &&
println_socket(m_clients[i].m_socket,
MAX_WRITE_TIMEOUT, m_text) == -1){
copy.push_back(m_clients[i].m_socket);
for(i = m_clients.size() - 1; i >= 0; i--)
{
if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat))
{
int fd= m_clients[i].m_socket;
if(fd != NDB_INVALID_SOCKET &&
println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1)
{
copy.push_back(fd);
m_clients.erase(i, false);
}
}
}
m_clients.unlock();
for(i = 0; (unsigned)i < copy.size(); i++){
NDB_CLOSE_SOCKET(copy[i]);
}
if ((n= (int)copy.size()))
{
for(i= 0; i < n; i++)
NDB_CLOSE_SOCKET(copy[i]);
if(copy.size()){
LogLevel tmp; tmp.clear();
m_clients.lock();
for(i = 0; (unsigned)i < m_clients.size(); i++){
for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
}
m_clients.unlock();
update_log_level(tmp);
}
......@@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
}
void
Ndb_mgmd_event_service::add_listener(const Event_listener& client){
Ndb_mgmd_event_service::check_listeners()
{
int i, n= 0;
DBUG_ENTER("Ndb_mgmd_event_service::check_listeners");
m_clients.lock();
for(i= m_clients.size() - 1; i >= 0; i--)
{
int fd= m_clients[i].m_socket;
DBUG_PRINT("info",("%d %d",i,fd));
char buf[1];
buf[0]=0;
if (fd != NDB_INVALID_SOCKET &&
println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1)
{
NDB_CLOSE_SOCKET(fd);
m_clients.erase(i, false);
n=1;
}
}
if (n)
{
LogLevel tmp; tmp.clear();
for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
update_log_level(tmp);
}
m_clients.unlock();
DBUG_VOID_RETURN;
}
void
Ndb_mgmd_event_service::add_listener(const Event_listener& client)
{
DBUG_ENTER("Ndb_mgmd_event_service::add_listener");
DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket));
check_listeners();
m_clients.push_back(client);
update_max_log_level(client.m_logLevel);
DBUG_VOID_RETURN;
}
void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment