ndb - interface change of handling of abort option

1) move AbortOption from NdbTransaction to NdbOperation

2) let each operation have a "default" abort option dependant on
   operation type

   - read - AO_IgnoreError
   - dml - AbortOnError
   - scan take over - AbortOnError

3) Changed default value to execute() from AbortOnError to DefaultAbortOption, which does not change the operations abort-option.

   Another value to execute(AO) is equivalent to setting AO on each operation before calling execute

4) execute() does _only_ return -1 if transaction has been aborted
   otherwise, you need to check each operation for error code
parent 7d54ea6b
......@@ -260,13 +260,14 @@ static int ndb_to_mysql_error(const NdbError *ndberr)
int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
{
int res= trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError,
NdbOperation::AO_IgnoreError,
h->m_force_send);
if (res == 0)
return 0;
if (res == -1)
return -1;
const NdbError &err= trans->getNdbError();
if (err.classification != NdbError::ConstraintViolation &&
if (err.classification != NdbError::NoError &&
err.classification != NdbError::ConstraintViolation &&
err.classification != NdbError::NoDataFound)
return res;
......@@ -286,7 +287,7 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans,
return h->m_ignore_no_key ?
execute_no_commit_ignore_no_key(h,trans) :
trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
NdbOperation::AbortOnError,
h->m_force_send);
}
......@@ -299,7 +300,7 @@ int execute_commit(ha_ndbcluster *h, NdbTransaction *trans)
return 0;
#endif
return trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
NdbOperation::AbortOnError,
h->m_force_send);
}
......@@ -312,7 +313,7 @@ int execute_commit(THD *thd, NdbTransaction *trans)
return 0;
#endif
return trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
NdbOperation::AbortOnError,
thd->variables.ndb_force_send);
}
......@@ -327,7 +328,7 @@ int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans,
#endif
h->release_completed_operations(trans, force_release);
return trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AO_IgnoreError,
NdbOperation::AO_IgnoreError,
h->m_force_send);
}
......@@ -1731,7 +1732,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
ERR_RETURN(trans->getNdbError());
}
if (execute_no_commit_ie(this,trans,false) != 0)
if ((res = execute_no_commit_ie(this,trans,false)) != 0 ||
op->getNdbError().code)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
......@@ -2003,7 +2005,8 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit_ie(this,trans,false) != 0)
if (execute_no_commit_ie(this,trans,false) != 0 ||
op->getNdbError().code)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
......@@ -7742,7 +7745,7 @@ ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb, const
(char*)&var_mem);
if (pTrans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError,
NdbOperation::AbortOnError,
TRUE) == -1)
{
error= pTrans->getNdbError();
......@@ -8000,7 +8003,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
!op->readTuple(lm) &&
!set_primary_key(op, multi_range_curr->start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(AO_IgnoreError), TRUE) &&
(!m_use_partition_function ||
(op->setPartitionId(part_spec.start_part), true)))
curr += reclength;
......@@ -8022,8 +8024,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) &&
!set_index_key(op, key_info, multi_range_curr->start_key.key) &&
!define_read_attrs(curr, op) &&
(op->setAbortOption(AO_IgnoreError), TRUE))
!define_read_attrs(curr, op))
curr += reclength;
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
......
......@@ -98,6 +98,19 @@ public:
#endif
};
/**
* How should transaction be handled if operation fails
*
* For READ, default is AO_IgnoreError
* DML, default is AbortOnError
* CommittedRead does _only_ support AO_IgnoreError
*/
enum AbortOption {
DefaultAbortOption = -1,///< Use default as specified by op-type
AbortOnError = 0, ///< Abort transaction on failed operation
AO_IgnoreError = 2 ///< Transaction continues on failed operation
};
/**
* Define the NdbOperation to be a standard operation of type insertTuple.
* When calling NdbTransaction::execute, this operation
......@@ -777,8 +790,13 @@ public:
*/
LockMode getLockMode() const { return theLockMode; }
/**
* Get/set abort option
*/
AbortOption getAbortOption() const;
int setAbortOption(AbortOption);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
void setAbortOption(Int8 ao) { m_abortOption = ao; }
/**
* Set/get partition key
......@@ -857,7 +875,8 @@ protected:
int doSend(int ProcessorId, Uint32 lastFlag);
virtual int prepareSend(Uint32 TC_ConnectPtr,
Uint64 TransactionId);
Uint64 TransactionId,
AbortOption);
virtual void setLastFlag(NdbApiSignal* signal, Uint32 lastFlag);
int prepareSendInterpreted(); // Help routine to prepare*
......
......@@ -21,6 +21,7 @@
#include "NdbError.hpp"
#include "NdbDictionary.hpp"
#include "Ndb.hpp"
#include "NdbOperation.hpp"
class NdbTransaction;
class NdbOperation;
......@@ -45,11 +46,12 @@ typedef void (* NdbAsynchCallback)(int, NdbTransaction*, void*);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
enum AbortOption {
CommitIfFailFree= 0,
TryCommit= 0,
AbortOnError= 0,
CommitAsMuchAsPossible= 2,
AO_IgnoreError= 2
DefaultAbortOption = NdbOperation::DefaultAbortOption,
CommitIfFailFree = NdbOperation::AbortOnError,
TryCommit = NdbOperation::AbortOnError,
AbortOnError= NdbOperation::AbortOnError,
CommitAsMuchAsPossible = NdbOperation::AO_IgnoreError,
AO_IgnoreError= NdbOperation::AO_IgnoreError
};
enum ExecType {
NoExecTypeDef = -1,
......@@ -145,20 +147,6 @@ class NdbTransaction
public:
/**
* Commit type of transaction
*/
enum AbortOption {
AbortOnError= ///< Abort transaction on failed operation
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
::AbortOnError
#endif
,AO_IgnoreError= ///< Transaction continues on failed operation
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
::AO_IgnoreError
#endif
};
/**
* Execution type of transaction
*/
......@@ -317,13 +305,15 @@ public:
* @return 0 if successful otherwise -1.
*/
int execute(ExecType execType,
AbortOption abortOption = AbortOnError,
NdbOperation::AbortOption = NdbOperation::DefaultAbortOption,
int force = 0 );
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
int execute(::ExecType execType,
::AbortOption abortOption = ::AbortOnError,
int force = 0 )
{ return execute ((ExecType)execType,(AbortOption)abortOption,force); }
::AbortOption abortOption = ::DefaultAbortOption,
int force = 0 ) {
return execute ((ExecType)execType,
(NdbOperation::AbortOption)abortOption,
force); }
#endif
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
......@@ -354,14 +344,14 @@ public:
void executeAsynchPrepare(ExecType execType,
NdbAsynchCallback callback,
void* anyObject,
AbortOption abortOption = AbortOnError);
NdbOperation::AbortOption = NdbOperation::DefaultAbortOption);
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
void executeAsynchPrepare(::ExecType execType,
NdbAsynchCallback callback,
void* anyObject,
::AbortOption abortOption = ::AbortOnError)
{ executeAsynchPrepare((ExecType)execType, callback, anyObject,
(AbortOption)abortOption); }
::AbortOption ao = ::DefaultAbortOption) {
executeAsynchPrepare((ExecType)execType, callback, anyObject,
(NdbOperation::AbortOption)ao); }
#endif
/**
......@@ -380,14 +370,14 @@ public:
void executeAsynch(ExecType aTypeOfExec,
NdbAsynchCallback aCallback,
void* anyObject,
AbortOption abortOption = AbortOnError);
NdbOperation::AbortOption = NdbOperation::DefaultAbortOption);
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
void executeAsynch(::ExecType aTypeOfExec,
NdbAsynchCallback aCallback,
void* anyObject,
::AbortOption abortOption= ::AbortOnError)
::AbortOption abortOption= ::DefaultAbortOption)
{ executeAsynch((ExecType)aTypeOfExec, aCallback, anyObject,
(AbortOption)abortOption); }
(NdbOperation::AbortOption)abortOption); }
#endif
#endif
/**
......@@ -589,7 +579,7 @@ private:
void init(); // Initialize connection object for new transaction
int executeNoBlobs(ExecType execType,
AbortOption abortOption = AbortOnError,
NdbOperation::AbortOption = NdbOperation::DefaultAbortOption,
int force = 0 );
/**
......@@ -643,7 +633,7 @@ private:
int sendCOMMIT(); // Send a TC_COMMITREQ signal;
void setGCI(int GCI); // Set the global checkpoint identity
int OpCompleteFailure(Uint8 abortoption, bool setFailure = true);
int OpCompleteFailure(NdbOperation*);
int OpCompleteSuccess();
void CompletedOperations(); // Move active ops to list of completed
......@@ -733,7 +723,6 @@ private:
Uint32 theNoOfOpSent; // How many operations have been sent
Uint32 theNoOfOpCompleted; // How many operations have completed
Uint32 theNoOfOpFetched; // How many operations was actually fetched
Uint32 theMyRef; // Our block reference
Uint32 theTCConPtr; // Transaction Co-ordinator connection pointer.
Uint64 theTransactionId; // theTransactionId of the transaction
......@@ -757,7 +746,6 @@ private:
bool theTransactionIsStarted;
bool theInUseState;
bool theSimpleState;
Uint8 m_abortOption; // Type of commi
enum ListState {
NotInList,
......
......@@ -1134,7 +1134,7 @@ NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
......@@ -1170,7 +1170,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
......@@ -1194,7 +1194,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
......@@ -1217,7 +1217,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
setErrorCode(tOp);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
n++;
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
......@@ -1253,7 +1253,7 @@ NdbBlob::deletePartsUnknown(Uint32 part)
setErrorCode(tOp);
DBUG_RETURN(-1);
}
tOp->m_abortOption= NdbTransaction::AO_IgnoreError;
tOp->m_abortOption= NdbOperation::AO_IgnoreError;
n++;
}
DBUG_PRINT("info", ("bat=%u", bat));
......@@ -1589,7 +1589,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
DBUG_RETURN(-1);
}
if (isWriteOp()) {
tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
tOp->m_abortOption = NdbOperation::AO_IgnoreError;
}
theHeadInlineReadOp = tOp;
// execute immediately
......@@ -1635,7 +1635,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
DBUG_RETURN(-1);
}
if (isWriteOp()) {
tOp->m_abortOption = NdbTransaction::AO_IgnoreError;
tOp->m_abortOption = NdbOperation::AO_IgnoreError;
}
theHeadInlineReadOp = tOp;
// execute immediately
......@@ -1808,7 +1808,7 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
setErrorCode(NdbBlobImpl::ErrAbort);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
DBUG_PRINT("info", ("added op to update head+inline"));
}
DBUG_RETURN(0);
......@@ -1838,7 +1838,7 @@ NdbBlob::preCommit()
setErrorCode(NdbBlobImpl::ErrAbort);
DBUG_RETURN(-1);
}
tOp->m_abortOption = NdbTransaction::AbortOnError;
tOp->m_abortOption = NdbOperation::AbortOnError;
DBUG_PRINT("info", ("added op to update head+inline"));
}
}
......
......@@ -428,7 +428,7 @@ NdbIndexStat::records_in_range(const NdbDictionary::Index* index, NdbIndexScanOp
DBUG_RETURN(-1);
}
if (trans->execute(NdbTransaction::NoCommit,
NdbTransaction::AbortOnError, forceSend) == -1) {
NdbOperation::AbortOnError, forceSend) == -1) {
m_error = trans->getNdbError();
DBUG_PRINT("error", ("trans:%d op:%d", trans->getNdbError().code,
op->getNdbError().code));
......
......@@ -45,6 +45,7 @@ NdbOperation::insertTuple()
tNdbCon->theSimpleState = 0;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
......@@ -65,6 +66,7 @@ NdbOperation::updateTuple()
theOperationType = UpdateRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
......@@ -85,12 +87,35 @@ NdbOperation::writeTuple()
theOperationType = WriteRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
return -1;
}//if
}//NdbOperation::writeTuple()
/*****************************************************************************
* int deleteTuple();
*****************************************************************************/
int
NdbOperation::deleteTuple()
{
NdbTransaction* tNdbCon = theNdbCon;
int tErrorLine = theErrorLine;
if (theStatus == Init) {
theStatus = OperationDefined;
tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
return -1;
}//if
}//NdbOperation::deleteTuple()
/******************************************************************************
* int readTuple();
*****************************************************************************/
......@@ -125,6 +150,7 @@ NdbOperation::readTuple()
theOperationType = ReadRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Read;
m_abortOption = AO_IgnoreError;
return 0;
} else {
setErrorCode(4200);
......@@ -132,27 +158,6 @@ NdbOperation::readTuple()
}//if
}//NdbOperation::readTuple()
/*****************************************************************************
* int deleteTuple();
*****************************************************************************/
int
NdbOperation::deleteTuple()
{
NdbTransaction* tNdbCon = theNdbCon;
int tErrorLine = theErrorLine;
if (theStatus == Init) {
theStatus = OperationDefined;
tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
return -1;
}//if
}//NdbOperation::deleteTuple()
/******************************************************************************
* int readTupleExclusive();
*****************************************************************************/
......@@ -167,6 +172,7 @@ NdbOperation::readTupleExclusive()
theOperationType = ReadExclusive;
theErrorLine = tErrorLine++;
theLockMode = LM_Exclusive;
m_abortOption = AO_IgnoreError;
return 0;
} else {
setErrorCode(4200);
......@@ -223,6 +229,7 @@ NdbOperation::committedRead()
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
m_abortOption = AO_IgnoreError;
return 0;
} else {
setErrorCode(4200);
......@@ -246,6 +253,7 @@ NdbOperation::dirtyUpdate()
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
......@@ -269,6 +277,7 @@ NdbOperation::dirtyWrite()
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
theLockMode = LM_CommittedRead;
m_abortOption = AbortOnError;
return 0;
} else {
setErrorCode(4200);
......@@ -291,6 +300,7 @@ NdbOperation::interpretedUpdateTuple()
theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
theErrorLine = tErrorLine++;
m_abortOption = AbortOnError;
initInterpreter();
return 0;
} else {
......@@ -315,6 +325,7 @@ NdbOperation::interpretedDeleteTuple()
theErrorLine = tErrorLine++;
theAI_LenInCurrAI = 25;
theLockMode = LM_Exclusive;
m_abortOption = AbortOnError;
initInterpreter();
return 0;
} else {
......
......@@ -100,7 +100,9 @@ Parameters: aTC_ConnectPtr: the Connect pointer to TC.
Remark: Puts the the data into TCKEYREQ signal and optional KEYINFO and ATTRINFO signals.
***************************************************************************/
int
NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
NdbOperation::prepareSend(Uint32 aTC_ConnectPtr,
Uint64 aTransId,
AbortOption ao)
{
Uint32 tTransId1, tTransId2;
Uint32 tReqInfo;
......@@ -148,8 +150,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
//-------------------------------------------------------------
TcKeyReq * const tcKeyReq = CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend());
Uint32 tTableId = m_currentTable->m_id;
Uint32 tSchemaVersion = m_currentTable->m_version;
Uint32 tTableId = m_accessTable->m_id;
Uint32 tSchemaVersion = m_accessTable->m_version;
tcKeyReq->apiConnectPtr = aTC_ConnectPtr;
tcKeyReq->apiOperationPtr = ptr2int();
......@@ -199,16 +201,16 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
OperationType tOperationType = theOperationType;
Uint32 tTupKeyLen = theTupKeyLen;
Uint8 abortOption =
m_abortOption != -1 ? m_abortOption : theNdbCon->m_abortOption;
Uint8 abortOption = (ao == DefaultAbortOption) ? m_abortOption : ao;
tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator);
tcKeyReq->setOperationType(tReqInfo, tOperationType);
tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen);
// A simple read is always ignore error
abortOption = tSimpleIndicator ? (Uint8) AO_IgnoreError : abortOption;
abortOption = tSimpleState ? AO_IgnoreError : abortOption;
tcKeyReq->setAbortOption(tReqInfo, abortOption);
m_abortOption = abortOption;
Uint8 tDistrKeyIndicator = theDistrKeyIndicator_;
Uint8 tScanIndicator = theScanInfo & 1;
......@@ -544,21 +546,16 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
return -1;
}//if
AbortOption ao = (AbortOption)
(m_abortOption != -1 ? m_abortOption : theNdbCon->m_abortOption);
setErrorCode(aSignal->readData(4));
theStatus = Finished;
theReceiver.m_received_result_length = ~0;
theStatus = Finished;
// blobs want this
if (m_abortOption != AO_IgnoreError)
// not simple read
if(! (theOperationType == ReadRequest && theSimpleIndicator))
{
theNdbCon->theReturnStatus = NdbTransaction::ReturnFailure;
theNdbCon->OpCompleteFailure(this);
return -1;
}
theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), ao);
if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read
return theNdbCon->OpCompleteFailure(ao, m_abortOption != AO_IgnoreError);
/**
* If TCKEYCONF has arrived
......@@ -566,23 +563,8 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
*/
if(theReceiver.m_expected_result_length)
{
return theNdbCon->OpCompleteFailure(AbortOnError);
return theNdbCon->OpCompleteFailure(this);
}
return -1;
}
void
NdbOperation::handleFailedAI_ElemLen()
{
NdbRecAttr* tRecAttr = theReceiver.theFirstRecAttr;
while (tRecAttr != NULL) {
tRecAttr->setNULL();
tRecAttr = tRecAttr->next();
}//while
}//NdbOperation::handleFailedAI_ElemLen()
......@@ -996,6 +996,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
newOp->theTupKeyLen = len;
newOp->theOperationType = opType;
newOp->m_abortOption = AbortOnError;
switch (opType) {
case (ReadRequest):
newOp->theLockMode = theLockMode;
......
......@@ -57,7 +57,6 @@ NdbTransaction::NdbTransaction( Ndb* aNdb ) :
theCompletedLastOp(NULL),
theNoOfOpSent(0),
theNoOfOpCompleted(0),
theNoOfOpFetched(0),
theMyRef(0),
theTCConPtr(0),
theTransactionId(0),
......@@ -132,7 +131,6 @@ NdbTransaction::init()
theNdb->theImpl->m_ndb_cluster_connection.get_latest_trans_gci();
theCommitStatus = Started;
theCompletionStatus = NotCompleted;
m_abortOption = AbortOnError;
theError.code = 0;
theErrorLine = 0;
......@@ -177,12 +175,9 @@ void
NdbTransaction::setOperationErrorCodeAbort(int error, int abortOption)
{
DBUG_ENTER("NdbTransaction::setOperationErrorCodeAbort");
if (abortOption == -1)
abortOption = m_abortOption;
if (theTransactionIsStarted == false) {
theCommitStatus = Aborted;
} else if ((abortOption == AbortOnError) &&
(theCommitStatus != Committed) &&
} else if ((theCommitStatus != Committed) &&
(theCommitStatus != Aborted)) {
theCommitStatus = NeedAbort;
}//if
......@@ -264,7 +259,7 @@ Remark: Initialise connection object for new transaction.
*****************************************************************************/
int
NdbTransaction::execute(ExecType aTypeOfExec,
AbortOption abortOption,
NdbOperation::AbortOption abortOption,
int forceSend)
{
NdbError savedError= theError;
......@@ -355,41 +350,15 @@ NdbTransaction::execute(ExecType aTypeOfExec,
theCompletedLastOp = NULL;
}
if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
if (executeNoBlobs(tExecType,
NdbOperation::DefaultAbortOption,
forceSend) == -1)
{
ret = -1;
if(savedError.code==0)
savedError= theError;
/**
* If AO_IgnoreError, error codes arent always set on individual
* operations, making postExecute impossible
*/
if (abortOption == AO_IgnoreError)
{
if (theCompletedFirstOp != NULL)
{
if (tCompletedFirstOp != NULL)
{
tCompletedLastOp->next(theCompletedFirstOp);
theCompletedFirstOp = tCompletedFirstOp;
}
}
else
{
theCompletedFirstOp = tCompletedFirstOp;
theCompletedLastOp = tCompletedLastOp;
}
if (tPrepOp != NULL && tRestOp != NULL) {
if (theFirstOpInList == NULL)
theFirstOpInList = tRestOp;
else
theLastOpInList->next(tRestOp);
theLastOpInList = tLastOp;
}
DBUG_RETURN(-1);
}
}
#ifdef ndb_api_crash_on_complex_blob_abort
assert(theFirstOpInList == NULL && theLastOpInList == NULL);
......@@ -448,8 +417,8 @@ NdbTransaction::execute(ExecType aTypeOfExec,
}
int
NdbTransaction::executeNoBlobs(ExecType aTypeOfExec,
AbortOption abortOption,
NdbTransaction::executeNoBlobs(NdbTransaction::ExecType aTypeOfExec,
NdbOperation::AbortOption abortOption,
int forceSend)
{
DBUG_ENTER("NdbTransaction::executeNoBlobs");
......@@ -528,10 +497,10 @@ Parameters : aTypeOfExec: Type of execute.
Remark: Prepare a part of a transaction in an asynchronous manner.
*****************************************************************************/
void
NdbTransaction::executeAsynchPrepare( ExecType aTypeOfExec,
NdbTransaction::executeAsynchPrepare(NdbTransaction::ExecType aTypeOfExec,
NdbAsynchCallback aCallback,
void* anyObject,
AbortOption abortOption)
NdbOperation::AbortOption abortOption)
{
DBUG_ENTER("NdbTransaction::executeAsynchPrepare");
DBUG_PRINT("enter", ("aTypeOfExec: %d, aCallback: 0x%lx, anyObject: Ox%lx",
......@@ -571,7 +540,6 @@ NdbTransaction::executeAsynchPrepare( ExecType aTypeOfExec,
theReturnStatus = ReturnSuccess;
theCallbackFunction = aCallback;
theCallbackObject = anyObject;
m_abortOption = abortOption;
m_waitForReply = true;
tNdb->thePreparedTransactionsArray[tnoOfPreparedTransactions] = this;
theTransArrayIndex = tnoOfPreparedTransactions;
......@@ -666,8 +634,7 @@ NdbTransaction::executeAsynchPrepare( ExecType aTypeOfExec,
while (tOp) {
int tReturnCode;
NdbOperation* tNextOp = tOp->next();
tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId);
tReturnCode = tOp->prepareSend(theTCConPtr, theTransactionId, abortOption);
if (tReturnCode == -1) {
theSendStatus = sendABORTfail;
DBUG_VOID_RETURN;
......@@ -1802,12 +1769,6 @@ from other transactions.
(theLastExecOpInList->theCommitIndicator == 1)){
if (m_abortOption == AO_IgnoreError && theError.code != 0){
/**
* There's always a TCKEYCONF when using IgnoreError
*/
return -1;
}
/**********************************************************************/
// We sent the transaction with Commit flag set and received a CONF with
// no Commit flag set. This is clearly an anomaly.
......@@ -1981,13 +1942,6 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf,
} else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){
if (m_abortOption == AO_IgnoreError && theError.code != 0){
/**
* There's always a TCKEYCONF when using IgnoreError
*/
return -1;
}
/**********************************************************************/
// We sent the transaction with Commit flag set and received a CONF with
// no Commit flag set. This is clearly an anomaly.
......@@ -2011,41 +1965,6 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf,
return -1;
}//NdbTransaction::receiveTCINDXCONF()
/*****************************************************************************
int receiveTCINDXREF( NdbApiSignal* aSignal)
Return Value: Return 0 : send was succesful.
Return -1: In all other case.
Parameters: aSignal: the signal object that contains the
TCINDXREF signal from TC.
Remark: Handles the reception of the TCINDXREF signal.
*****************************************************************************/
int
NdbTransaction::receiveTCINDXREF( NdbApiSignal* aSignal)
{
if(checkState_TransId(aSignal->getDataPtr()+1)){
theError.code = aSignal->readData(4); // Override any previous errors
/**********************************************************************/
/* A serious error has occured. This could be due to deadlock or */
/* lack of resources or simply a programming error in NDB. This */
/* transaction will be aborted. Actually it has already been */
/* and we only need to report completion and return with the */
/* error code to the application. */
/**********************************************************************/
theCompletionStatus = NdbTransaction::CompletedFailure;
theCommitStatus = NdbTransaction::Aborted;
theReturnStatus = NdbTransaction::ReturnFailure;
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
abort();
#endif
}
return -1;
}//NdbTransaction::receiveTCINDXREF()
/*******************************************************************************
int OpCompletedFailure();
......@@ -2055,36 +1974,15 @@ Parameters: aErrorCode: The error code.
Remark: An operation was completed with failure.
*******************************************************************************/
int
NdbTransaction::OpCompleteFailure(Uint8 abortOption, bool setFailure)
NdbTransaction::OpCompleteFailure(NdbOperation* op)
{
Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent;
if (setFailure)
theCompletionStatus = NdbTransaction::CompletedFailure;
tNoComp++;
theNoOfOpCompleted = tNoComp;
if (tNoComp == tNoSent) {
//------------------------------------------------------------------------
//If the transaction consists of only simple reads we can set
//Commit state Aborted. Otherwise this simple operation cannot
//decide the success of the whole transaction since a simple
//operation is not really part of that transaction.
//------------------------------------------------------------------------
if (abortOption == AO_IgnoreError){
/**
* There's always a TCKEYCONF when using IgnoreError
*/
return -1;
}
return 0; // Last operation received
} else if (tNoComp > tNoSent) {
setOperationErrorCodeAbort(4113); // Too many operations,
// stop waiting for more
return 0;
} else {
return -1; // Continue waiting for more signals
}//if
return (tNoComp == tNoSent) ? 0 : -1;
}//NdbTransaction::OpCompleteFailure()
/******************************************************************************
......
......@@ -107,8 +107,8 @@ public:
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async(Ndb*, NdbTransaction::ExecType, NdbOperation::AbortOption = NdbOperation::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbOperation::AbortOption = NdbOperation::AbortOnError);
int wait_async(Ndb*, int timeout = -1);
......
......@@ -1250,6 +1250,274 @@ int runScan_4006(NDBT_Context* ctx, NDBT_Step* step){
return result;
}
char pkIdxName[255];
int createPkIndex(NDBT_Context* ctx, NDBT_Step* step){
bool orderedIndex = ctx->getProperty("OrderedIndex", (unsigned)0);
const NdbDictionary::Table* pTab = ctx->getTab();
Ndb* pNdb = GETNDB(step);
bool logged = ctx->getProperty("LoggedIndexes", 1);
// Create index
BaseString::snprintf(pkIdxName, 255, "IDC_PK_%s", pTab->getName());
if (orderedIndex)
ndbout << "Creating " << ((logged)?"logged ": "temporary ") << "ordered index "
<< pkIdxName << " (";
else
ndbout << "Creating " << ((logged)?"logged ": "temporary ") << "unique index "
<< pkIdxName << " (";
NdbDictionary::Index pIdx(pkIdxName);
pIdx.setTable(pTab->getName());
if (orderedIndex)
pIdx.setType(NdbDictionary::Index::OrderedIndex);
else
pIdx.setType(NdbDictionary::Index::UniqueHashIndex);
for (int c = 0; c< pTab->getNoOfColumns(); c++){
const NdbDictionary::Column * col = pTab->getColumn(c);
if(col->getPrimaryKey()){
pIdx.addIndexColumn(col->getName());
ndbout << col->getName() <<" ";
}
}
pIdx.setStoredIndex(logged);
ndbout << ") ";
if (pNdb->getDictionary()->createIndex(pIdx) != 0){
ndbout << "FAILED!" << endl;
const NdbError err = pNdb->getDictionary()->getNdbError();
ERR(err);
return NDBT_FAILED;
}
ndbout << "OK!" << endl;
return NDBT_OK;
}
int createPkIndex_Drop(NDBT_Context* ctx, NDBT_Step* step){
const NdbDictionary::Table* pTab = ctx->getTab();
Ndb* pNdb = GETNDB(step);
// Drop index
ndbout << "Dropping index " << pkIdxName << " ";
if (pNdb->getDictionary()->dropIndex(pkIdxName,
pTab->getName()) != 0){
ndbout << "FAILED!" << endl;
ERR(pNdb->getDictionary()->getNdbError());
return NDBT_FAILED;
} else {
ndbout << "OK!" << endl;
}
return NDBT_OK;
}
static
int
op_row(NdbTransaction* pTrans, HugoOperations& hugoOps,
const NdbDictionary::Table* pTab, int op, int row)
{
NdbOperation * pOp = 0;
switch(op){
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
pOp = pTrans->getNdbOperation(pTab->getName());
break;
case 9:
return 0;
case 6:
case 7:
case 8:
case 10:
case 11:
pOp = pTrans->getNdbIndexOperation(pkIdxName, pTab->getName());
default:
break;
}
switch(op){
case 0:
case 6:
pOp->readTuple();
break;
case 1:
case 7:
pOp->committedRead();
break;
case 2:
case 8:
pOp->readTupleExclusive();
break;
case 3:
case 9:
pOp->insertTuple();
break;
case 4:
case 10:
pOp->updateTuple();
break;
case 5:
case 11:
pOp->deleteTuple();
break;
default:
abort();
}
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() == true){
if(hugoOps.equalForAttr(pOp, a, row) != 0){
return NDBT_FAILED;
}
}
}
switch(op){
case 0:
case 1:
case 2:
case 6:
case 7:
case 8:
for(int a = 0; a<pTab->getNoOfColumns(); a++){
pOp->getValue(a);
}
break;
case 3:
case 4:
case 10:
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getPrimaryKey() == false){
if(hugoOps.setValueForAttr(pOp, a, row, 2) != 0){
return NDBT_FAILED;
}
}
}
break;
case 5:
case 11:
pOp->deleteTuple();
break;
case 9:
default:
abort();
}
return NDBT_OK;
}
static void print(int op)
{
const char * str = 0;
switch(op){
case 0: str = "pk read-sh"; break;
case 1: str = "pk read-nl"; break;
case 2: str = "pk read-ex"; break;
case 3: str = "pk insert "; break;
case 4: str = "pk update "; break;
case 5: str = "pk delete "; break;
case 6: str = "uk read-sh"; break;
case 7: str = "uk read-nl"; break;
case 8: str = "uk read-ex"; break;
case 9: str = "noop "; break;
case 10: str = "uk update "; break;
case 11: str = "uk delete "; break;
default:
abort();
}
printf("%s ", str);
}
int
runTestIgnoreError(NDBT_Context* ctx, NDBT_Step* step)
{
int result = NDBT_OK;
Uint32 loops = ctx->getNumRecords();
const NdbDictionary::Table* pTab = ctx->getTab();
HugoOperations hugoOps(*pTab);
HugoTransactions hugoTrans(*pTab);
Ndb* pNdb = GETNDB(step);
struct {
ExecType et;
AbortOption ao;
} tests[] = {
{ Commit, AbortOnError },
{ Commit, AO_IgnoreError },
{ NoCommit, AbortOnError },
{ NoCommit, AO_IgnoreError },
};
printf("case: <op1> <op2> c/nc ao/ie\n");
Uint32 tno = 0;
for (Uint32 op1 = 0; op1 < 12; op1++)
{
for (Uint32 op2 = op1; op2 < 12; op2++)
{
int ret;
NdbTransaction* pTrans = 0;
for (Uint32 i = 0; i<4; i++, tno++)
{
if (loops != 1000 && loops != tno)
continue;
ExecType et = tests[i].et;
AbortOption ao = tests[i].ao;
printf("%.3d : ", tno);
print(op1);
print(op2);
switch(et){
case Commit: printf("c "); break;
case NoCommit: printf("nc "); break;
}
switch(ao){
case AbortOnError: printf("aoe "); break;
case AO_IgnoreError: printf("ie "); break;
}
printf(": ");
hugoTrans.loadTable(pNdb, 1);
pTrans = pNdb->startTransaction();
op_row(pTrans, hugoOps, pTab, op1, 0);
ret = pTrans->execute(et, ao);
pTrans->close();
printf("%d ", ret);
hugoTrans.clearTable(pNdb);
hugoTrans.loadTable(pNdb, 1);
pTrans = pNdb->startTransaction();
op_row(pTrans, hugoOps, pTab, op1, 1);
ret = pTrans->execute(et, ao);
pTrans->close();
printf("%d ", ret);
hugoTrans.clearTable(pNdb);
hugoTrans.loadTable(pNdb, 1);
pTrans = pNdb->startTransaction();
op_row(pTrans, hugoOps, pTab, op1, 0);
op_row(pTrans, hugoOps, pTab, op2, 1);
ret = pTrans->execute(et, ao);
pTrans->close();
printf("%d\n", ret);
hugoTrans.clearTable(pNdb);
hugoTrans.clearTable(pNdb);
}
}
}
return NDBT_OK;
}
template class Vector<NdbScanOperation*>;
......@@ -1343,6 +1611,12 @@ TESTCASE("Scan_4006",
INITIALIZER(runScan_4006);
FINALIZER(runClearTable);
}
TESTCASE("IgnoreError", ""){
INITIALIZER(createPkIndex);
STEP(runTestIgnoreError);
FINALIZER(runClearTable);
FINALIZER(createPkIndex_Drop);
}
NDBT_TESTSUITE_END(testNdbApi);
int main(int argc, const char** argv){
......
......@@ -458,7 +458,7 @@ HugoOperations::callback(int res, NdbTransaction* pCon)
int
HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
NdbOperation::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
......@@ -473,7 +473,7 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
int
HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
NdbOperation::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment