Commit 91c28daf authored by unknown's avatar unknown

Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-ndb

into  orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug21017

parents 74aca9aa 06b1f15d
...@@ -274,36 +274,48 @@ Backup::execCONTINUEB(Signal* signal) ...@@ -274,36 +274,48 @@ Backup::execCONTINUEB(Signal* signal)
BackupRecordPtr ptr; BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptr_I); c_backupPool.getPtr(ptr, ptr_I);
TablePtr tabPtr;
ptr.p->tables.getPtr(tabPtr, tabPtr_I);
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
BackupFilePtr filePtr; if (tabPtr_I == RNIL)
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
Uint32 * dst;
if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
{ {
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4); closeFiles(signal, ptr);
return; return;
} }
jam();
TablePtr tabPtr;
ptr.p->tables.getPtr(tabPtr, tabPtr_I);
jam();
if(tabPtr.p->fragments.getSize())
{
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
BackupFormat::CtlFile::FragmentInfo * fragInfo = BackupFilePtr filePtr;
(BackupFormat::CtlFile::FragmentInfo*)dst; ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
fragInfo->SectionLength = htonl(sz);
fragInfo->TableId = htonl(fragPtr.p->tableId);
fragInfo->FragmentNo = htonl(fragPtr_I);
fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF);
fragInfo->FilePosHigh = htonl(0 >> 32);
filePtr.p->operation.dataBuffer.updateWritePtr(sz); const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
Uint32 * dst;
if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
{
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
return;
}
BackupFormat::CtlFile::FragmentInfo * fragInfo =
(BackupFormat::CtlFile::FragmentInfo*)dst;
fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
fragInfo->SectionLength = htonl(sz);
fragInfo->TableId = htonl(fragPtr.p->tableId);
fragInfo->FragmentNo = htonl(fragPtr_I);
fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF);
fragInfo->FilePosHigh = htonl(0 >> 32);
filePtr.p->operation.dataBuffer.updateWritePtr(sz);
fragPtr_I++;
}
fragPtr_I++;
if (fragPtr_I == tabPtr.p->fragments.getSize()) if (fragPtr_I == tabPtr.p->fragments.getSize())
{ {
signal->theData[0] = tabPtr.p->tableId; signal->theData[0] = tabPtr.p->tableId;
...@@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal) ...@@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal)
TablePtr tabPtr; TablePtr tabPtr;
ptr.p->tables.first(tabPtr); ptr.p->tables.first(tabPtr);
if (tabPtr.i == RNIL)
{
closeFiles(signal, ptr);
return;
}
signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO; signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
signal->theData[1] = ptr.i; signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i; signal->theData[2] = tabPtr.i;
......
...@@ -1389,7 +1389,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], ...@@ -1389,7 +1389,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[],
MGM_END() MGM_END()
}; };
CHECK_HANDLE(handle, -1); CHECK_HANDLE(handle, -1);
const char *hostname= ndb_mgm_get_connected_host(handle); const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle); int port= ndb_mgm_get_connected_port(handle);
SocketClient s(hostname, port); SocketClient s(hostname, port);
...@@ -1411,19 +1411,20 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], ...@@ -1411,19 +1411,20 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[],
} }
args.put("filter", tmp.c_str()); args.put("filter", tmp.c_str());
} }
int tmp = handle->socket; int tmp = handle->socket;
handle->socket = sockfd; handle->socket = sockfd;
const Properties *reply; const Properties *reply;
reply = ndb_mgm_call(handle, stat_reply, "listen event", &args); reply = ndb_mgm_call(handle, stat_reply, "listen event", &args);
handle->socket = tmp; handle->socket = tmp;
if(reply == NULL) { if(reply == NULL) {
close(sockfd); close(sockfd);
CHECK_REPLY(reply, -1); CHECK_REPLY(reply, -1);
} }
delete reply;
return sockfd; return sockfd;
} }
......
...@@ -173,8 +173,15 @@ private: ...@@ -173,8 +173,15 @@ private:
bool rep_connected; bool rep_connected;
#endif #endif
struct NdbThread* m_event_thread; struct NdbThread* m_event_thread;
NdbMutex *m_print_mutex;
}; };
struct event_thread_param {
NdbMgmHandle *m;
NdbMutex **p;
};
NdbMutex* print_mutex;
/* /*
* Facade object for CommandInterpreter * Facade object for CommandInterpreter
...@@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) ...@@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
m_connected= false; m_connected= false;
m_event_thread= 0; m_event_thread= 0;
try_reconnect = 0; try_reconnect = 0;
m_print_mutex= NdbMutex_Create();
#ifdef HAVE_GLOBAL_REPLICATION #ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL; rep_host = NULL;
m_repserver = NULL; m_repserver = NULL;
...@@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) ...@@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
CommandInterpreter::~CommandInterpreter() CommandInterpreter::~CommandInterpreter()
{ {
disconnect(); disconnect();
NdbMutex_Destroy(m_print_mutex);
} }
static bool static bool
...@@ -444,11 +453,13 @@ CommandInterpreter::printError() ...@@ -444,11 +453,13 @@ CommandInterpreter::printError()
static int do_event_thread; static int do_event_thread;
static void* static void*
event_thread_run(void* m) event_thread_run(void* p)
{ {
DBUG_ENTER("event_thread_run"); DBUG_ENTER("event_thread_run");
NdbMgmHandle handle= *(NdbMgmHandle*)m; struct event_thread_param param= *(struct event_thread_param*)p;
NdbMgmHandle handle= *(param.m);
NdbMutex* printmutex= *(param.p);
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
1, NDB_MGM_EVENT_CATEGORY_STARTUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP,
...@@ -466,7 +477,11 @@ event_thread_run(void* m) ...@@ -466,7 +477,11 @@ event_thread_run(void* m)
{ {
const char ping_token[]= "<PING>"; const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
ndbout << tmp; if(tmp && strlen(tmp))
{
Guard g(printmutex);
ndbout << tmp;
}
} }
} while(do_event_thread); } while(do_event_thread);
NDB_CLOSE_SOCKET(fd); NDB_CLOSE_SOCKET(fd);
...@@ -519,8 +534,11 @@ CommandInterpreter::connect() ...@@ -519,8 +534,11 @@ CommandInterpreter::connect()
assert(m_event_thread == 0); assert(m_event_thread == 0);
assert(do_event_thread == 0); assert(do_event_thread == 0);
do_event_thread= 0; do_event_thread= 0;
struct event_thread_param p;
p.m= &m_mgmsrv2;
p.p= &m_print_mutex;
m_event_thread = NdbThread_Create(event_thread_run, m_event_thread = NdbThread_Create(event_thread_run,
(void**)&m_mgmsrv2, (void**)&p,
32768, 32768,
"CommandInterpreted_event_thread", "CommandInterpreted_event_thread",
NDB_THREAD_PRIO_LOW); NDB_THREAD_PRIO_LOW);
...@@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, ...@@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect,
int result= execute_impl(_line); int result= execute_impl(_line);
if (error) if (error)
*error= m_error; *error= m_error;
return result; return result;
} }
...@@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line) ...@@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line)
DBUG_RETURN(true); DBUG_RETURN(true);
if (strcasecmp(firstToken, "SHOW") == 0) { if (strcasecmp(firstToken, "SHOW") == 0) {
Guard g(m_print_mutex);
executeShow(allAfterFirstToken); executeShow(allAfterFirstToken);
DBUG_RETURN(true); DBUG_RETURN(true);
} }
...@@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ...@@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun,
ndbout_c("Trying to start all nodes of system."); ndbout_c("Trying to start all nodes of system.");
ndbout_c("Use ALL STATUS to see the system start-up phases."); ndbout_c("Use ALL STATUS to see the system start-up phases.");
} else { } else {
Guard g(m_print_mutex);
struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
if(cl == 0){ if(cl == 0){
ndbout_c("Unable get status from management server"); ndbout_c("Unable get status from management server");
...@@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters) ...@@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters)
if(it == 0){ if(it == 0){
ndbout_c("Unable to create config iterator"); ndbout_c("Unable to create config iterator");
ndb_mgm_destroy_configuration(conf);
return; return;
} }
NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it); NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it);
...@@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters) ...@@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters)
print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0); print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0);
print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0); print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0);
// ndbout << helpTextShow; // ndbout << helpTextShow;
ndb_mgm_destroy_configuration(conf);
return; return;
} else if (strcasecmp(parameters, "PROPERTIES") == 0 || } else if (strcasecmp(parameters, "PROPERTIES") == 0 ||
strcasecmp(parameters, "PROP") == 0) { strcasecmp(parameters, "PROP") == 0) {
......
...@@ -77,7 +77,6 @@ ...@@ -77,7 +77,6 @@
}\ }\
} }
extern int global_flag_send_heartbeat_now;
extern int g_no_nodeid_checks; extern int g_no_nodeid_checks;
extern my_bool opt_core; extern my_bool opt_core;
...@@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) ...@@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort)
#include <ClusterMgr.hpp> #include <ClusterMgr.hpp>
void
MgmtSrvr::updateStatus()
{
theFacade->theClusterMgr->forceHB();
}
int int
MgmtSrvr::status(int nodeId, MgmtSrvr::status(int nodeId,
ndb_mgm_node_status * _status, ndb_mgm_node_status * _status,
...@@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (found_matching_type && !found_free_node) { if (found_matching_type && !found_free_node) {
// we have a temporary error which might be due to that // we have a temporary error which might be due to that
// we have got the latest connect status from db-nodes. Force update. // we have got the latest connect status from db-nodes. Force update.
global_flag_send_heartbeat_now= 1; updateStatus();
} }
BaseString type_string, type_c_string; BaseString type_string, type_c_string;
...@@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() ...@@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
if (!m_reserved_nodes.isclear()) { if (!m_reserved_nodes.isclear()) {
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
// node has been reserved, force update signal to ndb nodes // node has been reserved, force update signal to ndb nodes
global_flag_send_heartbeat_now= 1; m_mgmsrv.updateStatus();
char tmp_str[128]; char tmp_str[128];
m_mgmsrv.m_reserved_nodes.getText(tmp_str); m_mgmsrv.m_reserved_nodes.getText(tmp_str);
......
...@@ -490,6 +490,8 @@ public: ...@@ -490,6 +490,8 @@ public:
void get_connected_nodes(NodeBitmask &connected_nodes) const; void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; } SocketServer *get_socket_server() { return m_socket_server; }
void updateStatus();
//************************************************************************** //**************************************************************************
private: private:
//************************************************************************** //**************************************************************************
......
...@@ -982,6 +982,7 @@ printNodeStatus(OutputStream *output, ...@@ -982,6 +982,7 @@ printNodeStatus(OutputStream *output,
MgmtSrvr &mgmsrv, MgmtSrvr &mgmsrv,
enum ndb_mgm_node_type type) { enum ndb_mgm_node_type type) {
NodeId nodeId = 0; NodeId nodeId = 0;
mgmsrv.updateStatus();
while(mgmsrv.getNextNodeId(&nodeId, type)) { while(mgmsrv.getNextNodeId(&nodeId, type)) {
enum ndb_mgm_node_status status; enum ndb_mgm_node_status status;
Uint32 startPhase = 0, Uint32 startPhase = 0,
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#include <mgmapi_configuration.hpp> #include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h> #include <mgmapi_config_parameters.h>
int global_flag_send_heartbeat_now= 0; //#define DEBUG_REG
// Just a C wrapper for threadMain // Just a C wrapper for threadMain
extern "C" extern "C"
...@@ -67,6 +67,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -67,6 +67,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
DBUG_ENTER("ClusterMgr::ClusterMgr"); DBUG_ENTER("ClusterMgr::ClusterMgr");
ndbSetOwnVersion(); ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create(); clusterMgrThreadMutex = NdbMutex_Create();
waitForHBCond= NdbCondition_Create();
waitingForHB= false;
noOfAliveNodes= 0; noOfAliveNodes= 0;
noOfConnectedNodes= 0; noOfConnectedNodes= 0;
theClusterMgrThread= 0; theClusterMgrThread= 0;
...@@ -77,7 +79,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -77,7 +79,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
ClusterMgr::~ClusterMgr() ClusterMgr::~ClusterMgr()
{ {
DBUG_ENTER("ClusterMgr::~ClusterMgr"); DBUG_ENTER("ClusterMgr::~ClusterMgr");
doStop(); doStop();
NdbCondition_Destroy(waitForHBCond);
NdbMutex_Destroy(clusterMgrThreadMutex); NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -163,6 +166,70 @@ ClusterMgr::doStop( ){ ...@@ -163,6 +166,70 @@ ClusterMgr::doStop( ){
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void
ClusterMgr::forceHB()
{
theFacade.lock_mutex();
if(waitingForHB)
{
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
theFacade.unlock_mutex();
return;
}
waitingForHB= true;
NodeBitmask ndb_nodes;
ndb_nodes.clear();
waitForHBFromNodes.clear();
for(Uint32 i = 0; i < MAX_NODES; i++)
{
if(!theNodes[i].defined)
continue;
if(theNodes[i].m_info.m_type == NodeInfo::DB)
{
ndb_nodes.set(i);
const ClusterMgr::Node &node= getNodeInfo(i);
waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
}
}
waitForHBFromNodes.bitAND(ndb_nodes);
#ifdef DEBUG_REG
char buf[128];
ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
#endif
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
signal.theVerId_signalNumber = GSN_API_REGREQ;
signal.theReceiversBlockNumber = QMGR;
signal.theTrace = 0;
signal.theLength = ApiRegReq::SignalLength;
ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
req->version = NDB_VERSION;
int nodeId= 0;
for(int i=0;
NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i));
i= nodeId+1)
{
#ifdef DEBUG_REG
ndbout << "FORCE HB to " << nodeId << endl;
#endif
theFacade.sendSignalUnCond(&signal, nodeId);
}
NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
waitingForHB= false;
#ifdef DEBUG_REG
ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
#endif
theFacade.unlock_mutex();
}
void void
ClusterMgr::threadMain( ){ ClusterMgr::threadMain( ){
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
...@@ -184,9 +251,6 @@ ClusterMgr::threadMain( ){ ...@@ -184,9 +251,6 @@ ClusterMgr::threadMain( ){
/** /**
* Start of Secure area for use of Transporter * Start of Secure area for use of Transporter
*/ */
int send_heartbeat_now= global_flag_send_heartbeat_now;
global_flag_send_heartbeat_now= 0;
theFacade.lock_mutex(); theFacade.lock_mutex();
for (int i = 1; i < MAX_NODES; i++){ for (int i = 1; i < MAX_NODES; i++){
/** /**
...@@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){ ...@@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){
} }
theNode.hbCounter += timeSlept; theNode.hbCounter += timeSlept;
if (theNode.hbCounter >= theNode.hbFrequency || if (theNode.hbCounter >= theNode.hbFrequency) {
send_heartbeat_now) {
/** /**
* It is now time to send a new Heartbeat * It is now time to send a new Heartbeat
*/ */
...@@ -226,7 +289,7 @@ ClusterMgr::threadMain( ){ ...@@ -226,7 +289,7 @@ ClusterMgr::threadMain( ){
if (theNode.m_info.m_type == NodeInfo::REP) { if (theNode.m_info.m_type == NodeInfo::REP) {
signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theReceiversBlockNumber = API_CLUSTERMGR;
} }
#if 0 #ifdef DEBUG_REG
ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
#endif #endif
theFacade.sendSignalUnCond(&signal, nodeId); theFacade.sendSignalUnCond(&signal, nodeId);
...@@ -278,7 +341,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ ...@@ -278,7 +341,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
const NodeId nodeId = refToNode(apiRegReq->ref); const NodeId nodeId = refToNode(apiRegReq->ref);
#if 0 #ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
#endif #endif
...@@ -319,7 +382,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ ...@@ -319,7 +382,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef); const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
#if 0 #ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
#endif #endif
...@@ -351,6 +414,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ ...@@ -351,6 +414,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
if (node.m_info.m_type != NodeInfo::REP) { if (node.m_info.m_type != NodeInfo::REP) {
node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
} }
if(waitingForHB)
{
waitForHBFromNodes.clear(nodeId);
if(waitForHBFromNodes.isclear())
{
waitingForHB= false;
NdbCondition_Broadcast(waitForHBCond);
}
}
} }
void void
...@@ -379,6 +453,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ ...@@ -379,6 +453,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){
default: default:
break; break;
} }
waitForHBFromNodes.clear(nodeId);
if(waitForHBFromNodes.isclear())
NdbCondition_Signal(waitForHBCond);
} }
void void
......
...@@ -49,7 +49,9 @@ public: ...@@ -49,7 +49,9 @@ public:
void doStop(); void doStop();
void startThread(); void startThread();
void forceHB();
private: private:
void threadMain(); void threadMain();
...@@ -85,7 +87,11 @@ private: ...@@ -85,7 +87,11 @@ private:
Uint32 noOfConnectedNodes; Uint32 noOfConnectedNodes;
Node theNodes[MAX_NODES]; Node theNodes[MAX_NODES];
NdbThread* theClusterMgrThread; NdbThread* theClusterMgrThread;
NodeBitmask waitForHBFromNodes; // used in forcing HBs
NdbCondition* waitForHBCond;
bool waitingForHB;
/** /**
* Used for controlling start/stop of the thread * Used for controlling start/stop of the thread
*/ */
......
...@@ -4175,10 +4175,15 @@ static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length) ...@@ -4175,10 +4175,15 @@ static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
acc_row_size+= 4 + /*safety margin*/ 4; acc_row_size+= 4 + /*safety margin*/ 4;
#endif #endif
ulonglong acc_fragment_size= 512*1024*1024; ulonglong acc_fragment_size= 512*1024*1024;
/*
* if not --with-big-tables then max_rows is ulong
* the warning in this case is misleading though
*/
ulonglong big_max_rows = (ulonglong)max_rows;
#if MYSQL_VERSION_ID >= 50100 #if MYSQL_VERSION_ID >= 50100
no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1; no_fragments= (big_max_rows*acc_row_size)/acc_fragment_size+1;
#else #else
no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1 no_fragments= ((big_max_rows*acc_row_size)/acc_fragment_size+1
+1/*correct rounding*/)/2; +1/*correct rounding*/)/2;
#endif #endif
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment