ndb - bug#19293 and family

  introduce acc per row logical mutex to fix difficult error handling cases
  
parent d435cb85
......@@ -17,14 +17,13 @@
#ifndef DBACC_H
#define DBACC_H
#ifdef VM_TRACE
#define ACC_SAFE_QUEUE
#endif
#include <pc.hpp>
#include <SimulatedBlock.hpp>
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#ifdef DBACC_C
// Debug Macros
#define dbgWord32(ptr, ind, val)
......@@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3
#define ZSCAN_OP 5
/**
* Check kernel_types for other operation types
*/
#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2
#define ZTABLESIZE 16
......@@ -477,6 +479,7 @@ struct Fragmentrec {
/* OPERATIONREC */
/* --------------------------------------------------------------------------------- */
struct Operationrec {
Uint32 m_op_bits;
Uint32 localdata[2];
Uint32 elementIsforward;
Uint32 elementPage;
......@@ -485,36 +488,62 @@ struct Operationrec {
Uint32 fragptr;
Uint32 hashvaluePart;
Uint32 hashValue;
Uint32 insertDeleteLen;
Uint32 nextLockOwnerOp;
Uint32 nextOp;
Uint32 nextParallelQue;
Uint32 nextSerialQue;
union {
Uint32 nextSerialQue;
Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
};
Uint32 prevOp;
Uint32 prevLockOwnerOp;
Uint32 prevParallelQue;
Uint32 prevSerialQue;
union {
Uint32 prevParallelQue;
Uint32 m_lo_last_parallel_op_ptr_i;
};
union {
Uint32 prevSerialQue;
Uint32 m_lo_last_serial_op_ptr_i;
};
Uint32 scanRecPtr;
Uint32 transId1;
Uint32 transId2;
State opState;
Uint32 userptr;
State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
Uint8 elementIsDisappeared;
Uint8 insertIsDone;
Uint8 lockMode;
Uint8 lockOwner;
Uint8 nodeType;
Uint8 operation;
Uint8 opSimple;
Uint8 dirtyRead;
Uint8 commitDeleteCheckFlag;
Uint8 isAccLockReq;
enum OpBits {
OP_MASK = 0x0000F // 4 bits for operation type
,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
// before me
,OP_LOCK_OWNER = 0x00040
,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
,OP_DIRTY_READ = 0x00100
,OP_LOCK_REQ = 0x00200 // isAccLockReq
,OP_COMMIT_DELETE_CHECK = 0x00400
,OP_INSERT_IS_DONE = 0x00800
,OP_ELEMENT_DISAPPEARED = 0x01000
,OP_STATE_MASK = 0xF0000
,OP_STATE_IDLE = 0xF0000
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_STATE_RUNNING_ABORT = 0x20000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
};
bool is_same_trans(const Operationrec* op) const {
return
transId1 == op->transId1 && transId2 == op->transId2;
}
}; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr;
......@@ -577,7 +606,6 @@ struct ScanRec {
Uint32 scanUserblockref;
Uint32 scanMask;
Uint8 scanLockMode;
Uint8 scanKeyinfoFlag;
Uint8 scanTimer;
Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag;
......@@ -602,7 +630,8 @@ public:
virtual ~Dbacc();
// pointer to TUP instance in this thread
Dbtup* c_tup;
class Dbtup* c_tup;
class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal);
......@@ -640,7 +669,8 @@ private:
void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck();
void report_dealloc(Signal* signal, const Operationrec* opPtrP);
typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
void initFragAdd(Signal*, FragmentrecPtr);
......@@ -679,14 +709,30 @@ private:
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal);
Uint32 placeReadInLockQueue(Signal* signal);
void placeSerialQueueRead(Signal* signal);
void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*);
void moveLastParallelQueue(Signal* signal);
void moveLastParallelQueueWrite(Signal* signal);
Uint32 placeWriteInLockQueue(Signal* signal);
void placeSerialQueueWrite(Signal* signal);
#ifdef VM_TRACE
Uint32 getNoParallelTransactionFull(const Operationrec*);
#endif
#ifdef ACC_SAFE_QUEUE
bool validate_lock_queue(OperationrecPtr opPtr);
Uint32 get_parallel_head(OperationrecPtr opPtr);
void dump_lock_queue(OperationrecPtr loPtr);
#else
bool validate_lock_queue(OperationrecPtr) { return true;}
#endif
public:
void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
void startNext(Signal* signal, OperationrecPtr lastOp);
private:
Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal);
......@@ -716,8 +762,8 @@ private:
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 localkey1);
void getElement(Signal* signal);
Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal);
void deleteElement(Signal* signal);
......@@ -726,12 +772,17 @@ private:
void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal);
void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal);
void copyOpInfo(Signal* signal);
void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal);
void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
void startNew(Signal* signal, OperationrecPtr newOwner);
void abortWaitingOperation(Signal*, OperationrecPtr);
void abortExecutedOperation(Signal*, OperationrecPtr);
void takeOutFragWaitQue(Signal* signal);
void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op);
void allocOverflowPage(Signal* signal);
......@@ -780,8 +831,8 @@ private:
void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal);
void accIsLockedLab(Signal* signal);
void insertExistElemLab(Signal* signal);
void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal);
......@@ -840,8 +891,6 @@ private:
Operationrec *operationrec;
OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr;
OperationrecPtr copyInOperPtr;
OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr;
......@@ -885,8 +934,6 @@ private:
Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr;
Page8Ptr priPageptr;
Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr;
Page8Ptr isoPageptr;
......@@ -926,8 +973,6 @@ private:
Tabrec *tabrec;
TabrecPtr tabptr;
Uint32 ctablesize;
Uint32 tpwiElementptr;
Uint32 tpriElementptr;
Uint32 tgseElementptr;
Uint32 tgseContainerptr;
Uint32 trlHead;
......@@ -961,8 +1006,6 @@ private:
Uint32 tdelForward;
Uint32 tiopPageId;
Uint32 tipPageId;
Uint32 tgeLocked;
Uint32 tgeResult;
Uint32 tgeContainerptr;
Uint32 tgeElementptr;
Uint32 tgeForward;
......
......@@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx):
&fragrecptr,
&operationRecPtr,
&idrOperationRecPtr,
&copyInOperPtr,
&copyOperPtr,
&mlpqOperPtr,
&queOperPtr,
&readWriteOpPtr,
......@@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx):
&lcnPageptr,
&lcnCopyPageptr,
&lupPageptr,
&priPageptr,
&pwiPageptr,
&ciPageidptr,
&gsePageidptr,
&isoPageptr,
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -33,8 +33,6 @@
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#include "../dbacc/Dbacc.hpp"
#ifdef DBLQH_C
// Constants
/* ------------------------------------------------------------------------- */
......@@ -2556,8 +2554,19 @@ private:
Dbtup* c_tup;
Dbacc* c_acc;
/**
* Read primary key from tup
*/
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
/**
* Read primary key from operation
*/
public:
Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
private:
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
......@@ -2924,6 +2933,11 @@ public:
}
DLHashTable<ScanRecord> c_scanTakeOverHash;
#ifdef ERROR_INSERT
inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
#endif
};
inline
......@@ -2991,10 +3005,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal);
if (ERROR_INSERTED(5712))
if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key;
}
inline
bool
Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
{
return (ERROR_INSERTED(5712) &&
(regTcPtr->operation == ZINSERT ||
regTcPtr->operation == ZDELETE)) ||
ERROR_INSERTED(5713);
}
#endif
......@@ -67,48 +67,70 @@
// seen only when we debug the product
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state;
return out;
}
static
NdbOut &
operator<<(NdbOut& out, Operation_t op)
{
switch(op){
case ZREAD: out << "READ"; break;
case ZREAD_EX: out << "READ-EX"; break;
case ZINSERT: out << "INSERT"; break;
case ZUPDATE: out << "UPDATE"; break;
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
}
return out;
}
#else
#define DEBUG(x)
#endif
......@@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0;
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h>
NdbOut * tracenrout = 0;
static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1
......@@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0;
#define CLEAR_TRACENR_FLAG
#endif
#ifdef ERROR_INSERT
static NdbOut * traceopout = 0;
#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
#else
#define TRACE_OP(x) {}
#endif
/* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */
/* */
......@@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal)
name = NdbConfig_SignalLogFileName(getOwnNodeId());
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif
#ifdef ERROR_INSERT
traceopout = &ndbout;
#endif
return;
break;
......@@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED:
jam();
/* -------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* -------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* ------------------------------------------------------------------------- */
break;
default:
ndbrequire(false);
break;
}//switch
}//Dblqh::execTUPKEYCONF()
/* ************> */
......@@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
TRACE_OP(regTcPtr, "TUPKEYREF");
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
......@@ -2568,7 +2604,12 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
else if (getNodeState().startLevel == NodeState::SL_STARTED)
{
if (terrorCode == 899)
ndbout << "899: " << regTcPtr->m_row_id << endl;
}
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
......@@ -3606,57 +3647,7 @@ void Dblqh::endgettupkeyLab(Signal* signal)
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return;
}//if
//#define TRACE_LQHKEYREQ
#ifdef TRACE_LQHKEYREQ
{
ndbout << (regTcPtr->operation == ZREAD ? "READ" :
regTcPtr->operation == ZUPDATE ? "UPDATE" :
regTcPtr->operation == ZINSERT ? "INSERT" :
regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
<< "(" << (int)regTcPtr->operation << ")"
<< " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
<< ", " << refToNode(regTcPtr->clientBlockref) << ")"
<< " table=" << regTcPtr->tableref << " ";
ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
ndbout << "attr=[" << hex;
for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
while(i < regTcPtr->totReclenAi)
{
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
ndbrequire(dataLen != 0);
ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
for(Uint32 j= 0; j<dataLen; j++, i++)
ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
}
ndbout << "]" << endl;
}
#endif
/* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
......@@ -3763,6 +3754,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ----------------------------------------------------------------- */
if (TRACENR_FLAG)
{
TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
......@@ -3847,6 +3839,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
signal->theData[8] = sig2;
signal->theData[9] = sig3;
signal->theData[10] = sig4;
TRACE_OP(regTcPtr.p, "ACC");
if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11);
}//if
......@@ -4133,7 +4128,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
jam();
ndbrequire(rowid == 0);
signal->theData[0] = accPtr;
signal->theData[1] = false;
signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry();
return;
......@@ -4144,16 +4139,18 @@ Dblqh::nr_copy_delete_row(Signal* signal,
*/
ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id;
signal->theData[0] = regTcPtr.p->accConnectrec;
c_acc->execACCKEY_ORD(signal, accPtr);
signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry();
ndbrequire(regTcPtr.p->m_dealloc == 1);
int ret = c_tup->nr_delete(signal, regTcPtr.i,
fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id,
regTcPtr.p->gci);
jamEntry();
if (ret)
{
ndbassert(ret == 1);
......@@ -4167,7 +4164,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
}
TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
regTcPtr.p->m_dealloc = 0;
regTcPtr.p->m_row_id = save;
fragptr = fragPtr;
......@@ -4274,6 +4271,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
}
}
Uint32
Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
{
TcConnectionrecPtr regTcPtr;
DatabufPtr regDatabufptr;
Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
jamEntry();
regTcPtr.i = opPtrI;
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
Uint32 tableId = regTcPtr.p->tableref;
Uint32 keyLen = regTcPtr.p->primKeyLen;
regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
if (keyLen > 4)
{
tmp += 4;
Uint32 pos = 4;
do {
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
regDatabufptr.i = regDatabufptr.p->nextDatabuf;
tmp += sizeof(regDatabufptr.p->data) >> 2;
pos += sizeof(regDatabufptr.p->data) >> 2;
} while(pos < keyLen);
}
if (xfrm)
{
jam();
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
}
return keyLen;
}
/* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */
......@@ -4447,10 +4483,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
* ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS;
#ifdef TRACE_LQHKEYREQ
ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
<< endl;
#endif
Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6);
......@@ -4506,70 +4538,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1;
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << "INSERT " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ")";
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "] ";
}
if(regTcPtr->m_use_rowid)
ndbout << " " << regTcPtr->m_row_id;
}
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
{
Local_key lk; lk.assref(local_key);
ndbout << "DELETE " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ") " << lk;
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
}
}
TRACE_OP(regTcPtr, "TUPKEYREQ");
regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << endl;
}
}//Dblqh::execACCKEYCONF()
void
......@@ -4654,27 +4629,37 @@ void Dblqh::tupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
Uint32 readLen = tupKeyConf->readLength;
Uint32 writeLen = tupKeyConf->writeLength;
Uint32 accOp = regTcPtr->accConnectrec;
c_acc->execACCKEY_ORD(signal, accOp);
TRACE_OP(regTcPtr, "TUPKEYCONF");
if (regTcPtr->simpleRead) {
jam();
/* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET
* THE OPERATION IS A SIMPLE READ.
* WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
* (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
* ---------------------------------------------------------------------- */
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
* READ LENGTH
* --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal);
return;
}//if
if (tupKeyConf->readLength != 0) {
if (readLen != 0)
{
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = tupKeyConf->readLength;
regTcPtr->readlenAi = readLen;
}//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength;
regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
......@@ -5597,6 +5582,8 @@ void Dblqh::releaseOprec(Signal* signal)
if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref;
......@@ -5818,6 +5805,10 @@ void Dblqh::execCOMMIT(Signal* signal)
ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMMIT");
commitReqLab(signal, gci);
return;
}//if
......@@ -5937,6 +5928,10 @@ void Dblqh::execCOMPLETE(Signal* signal)
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMPLETE");
if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam();
......@@ -6313,12 +6308,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TRACENR(endl);
}
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else {
if(!dirtyOp){
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
......@@ -6362,6 +6361,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI)
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
TRACE_OP(tcPtr, "ACC_COMMITREQ");
Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
......@@ -6670,6 +6671,8 @@ void Dblqh::execABORT(Signal* signal)
regTcPtr->commitAckMarker = RNIL;
}
TRACE_OP(regTcPtr, "ABORT");
abortStateHandlerLab(signal);
return;
......@@ -7087,23 +7090,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP.
* ------------------------------------------------------------------------ */
* ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p;
fragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(fragptr);
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
TRACE_OP(regTcPtr, "ACC ABORT");
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec;
signal->theData[1] = true;
signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
/* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they
* are executed.
* ----------------------------------------------------------------------- */
if (signal->theData[1] == RNIL)
{
jam();
/* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they
* are executed.
* --------------------------------------------------------------------- */
return;
}
execACC_ABORTCONF(signal);
return;
}//Dblqh::abortContinueAfterBlockedLab()
......@@ -7117,6 +7127,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
TRACE_OP(regTcPtr, "ACC_ABORTCONF");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
continueAbortLab(signal);
return;
}//Dblqh::execACC_ABORTCONF()
......@@ -8837,7 +8852,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
}//if
// If accOperationPtr == RNIL no record was returned by ACC
if (nextScanConf->accOperationPtr == RNIL) {
Uint32 accOpPtr = nextScanConf->accOperationPtr;
if (accOpPtr == RNIL)
{
jam();
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
......@@ -8871,7 +8888,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
jam();
set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows,
nextScanConf->accOperationPtr);
accOpPtr);
jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
......@@ -9071,12 +9089,24 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
if ((scanptr.p->scanLockHold == ZTRUE) && rows)
{
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
......@@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4;
scanptr.p->m_curr_batch_size_rows++;
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) {
......@@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
return;
} else {
jam();
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
......@@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
if ((scanptr.p->scanLockHold == ZTRUE) && rows)
{
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
......@@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanptr.p->scanReleaseCounter = 1;
} else {
jam();
scanptr.p->m_curr_batch_size_rows++;
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->scanReleaseCounter = rows + 1;
}//if
/* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY
......@@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
return;
}//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
if (scanptr.p->m_curr_batch_size_rows > 0) {
if (rows) {
if (time_passed > 1) {
/* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
......@@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API.
* ----------------------------------------------------------------------- */
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
......@@ -9872,6 +9914,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0,
0,
......@@ -10074,7 +10118,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
initCopyTc(signal, ZDELETE);
set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
tcConP->gci = nextScanConf->gci;
tcConP->primKeyLen = 0;
tcConP->totSendlenAi = 0;
tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
......@@ -10197,6 +10241,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p;
Uint32 rows = scanP->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
ndbassert(accOpPtr != (Uint32)-1);
c_acc->execACCKEY_ORD(signal, accOpPtr);
if (tcConnectptr.p->errorCode != 0) {
jam();
closeCopyLab(signal);
......@@ -18538,6 +18588,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
}
ndbrequire(arg != 2308);
}
#ifdef ERROR_INSERT
if (arg == 5712 || arg == 5713)
{
if (arg == 5712)
{
traceopout = &ndbout;
}
else if (arg == 5713)
{
traceopout = tracenrout;
}
SET_ERROR_INSERT_VALUE(arg);
}
#endif
}//Dblqh::execDUMP_STATE_ORD()
......@@ -18702,3 +18767,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place,
logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
}
#if defined ERROR_INSERT
void
Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
{
(* traceopout)
<< "[ " << hex << regTcPtr->transid[0]
<< " " << hex << regTcPtr->transid[1] << " ] " << dec
<< pos
<< " " << (Operation_t)regTcPtr->operation
<< " " << regTcPtr->tableref
<< "(" << regTcPtr->fragmentid << ")"
<< "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
{
(* traceopout) << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
(* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
}
(* traceopout) << "] ";
}
if (regTcPtr->m_use_rowid)
(* traceopout) << " " << regTcPtr->m_row_id;
(* traceopout) << endl;
}
#endif
......@@ -297,7 +297,6 @@ enum TransState {
};
enum TupleState {
TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3
......@@ -305,7 +304,6 @@ enum TupleState {
enum State {
NOT_INITIALIZED = 0,
COMMON_AREA_PAGES = 1,
IDLE = 17,
ACTIVE = 18,
SYSTEM_RESTART = 19,
......@@ -1441,7 +1439,6 @@ private:
void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal);
void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);
......
......@@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
}
}
void Dbtup::execTUP_ALLOCREQ(Signal* signal)
{
OperationrecPtr regOperPtr;
jamEntry();
regOperPtr.i= signal->theData[0];
c_operation_pool.getPtr(regOperPtr);
regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
//ndbout_c("execTUP_ALLOCREQ");
signal->theData[0]= 0;
signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
return;
mem_error:
jam();
signal->theData[0]= ZMEM_NOMEM_ERROR;
return;
}
void
Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr)
......@@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal,
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p;
if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT)
if(local_key == ~(Uint32)0)
{
jam();
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
return 1;
}
}
jam();
Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
......@@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2;
regOperPtr->m_tuple_location.m_page_idx= sig3;
Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex;
......@@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4;
Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen;
......@@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
copyAttrinfo(regOperPtr, &cinBuffer[0]);
if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT)
Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
if(Roptype == ZINSERT && localkey == ~0)
{
// No tuple allocatated yet
goto do_insert;
......@@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}
void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
if(cnt1)
{
// Disk part is 32-bit aligned
char *varptr = req_struct->m_var_data[MM].m_data_ptr;
ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
}
else
{
ptr -= Tuple_header::HeaderSize;
}
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
dst->m_var_len_offset= cnt2;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
// Set all null bits
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
req_struct->m_tuple_ptr->m_header_bits =
(Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
......@@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal,
Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
bool disk_insert = regOperPtr.p->is_first_operation() && disk;
bool mem_insert = regOperPtr.p->is_first_operation();
bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
......@@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal,
if(mem_insert)
{
jam();
ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
}
else
{
if (!regOperPtr.p->is_first_operation())
{
Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE);
tup_version= prevOp->tupVersion + 1;
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
Operationrec* prevOp= req_struct->prevOpPtr.p;
ndbassert(prevOp->op_struct.op_type == ZDELETE);
tup_version= prevOp->tupVersion + 1;
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else
......@@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (disk_insert)
{
int res;
if (unlikely(!mem_insert))
{
sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
}
if (ERROR_INSERTED(4015))
{
......@@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal,
}
if (unlikely(ptr == 0))
{
jam();
goto alloc_rowid_error;
}
}
......@@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal,
(varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
else if(!rowid || !regOperPtr.p->is_first_operation())
else
{
int ret;
if (ERROR_INSERTED(4020))
......@@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal,
req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
if ((req_struct->m_row_id.m_page_no == frag_page_id &&
req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
{
ndbout_c("no mem insert but rowid (same)");
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
// no mem insert, but rowid
ndbrequire(false);
}
}
base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1);
......
......@@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);
......
......@@ -665,9 +665,9 @@ main(int argc, const char** argv){
for(Uint32 i = 0; i < 12; i++)
{
if(i == 6 || i == 8 || i == 10)
if(false && (i == 6 || i == 8 || i == 10))
continue;
BaseString name("bug_9749");
name.appfmt("_%d", i);
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment