ndb - bug#28161

  fix commit triggers with DD but not using DD
parent b5339a80
......@@ -35,7 +35,7 @@ class TupCommitReq {
friend bool printTUPCOMMITREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
public:
STATIC_CONST( SignalLength = 3 );
STATIC_CONST( SignalLength = 4 );
private:
......@@ -45,6 +45,7 @@ private:
Uint32 opPtr;
Uint32 gci;
Uint32 hashValue;
Uint32 diskpage;
};
#endif
......@@ -6348,6 +6348,7 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
tupCommitReq->opPtr = sig0;
tupCommitReq->gci = regTcPtr.p->gci;
tupCommitReq->hashValue = regTcPtr.p->hashValue;
tupCommitReq->diskpage = RNIL;
EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal,
TupCommitReq::SignalLength);
......
......@@ -2126,7 +2126,8 @@ private:
#endif
void checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTablePtr);
Tablerec* regTablePtr,
bool disk);
void fireImmediateTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
......@@ -2138,7 +2139,8 @@ private:
void fireDetachedTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
Operationrec* regOperPtr);
Operationrec* regOperPtr,
bool disk);
void executeTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
......@@ -2146,7 +2148,8 @@ private:
void executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* trigPtr,
Operationrec* regOperPtr);
Operationrec* regOperPtr,
bool disk = true);
bool readTriggerInfo(TupTriggerData* trigPtr,
Operationrec* regOperPtr,
......@@ -2157,8 +2160,9 @@ private:
Uint32* afterBuffer,
Uint32& noAfterWords,
Uint32* beforeBuffer,
Uint32& noBeforeWords);
Uint32& noBeforeWords,
bool disk);
void sendTrigAttrInfo(Signal* signal,
Uint32* data,
Uint32 dataLen,
......
......@@ -358,6 +358,7 @@ Dbtup::disk_page_commit_callback(Signal* signal,
tupCommitReq->opPtr= opPtrI;
tupCommitReq->hashValue= hash_value;
tupCommitReq->gci= gci;
tupCommitReq->diskpage = page_id;
regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
regOperPtr.p->m_commit_disk_callback_page= page_id;
......@@ -388,14 +389,15 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
c_operation_pool.getPtr(regOperPtr, opPtrI);
c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci);
Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
tupCommitReq->opPtr= opPtrI;
tupCommitReq->hashValue= hash_value;
tupCommitReq->gci= gci;
tupCommitReq->diskpage = page;
Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
ndbassert(regOperPtr.p->op_struct.m_load_diskpage_on_commit == 0);
regOperPtr.p->op_struct.m_wait_log_buffer= 0;
m_global_page_pool.getPtr(m_pgman.m_ptr, page);
......@@ -480,7 +482,16 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
req_struct.signal= signal;
req_struct.hash_value= hash_value;
req_struct.gci= gci;
regOperPtr.p->m_commit_disk_callback_page = tupCommitReq->diskpage;
#ifdef VM_TRACE
if (tupCommitReq->diskpage == RNIL)
{
m_pgman.m_ptr.setNull();
req_struct.m_disk_page_ptr.setNull();
}
#endif
ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
PagePtr page;
......@@ -628,8 +639,10 @@ skip_disk:
/**
* Perform "real" commit
*/
Uint32 disk = regOperPtr.p->m_commit_disk_callback_page;
set_change_mask_info(&req_struct, regOperPtr.p);
checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p);
checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p,
disk != RNIL);
if(regOperPtr.p->op_struct.op_type != ZDELETE)
{
......
......@@ -24,7 +24,6 @@
#include "AttributeOffset.hpp"
#include <AttributeHeader.hpp>
#include <Interpreter.hpp>
#include <signaldata/TupCommit.hpp>
#include <signaldata/TupKey.hpp>
#include <signaldata/AttrInfo.hpp>
#include <NdbSqlUtil.hpp>
......
......@@ -459,7 +459,8 @@ void Dbtup::checkDeferredTriggers(Signal* signal,
/* ---------------------------------------------------------------- */
void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTablePtr)
Tablerec* regTablePtr,
bool disk)
{
Uint32 save_type = regOperPtr->op_struct.op_type;
Tuple_header *save_ptr = req_struct->m_tuple_ptr;
......@@ -505,7 +506,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// If any fired immediate insert trigger then fetch after tuple
fireDetachedTriggers(req_struct,
regTablePtr->subscriptionInsertTriggers,
regOperPtr);
regOperPtr, disk);
break;
case(ZDELETE):
ljam();
......@@ -519,7 +520,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// FIRETRIGORD with the before tuple
fireDetachedTriggers(req_struct,
regTablePtr->subscriptionDeleteTriggers,
regOperPtr);
regOperPtr, disk);
break;
case(ZUPDATE):
ljam();
......@@ -533,7 +534,7 @@ void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
// and send two FIRETRIGORD one with before tuple and one with after tuple
fireDetachedTriggers(req_struct,
regTablePtr->subscriptionUpdateTriggers,
regOperPtr);
regOperPtr, disk);
break;
default:
ndbrequire(false);
......@@ -591,7 +592,8 @@ Dbtup::fireDeferredTriggers(Signal* signal,
void
Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
Operationrec* const regOperPtr)
Operationrec* const regOperPtr,
bool disk)
{
TriggerPtr trigPtr;
......@@ -612,7 +614,8 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
ljam();
executeTrigger(req_struct,
trigPtr.p,
regOperPtr);
regOperPtr,
disk);
}
triggerList.next(trigPtr);
}
......@@ -636,7 +639,8 @@ void Dbtup::executeTriggers(KeyReqStruct *req_struct,
void Dbtup::executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* const trigPtr,
Operationrec* const regOperPtr)
Operationrec* const regOperPtr,
bool disk)
{
/**
* The block below does not work together with GREP.
......@@ -703,7 +707,8 @@ void Dbtup::executeTrigger(KeyReqStruct *req_struct,
afterBuffer,
noAfterWords,
beforeBuffer,
noBeforeWords)) {
noBeforeWords,
disk)) {
ljam();
return;
}
......@@ -806,9 +811,9 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
Uint32* const afterBuffer,
Uint32& noAfterWords,
Uint32* const beforeBuffer,
Uint32& noBeforeWords)
Uint32& noBeforeWords,
bool disk)
{
//XXX this will not work with varsize attributes...
noAfterWords = 0;
noBeforeWords = 0;
Uint32 readBuffer[MAX_ATTRIBUTES_IN_TABLE];
......@@ -841,8 +846,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
c_undo_buffer.get_ptr(&req_struct->prevOpPtr.p->m_copy_tuple_location);
}
if (regTabPtr->need_expand())
prepare_read(req_struct, regTabPtr, true);
if (regTabPtr->need_expand(disk))
prepare_read(req_struct, regTabPtr, disk);
int ret = readAttributes(req_struct,
&tableDescriptor[regTabPtr->readKeyArray].tabDescr,
......@@ -937,8 +942,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
req_struct->m_tuple_ptr= (Tuple_header*)ptr;
}
if (regTabPtr->need_expand()) // no disk
prepare_read(req_struct, regTabPtr, true);
if (regTabPtr->need_expand(disk))
prepare_read(req_struct, regTabPtr, disk);
int ret = readAttributes(req_struct,
&readBuffer[0],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment