ndb - bug#21535

  send new fragdistkeys to all replicas during node recovery
  to make sure that not 3rd or 4th replicas fragDistKey becomes out of sync
parent f21394be
......@@ -127,6 +127,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/* 68 unused */
/* 69 unused */
/* 70 unused */
#define GSN_UPDATE_FRAG_DIST_KEY_ORD 70
#define GSN_ACC_ABORTREQ 71
#define GSN_ACC_CHECK_SCAN 72
#define GSN_ACC_COMMITCONF 73
......
......@@ -30,7 +30,8 @@ class CopyFragReq {
*/
friend class Dblqh;
public:
STATIC_CONST( SignalLength = 7 );
STATIC_CONST( SignalLength = 8
);
private:
Uint32 userPtr;
......@@ -40,6 +41,8 @@ private:
Uint32 nodeId;
Uint32 schemaVersion;
Uint32 distributionKey;
Uint32 nodeCount;
Uint32 nodeList[1];
};
class CopyFragConf {
......@@ -84,4 +87,13 @@ private:
Uint32 errorCode;
};
struct UpdateFragDistKeyOrd
{
Uint32 tableId;
Uint32 fragId;
Uint32 fragDistributionKey;
STATIC_CONST( SignalLength = 3 );
};
#endif
......@@ -62,5 +62,7 @@ char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
#define NDBD_DICT_LOCK_VERSION_5 MAKE_VERSION(5,0,23)
#define NDBD_UPDATE_FRAG_DIST_KEY_50 MAKE_VERSION(5,0,26)
#define NDBD_UPDATE_FRAG_DIST_KEY_51 MAKE_VERSION(5,1,12)
#endif
......@@ -636,6 +636,7 @@ const GsnName SignalNames [] = {
,{ GSN_DICT_LOCK_CONF, "DICT_LOCK_CONF" }
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
......@@ -3078,7 +3078,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
copyFragReq->nodeList);
sendSignal(ref, GSN_COPY_FRAGREQ, signal,
CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
jam();
......
......@@ -2161,6 +2161,7 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
void execCOPY_STATEREQ(Signal* signal);
void execLQH_TRANSREQ(Signal* signal);
......
......@@ -335,7 +335,9 @@ Dblqh::Dblqh(const class Configuration & conf):
addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF);
addRecSignal(GSN_READ_PSUEDO_REQ, &Dblqh::execREAD_PSUEDO_REQ);
addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
&Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
initData();
#ifdef VM_TRACE
......
......@@ -9109,7 +9109,12 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
ndbrequire(cfirstfreeTcConrec != RNIL);
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
Uint32 nodeCount = copyFragReq->nodeCount;
Uint32 nodeList[MAX_REPLICAS];
ndbrequire(nodeCount <= MAX_REPLICAS);
memcpy(nodeList, copyFragReq->nodeList, 4*nodeCount);
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
......@@ -9184,9 +9189,56 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->savePointId = tcConnectptr.p->savePointId;
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
AccScanReq::SignalLength, JBB);
Uint32 i;
NdbNodeBitmask mask;
for (i = 0; i<nodeCount; i++)
{
ndbout_c("nodeList: %d", nodeList[i]);
mask.set(nodeList[i]);
}
ndbrequire(mask.get(getOwnNodeId()));
ndbrequire(mask.get(nodeId)); // cpy dest
if (!mask.isclear())
{
UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtrSend();
ord->tableId = tabptr.i;
ord->fragId = fragId;
ord->fragDistributionKey = key;
i = 0;
while ((i = mask.find(i+1)) != NdbNodeBitmask::NotFound)
{
#ifdef NDB_VERSION >= MAKE_VERSION(5,1,0)
Uint32 checkversion = NDBD_UPDATE_FRAG_DIST_KEY_51;
#elif NDB_VERSION >= MAKE_VERSION(5,0,0)
Uint32 checkversion = NDBD_UPDATE_FRAG_DIST_KEY_50;
#endif
checkversion = NDB_VERSION;
if (getNodeInfo(i).m_version >= checkversion)
sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD,
signal, UpdateFragDistKeyOrd::SignalLength, JBB);
}
}
return;
}//Dblqh::execCOPY_FRAGREQ()
void
Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal)
{
jamEntry();
UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr();
tabptr.i = ord->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
ndbrequire(getFragmentrec(signal, ord->fragId));
fragptr.p->fragDistributionKey = ord->fragDistributionKey;
ndbout_c("UpdateFragDistKeyOrd tab: %d frag: %d key: %d",
tabptr.i,
ord->fragId,
ord->fragDistributionKey);
}
void Dblqh::accScanConfCopyLab(Signal* signal)
{
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
......@@ -18437,6 +18489,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
infoEvent("Table %d Status: %d Usage: %d",
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
{
FragrecordPtr fragPtr;
if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)
{
ptrCheckGuard(fragPtr, cfragrecFileSize, fragrecord);
infoEvent(" frag: %d distKey: %u",
tabPtr.p->fragid[j],
fragPtr.p->fragDistributionKey);
}
}
}
}
return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment