Commit dc78a772 authored by joreland@mysql.com's avatar joreland@mysql.com

bug#11166 - ndb

  Fix potential inconsistency when running ndb_restore due to faulty parsing
    of backup log wrt inserts
parent e6dfe6aa
......@@ -1676,13 +1676,30 @@ Backup::execWAIT_GCP_CONF(Signal* signal){
ptr.p->masterData.sendCounter= 0;
ptr.p->masterData.gsn = GSN_BACKUP_FRAGMENT_REQ;
nextFragment(signal, ptr);
return;
} else {
jam();
CRASH_INSERTION((10009));
ptr.p->stopGCP = gcp;
sendDropTrig(signal, ptr); // regular dropping of triggers
}//if
if(gcp >= ptr.p->startGCP + 3)
{
CRASH_INSERTION((10009));
ptr.p->stopGCP = gcp;
sendDropTrig(signal, ptr); // regular dropping of triggers
return;
}//if
/**
* Make sure that we got entire stopGCP
*/
WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = ptr.i;
req->requestType = WaitGCPReq::CompleteForceStart;
sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
WaitGCPReq::SignalLength,JBB);
return;
}
}
/*****************************************************************************
*
* Master functionallity - Backup fragment
......
......@@ -765,6 +765,7 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md)
setLogFile(md, 0);
m_count = 0;
m_last_gci = 0;
}
const LogEntry *
......@@ -772,7 +773,6 @@ RestoreLogIterator::getNextLogEntry(int & res) {
// Read record length
typedef BackupFormat::LogFile::LogEntry LogE;
Uint32 gcp= 0;
LogE * logE= 0;
Uint32 len= ~0;
const Uint32 stopGCP = m_metaData.getStopGCP();
......@@ -802,10 +802,10 @@ RestoreLogIterator::getNextLogEntry(int & res) {
if(hasGcp){
len--;
gcp = ntohl(logE->Data[len-2]);
m_last_gci = ntohl(logE->Data[len-2]);
}
} while(gcp > stopGCP + 1);
} while(m_last_gci > stopGCP + 1);
m_logEntry.m_table = m_metaData.getTable(logE->TableId);
switch(logE->TriggerEvent){
case TriggerEvent::TE_INSERT:
......
......@@ -361,6 +361,7 @@ private:
const RestoreMetaData & m_metaData;
Uint32 m_count;
Uint32 m_last_gci;
LogEntry m_logEntry;
public:
RestoreLogIterator(const RestoreMetaData &);
......
......@@ -512,7 +512,14 @@ BackupRestore::logEntry(const LogEntry & tup)
<< " Exiting...";
exit(-1);
}
if (check != 0)
{
err << "Error defining op: " << trans->getNdbError() << endl;
exit(-1);
} // if
Bitmask<4096> keys;
for (Uint32 i= 0; i < tup.size(); i++)
{
const AttributeS * attr = tup[i];
......@@ -525,9 +532,21 @@ BackupRestore::logEntry(const LogEntry & tup)
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length);
{
if(!keys.get(attr->Desc->attrId))
{
keys.set(attr->Desc->attrId);
check= op->equal(attr->Desc->attrId, dataPtr, length);
}
}
else
op->setValue(attr->Desc->attrId, dataPtr, length);
check= op->setValue(attr->Desc->attrId, dataPtr, length);
if (check != 0)
{
err << "Error defining op: " << trans->getNdbError() << endl;
exit(-1);
} // if
}
const int ret = trans->execute(Commit);
......@@ -536,18 +555,25 @@ BackupRestore::logEntry(const LogEntry & tup)
// Both insert update and delete can fail during log running
// and it's ok
// TODO: check that the error is either tuple exists or tuple does not exist?
bool ok= false;
NdbError errobj= trans->getNdbError();
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
if(errobj.status == NdbError::PermanentError &&
errobj.classification == NdbError::ConstraintViolation)
ok= true;
break;
case LogEntry::LE_UPDATE:
break;
case LogEntry::LE_DELETE:
if(errobj.status == NdbError::PermanentError &&
errobj.classification == NdbError::NoDataFound)
ok= true;
break;
}
if (false)
if (!ok)
{
err << "execute failed: " << trans->getNdbError() << endl;
err << "execute failed: " << errobj << endl;
exit(-1);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment