ndb: removed TransporterFacade singleton

parent 615699a8
......@@ -792,7 +792,7 @@ private:
bool theBlobFlag;
Uint8 thePendingBlobOps;
static void sendTC_COMMIT_ACK(NdbApiSignal *,
static void sendTC_COMMIT_ACK(class TransporterFacade *, NdbApiSignal *,
Uint32 transId1, Uint32 transId2,
Uint32 aBlockRef);
......
......@@ -560,8 +560,7 @@ MgmtSrvr::start(BaseString &error_string)
DBUG_RETURN(false);
}
}
theFacade= TransporterFacade::theFacadeInstance
= new TransporterFacade();
theFacade= new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......
......@@ -139,7 +139,7 @@ Ndb::NDB_connect(Uint32 tNode)
//***************************************************************************
int tReturnCode;
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theImpl->m_transporter_facade;
DBUG_ENTER("Ndb::NDB_connect");
......@@ -609,7 +609,7 @@ Ndb::NdbTamper(TamperType aAction, int aNode)
tSignal.setData(tNdbConn->ptr2int(),2);
tSignal.setData(theMyRef,3); // Set return block reference
tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theImpl->m_transporter_facade;
if (tAction == 3) {
tp->lock_mutex();
tp->sendSignal(&tSignal, aNode);
......
......@@ -60,6 +60,7 @@ public:
Ndb &m_ndb;
Ndb_cluster_connection_impl &m_ndb_cluster_connection;
TransporterFacade *m_transporter_facade;
NdbDictionaryImpl m_dictionary;
......
......@@ -59,7 +59,7 @@ NdbOperation::doSend(int aNodeId, Uint32 lastFlag)
int tSignalCount = 0;
assert(theTCREQ != NULL);
setLastFlag(theTCREQ, lastFlag);
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
tReturnCode = tp->sendSignal(theTCREQ, aNodeId);
tSignalCount++;
if (tReturnCode == -1) {
......
......@@ -101,7 +101,7 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
Uint32& batch_byte_size,
Uint32& first_batch_size)
{
TransporterFacade *tp= TransporterFacade::instance();
TransporterFacade *tp= m_ndb->theImpl->m_transporter_facade;
Uint32 max_scan_batch_size= tp->get_scan_batch_size();
Uint32 max_batch_byte_size= tp->get_batch_byte_size();
Uint32 max_batch_size= tp->get_batch_size();
......
......@@ -367,7 +367,7 @@ NdbScanOperation::getFirstATTRINFOScan()
int
NdbScanOperation::executeCursor(int nodeId){
NdbTransaction * tCon = theNdbCon;
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
Guard guard(tp->theMutexPtr);
Uint32 magic = tCon->theMagicNumber;
......@@ -469,7 +469,7 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
}
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
......@@ -609,7 +609,7 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
......@@ -664,7 +664,7 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
m_conf_receivers_count,
m_sent_receivers_count);
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
......@@ -828,7 +828,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
req->requestInfo = tmp;
tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
ptr[0].sz = theParallelism;
......@@ -1382,7 +1382,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
if(fetchNeeded){
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
......@@ -1525,7 +1525,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
m_sent_receivers_count = last + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
tSignal.setLength(4+1);
int ret= tp->sendSignal(&tSignal, nodeId);
return ret;
......@@ -1658,7 +1658,7 @@ int
NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
......@@ -1693,7 +1693,7 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
......
......@@ -706,7 +706,7 @@ NdbTransaction::sendTC_HBREP() // Send a TC_HBREP signal;
tcHbRep->transId1 = tTransId1;
tcHbRep->transId2 = tTransId2;
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
tp->lock_mutex();
const int res = tp->sendSignal(tSignal,theDBnode);
tp->unlock_mutex();
......@@ -812,7 +812,7 @@ NdbTransaction::sendROLLBACK() // Send a TCROLLBACKREQ signal;
*************************************************************************/
NdbApiSignal tSignal(tNdb->theMyRef);
Uint32 tTransId1, tTransId2;
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
int tReturnCode;
tTransId1 = (Uint32) theTransactionId;
......@@ -859,7 +859,7 @@ NdbTransaction::sendCOMMIT() // Send a TC_COMMITREQ signal;
{
NdbApiSignal tSignal(theNdb->theMyRef);
Uint32 tTransId1, tTransId2;
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
int tReturnCode;
tTransId1 = (Uint32) theTransactionId;
......
......@@ -79,7 +79,7 @@ Ndb::init(int aMaxNoOfTransactions)
DBUG_RETURN(-1);
}//if
theInitState = StartingInit;
TransporterFacade * theFacade = TransporterFacade::instance();
TransporterFacade * theFacade = theImpl->m_transporter_facade;
theFacade->lock_mutex();
const int tBlockNo = theFacade->open(this,
......@@ -147,7 +147,7 @@ error_handler:
ndbout << "error_handler" << endl;
releaseTransactionArrays();
delete theDictionary;
TransporterFacade::instance()->close(theNdbBlockNumber, 0);
theImpl->m_transporter_facade->close(theNdbBlockNumber, 0);
DBUG_RETURN(-1);
}
......@@ -185,7 +185,7 @@ void Ndb::connected(Uint32 ref)
assert(theMyRef == numberToRef(theNdbBlockNumber, tmpTheNode));
}
TransporterFacade * theFacade = TransporterFacade::instance();
TransporterFacade * theFacade = theImpl->m_transporter_facade;
int i, n= 0;
for (i = 1; i < MAX_NDB_NODES; i++){
if (theFacade->getIsDbNode(i)){
......@@ -258,7 +258,7 @@ Ndb::report_node_failure_completed(Uint32 node_id)
// node failed
// eventOperations in the ndb object should be notified
theEventBuffer->report_node_failure(node_id);
if(!TransporterFacade::instance()->theClusterMgr->isClusterAlive())
if(!theImpl->m_transporter_facade->theClusterMgr->isClusterAlive())
{
// cluster is unavailable,
// eventOperations in the ndb object should be notified
......@@ -374,10 +374,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
if(TcKeyConf::getMarkerFlag(keyConf->confInfo)){
NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
keyConf->transId1,
keyConf->transId2,
aTCRef);
NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
theCommitAckSignal,
keyConf->transId1,
keyConf->transId2,
aTCRef);
}
return;
......@@ -453,10 +454,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
#endif
}
if(tFirstData & 1){
NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
failConf->transId1,
failConf->transId2,
aTCRef);
NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
theCommitAckSignal,
failConf->transId1,
failConf->transId2,
aTCRef);
}
return;
}
......@@ -525,10 +527,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
if(tFirstData & 1){
NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
commitConf->transId1,
commitConf->transId2,
aTCRef);
NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
theCommitAckSignal,
commitConf->transId1,
commitConf->transId2,
aTCRef);
}
return;
}
......@@ -860,10 +863,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}//if
if(TcIndxConf::getMarkerFlag(indxConf->confInfo)){
NdbTransaction::sendTC_COMMIT_ACK(theCommitAckSignal,
indxConf->transId1,
indxConf->transId2,
aTCRef);
NdbTransaction::sendTC_COMMIT_ACK(theImpl->m_transporter_facade,
theCommitAckSignal,
indxConf->transId1,
indxConf->transId2,
aTCRef);
}
return;
}
......@@ -905,7 +909,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
its conditional wait. This will wake up this thread so that it
can continue its work.
*/
TransporterFacade *tp= TransporterFacade::instance();
TransporterFacade *tp= theImpl->m_transporter_facade;
if (tp->get_poll_owner() != t_waiter)
{
/*
......@@ -969,7 +973,7 @@ Ndb::completedTransaction(NdbTransaction* aCon)
if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0;
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = theImpl->m_transporter_facade;
NdbWaiter *t_waiter= &theImpl->theWaiter;
if (tp->get_poll_owner() != t_waiter) {
/*
......@@ -1138,7 +1142,7 @@ Ndb::sendPrepTrans(int forceSend)
and we keep a small space for messages like that.
*/
Uint32 i;
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theImpl->m_transporter_facade;
Uint32 no_of_prep_trans = theNoOfPreparedTransactions;
for (i = 0; i < no_of_prep_trans; i++) {
NdbTransaction * a_con = thePreparedTransactionsArray[i];
......@@ -1279,9 +1283,9 @@ Remark: First send all prepared operations and then check if there are any
void
Ndb::sendPreparedTransactions(int forceSend)
{
TransporterFacade::instance()->lock_mutex();
theImpl->m_transporter_facade->lock_mutex();
sendPrepTrans(forceSend);
TransporterFacade::instance()->unlock_mutex();
theImpl->m_transporter_facade->unlock_mutex();
return;
}//Ndb::sendPreparedTransactions()
......@@ -1302,7 +1306,7 @@ Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSen
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
PollGuard pg(theImpl->m_transporter_facade, &theImpl->theWaiter,
theNdbBlockNumber);
sendPrepTrans(forceSend);
return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
......@@ -1346,7 +1350,7 @@ Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup)
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
PollGuard pg(theImpl->m_transporter_facade, &theImpl->theWaiter,
theNdbBlockNumber);
return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
}
......@@ -1370,7 +1374,7 @@ Ndb::sendRecSignal(Uint16 node_id,
int return_code;
Uint32 read_conn_seq;
TransporterFacade* tp = TransporterFacade::instance();
TransporterFacade* tp = theImpl->m_transporter_facade;
Uint32 send_size = 1; // Always sends one signal only
// Protected area
/*
......@@ -1411,16 +1415,16 @@ Ndb::sendRecSignal(Uint16 node_id,
}//Ndb::sendRecSignal()
void
NdbTransaction::sendTC_COMMIT_ACK(NdbApiSignal * aSignal,
Uint32 transId1, Uint32 transId2,
Uint32 aTCRef){
NdbTransaction::sendTC_COMMIT_ACK(TransporterFacade *tp,
NdbApiSignal * aSignal,
Uint32 transId1, Uint32 transId2,
Uint32 aTCRef){
#ifdef MARKER_TRACE
ndbout_c("Sending TC_COMMIT_ACK(0x%.8x, 0x%.8x) to -> %d",
transId1,
transId2,
refToNode(aTCRef));
#endif
TransporterFacade *tp = TransporterFacade::instance();
aSignal->theTrace = TestOrd::TraceAPI;
aSignal->theReceiversBlockNumber = DBTC;
aSignal->theVerId_signalNumber = GSN_TC_COMMIT_ACK;
......@@ -1437,7 +1441,7 @@ int
NdbImpl::send_event_report(Uint32 *data, Uint32 length)
{
NdbApiSignal aSignal(m_ndb.theMyRef);
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = m_transporter_facade;
aSignal.theTrace = TestOrd::TraceAPI;
aSignal.theReceiversBlockNumber = CMVMI;
aSignal.theVerId_signalNumber = GSN_EVENT_REP;
......
......@@ -108,7 +108,7 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theImpl->m_schemaname.assign(aSchema);
theImpl->update_prefix();
theImpl->theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
theImpl->theWaiter.m_mutex = theImpl->m_transporter_facade->theMutexPtr;
// Signal that the constructor has finished OK
if (theInitState == NotConstructed)
......@@ -148,8 +148,8 @@ Ndb::~Ndb()
delete theEventBuffer;
if (TransporterFacade::instance() != NULL && theNdbBlockNumber > 0){
TransporterFacade::instance()->close(theNdbBlockNumber, theFirstTransId);
if (theImpl->m_transporter_facade != NULL && theNdbBlockNumber > 0){
theImpl->m_transporter_facade->close(theNdbBlockNumber, theFirstTransId);
}
releaseTransactionArrays();
......@@ -205,9 +205,10 @@ NdbImpl::NdbImpl(Ndb_cluster_connection *ndb_cluster_connection,
Ndb& ndb)
: m_ndb(ndb),
m_ndb_cluster_connection(ndb_cluster_connection->m_impl),
m_transporter_facade(ndb_cluster_connection->m_impl.m_transporter_facade),
m_dictionary(ndb),
theCurrentConnectIndex(0),
theNdbObjectIdMap(ndb_cluster_connection->m_impl.m_transporter_facade->theMutexPtr,
theNdbObjectIdMap(m_transporter_facade->theMutexPtr,
1024,1024),
theNoOfDBnodes(0),
m_ev_op(0)
......
......@@ -57,8 +57,6 @@ static int indexToNumber(int index)
#define TRP_DEBUG(t)
#endif
TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
/*****************************************************************************
* Call back functions
*****************************************************************************/
......@@ -116,7 +114,6 @@ reportConnect(void * callbackObj, NodeId nodeId){
ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
#endif
((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
// TransporterFacade::instance()->reportConnected(nodeId);
}
/**
......@@ -128,7 +125,6 @@ reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
#endif
((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
//TransporterFacade::instance()->reportDisconnected(nodeId);
}
void
......@@ -392,7 +388,7 @@ int
TransporterFacade::start_instance(int nodeId,
const ndb_mgm_configuration* props)
{
if (! theFacadeInstance->init(nodeId, props)) {
if (! init(nodeId, props)) {
return -1;
}
......@@ -418,8 +414,7 @@ TransporterFacade::start_instance(int nodeId,
void
TransporterFacade::stop_instance(){
DBUG_ENTER("TransporterFacade::stop_instance");
if(theFacadeInstance)
theFacadeInstance->doStop();
doStop();
DBUG_VOID_RETURN;
}
......
......@@ -57,9 +57,8 @@ public:
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
static TransporterFacade* instance();
int start_instance(int, const ndb_mgm_configuration*);
static void stop_instance();
void stop_instance();
/**
* Register this block for sending/receiving signals
......@@ -277,8 +276,6 @@ private:
public:
NdbMutex* theMutexPtr;
private:
static TransporterFacade* theFacadeInstance;
public:
GlobalDictCache m_globalDictCache;
......@@ -302,12 +299,6 @@ class PollGuard
bool m_locked;
};
inline
TransporterFacade*
TransporterFacade::instance()
{
return theFacadeInstance;
}
inline
void
......
......@@ -189,7 +189,7 @@ Ndb_cluster_connection::node_id()
int Ndb_cluster_connection::get_no_ready()
{
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = m_impl.m_transporter_facade;
if (tp == 0 || tp->ownId() == 0)
return -1;
......@@ -214,7 +214,7 @@ Ndb_cluster_connection::wait_until_ready(int timeout,
int timeout_after_first_alive)
{
DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
TransporterFacade *tp = TransporterFacade::instance();
TransporterFacade *tp = m_impl.m_transporter_facade;
if (tp == 0)
{
DBUG_RETURN(-1);
......@@ -293,9 +293,7 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
NdbMgmHandle h= m_config_retriever->get_mgmHandle();
ndb_mgm_set_name(h, m_name);
}
m_transporter_facade=
TransporterFacade::theFacadeInstance=
new TransporterFacade();
m_transporter_facade= new TransporterFacade();
DBUG_VOID_RETURN;
}
......@@ -303,7 +301,10 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
{
DBUG_ENTER("~Ndb_cluster_connection");
TransporterFacade::stop_instance();
if (m_transporter_facade != 0)
{
m_transporter_facade->stop_instance();
}
if (m_connect_thread)
{
void *status;
......@@ -315,9 +316,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
if (m_transporter_facade != 0)
{
delete m_transporter_facade;
if (m_transporter_facade != TransporterFacade::theFacadeInstance)
abort();
TransporterFacade::theFacadeInstance= 0;
m_transporter_facade = 0;
}
if (m_config_retriever)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment