Commit 3d96ab94 authored by mskold/marty@linux.site's avatar mskold/marty@linux.site

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.1-ndb

into  mysql.com:/windows/Linux_space/MySQL/mysql-5.1-new-ndb
parents cade1966 174d5614
......@@ -33,7 +33,8 @@ private:
BUFFER_FULL_FRAG_COMPLETE = 3,
BUFFER_FULL_META = 4,
BACKUP_FRAGMENT_INFO = 5,
RESET_DISK_SPEED_COUNTER = 6
RESET_DISK_SPEED_COUNTER = 6,
ZDELAY_SCAN_NEXT = 7
};
};
......
......@@ -8,7 +8,7 @@ Next DBDICT 6007
Next DBDIH 7178
Next DBTC 8039
Next CMVMI 9000
Next BACKUP 10036
Next BACKUP 10038
Next DBUTIL 11002
Next DBTUX 12008
Next SUMA 13001
......@@ -425,6 +425,9 @@ Backup Stuff:
10034: define backup reply error
10035: Fail to allocate buffers
10036: Halt backup for table >= 2
10037: Resume backup (from 10036)
11001: Send UTIL_SEQUENCE_REF (in master)
5028: Crash when receiving LQHKEYREQ (in non-master)
......
......@@ -356,6 +356,25 @@ Backup::execCONTINUEB(Signal* signal)
GetTabInfoReq::SignalLength, JBB);
return;
}
case BackupContinueB::ZDELAY_SCAN_NEXT:
if (ERROR_INSERTED(10036))
{
jam();
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 300,
signal->getLength());
return;
}
else
{
jam();
CLEAR_ERROR_INSERT_VALUE;
ndbout_c("Resuming backup");
memmove(signal->theData, signal->theData + 1,
4*ScanFragNextReq::SignalLength);
sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
return ;
}
default:
ndbrequire(0);
}//switch
......@@ -3925,6 +3944,22 @@ Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
req->batch_size_rows= 16;
req->batch_size_bytes= 0;
if (ERROR_INSERTED(10036) &&
filePtr.p->tableId >= 2 &&
filePtr.p->operation.noOfRecords > 0)
{
ndbout_c("halting backup for table %d fragment: %d after %d records",
filePtr.p->tableId,
filePtr.p->fragmentNo,
filePtr.p->operation.noOfRecords);
memmove(signal->theData+1, signal->theData,
4*ScanFragNextReq::SignalLength);
signal->theData[0] = BackupContinueB::ZDELAY_SCAN_NEXT;
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal,
300, 1+ScanFragNextReq::SignalLength);
return;
}
if(ERROR_INSERTED(10032))
sendSignalWithDelay(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
100, ScanFragNextReq::SignalLength);
......
......@@ -152,10 +152,10 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
operator>(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
(key1.m_page_no == key2.m_page_no && key1.m_page_idx > key2.m_page_idx);
}
void
......@@ -187,7 +187,7 @@ Dbtup::dealloc_tuple(Signal* signal,
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = page->frag_page_id;
if (rowid >= scanpos)
if (rowid > scanpos)
{
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list;
......@@ -215,6 +215,7 @@ Dbtup::commit_operation(Signal* signal,
{
ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
Uint32 save= tuple_ptr->m_operation_ptr_i;
Uint32 bits= tuple_ptr->m_header_bits;
......@@ -264,7 +265,6 @@ Dbtup::commit_operation(Signal* signal,
Local_key key;
memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
PagePtr diskPagePtr = *(PagePtr*)&m_pgman.m_ptr;
ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
......@@ -273,19 +273,6 @@ Dbtup::commit_operation(Signal* signal,
if(copy_bits & Tuple_header::DISK_ALLOC)
{
disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
if(lcpScan_ptr_i != RNIL)
{
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = pagePtr.p->frag_page_id;
if(rowid >= scanpos)
{
copy_bits |= Tuple_header::LCP_SKIP;
}
}
}
if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
......@@ -312,6 +299,18 @@ Dbtup::commit_operation(Signal* signal,
copy_bits |= Tuple_header::DISK_PART;
}
if(lcpScan_ptr_i != RNIL)
{
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = pagePtr.p->frag_page_id;
if(rowid > scanpos)
{
copy_bits |= Tuple_header::LCP_SKIP;
}
}
Uint32 clear=
Tuple_header::ALLOC | Tuple_header::FREE |
......
......@@ -784,7 +784,7 @@ int Dbtup::updateAttributes(KeyReqStruct *req_struct,
req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::DISK_PART;
memcpy(req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
inBuffer+inBufIndex+1, sz << 2);
inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex += 1 + sz;
}
else
{
......
......@@ -54,8 +54,7 @@ Dbtup::execACC_SCANREQ(Signal* signal)
// flags
Uint32 bits = 0;
if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
tablePtr.p->m_no_of_disk_attributes == 0)
if (!AccScanReq::getLcpScanFlag(req->requestInfo))
{
// seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
......@@ -1052,24 +1051,21 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
tablePtr.i = req->tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
if(tablePtr.p->m_no_of_disk_attributes)
{
jam();
FragrecordPtr fragPtr;
Uint32 fragId = req->fragmentId;
fragPtr.i = RNIL;
getFragmentrec(fragPtr, fragId, tablePtr.p);
ndbrequire(fragPtr.i != RNIL);
Fragrecord& frag = *fragPtr.p;
ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
frag.m_lcp_scan_op = c_lcp_scan_op;
ScanOpPtr scanPtr;
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
scanPtr.p->m_fragPtrI = fragPtr.i;
scanFirst(signal, scanPtr);
scanPtr.p->m_state = ScanOp::First;
}
jam();
FragrecordPtr fragPtr;
Uint32 fragId = req->fragmentId;
fragPtr.i = RNIL;
getFragmentrec(fragPtr, fragId, tablePtr.p);
ndbrequire(fragPtr.i != RNIL);
Fragrecord& frag = *fragPtr.p;
ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
frag.m_lcp_scan_op = c_lcp_scan_op;
ScanOpPtr scanPtr;
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
scanPtr.p->m_fragPtrI = fragPtr.i;
scanFirst(signal, scanPtr);
scanPtr.p->m_state = ScanOp::First;
}
......@@ -1922,7 +1922,7 @@ MgmtSrvr::handleStatus(NodeId nodeId, bool alive, bool nfComplete)
m_started_nodes.push_back(nodeId);
rep->setEventType(NDB_LE_Connected);
} else {
rep->setEventType(NDB_LE_Connected);
rep->setEventType(NDB_LE_Disconnected);
if(nfComplete)
{
DBUG_VOID_RETURN;
......
......@@ -1162,6 +1162,64 @@ runBug21536(NDBT_Context* ctx, NDBT_Step* step)
return result;
}
int
runBug24664(NDBT_Context* ctx, NDBT_Step* step)
{
int result = NDBT_OK;
NdbRestarter restarter;
Ndb* pNdb = GETNDB(step);
const Uint32 nodeCount = restarter.getNumDbNodes();
int records = ctx->getNumRecords();
UtilTransactions utilTrans(*ctx->getTab());
HugoTransactions hugoTrans(*ctx->getTab());
int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
int dump[] = { DumpStateOrd::DihStartLcpImmediately };
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_CHECKPOINT, 0 };
NdbLogEventHandle handle =
ndb_mgm_create_logevent_handle(restarter.handle, filter);
struct ndb_logevent event;
do {
CHECK(restarter.dumpStateAllNodes(args, 1) == 0);
CHECK(restarter.dumpStateAllNodes(dump, 1) == 0);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointStarted);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointCompleted);
if (hugoTrans.loadTable(GETNDB(step), records) != 0){
return NDBT_FAILED;
}
restarter.insertErrorInAllNodes(10036); // Hang LCP
CHECK(restarter.dumpStateAllNodes(dump, 1) == 0);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointStarted);
NdbSleep_SecSleep(3);
CHECK(utilTrans.clearTable(pNdb, records) == 0);
if (hugoTrans.loadTable(GETNDB(step), records) != 0){
return NDBT_FAILED;
}
restarter.insertErrorInAllNodes(10037); // Resume LCP
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_LocalCheckpointCompleted);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_GlobalCheckpointCompleted);
while(ndb_logevent_get_next(handle, &event, 0) >= 0 &&
event.type != NDB_LE_GlobalCheckpointCompleted);
restarter.restartAll(false, false, true);
CHECK(restarter.waitClusterStarted() == 0);
} while(false);
return result;
}
NDBT_TESTSUITE(testSystemRestart);
TESTCASE("SR1",
"Basic system restart test. Focus on testing restart from REDO log.\n"
......@@ -1334,6 +1392,14 @@ TESTCASE("Bug21536",
STEP(runBug21536);
FINALIZER(runClearTable);
}
TESTCASE("Bug24664",
"Check handling of LCP skip/keep")
{
INITIALIZER(runWaitStarted);
INITIALIZER(runClearTable);
STEP(runBug24664);
FINALIZER(runClearTable);
}
NDBT_TESTSUITE_END(testSystemRestart);
int main(int argc, const char** argv){
......
......@@ -752,6 +752,10 @@ max-time: 300
cmd: testNodeRestart
args: -n Bug24543 T1
max-time: 1500
cmd: testSystemRestart
args: -n Bug24664
# OLD FLEX
max-time: 500
cmd: flexBench
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment