diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp index 7cf18a6685defc24c9dd407d5571f028ca8a51e0..483e08179c0fa44e892d77b9eb048f06d7c0c8e7 100644 --- a/ndb/include/ndbapi/NdbResultSet.hpp +++ b/ndb/include/ndbapi/NdbResultSet.hpp @@ -96,6 +96,11 @@ public: */ void close(); + /** + * Restart + */ + int restart(); + /** * Transfer scan operation to an updating transaction. Use this function * when a scan has found a record that you want to update. diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 6ebf5a083f87ed74d213d89e1a960203518c0d28..c7ae029e742f8a2a84da331a7342fa83b61cef65 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -157,6 +157,8 @@ protected: NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); Uint32 m_ordered; + + int restart(); }; inline diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp index 2c5d4a43c4c729f5e57fa0eb2d61eb20e27aa79b..b286c9fd7c93a049edb748ce71cc4e4afc320fe4 100644 --- a/ndb/src/ndbapi/NdbResultSet.cpp +++ b/ndb/src/ndbapi/NdbResultSet.cpp @@ -89,3 +89,8 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ return -1; return 0; } + +int +NdbResultSet::restart(){ + return m_operation->restart(); +} diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index fcb3e137a47973c8ee732f46b874efc98d636d9e..7dcad95bf5b10508e5e8072cb63a9708204c5cdf 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -470,6 +470,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed) if(DEBUG_NEXT_RESULT) ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last); + if(DEBUG_NEXT_RESULT) + ndbout_c("nextResult(%d) idx=%d last=%d", + fetchAllowed, + idx, last); + /** * Check next buckets */ @@ -1395,3 +1400,88 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ tSignal.setLength(4+1); return tp->sendSignal(&tSignal, nodeId); } + +int +NdbScanOperation::restart(){ + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; + + if(seq != tp->getNodeSequence(nodeId)){ + theNdbCon->theReleaseOnClose = true; + return -1; + } + + while(m_sent_receivers_count){ + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; + return -1; + } + } + + if(m_api_receivers_count+m_conf_receivers_count){ + // Send close scan + if(send_next_scan(0, true) == -1) // Close scan + return -1; + } + + /** + * wait for close scan conf + */ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; + return -1; + } + } + + /** + * Reset receivers + */ + const Uint32 parallell = theParallelism; + + for(Uint32 i = 0; i<parallell; i++){ + m_receivers[i]->m_list_index = i; + m_prepared_receivers[i] = m_receivers[i]->getId(); + m_sent_receivers[i] = m_receivers[i]; + m_conf_receivers[i] = 0; + m_api_receivers[i] = 0; + m_receivers[i]->prepareSend(); + } + + m_api_receivers_count = 0; + m_current_api_receiver = 0; + m_sent_receivers_count = parallell; + m_conf_receivers_count = 0; + + if(m_ordered){ + m_current_api_receiver = parallell; + } + + if (doSendScan(nodeId) == -1) + return -1; + + return 0; +} diff --git a/ndb/test/ndbapi/testScan.cpp b/ndb/test/ndbapi/testScan.cpp index 0a4fa96dd2d3e6bb0ac2e61d5fb0ac6a0e4056e4..de60d68f2132bf7aebcedcd7990155a82a888a16 100644 --- a/ndb/test/ndbapi/testScan.cpp +++ b/ndb/test/ndbapi/testScan.cpp @@ -881,6 +881,93 @@ int runCheckInactivityBeforeClose(NDBT_Context* ctx, NDBT_Step* step){ } +int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){ + int loops = ctx->getNumLoops(); + int records = ctx->getNumRecords(); + Ndb * pNdb = GETNDB(step); + const NdbDictionary::Table* pTab = ctx->getTab(); + + HugoCalculator calc(* pTab); + NDBT_ResultRow tmpRow(* pTab); + + int i = 0; + while (i<loops && !ctx->isTestStopped()) { + g_info << i++ << ": "; + const int record = (rand() % records); + g_info << " row=" << record; + + NdbConnection* pCon = pNdb->startTransaction(); + NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName()); + if (pOp == NULL) { + ERR(pCon->getNdbError()); + return NDBT_FAILED; + } + + NdbResultSet* rs = pOp->readTuples(); + if( rs == 0 ) { + ERR(pCon->getNdbError()); + return NDBT_FAILED; + } + + int check = pOp->interpret_exit_ok(); + if( check == -1 ) { + ERR(pCon->getNdbError()); + return NDBT_FAILED; + } + + // Define attributes to read + for(int a = 0; a<pTab->getNoOfColumns(); a++){ + if((tmpRow.attributeStore(a) = + pOp->getValue(pTab->getColumn(a)->getName())) == 0) { + ERR(pCon->getNdbError()); + return NDBT_FAILED; + } + } + + check = pCon->execute(NoCommit); + if( check == -1 ) { + ERR(pCon->getNdbError()); + return NDBT_FAILED; + } + + int res; + int row = 0; + while(row < record && (res = rs->nextResult()) == 0) { + if(calc.verifyRowValues(&tmpRow) != 0){ + abort(); + return NDBT_FAILED; + } + row++; + } + if(row != record){ + ERR(pCon->getNdbError()); + abort(); + return NDBT_FAILED; + } + g_info << " restarting" << endl; + if((res = rs->restart()) != 0){ + ERR(pCon->getNdbError()); + abort(); + return NDBT_FAILED; + } + + row = 0; + while((res = rs->nextResult()) == 0) { + if(calc.verifyRowValues(&tmpRow) != 0){ + abort(); + return NDBT_FAILED; + } + row++; + } + if(res != 1 || row != records){ + ERR(pCon->getNdbError()); + abort(); + return NDBT_FAILED; + } + pCon->close(); + } + return NDBT_OK; +} NDBT_TESTSUITE(testScan); @@ -1304,6 +1391,12 @@ TESTCASE("ScanReadWhileNodeIsDown", STEP(runStopAndStartNode); FINALIZER(runClearTable); } +TESTCASE("ScanRestart", + "Verify restart functionallity"){ + INITIALIZER(runLoadTable); + STEP(runScanRestart); + FINALIZER(runClearTable); +} NDBT_TESTSUITE_END(testScan); int main(int argc, const char** argv){