Commit e8c6b766 authored by knielsen@ymer.(none)'s avatar knielsen@ymer.(none)

BUG#28073: Infinite loop in lock queue.

In a certain code path the NDBD could loop infinitely in the lock queue.
parent aca5ac19
......@@ -1321,7 +1321,7 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
* We must check if there are many transactions in parallel queue...
*/
OperationrecPtr tmp;
tmp.i = loPtr.p->nextParallelQue;
tmp= loPtr;
while (tmp.i != RNIL)
{
ptrCheckGuard(tmp, coprecsize, operationrec);
......@@ -1333,6 +1333,7 @@ Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
*/
return;
}
tmp.i = tmp.p->nextParallelQue;
}
upgrade:
......
......@@ -27,7 +27,8 @@ public:
const NdbDictionary::Index* idx = 0);
~HugoOperations();
int startTransaction(Ndb*);
int startTransaction(Ndb*, const NdbDictionary::Table *table= 0,
const char *keyData= 0, Uint32 keyLen= 0);
int setTransaction(NdbTransaction*,bool not_null_ok= false);
int closeTransaction(Ndb*);
NdbTransaction* getTransaction();
......
......@@ -1272,6 +1272,74 @@ runBug25090(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK;
}
int
runBug28073(NDBT_Context *ctx, NDBT_Step* step)
{
int result = NDBT_OK;
const NdbDictionary::Table *table= ctx->getTab();
HugoOperations hugoOp1(*table);
HugoOperations hugoOp2(*table);
Ndb* pNdb = GETNDB(step);
int loops = ctx->getNumLoops();
bool inserted= false;
while (loops--)
{
if (!inserted)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_Commit(pNdb) == 0);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
inserted= 1;
}
// Use TC hint to hit the same node in both transactions.
Uint32 key_val= 0;
const char *key= (const char *)(&key_val);
CHECK(hugoOp1.startTransaction(pNdb, table, key, 4) == 0);
CHECK(hugoOp2.startTransaction(pNdb, table, key, 4) == 0);
// First take 2*read lock on the tuple in transaction 1.
for (Uint32 i= 0; i < 2; i++)
{
CHECK(hugoOp1.pkReadRecord(pNdb, 1, 1, NdbOperation::LM_Read) == 0);
CHECK(hugoOp1.pkReadRecord(pNdb, 1, 1, NdbOperation::LM_Read) == 0);
}
CHECK(hugoOp1.execute_NoCommit(pNdb) == 0);
// Now send ops in two transactions, one batch.
// First 2*read in transaction 2.
for (Uint32 i= 0; i < 2; i++)
{
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1, NdbOperation::LM_Read) == 0);
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1, NdbOperation::LM_Read) == 0);
}
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::NoCommit) == 0);
// Second op an update in transaction 1.
CHECK(hugoOp1.pkUpdateRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
// Transaction 1 will now hang waiting on transaction 2 to commit before it
// can upgrade its read lock to a write lock.
// With the bug, we get a node failure due to watchdog timeout here.
CHECK(hugoOp2.wait_async(pNdb) == 0);
// Now commit transaction 2, we should see transaction 1 finish with the
// update.
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
CHECK(hugoOp2.wait_async(pNdb) == 0);
// No error check, as transaction 1 may have terminated already.
hugoOp1.wait_async(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
CHECK(hugoOp2.closeTransaction(pNdb) == 0);
}
return result;
}
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
......@@ -1542,6 +1610,10 @@ TESTCASE("Bug25090",
"Verify what happens when we fill the db" ){
STEP(runBug25090);
}
TESTCASE("Bug28073",
"Infinite loop in lock queue" ){
STEP(runBug28073);
}
NDBT_TESTSUITE_END(testBasic);
#if 0
......
......@@ -235,6 +235,10 @@ max-time: 500
cmd: testBasic
args: -n Bug25090 T1
max-time: 500
cmd: testBasic
args: -n Bug28073
max-time: 500
cmd: testIndex
args: -n Bug25059 -r 3000 T1
......
......@@ -15,13 +15,15 @@
#include <HugoOperations.hpp>
int HugoOperations::startTransaction(Ndb* pNdb){
int HugoOperations::startTransaction(Ndb* pNdb,
const NdbDictionary::Table *table,
const char *keyData, Uint32 keyLen){
if (pTrans != NULL){
ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
return NDBT_FAILED;
}
pTrans = pNdb->startTransaction();
pTrans = pNdb->startTransaction(table, keyData, keyLen);
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
ERR(err);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment