Commit 27f7a6c4 authored by joreland@mysql.com's avatar joreland@mysql.com

BUG#9891 - ndb lcp

Crash if ACC_CONTOPREQ was sent while ACC_LCPCONF was in job buffer
  if ACC_LCPCONF would have arrived eariler (before TUP_LCPSTARTED)
  operations could lockup. 
  But would be restarted on next LCP

-- LQH
1) Better check for LCP started that will also return true
   if ACC or TUP already has completed
    
2) Remove incorrect if statement that prevented operations to
   be started if ACC has completed

-- ACC
Make sure all ACC_CONTOPCONF are sent before releasing lcp record
  i.e. use noOfLcpConf == 4 (2 ACC_LCPCONF + 2 ACC_CONTOPCONF)

Check for == 4 also when sending ACC_CONTOPCONF
parent 45a07db5
...@@ -8486,7 +8486,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal) ...@@ -8486,7 +8486,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
break; break;
}//switch }//switch
lcpConnectptr.p->noOfLcpConf++; lcpConnectptr.p->noOfLcpConf++;
ndbrequire(lcpConnectptr.p->noOfLcpConf <= 2); ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
fragrecptr.p->fragState = ACTIVEFRAG; fragrecptr.p->fragState = ACTIVEFRAG;
rlpPageptr.i = fragrecptr.p->zeroPagePtr; rlpPageptr.i = fragrecptr.p->zeroPagePtr;
ptrCheckGuard(rlpPageptr, cpagesize, page8); ptrCheckGuard(rlpPageptr, cpagesize, page8);
...@@ -8504,7 +8504,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal) ...@@ -8504,7 +8504,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
}//for }//for
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB); sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
if (lcpConnectptr.p->noOfLcpConf == 2) { if (lcpConnectptr.p->noOfLcpConf == 4) {
jam(); jam();
releaseLcpConnectRec(signal); releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot; rootfragrecptr.i = fragrecptr.p->myroot;
...@@ -8535,6 +8535,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal) ...@@ -8535,6 +8535,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
/* LOCAL FRAG ID */ /* LOCAL FRAG ID */
tresult = 0; tresult = 0;
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec); ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
{
sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
signal->getLength());
return;
}
ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE); ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
rootfragrecptr.i = lcpConnectptr.p->rootrecptr; rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec); ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
...@@ -8568,6 +8575,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal) ...@@ -8568,6 +8575,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
}//while }//while
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA); sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
lcpConnectptr.p->noOfLcpConf++;
if (lcpConnectptr.p->noOfLcpConf == 4) {
jam();
releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->rootState = ACTIVEROOT;
}//if
return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */ return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
}//Dbacc::execACC_CONTOPREQ() }//Dbacc::execACC_CONTOPREQ()
......
...@@ -968,7 +968,6 @@ public: ...@@ -968,7 +968,6 @@ public:
enum LcpState { enum LcpState {
LCP_IDLE = 0, LCP_IDLE = 0,
LCP_STARTED = 1,
LCP_COMPLETED = 2, LCP_COMPLETED = 2,
LCP_WAIT_FRAGID = 3, LCP_WAIT_FRAGID = 3,
LCP_WAIT_TUP_PREPLCP = 4, LCP_WAIT_TUP_PREPLCP = 4,
...@@ -2266,7 +2265,7 @@ private: ...@@ -2266,7 +2265,7 @@ private:
void sendCopyActiveConf(Signal* signal,Uint32 tableId); void sendCopyActiveConf(Signal* signal,Uint32 tableId);
void checkLcpCompleted(Signal* signal); void checkLcpCompleted(Signal* signal);
void checkLcpHoldop(Signal* signal); void checkLcpHoldop(Signal* signal);
void checkLcpStarted(Signal* signal); bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal); void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal); void getNextFragForLcp(Signal* signal);
void initLcpLocAcc(Signal* signal, Uint32 fragId); void initLcpLocAcc(Signal* signal, Uint32 fragId);
......
...@@ -10351,8 +10351,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal) ...@@ -10351,8 +10351,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal)
void Dblqh::lcpStartedLab(Signal* signal) void Dblqh::lcpStartedLab(Signal* signal)
{ {
checkLcpStarted(signal); if (checkLcpStarted(signal))
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) { {
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO * THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO
...@@ -10432,26 +10432,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal) ...@@ -10432,26 +10432,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal)
lcpPtr.i = signal->theData[1]; lcpPtr.i = signal->theData[1];
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord); ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED); ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) { restartOperationsLab(signal);
jam();
/***********************************************************************/
/* THIS SIGNAL CAN ONLY BE RECEIVED WHEN FRAGMENT IS BLOCKED AND
* THE LOCAL CHECKPOINT HAS BEEN STARTED. THE BLOCKING WILL BE
* REMOVED AS SOON AS ALL OPERATIONS HAVE BEEN STARTED.
***********************************************************************/
restartOperationsLab(signal);
} else if (lcpPtr.p->lcpState == LcpRecord::LCP_BLOCKED_COMP) {
jam();
/*******************************************************************>
* THE CHECKPOINT IS COMPLETED BUT HAS NOT YET STARTED UP
* ALL OPERATIONS AGAIN.
* WE PERFORM THIS START-UP BEFORE CONTINUING WITH THE NEXT
* FRAGMENT OF THE LOCAL CHECKPOINT TO AVOID ANY STRANGE ERRORS.
*******************************************************************> */
restartOperationsLab(signal);
} else {
ndbrequire(false);
}
}//Dblqh::execLQH_RESTART_OP() }//Dblqh::execLQH_RESTART_OP()
void Dblqh::restartOperationsLab(Signal* signal) void Dblqh::restartOperationsLab(Signal* signal)
...@@ -11000,7 +10981,8 @@ void Dblqh::checkLcpHoldop(Signal* signal) ...@@ -11000,7 +10981,8 @@ void Dblqh::checkLcpHoldop(Signal* signal)
* *
* SUBROUTINE SHORT NAME = CLS * SUBROUTINE SHORT NAME = CLS
* ========================================================================== */ * ========================================================================== */
void Dblqh::checkLcpStarted(Signal* signal) bool
Dblqh::checkLcpStarted(Signal* signal)
{ {
LcpLocRecordPtr clsLcpLocptr; LcpLocRecordPtr clsLcpLocptr;
...@@ -11010,7 +10992,7 @@ void Dblqh::checkLcpStarted(Signal* signal) ...@@ -11010,7 +10992,7 @@ void Dblqh::checkLcpStarted(Signal* signal)
do { do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){ if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){
return; return false;
}//if }//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc; clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++; i++;
...@@ -11021,12 +11003,13 @@ void Dblqh::checkLcpStarted(Signal* signal) ...@@ -11021,12 +11003,13 @@ void Dblqh::checkLcpStarted(Signal* signal)
do { do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){ if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){
return; return false;
}//if }//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc; clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++; i++;
} while (clsLcpLocptr.i != RNIL); } while (clsLcpLocptr.i != RNIL);
lcpPtr.p->lcpState = LcpRecord::LCP_STARTED;
return true;
}//Dblqh::checkLcpStarted() }//Dblqh::checkLcpStarted()
/* ========================================================================== /* ==========================================================================
...@@ -11187,20 +11170,12 @@ void Dblqh::sendAccContOp(Signal* signal) ...@@ -11187,20 +11170,12 @@ void Dblqh::sendAccContOp(Signal* signal)
do { do {
ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
sacLcpLocptr.p->accContCounter = 0; sacLcpLocptr.p->accContCounter = 0;
if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED){ /* ------------------------------------------------------------------- */
/* ------------------------------------------------------------------- */ /*SEND START OPERATIONS TO ACC AGAIN */
/*SEND START OPERATIONS TO ACC AGAIN */ /* ------------------------------------------------------------------- */
/* ------------------------------------------------------------------- */ signal->theData[0] = lcpPtr.p->lcpAccptr;
signal->theData[0] = lcpPtr.p->lcpAccptr; signal->theData[1] = sacLcpLocptr.p->locFragid;
signal->theData[1] = sacLcpLocptr.p->locFragid; sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
count++;
} else if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_COMPLETED){
signal->theData[0] = sacLcpLocptr.i;
sendSignal(reference(), GSN_ACC_CONTOPCONF, signal, 1, JBB);
} else {
ndbrequire(false);
}
sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc; sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc;
} while (sacLcpLocptr.i != RNIL); } while (sacLcpLocptr.i != RNIL);
...@@ -11236,9 +11211,18 @@ void Dblqh::sendStartLcp(Signal* signal) ...@@ -11236,9 +11211,18 @@ void Dblqh::sendStartLcp(Signal* signal)
signal->theData[0] = stlLcpLocptr.i; signal->theData[0] = stlLcpLocptr.i;
signal->theData[1] = cownref; signal->theData[1] = cownref;
signal->theData[2] = stlLcpLocptr.p->tupRef; signal->theData[2] = stlLcpLocptr.p->tupRef;
sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA); if(ERROR_INSERTED(5077))
sendSignalWithDelay(fragptr.p->tupBlockref, GSN_TUP_LCPREQ,
signal, 5000, 3);
else
sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc; stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
} while (stlLcpLocptr.i != RNIL); } while (stlLcpLocptr.i != RNIL);
if(ERROR_INSERTED(5077))
{
ndbout_c("Delayed TUP_LCPREQ with 5 sec");
}
}//Dblqh::sendStartLcp() }//Dblqh::sendStartLcp()
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment