ndb - bug#31477 post-merge fixes in 5.1

parent fc351758
...@@ -1558,7 +1558,7 @@ public: ...@@ -1558,7 +1558,7 @@ public:
/* /*
* TUX checks if tuple is visible to scan. * TUX checks if tuple is visible to scan.
*/ */
bool tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savePointId); bool tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageIndex, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savepointId);
int load_diskpage(Signal*, Uint32 opRec, Uint32 fragPtrI, int load_diskpage(Signal*, Uint32 opRec, Uint32 fragPtrI,
Uint32 local_key, Uint32 flags); Uint32 local_key, Uint32 flags);
...@@ -3041,16 +3041,15 @@ inline ...@@ -3041,16 +3041,15 @@ inline
bool Dbtup::find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId) bool Dbtup::find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId)
{ {
while (true) { while (true) {
if (savepointId > loopOpPtr.p->savePointId) { if (savepointId > loopOpPtr.p->savepointId) {
jam(); jam();
return true; return true;
} }
// note 5.0 has reversed next/prev pointers loopOpPtr.i = loopOpPtr.p->prevActiveOp;
loopOpPtr.i = loopOpPtr.p->nextActiveOp;
if (loopOpPtr.i == RNIL) { if (loopOpPtr.i == RNIL) {
break; break;
} }
ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec); c_operation_pool.getPtr(loopOpPtr);
} }
return false; return false;
} }
......
...@@ -358,21 +358,7 @@ Dbtup::setup_read(KeyReqStruct *req_struct, ...@@ -358,21 +358,7 @@ Dbtup::setup_read(KeyReqStruct *req_struct,
dirty= false; dirty= false;
} }
OperationrecPtr prevOpPtr = currOpPtr; bool found= find_savepoint(currOpPtr, savepointId);
bool found= false;
while(true)
{
if (savepointId > currOpPtr.p->savepointId) {
found= true;
break;
}
if (currOpPtr.p->is_first_operation()){
break;
}
prevOpPtr= currOpPtr;
currOpPtr.i = currOpPtr.p->prevActiveOp;
c_operation_pool.getPtr(currOpPtr);
}
Uint32 currOp= currOpPtr.p->op_struct.op_type; Uint32 currOp= currOpPtr.p->op_struct.op_type;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define DBTUP_C #define DBTUP_C
#define DBTUP_INDEX_CPP #define DBTUP_INDEX_CPP
#include <Dblqh.hpp>
#include "Dbtup.hpp" #include "Dbtup.hpp"
#include <RefConvert.hpp> #include <RefConvert.hpp>
#include <ndb_limits.h> #include <ndb_limits.h>
...@@ -330,7 +331,14 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn ...@@ -330,7 +331,14 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn
* clear to do the full check here. * clear to do the full check here.
*/ */
bool bool
Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savePointId) Dbtup::tuxQueryTh(Uint32 fragPtrI,
Uint32 pageId,
Uint32 pageIndex,
Uint32 tupVersion,
Uint32 transId1,
Uint32 transId2,
bool dirty,
Uint32 savepointId)
{ {
jamEntry(); jamEntry();
FragrecordPtr fragPtr; FragrecordPtr fragPtr;
...@@ -341,30 +349,40 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupV ...@@ -341,30 +349,40 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupV
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr; PagePtr pagePtr;
pagePtr.i = pageId; pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page); c_page_pool.getPtr(pagePtr);
KeyReqStruct req_struct;
{
Operationrec tmpOp;
tmpOp.m_tuple_location.m_page_no = pageId;
tmpOp.m_tuple_location.m_page_idx = pageIndex;
setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
}
Tuple_header* tuple_ptr = req_struct.m_tuple_ptr;
OperationrecPtr currOpPtr; OperationrecPtr currOpPtr;
currOpPtr.i = pagePtr.p->pageWord[pageOffset]; currOpPtr.i = tuple_ptr->m_operation_ptr_i;
if (currOpPtr.i == RNIL) { if (currOpPtr.i == RNIL) {
ljam(); jam();
// tuple has no operation, any scan can see it // tuple has no operation, any scan can see it
return true; return true;
} }
ptrCheckGuard(currOpPtr, cnoOfOprec, operationrec); c_operation_pool.getPtr(currOpPtr);
const bool sameTrans = const bool sameTrans =
transId1 == currOpPtr.p->transid1 && c_lqh->is_same_trans(currOpPtr.p->userpointer, transId1, transId2);
transId2 == currOpPtr.p->transid2;
bool res = false; bool res = false;
OperationrecPtr loopOpPtr = currOpPtr; OperationrecPtr loopOpPtr = currOpPtr;
if (!sameTrans) { if (!sameTrans) {
ljam(); jam();
if (!dirty) { if (!dirty) {
ljam(); jam();
if (currOpPtr.p->prevActiveOp == RNIL) { if (currOpPtr.p->nextActiveOp == RNIL) {
ljam(); jam();
// last op - TUX makes ACC lock request in same timeslice // last op - TUX makes ACC lock request in same timeslice
res = true; res = true;
} }
...@@ -372,33 +390,33 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupV ...@@ -372,33 +390,33 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupV
else { else {
// loop to first op (returns false) // loop to first op (returns false)
find_savepoint(loopOpPtr, 0); find_savepoint(loopOpPtr, 0);
const Uint32 op_type = loopOpPtr.p->optype; const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
if (op_type != ZINSERT) { if (op_type != ZINSERT) {
ljam(); jam();
// read committed version from the page // read committed version
const Uint32 origVersion = pagePtr.p->pageWord[pageOffset + 1]; const Uint32 origVersion = tuple_ptr->get_tuple_version();
if (origVersion == tupVersion) { if (origVersion == tupVersion) {
ljam(); jam();
res = true; res = true;
} }
} }
} }
} }
else { else {
ljam(); jam();
// for own trans, ignore dirty flag // for own trans, ignore dirty flag
if (find_savepoint(loopOpPtr, savePointId)) { if (find_savepoint(loopOpPtr, savepointId)) {
ljam(); jam();
const Uint32 op_type = loopOpPtr.p->optype; const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
if (op_type != ZDELETE) { if (op_type != ZDELETE) {
ljam(); jam();
// check if this op has produced the scanned version // check if this op has produced the scanned version
Uint32 loopVersion = loopOpPtr.p->tupVersion; Uint32 loopVersion = loopOpPtr.p->tupVersion;
if (loopVersion == tupVersion) { if (loopVersion == tupVersion) {
ljam(); jam();
res = true; res = true;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment