ndb - bug#27203

  Allow readTablePk to stumble on scan+deleted tuple,
      reporting no-match instead of crash (in case scan is lock-owner)
parent 1bf8202f
...@@ -760,7 +760,7 @@ private: ...@@ -760,7 +760,7 @@ private:
void increaselistcont(Signal* signal); void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal); void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal); void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*); Uint32 readTablePk(Uint32 localkey1, Uint32 eh, OperationrecPtr);
Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner); Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal); void getdirindex(Signal* signal);
void commitdelete(Signal* signal); void commitdelete(Signal* signal);
......
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
ndbrequire(false); } } while(0) ndbrequire(false); } } while(0)
#else #else
#define vlqrequire(x) ndbrequire(x) #define vlqrequire(x) ndbrequire(x)
#define dump_lock_queue(x)
#endif #endif
...@@ -3184,7 +3185,7 @@ void Dbacc::getdirindex(Signal* signal) ...@@ -3184,7 +3185,7 @@ void Dbacc::getdirindex(Signal* signal)
}//Dbacc::getdirindex() }//Dbacc::getdirindex()
Uint32 Uint32
Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, Ptr<Operationrec> opPtr)
{ {
int ret; int ret;
Uint32 tableId = fragrecptr.p->myTableId; Uint32 tableId = fragrecptr.p->myTableId;
...@@ -3205,8 +3206,17 @@ Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op) ...@@ -3205,8 +3206,17 @@ Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op)
else else
{ {
ndbrequire(ElementHeader::getLocked(eh)); ndbrequire(ElementHeader::getLocked(eh));
ndbrequire((op->m_op_bits & Operationrec::OP_MASK) != ZSCAN_OP); if (unlikely((opPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP))
ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm); {
dump_lock_queue(opPtr);
ndbrequire(opPtr.p->nextParallelQue == RNIL);
ndbrequire(opPtr.p->nextSerialQue == RNIL);
ndbrequire(opPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED);
ndbrequire(opPtr.p->m_op_bits & Operationrec::OP_COMMIT_DELETE_CHECK);
ndbrequire((opPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING);
return 0;
}
ret = c_lqh->readPrimaryKeys(opPtr.p->userptr, ckeys, xfrm);
} }
jamEntry(); jamEntry();
ndbrequire(ret >= 0); ndbrequire(ret >= 0);
...@@ -3351,7 +3361,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) ...@@ -3351,7 +3361,7 @@ Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
if (! searchLocalKey) if (! searchLocalKey)
{ {
Uint32 len = readTablePk(localkey1, tgeElementHeader, Uint32 len = readTablePk(localkey1, tgeElementHeader,
lockOwnerPtr.p); lockOwnerPtr);
found = (len == operationRecPtr.p->xfrmtupkeylen) && found = (len == operationRecPtr.p->xfrmtupkeylen) &&
(memcmp(Tkeydata, ckeys, len << 2) == 0); (memcmp(Tkeydata, ckeys, len << 2) == 0);
} else { } else {
...@@ -6866,6 +6876,7 @@ void Dbacc::execACC_TO_REQ(Signal* signal) ...@@ -6866,6 +6876,7 @@ void Dbacc::execACC_TO_REQ(Signal* signal)
{ {
tatrOpPtr.p->transId1 = signal->theData[2]; tatrOpPtr.p->transId1 = signal->theData[2];
tatrOpPtr.p->transId2 = signal->theData[3]; tatrOpPtr.p->transId2 = signal->theData[3];
validate_lock_queue(tatrOpPtr);
} else { } else {
jam(); jam();
signal->theData[0] = cminusOne; signal->theData[0] = cminusOne;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment