Commit e4353cf0 authored by pekka@orca.ndb.mysql.com's avatar pekka@orca.ndb.mysql.com

Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-ndb

into  orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug20446
parents ca4e9be9 2fc65dba
...@@ -121,41 +121,17 @@ private: ...@@ -121,41 +121,17 @@ private:
// forward declarations // forward declarations
struct DescEnt; struct DescEnt;
/* // Pointer to array of Uint32 represents attribute data and bounds
* Pointer to array of Uint32.
*/
struct Data {
private:
Uint32* m_data;
public:
Data();
Data(Uint32* data);
Data& operator=(Uint32* data);
operator Uint32*() const;
Data& operator+=(size_t n);
AttributeHeader& ah() const;
};
friend class Data;
/* typedef Uint32 *Data;
* Pointer to array of constant Uint32. inline AttributeHeader& ah(Data data) {
*/ return *reinterpret_cast<AttributeHeader*>(data);
struct ConstData; }
friend struct ConstData;
struct ConstData { typedef const Uint32* ConstData;
private: inline const AttributeHeader& ah(ConstData data) {
const Uint32* m_data; return *reinterpret_cast<const AttributeHeader*>(data);
public: }
ConstData();
ConstData(const Uint32* data);
ConstData& operator=(const Uint32* data);
operator const Uint32*() const;
ConstData& operator+=(size_t n);
const AttributeHeader& ah() const;
// non-const pointer can be cast to const pointer
ConstData(Data data);
ConstData& operator=(Data data);
};
// AttributeHeader size is assumed to be 1 word // AttributeHeader size is assumed to be 1 word
STATIC_CONST( AttributeHeaderSize = 1 ); STATIC_CONST( AttributeHeaderSize = 1 );
...@@ -212,6 +188,7 @@ private: ...@@ -212,6 +188,7 @@ private:
unsigned m_fragBit : 1; // which duplicated table fragment unsigned m_fragBit : 1; // which duplicated table fragment
TreeEnt(); TreeEnt();
// methods // methods
bool eqtuple(const TreeEnt ent) const;
bool eq(const TreeEnt ent) const; bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const; int cmp(const TreeEnt ent) const;
}; };
...@@ -289,8 +266,7 @@ private: ...@@ -289,8 +266,7 @@ private:
struct TreePos { struct TreePos {
TupLoc m_loc; // physical node address TupLoc m_loc; // physical node address
Uint16 m_pos; // position 0 to m_occup Uint16 m_pos; // position 0 to m_occup
Uint8 m_match; // at an existing entry Uint8 m_dir; // see scanNext
Uint8 m_dir; // see scanNext()
TreePos(); TreePos();
}; };
...@@ -381,12 +357,13 @@ private: ...@@ -381,12 +357,13 @@ private:
enum { enum {
Undef = 0, Undef = 0,
First = 1, // before first entry First = 1, // before first entry
Current = 2, // at current before locking Current = 2, // at some entry
Blocked = 3, // at current waiting for ACC lock Found = 3, // return current as next scan result
Locked = 4, // at current and locked or no lock needed Blocked = 4, // found and waiting for ACC lock
Next = 5, // looking for next extry Locked = 5, // found and locked or no lock needed
Last = 6, // after last entry Next = 6, // looking for next extry
Aborting = 7, // lock wait at scan close Last = 7, // after last entry
Aborting = 8, // lock wait at scan close
Invalid = 9 // cannot return REF to LQH currently Invalid = 9 // cannot return REF to LQH currently
}; };
Uint16 m_state; Uint16 m_state;
...@@ -563,6 +540,7 @@ private: ...@@ -563,6 +540,7 @@ private:
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData); void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize); void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
void unpackBound(const ScanBound& bound, Data data);
/* /*
* DbtuxMeta.cpp * DbtuxMeta.cpp
...@@ -637,7 +615,9 @@ private: ...@@ -637,7 +615,9 @@ private:
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanFind(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr, bool fromMaintReq); void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -647,8 +627,8 @@ private: ...@@ -647,8 +627,8 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
...@@ -737,99 +717,6 @@ private: ...@@ -737,99 +717,6 @@ private:
static unsigned max(unsigned x, unsigned y); static unsigned max(unsigned x, unsigned y);
}; };
// Dbtux::Data
inline
Dbtux::Data::Data() :
m_data(0)
{
}
inline
Dbtux::Data::Data(Uint32* data) :
m_data(data)
{
}
inline Dbtux::Data&
Dbtux::Data::operator=(Uint32* data)
{
m_data = data;
return *this;
}
inline
Dbtux::Data::operator Uint32*() const
{
return m_data;
}
inline Dbtux::Data&
Dbtux::Data::operator+=(size_t n)
{
m_data += n;
return *this;
}
inline AttributeHeader&
Dbtux::Data::ah() const
{
return *reinterpret_cast<AttributeHeader*>(m_data);
}
// Dbtux::ConstData
inline
Dbtux::ConstData::ConstData() :
m_data(0)
{
}
inline
Dbtux::ConstData::ConstData(const Uint32* data) :
m_data(data)
{
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator=(const Uint32* data)
{
m_data = data;
return *this;
}
inline
Dbtux::ConstData::operator const Uint32*() const
{
return m_data;
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator+=(size_t n)
{
m_data += n;
return *this;
}
inline const AttributeHeader&
Dbtux::ConstData::ah() const
{
return *reinterpret_cast<const AttributeHeader*>(m_data);
}
inline
Dbtux::ConstData::ConstData(Data data) :
m_data(static_cast<Uint32*>(data))
{
}
inline Dbtux::ConstData&
Dbtux::ConstData::operator=(Data data)
{
m_data = static_cast<Uint32*>(data);
return *this;
}
// Dbtux::TupLoc // Dbtux::TupLoc
inline inline
...@@ -898,6 +785,14 @@ Dbtux::TreeEnt::TreeEnt() : ...@@ -898,6 +785,14 @@ Dbtux::TreeEnt::TreeEnt() :
{ {
} }
inline bool
Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const
{
return
m_tupLoc == ent.m_tupLoc &&
m_fragBit == ent.m_fragBit;
}
inline bool inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const Dbtux::TreeEnt::eq(const TreeEnt ent) const
{ {
...@@ -910,6 +805,11 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const ...@@ -910,6 +805,11 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const
inline int inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{ {
// compare frag first to improve cacheing in 5.0
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId()) if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId())
return -1; return -1;
if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId()) if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId())
...@@ -918,14 +818,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const ...@@ -918,14 +818,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1; return -1;
if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset()) if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset())
return +1; return +1;
if (m_tupVersion < ent.m_tupVersion) /*
* Guess if one tuple version has wrapped around. This is well
* defined ordering on existing versions since versions are assigned
* consecutively and different versions exists only on uncommitted
* tuple. Assuming max 2**14 uncommitted ops on same tuple.
*/
const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1));
if (m_tupVersion < ent.m_tupVersion) {
if (ent.m_tupVersion - m_tupVersion < version_wrap_limit)
return -1; return -1;
if (m_tupVersion > ent.m_tupVersion) else
return +1; return +1;
if (m_fragBit < ent.m_fragBit) }
return -1; if (m_tupVersion > ent.m_tupVersion) {
if (m_fragBit > ent.m_fragBit) if (m_tupVersion - ent.m_tupVersion < version_wrap_limit)
return +1; return +1;
else
return -1;
}
return 0; return 0;
} }
...@@ -992,7 +903,6 @@ inline ...@@ -992,7 +903,6 @@ inline
Dbtux::TreePos::TreePos() : Dbtux::TreePos::TreePos() :
m_loc(), m_loc(),
m_pos(ZNIL), m_pos(ZNIL),
m_match(false),
m_dir(255) m_dir(255)
{ {
} }
......
...@@ -34,7 +34,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -34,7 +34,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
// skip to right position in search key only // skip to right position in search key only
for (unsigned i = 0; i < start; i++) { for (unsigned i = 0; i < start; i++) {
jam(); jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize(); searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
} }
// number of words of entry data left // number of words of entry data left
unsigned len2 = maxlen; unsigned len2 = maxlen;
...@@ -46,16 +46,16 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -46,16 +46,16 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break; break;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
if (! searchKey.ah().isNULL()) { if (! ah(searchKey).isNULL()) {
if (! entryData.ah().isNULL()) { if (! ah(entryData).isNULL()) {
jam(); jam();
// verify attribute id // verify attribute id
const DescAttr& descAttr = descEnt.m_descAttr[start]; const DescAttr& descAttr = descEnt.m_descAttr[start];
ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes // sizes
const unsigned size1 = searchKey.ah().getDataSize(); const unsigned size1 = ah(searchKey).getDataSize();
const unsigned size2 = min(entryData.ah().getDataSize(), len2); const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2; len2 -= size2;
// compare // compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start]; NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
...@@ -74,15 +74,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -74,15 +74,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break; break;
} }
} else { } else {
if (! entryData.ah().isNULL()) { if (! ah(entryData).isNULL()) {
jam(); jam();
// NULL < not NULL // NULL < not NULL
ret = -1; ret = -1;
break; break;
} }
} }
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize(); searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize(); entryData += AttributeHeaderSize + ah(entryData).getDataSize();
start++; start++;
} }
return ret; return ret;
...@@ -130,17 +130,17 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign ...@@ -130,17 +130,17 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
// get and skip bound type (it is used after the loop) // get and skip bound type (it is used after the loop)
type = boundInfo[0]; type = boundInfo[0];
boundInfo += 1; boundInfo += 1;
if (! boundInfo.ah().isNULL()) { if (! ah(boundInfo).isNULL()) {
if (! entryData.ah().isNULL()) { if (! ah(entryData).isNULL()) {
jam(); jam();
// verify attribute id // verify attribute id
const Uint32 index = boundInfo.ah().getAttributeId(); const Uint32 index = ah(boundInfo).getAttributeId();
ndbrequire(index < frag.m_numAttrs); ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index]; const DescAttr& descAttr = descEnt.m_descAttr[index];
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes // sizes
const unsigned size1 = boundInfo.ah().getDataSize(); const unsigned size1 = ah(boundInfo).getDataSize();
const unsigned size2 = min(entryData.ah().getDataSize(), len2); const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2; len2 -= size2;
// compare // compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index]; NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
...@@ -159,14 +159,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign ...@@ -159,14 +159,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
} }
} else { } else {
jam(); jam();
if (! entryData.ah().isNULL()) { if (! ah(entryData).isNULL()) {
jam(); jam();
// NULL < not NULL // NULL < not NULL
return -1; return -1;
} }
} }
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize(); boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize(); entryData += AttributeHeaderSize + ah(entryData).getDataSize();
boundCount -= 1; boundCount -= 1;
} }
// all attributes were equal // all attributes were equal
......
...@@ -311,7 +311,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos) ...@@ -311,7 +311,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos)
out << "[TreePos " << hex << &pos; out << "[TreePos " << hex << &pos;
out << " [loc " << pos.m_loc << "]"; out << " [loc " << pos.m_loc << "]";
out << " [pos " << dec << pos.m_pos << "]"; out << " [pos " << dec << pos.m_pos << "]";
out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]"; out << " [dir " << dec << pos.m_dir << "]";
out << "]"; out << "]";
return out; return out;
......
...@@ -221,7 +221,7 @@ Dbtux::setKeyAttrs(const Frag& frag) ...@@ -221,7 +221,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
const DescAttr& descAttr = descEnt.m_descAttr[i]; const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size // set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size); ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1; keyAttrs += 1;
// set comparison method pointer // set comparison method pointer
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId); const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
...@@ -251,8 +251,8 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData) ...@@ -251,8 +251,8 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
ConstData data = keyData; ConstData data = keyData;
Uint32 totalSize = 0; Uint32 totalSize = 0;
for (Uint32 i = start; i < frag.m_numAttrs; i++) { for (Uint32 i = start; i < frag.m_numAttrs; i++) {
Uint32 attrId = data.ah().getAttributeId(); Uint32 attrId = ah(data).getAttributeId();
Uint32 dataSize = data.ah().getDataSize(); Uint32 dataSize = ah(data).getDataSize();
debugOut << i << " attrId=" << attrId << " size=" << dataSize; debugOut << i << " attrId=" << attrId << " size=" << dataSize;
data += 1; data += 1;
for (Uint32 j = 0; j < dataSize; j++) { for (Uint32 j = 0; j < dataSize; j++) {
...@@ -290,7 +290,7 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 ...@@ -290,7 +290,7 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
unsigned len2 = maxlen2; unsigned len2 = maxlen2;
while (n != 0) { while (n != 0) {
jam(); jam();
const unsigned dataSize = data1.ah().getDataSize(); const unsigned dataSize = ah(data1).getDataSize();
// copy header // copy header
if (len2 == 0) if (len2 == 0)
return; return;
...@@ -314,4 +314,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 ...@@ -314,4 +314,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
#endif #endif
} }
void
Dbtux::unpackBound(const ScanBound& bound, Data dest)
{
ScanBoundIterator iter;
bound.first(iter);
const unsigned n = bound.getSize();
unsigned j;
for (j = 0; j < n; j++) {
dest[j] = *iter.data;
bound.next(iter);
}
}
BLOCK_FUNCTIONS(Dbtux) BLOCK_FUNCTIONS(Dbtux)
...@@ -113,16 +113,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -113,16 +113,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
// do the operation // do the operation
req->errorCode = 0; req->errorCode = 0;
TreePos treePos; TreePos treePos;
bool ok;
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(frag, c_searchKey, ent, treePos); ok = searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! ok ? " - error" : "") << endl;
} }
#endif #endif
if (treePos.m_match) { if (! ok) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) { if (indexPtr.p->m_state == Index::Online) {
...@@ -152,13 +153,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -152,13 +153,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(frag, c_searchKey, ent, treePos); ok = searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! ok ? " - error" : "") << endl;
} }
#endif #endif
if (! treePos.m_match) { if (! ok) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) { if (indexPtr.p->m_state == Index::Online) {
......
...@@ -421,21 +421,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -421,21 +421,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
}
#endif
} }
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Current ||
scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(scanPtr, false); scanFind(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Found or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
unsigned pkSize = 0; // indicates not yet done unsigned pkSize = 0; // indicates not yet done
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Found) {
// found an entry to return // found an entry to return
jam(); jam();
ndbrequire(scan.m_accLockOp == RNIL); ndbrequire(scan.m_accLockOp == RNIL);
...@@ -509,8 +505,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -509,8 +505,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
// max ops should depend on max scans (assert only) // max ops should depend on max scans (assert only)
ndbassert(false); ndbassert(false);
// stay in Current state // stay in Found state
scan.m_state = ScanOp::Current; scan.m_state = ScanOp::Found;
signal->theData[0] = scan.m_userPtr; signal->theData[0] = scan.m_userPtr;
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
...@@ -697,44 +693,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -697,44 +693,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
} }
/* /*
* Find start position for single range scan. If it exists, sets state * Find start position for single range scan.
* to Next and links the scan to the node. The first entry is returned
* by scanNext.
*/ */
void void
Dbtux::scanFirst(ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
}
#endif
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// scan direction 0, 1 // scan direction 0, 1
const unsigned idir = scan.m_descending; const unsigned idir = scan.m_descending;
// unpack start key into c_dataBuffer unpackBound(*scan.m_bound[idir], c_dataBuffer);
const ScanBound& bound = *scan.m_bound[idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
jam();
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
TreePos treePos; TreePos treePos;
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc != NullTupLoc) {
// empty result set
jam();
scan.m_state = ScanOp::Last;
return;
}
// set position and state
scan.m_scanPos = treePos; scan.m_scanPos = treePos;
scan.m_state = ScanOp::Next;
// link the scan to node found // link the scan to node found
NodeHandle node(frag); NodeHandle node(frag);
selectNode(node, treePos.m_loc); selectNode(node, treePos.m_loc);
linkScan(node, scanPtr); linkScan(node, scanPtr);
if (treePos.m_dir == 3) {
jam();
// check upper bound
TreeEnt ent = node.getEnt(treePos.m_pos);
if (scanCheck(scanPtr, ent))
scan.m_state = ScanOp::Current;
else
scan.m_state = ScanOp::Last;
} else {
scan.m_state = ScanOp::Next;
}
} else {
jam();
scan.m_state = ScanOp::Last;
}
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
}
#endif
}
/*
* Look for entry to return as scan result.
*/
void
Dbtux::scanFind(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
}
#endif
ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
while (1) {
jam();
if (scan.m_state == ScanOp::Next)
scanNext(scanPtr, false);
if (scan.m_state == ScanOp::Current) {
jam();
const TreePos pos = scan.m_scanPos;
NodeHandle node(frag);
selectNode(node, pos.m_loc);
const TreeEnt ent = node.getEnt(pos.m_pos);
if (scanVisible(scanPtr, ent)) {
jam();
scan.m_state = ScanOp::Found;
scan.m_scanEnt = ent;
break;
}
} else {
jam();
break;
}
scan.m_state = ScanOp::Next;
}
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
}
#endif
} }
/* /*
...@@ -752,6 +799,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -752,6 +799,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* *
* If an entry was found, scan direction is 3. Therefore tree * If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction. * re-organizations need not worry about scan direction.
*
* This method is also used to move a scan when its entry is removed
* (see moveScanList). If the scan is Blocked, we check if it remains
* Blocked on a different version of the tuple. Otherwise the tuple is
* lost and state becomes Current.
*/ */
void void
Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
...@@ -759,8 +811,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -759,8 +811,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
// cannot be moved away from tuple we have locked // cannot be moved away from tuple we have locked
...@@ -770,15 +822,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -770,15 +822,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// scan direction // scan direction
const unsigned idir = scan.m_descending; // 0, 1 const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1 const int jdir = 1 - 2 * (int)idir; // 1, -1
// unpack end key into c_dataBuffer unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
const ScanBound& bound = *scan.m_bound[1 - idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
jam();
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// use copy of position // use copy of position
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
...@@ -792,15 +836,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -792,15 +836,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
while (true) { while (true) {
jam(); jam();
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Scan next pos " << pos << " " << node << endl; debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
} }
#endif #endif
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
// coming up from root ends the scan // coming up from root ends the scan
jam(); jam();
pos.m_loc = NullTupLoc; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break; break;
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
...@@ -832,41 +875,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -832,41 +875,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
if (pos.m_dir == idir) { if (pos.m_dir == idir) {
// coming up from left child scan current node // coming up from left child scan current node
jam(); jam();
pos.m_pos = idir == 0 ? 0 : occup - 1; pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
pos.m_match = false;
pos.m_dir = 3; pos.m_dir = 3;
} }
if (pos.m_dir == 3) { if (pos.m_dir == 3) {
// within node // before or within node
jam(); jam();
// advance position // advance position - becomes ZNIL (> occup) if 0 and descending
if (! pos.m_match)
pos.m_match = true;
else
// becomes ZNIL (which is > occup) if 0 and scan descending
pos.m_pos += jdir; pos.m_pos += jdir;
if (pos.m_pos < occup) { if (pos.m_pos < occup) {
jam(); jam();
ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// read and compare all attributes ent = node.getEnt(pos.m_pos);
readKeyAttrs(frag, ent, 0, c_entryKey); if (! scanCheck(scanPtr, ent)) {
int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (jdir * ret < 0) {
jam(); jam();
// hit upper bound of single range scan
pos.m_loc = NullTupLoc; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break;
} }
// can we see it
if (! scanVisible(scanPtr, ent)) {
jam();
continue;
}
// found entry
scan.m_state = ScanOp::Current;
break; break;
} }
// after node proceed to right child // after node proceed to right child
...@@ -892,30 +916,63 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -892,30 +916,63 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// copy back position // copy back position
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (pos.m_loc != NullTupLoc) {
ndbrequire(pos.m_match == true && pos.m_dir == 3); ndbrequire(pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
unlinkScan(origNode, scanPtr); unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
// copy found entry if (scan.m_state != ScanOp::Blocked) {
scan.m_scanEnt = ent; scan.m_state = ScanOp::Current;
} else if (scan.m_state == ScanOp::Last) { } else {
jam(); jam();
ndbrequire(pos.m_loc == NullTupLoc); ndbrequire(fromMaintReq);
unlinkScan(origNode, scanPtr); TreeEnt& scanEnt = scan.m_scanEnt;
ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
if (scanEnt.eqtuple(ent)) {
// remains blocked on another version
scanEnt = ent;
} else { } else {
ndbrequire(false); jam();
scanEnt.m_tupLoc = NullTupLoc;
scan.m_state = ScanOp::Current;
}
}
} else {
jam();
unlinkScan(origNode, scanPtr);
scan.m_state = ScanOp::Last;
} }
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Next out scan " << scanPtr.i << " " << scan << endl; debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
} }
/*
* Check end key. Return true if scan is still within range.
*/
bool
Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
const unsigned idir = scan.m_descending;
const int jdir = 1 - 2 * (int)idir;
unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
unsigned boundCnt = scan.m_boundCnt[1 - idir];
readKeyAttrs(frag, ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (jdir * ret > 0)
return true;
// hit upper bound of single range scan
return false;
}
/* /*
* Check if an entry is visible to the scan. * Check if an entry is visible to the scan.
* *
......
...@@ -21,22 +21,18 @@ ...@@ -21,22 +21,18 @@
* Search for entry to add. * Search for entry to add.
* *
* Similar to searchToRemove (see below). * Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/ */
void bool
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = false;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
return; return true;
} }
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
/* /*
...@@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
// failed // entry found - error
treePos.m_match = true; return false;
return;
} }
break; break;
} }
...@@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
// binary search // binary search
int lo = -1; int lo = -1;
unsigned hi = currNode.getOccup(); int hi = currNode.getOccup();
int ret; int ret;
while (1) { while (1) {
jam(); jam();
...@@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
lo = j; lo = j;
else { else {
treePos.m_pos = j; treePos.m_pos = j;
// failed // entry found - error
treePos.m_match = true; return false;
return;
} }
if (hi - lo == 1) if (hi - lo == 1)
break; break;
...@@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
if (ret < 0) { if (ret < 0) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
if (hi < currNode.getOccup()) { if (hi < currNode.getOccup()) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
if (bottomNode.isNull()) { if (bottomNode.isNull()) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
jam(); jam();
// backwards compatible for now // backwards compatible for now
treePos.m_loc = bottomNode.m_loc; treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
return true;
} }
/* /*
...@@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
* then the saved node is the g.l.b of the final node and we move back * then the saved node is the g.l.b of the final node and we move back
* to it. * to it.
*/ */
void bool
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = true;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree - failed
jam(); jam();
// failed return false;
treePos.m_match = false;
return;
} }
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
...@@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo ...@@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
return; return true;
} }
break; break;
} }
...@@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo ...@@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
if (searchEnt.eq(currNode.getEnt(j))) { if (searchEnt.eq(currNode.getEnt(j))) {
jam(); jam();
treePos.m_pos = j; treePos.m_pos = j;
return; return true;
} }
} }
treePos.m_pos = currNode.getOccup(); treePos.m_pos = currNode.getOccup();
// failed // not found - failed
treePos.m_match = false; return false;
} }
/* /*
...@@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) { while (true) {
jam(); jam();
selectNode(currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
...@@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
treePos.m_dir = 3; treePos.m_dir = 3;
return; return;
} }
} else if (ret > 0) { } else {
// bound is at or right of this node // bound is at or right of this node
jam(); jam();
const TupLoc loc = currNode.getLink(1); const TupLoc loc = currNode.getLink(1);
...@@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = loc; currNode.m_loc = loc;
continue; continue;
} }
} else {
ndbrequire(false);
} }
break; break;
} }
...@@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) { while (true) {
jam(); jam();
selectNode(currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
...@@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
// empty result set // empty result set
return; return;
} }
} else if (ret > 0) { } else {
// bound is at or right of this node // bound is at or right of this node
jam(); jam();
const TupLoc loc = currNode.getLink(1); const TupLoc loc = currNode.getLink(1);
...@@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = loc; currNode.m_loc = loc;
continue; continue;
} }
} else {
ndbrequire(false);
} }
break; break;
} }
......
...@@ -47,7 +47,6 @@ struct Opt { ...@@ -47,7 +47,6 @@ struct Opt {
int m_die; int m_die;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_subsubloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_msglock; bool m_msglock;
...@@ -56,6 +55,7 @@ struct Opt { ...@@ -56,6 +55,7 @@ struct Opt {
unsigned m_pctnull; unsigned m_pctnull;
unsigned m_rows; unsigned m_rows;
unsigned m_samples; unsigned m_samples;
unsigned m_scanbatch;
unsigned m_scanpar; unsigned m_scanpar;
unsigned m_scanstop; unsigned m_scanstop;
int m_seed; int m_seed;
...@@ -74,7 +74,6 @@ struct Opt { ...@@ -74,7 +74,6 @@ struct Opt {
m_die(0), m_die(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_subsubloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_msglock(true), m_msglock(true),
...@@ -83,6 +82,7 @@ struct Opt { ...@@ -83,6 +82,7 @@ struct Opt {
m_pctnull(10), m_pctnull(10),
m_rows(1000), m_rows(1000),
m_samples(0), m_samples(0),
m_scanbatch(0),
m_scanpar(0), m_scanpar(0),
m_scanstop(0), m_scanstop(0),
m_seed(-1), m_seed(-1),
...@@ -120,9 +120,10 @@ printhelp() ...@@ -120,9 +120,10 @@ printhelp()
<< " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl << " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl << " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl << " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
<< " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
<< " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl << " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
<< " -subloop N subtest loop count [" << d.m_subloop << "]" << endl << " -subloop N subtest (and subsubtest) loop count [" << d.m_subloop << "]" << endl
<< " -table xyz only given table numbers (digits 0-9)" << endl << " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl << " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl << " -vN verbosity [" << d.m_v << "]" << endl
...@@ -294,6 +295,7 @@ struct Par : public Opt { ...@@ -294,6 +295,7 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; } Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
char m_currcase[2];
unsigned m_lno; unsigned m_lno;
unsigned m_slno; unsigned m_slno;
unsigned m_totrows; unsigned m_totrows;
...@@ -302,6 +304,7 @@ struct Par : public Opt { ...@@ -302,6 +304,7 @@ struct Par : public Opt {
unsigned m_pctrange; unsigned m_pctrange;
unsigned m_pctbrange; unsigned m_pctbrange;
int m_bdir; int m_bdir;
bool m_noindexkeyupdate;
// choice of key // choice of key
bool m_randomkey; bool m_randomkey;
// do verify after read // do verify after read
...@@ -330,6 +333,7 @@ struct Par : public Opt { ...@@ -330,6 +333,7 @@ struct Par : public Opt {
m_pctrange(40), m_pctrange(40),
m_pctbrange(80), m_pctbrange(80),
m_bdir(0), m_bdir(0),
m_noindexkeyupdate(false),
m_randomkey(false), m_randomkey(false),
m_verify(false), m_verify(false),
m_deadlock(false), m_deadlock(false),
...@@ -337,7 +341,9 @@ struct Par : public Opt { ...@@ -337,7 +341,9 @@ struct Par : public Opt {
m_lockmode(NdbOperation::LM_Read), m_lockmode(NdbOperation::LM_Read),
m_tupscan(false), m_tupscan(false),
m_ordered(false), m_ordered(false),
m_descending(false) { m_descending(false)
{
m_currcase[0] = 0;
} }
}; };
...@@ -892,6 +898,8 @@ struct Tab { ...@@ -892,6 +898,8 @@ struct Tab {
const Col** m_col; const Col** m_col;
unsigned m_itabs; unsigned m_itabs;
const ITab** m_itab; const ITab** m_itab;
unsigned m_orderedindexes;
unsigned m_hashindexes;
// pk must contain an Unsigned column // pk must contain an Unsigned column
unsigned m_keycol; unsigned m_keycol;
void coladd(unsigned k, Col* colptr); void coladd(unsigned k, Col* colptr);
...@@ -906,6 +914,8 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) : ...@@ -906,6 +914,8 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_col(new const Col* [cols + 1]), m_col(new const Col* [cols + 1]),
m_itabs(itabs), m_itabs(itabs),
m_itab(new const ITab* [itabs + 1]), m_itab(new const ITab* [itabs + 1]),
m_orderedindexes(0),
m_hashindexes(0),
m_keycol(keycol) m_keycol(keycol)
{ {
for (unsigned k = 0; k <= cols; k++) for (unsigned k = 0; k <= cols; k++)
...@@ -935,8 +945,12 @@ Tab::coladd(unsigned k, Col* colptr) ...@@ -935,8 +945,12 @@ Tab::coladd(unsigned k, Col* colptr)
void void
Tab::itabadd(unsigned j, ITab* itabptr) Tab::itabadd(unsigned j, ITab* itabptr)
{ {
assert(j < m_itabs && m_itab[j] == 0); assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
m_itab[j] = itabptr; m_itab[j] = itabptr;
if (itabptr->m_type == ITab::OrderedIndex)
m_orderedindexes++;
else
m_hashindexes++;
} }
static NdbOut& static NdbOut&
...@@ -1434,7 +1448,7 @@ Con::readTuples(Par par) ...@@ -1434,7 +1448,7 @@ Con::readTuples(Par par)
int scan_flags = 0; int scan_flags = 0;
if (par.m_tupscan) if (par.m_tupscan)
scan_flags |= NdbScanOperation::SF_TupScan; scan_flags |= NdbScanOperation::SF_TupScan;
CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar) == 0, *this); CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0; return 0;
} }
...@@ -1442,7 +1456,12 @@ int ...@@ -1442,7 +1456,12 @@ int
Con::readIndexTuples(Par par) Con::readIndexTuples(Par par)
{ {
assert(m_tx != 0 && m_indexscanop != 0); assert(m_tx != 0 && m_indexscanop != 0);
CHKCON(m_indexscanop->readTuples(par.m_lockmode, 0, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this); int scan_flags = 0;
if (par.m_ordered)
scan_flags |= NdbScanOperation::SF_OrderBy;
if (par.m_descending)
scan_flags |= NdbScanOperation::SF_Descending;
CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0; return 0;
} }
...@@ -2193,7 +2212,7 @@ struct Row { ...@@ -2193,7 +2212,7 @@ struct Row {
void copy(const Row& row2); void copy(const Row& row2);
void calc(Par par, unsigned i, unsigned mask = 0); void calc(Par par, unsigned i, unsigned mask = 0);
const Row& dbrow() const; const Row& dbrow() const;
int verify(Par par, const Row& row2) const; int verify(Par par, const Row& row2, bool pkonly) const;
int insrow(Par par); int insrow(Par par);
int updrow(Par par); int updrow(Par par);
int updrow(Par par, const ITab& itab); int updrow(Par par, const ITab& itab);
...@@ -2275,16 +2294,19 @@ Row::dbrow() const ...@@ -2275,16 +2294,19 @@ Row::dbrow() const
} }
int int
Row::verify(Par par, const Row& row2) const Row::verify(Par par, const Row& row2, bool pkonly) const
{ {
const Tab& tab = m_tab; const Tab& tab = m_tab;
const Row& row1 = *this; const Row& row1 = *this;
assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist); assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) { for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = row1.m_val[k]->m_col;
if (! pkonly || col.m_pk) {
const Val& val1 = *row1.m_val[k]; const Val& val1 = *row1.m_val[k];
const Val& val2 = *row2.m_val[k]; const Val& val2 = *row2.m_val[k];
CHK(val1.verify(par, val2) == 0); CHK(val1.verify(par, val2) == 0);
} }
}
return 0; return 0;
} }
...@@ -2585,8 +2607,11 @@ struct Set { ...@@ -2585,8 +2607,11 @@ struct Set {
int getval(Par par); int getval(Par par);
int getkey(Par par, unsigned* i); int getkey(Par par, unsigned* i);
int putval(unsigned i, bool force, unsigned n = ~0); int putval(unsigned i, bool force, unsigned n = ~0);
// sort rows in-place according to ordered index
void sort(Par par, const ITab& itab);
void sort(Par par, const ITab& itab, unsigned lo, unsigned hi);
// verify // verify
int verify(Par par, const Set& set2) const; int verify(Par par, const Set& set2, bool pkonly) const;
int verifyorder(Par par, const ITab& itab, bool descending) const; int verifyorder(Par par, const ITab& itab, bool descending) const;
// protect structure // protect structure
NdbMutex* m_mutex; NdbMutex* m_mutex;
...@@ -2890,6 +2915,7 @@ Set::getkey(Par par, unsigned* i) ...@@ -2890,6 +2915,7 @@ Set::getkey(Par par, unsigned* i)
assert(m_rec[k] != 0); assert(m_rec[k] != 0);
const char* aRef = m_rec[k]->aRef(); const char* aRef = m_rec[k]->aRef();
Uint32 key = *(const Uint32*)aRef; Uint32 key = *(const Uint32*)aRef;
LL5("getkey: " << key);
CHK(key < m_rows); CHK(key < m_rows);
*i = key; *i = key;
return 0; return 0;
...@@ -2922,8 +2948,43 @@ Set::putval(unsigned i, bool force, unsigned n) ...@@ -2922,8 +2948,43 @@ Set::putval(unsigned i, bool force, unsigned n)
return 0; return 0;
} }
void
Set::sort(Par par, const ITab& itab)
{
if (m_rows != 0)
sort(par, itab, 0, m_rows - 1);
}
void
Set::sort(Par par, const ITab& itab, unsigned lo, unsigned hi)
{
assert(lo < m_rows && hi < m_rows && lo <= hi);
Row* const p = m_row[lo];
unsigned i = lo;
unsigned j = hi;
while (i < j) {
while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
j--;
if (i < j) {
m_row[i] = m_row[j];
i++;
}
while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
i++;
if (i < j) {
m_row[j] = m_row[i];
j--;
}
}
m_row[i] = p;
if (lo < i)
sort(par, itab, lo, i - 1);
if (hi > i)
sort(par, itab, i + 1, hi);
}
int int
Set::verify(Par par, const Set& set2) const Set::verify(Par par, const Set& set2, bool pkonly) const
{ {
assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows); assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count()); LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
...@@ -2932,7 +2993,7 @@ Set::verify(Par par, const Set& set2) const ...@@ -2932,7 +2993,7 @@ Set::verify(Par par, const Set& set2) const
if (exist(i) != set2.exist(i)) { if (exist(i) != set2.exist(i)) {
ok = false; ok = false;
} else if (exist(i)) { } else if (exist(i)) {
if (dbrow(i).verify(par, set2.dbrow(i)) != 0) if (dbrow(i).verify(par, set2.dbrow(i), pkonly) != 0)
ok = false; ok = false;
} }
if (! ok) { if (! ok) {
...@@ -3490,7 +3551,7 @@ pkread(Par par) ...@@ -3490,7 +3551,7 @@ pkread(Par par)
con.closeTransaction(); con.closeTransaction();
} }
if (par.m_verify) if (par.m_verify)
CHK(set1.verify(par, set2) == 0); CHK(set1.verify(par, set2, false) == 0);
return 0; return 0;
} }
...@@ -3657,7 +3718,7 @@ hashindexread(Par par, const ITab& itab) ...@@ -3657,7 +3718,7 @@ hashindexread(Par par, const ITab& itab)
con.closeTransaction(); con.closeTransaction();
} }
if (par.m_verify) if (par.m_verify)
CHK(set1.verify(par, set2) == 0); CHK(set1.verify(par, set2, false) == 0);
return 0; return 0;
} }
...@@ -3698,7 +3759,7 @@ scanreadtable(Par par) ...@@ -3698,7 +3759,7 @@ scanreadtable(Par par)
} }
con.closeTransaction(); con.closeTransaction();
if (par.m_verify) if (par.m_verify)
CHK(set1.verify(par, set2) == 0); CHK(set1.verify(par, set2, false) == 0);
LL3("scanread " << tab.m_name << " done rows=" << n); LL3("scanread " << tab.m_name << " done rows=" << n);
return 0; return 0;
} }
...@@ -3730,14 +3791,10 @@ scanreadtablefast(Par par, unsigned countcheck) ...@@ -3730,14 +3791,10 @@ scanreadtablefast(Par par, unsigned countcheck)
return 0; return 0;
} }
static int // try to get interesting bounds
scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) static void
calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
{ {
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
while (true) { while (true) {
bset.calc(par); bset.calc(par);
bset.filter(par, set, set1); bset.filter(par, set, set1);
...@@ -3745,14 +3802,25 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) ...@@ -3745,14 +3802,25 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
// prefer proper subset // prefer proper subset
if (0 < n && n < set.m_rows) if (0 < n && n < set.m_rows)
break; break;
if (urandom(3) == 0) if (urandom(5) == 0)
break; break;
set1.reset(); set1.reset();
} }
}
static int
scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
Con& con = par.con();
const Tab& tab = par.tab();
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
calcscanbounds(par, itab, bset, set, set1);
} else { } else {
bset.filter(par, set, set1); bset.filter(par, set, set1);
} }
LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending); LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows); Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
CHK(con.getNdbIndexScanOperation(itab, tab) == 0); CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
...@@ -3780,7 +3848,7 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc) ...@@ -3780,7 +3848,7 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
} }
con.closeTransaction(); con.closeTransaction();
if (par.m_verify) { if (par.m_verify) {
CHK(set1.verify(par, set2) == 0); CHK(set1.verify(par, set2, false) == 0);
if (par.m_ordered) if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0); CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
} }
...@@ -3825,17 +3893,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc) ...@@ -3825,17 +3893,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
const Set& set = par.set(); const Set& set = par.set();
Set set1(tab, set.m_rows); Set set1(tab, set.m_rows);
if (calc) { if (calc) {
while (true) { calcscanbounds(par, itab, bset, set, set1);
bset.calc(par);
bset.filter(par, set, set1);
unsigned n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
if (urandom(3) == 0)
break;
set1.reset();
}
} else { } else {
bset.filter(par, set, set1); bset.filter(par, set, set1);
} }
...@@ -3867,7 +3925,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc) ...@@ -3867,7 +3925,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
} }
con.closeTransaction(); con.closeTransaction();
if (par.m_verify) { if (par.m_verify) {
CHK(set1.verify(par, set2) == 0); CHK(set1.verify(par, set2, false) == 0);
} }
LL3("scanfilter " << itab.m_name << " done rows=" << n); LL3("scanfilter " << itab.m_name << " done rows=" << n);
return 0; return 0;
...@@ -3877,7 +3935,7 @@ static int ...@@ -3877,7 +3935,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) { if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
CHK(scanreadfilter(par, itab, bset, true) == 0); CHK(scanreadfilter(par, itab, bset, true) == 0);
...@@ -4068,12 +4126,19 @@ out: ...@@ -4068,12 +4126,19 @@ out:
} }
static int static int
scanupdateindex(Par par, const ITab& itab, const BSet& bset) scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
{ {
Con& con = par.con(); Con& con = par.con();
const Tab& tab = par.tab(); const Tab& tab = par.tab();
Set& set = par.set(); Set& set = par.set();
LL3("scan update " << itab.m_name); // expected
Set set1(tab, set.m_rows);
if (calc) {
calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
LL3("scan update " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows); Set set2(tab, set.m_rows);
par.m_lockmode = NdbOperation::LM_Exclusive; par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
...@@ -4117,7 +4182,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -4117,7 +4182,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Par par2 = par; Par par2 = par;
par2.m_con = &con2; par2.m_con = &con2;
set.dbsave(i); set.dbsave(i);
set.calc(par, i); set.calc(par, i, ! par.m_noindexkeyupdate ? 0 : itab.m_colmask);
CHKTRY(set.setrow(par2, i) == 0, set.unlock()); CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row); LL4("scan update " << itab.m_name << ": " << row);
lst.push(i); lst.push(i);
...@@ -4131,6 +4196,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -4131,6 +4196,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out; goto out;
} }
con2.closeTransaction(); con2.closeTransaction();
LL4("scanupdateindex: committed batch [at 1]");
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst);
set.dbdiscard(lst); set.dbdiscard(lst);
...@@ -4148,6 +4214,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -4148,6 +4214,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out; goto out;
} }
con2.closeTransaction(); con2.closeTransaction();
LL4("scanupdateindex: committed batch [at 2]");
set.lock(); set.lock();
set.notpending(lst); set.notpending(lst);
set.dbdiscard(lst); set.dbdiscard(lst);
...@@ -4160,6 +4227,11 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset) ...@@ -4160,6 +4227,11 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
} }
out: out:
con2.closeTransaction(); con2.closeTransaction();
if (par.m_verify) {
CHK(set1.verify(par, set2, true) == 0);
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
LL3("scan update " << itab.m_name << " rows updated=" << count); LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction(); con.closeTransaction();
return 0; return 0;
...@@ -4169,11 +4241,10 @@ static int ...@@ -4169,11 +4241,10 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_subsubloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) { if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); CHK(scanupdateindex(par, itab, bset, true) == 0);
CHK(scanupdateindex(par, itab, bset) == 0);
} else { } else {
CHK(hashindexupdate(par, itab) == 0); CHK(hashindexupdate(par, itab) == 0);
} }
...@@ -4204,22 +4275,6 @@ scanupdateall(Par par) ...@@ -4204,22 +4275,6 @@ scanupdateall(Par par)
// medium level routines // medium level routines
static int
readverify(Par par)
{
if (par.m_noverify)
return 0;
par.m_verify = true;
if (par.m_abortpct != 0) {
LL2("skip verify in this version"); // implement in 5.0 version
par.m_verify = false;
}
par.m_lockmode = NdbOperation::LM_CommittedRead;
CHK(pkread(par) == 0);
CHK(scanreadall(par) == 0);
return 0;
}
static int static int
readverifyfull(Par par) readverifyfull(Par par)
{ {
...@@ -4237,7 +4292,6 @@ readverifyfull(Par par) ...@@ -4237,7 +4292,6 @@ readverifyfull(Par par)
CHK(scanreadtable(par) == 0); CHK(scanreadtable(par) == 0);
// once more via tup scan // once more via tup scan
par.m_tupscan = true; par.m_tupscan = true;
if (NDB_VERSION < MAKE_VERSION(5, 1, 0)) //TODO
CHK(scanreadtable(par) == 0); CHK(scanreadtable(par) == 0);
} }
// each thread scans different indexes // each thread scans different indexes
...@@ -4278,7 +4332,7 @@ pkops(Par par) ...@@ -4278,7 +4332,7 @@ pkops(Par par)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
par.m_randomkey = true; par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) { for (unsigned i = 0; i < par.m_subloop; i++) {
unsigned j = 0; unsigned j = 0;
while (j < tab.m_itabs) { while (j < tab.m_itabs) {
if (tab.m_itab[j] != 0) { if (tab.m_itab[j] != 0) {
...@@ -4377,6 +4431,33 @@ mixedoperations(Par par) ...@@ -4377,6 +4431,33 @@ mixedoperations(Par par)
return 0; return 0;
} }
static int
parallelorderedupdate(Par par)
{
const Tab& tab = par.tab();
unsigned k = 0;
for (unsigned i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
if (itab.m_type != ITab::OrderedIndex)
continue;
// cannot sync threads yet except via subloop
if (k++ == par.m_slno % tab.m_orderedindexes) {
LL3("parallelorderedupdate: " << itab.m_name);
par.m_noindexkeyupdate = true;
par.m_ordered = true;
par.m_descending = (par.m_slno != 0);
par.m_verify = true;
BSet bset(tab, itab, par.m_rows); // empty bounds
// prefer empty bounds
unsigned sel = urandom(10);
CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
}
}
return 0;
}
static int static int
pkupdateindexbuild(Par par) pkupdateindexbuild(Par par)
{ {
...@@ -4579,7 +4660,7 @@ getthrno() ...@@ -4579,7 +4660,7 @@ getthrno()
static int static int
runstep(Par par, const char* fname, TFunc func, unsigned mode) runstep(Par par, const char* fname, TFunc func, unsigned mode)
{ {
LL2(fname); LL2("step: " << fname);
const int threads = (mode & ST ? 1 : par.m_threads); const int threads = (mode & ST ? 1 : par.m_threads);
int n; int n;
for (n = 0; n < threads; n++) { for (n = 0; n < threads; n++) {
...@@ -4605,7 +4686,12 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -4605,7 +4686,12 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
return 0; return 0;
} }
#define RUNSTEP(par, func, mode) CHK(runstep(par, #func, func, mode) == 0) #define RUNSTEP(par, func, mode) \
CHK(runstep(par, #func, func, mode) == 0)
#define SUBLOOP(par) \
"subloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
par.m_tab->m_name << "/" << par.m_slno
static int static int
tbuild(Par par) tbuild(Par par)
...@@ -4614,21 +4700,31 @@ tbuild(Par par) ...@@ -4614,21 +4700,31 @@ tbuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (par.m_slno % 2 == 0) { LL1(SUBLOOP(par));
if (par.m_slno % 3 == 0) {
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, pkupdate, MT);
} else if (par.m_slno % 3 == 1) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkupdate, MT);
} else { } else {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
} }
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, readverifyfull, MT);
// leave last one
if (par.m_slno + 1 < par.m_subloop) {
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
}
return 0; return 0;
} }
...@@ -4643,7 +4739,7 @@ tindexscan(Par par) ...@@ -4643,7 +4739,7 @@ tindexscan(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL4("subloop " << par.m_slno); LL1(SUBLOOP(par));
RUNSTEP(par, readverifyindex, MT); RUNSTEP(par, readverifyindex, MT);
} }
return 0; return 0;
...@@ -4659,6 +4755,7 @@ tpkops(Par par) ...@@ -4659,6 +4755,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkops, MT); RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count()); LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT); RUNSTEP(par, readverifyfull, MT);
...@@ -4675,13 +4772,14 @@ tpkopsread(Par par) ...@@ -4675,13 +4772,14 @@ tpkopsread(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
} }
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
return 0; return 0;
} }
...@@ -4694,10 +4792,11 @@ tmixedops(Par par) ...@@ -4694,10 +4792,11 @@ tmixedops(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
} }
return 0; return 0;
} }
...@@ -4710,9 +4809,10 @@ tbusybuild(Par par) ...@@ -4710,9 +4809,10 @@ tbusybuild(Par par)
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -4728,10 +4828,29 @@ trollback(Par par) ...@@ -4728,10 +4828,29 @@ trollback(Par par)
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tparupdate(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, parallelorderedupdate, MT);
RUNSTEP(par, readverifyfull, MT);
} }
return 0; return 0;
} }
...@@ -4744,6 +4863,7 @@ ttimebuild(Par par) ...@@ -4744,6 +4863,7 @@ ttimebuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
...@@ -4763,6 +4883,7 @@ ttimemaint(Par par) ...@@ -4763,6 +4883,7 @@ ttimemaint(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
...@@ -4792,6 +4913,7 @@ ttimescan(Par par) ...@@ -4792,6 +4913,7 @@ ttimescan(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -4818,6 +4940,7 @@ ttimepkread(Par par) ...@@ -4818,6 +4940,7 @@ ttimepkread(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -4859,6 +4982,7 @@ tcaselist[] = { ...@@ -4859,6 +4982,7 @@ tcaselist[] = {
TCase("e", tmixedops, "pk operations and scan operations"), TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"), TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"), TCase("g", trollback, "operations with random rollbacks"),
TCase("h", tparupdate, "parallel ordered update (bug20446)"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
...@@ -4916,17 +5040,16 @@ printtables() ...@@ -4916,17 +5040,16 @@ printtables()
static int static int
runtest(Par par) runtest(Par par)
{ {
LL1("start");
if (par.m_seed == -1) { if (par.m_seed == -1) {
// good enough for daily run // good enough for daily run
unsigned short seed = (getpid() ^ time(0)); unsigned short seed = (unsigned short)getpid();
LL1("random seed: " << seed); LL0("random seed: " << seed);
srandom((unsigned)seed); srandom((unsigned)seed);
} else if (par.m_seed != 0) { } else if (par.m_seed != 0) {
LL1("random seed: " << par.m_seed); LL0("random seed: " << par.m_seed);
srandom(par.m_seed); srandom(par.m_seed);
} else { } else {
LL1("random seed: loop number"); LL0("random seed: loop number");
} }
// cs // cs
assert(par.m_csname != 0); assert(par.m_csname != 0);
...@@ -4951,22 +5074,25 @@ runtest(Par par) ...@@ -4951,22 +5074,25 @@ runtest(Par par)
assert(thr.m_thread != 0); assert(thr.m_thread != 0);
} }
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) { for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << par.m_lno); LL1("loop: " << par.m_lno);
if (par.m_seed == 0) if (par.m_seed == 0) {
LL1("random seed: " << par.m_lno);
srandom(par.m_lno); srandom(par.m_lno);
}
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
continue; continue;
sprintf(par.m_currcase, "%c", tcase.m_name[0]);
makebuiltintables(par); makebuiltintables(par);
LL1("case " << tcase.m_name << " - " << tcase.m_desc); LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
for (unsigned j = 0; j < tabcount; j++) { for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0) if (tablist[j] == 0)
continue; continue;
const Tab& tab = *tablist[j]; const Tab& tab = *tablist[j];
par.m_tab = &tab; par.m_tab = &tab;
par.m_set = new Set(tab, par.m_totrows); par.m_set = new Set(tab, par.m_totrows);
LL1("table " << tab.m_name); LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
CHK(tcase.m_func(par) == 0); CHK(tcase.m_func(par) == 0);
delete par.m_set; delete par.m_set;
par.m_set = 0; par.m_set = 0;
...@@ -4985,14 +5111,20 @@ runtest(Par par) ...@@ -4985,14 +5111,20 @@ runtest(Par par)
delete [] g_thrlist; delete [] g_thrlist;
g_thrlist = 0; g_thrlist = 0;
con.disconnect(); con.disconnect();
LL1("done");
return 0; return 0;
} }
NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) static const char* g_progname = "testOIBasic";
int
main(int argc, char** argv)
{ {
ndb_init(); ndb_init();
if (ndbout_mutex == NULL) unsigned i;
ndbout << g_progname;
for (i = 1; i < argc; i++)
ndbout << " " << argv[i];
ndbout << endl;
ndbout_mutex = NdbMutex_Create(); ndbout_mutex = NdbMutex_Create();
while (++argv, --argc > 0) { while (++argv, --argc > 0) {
const char* arg = argv[0]; const char* arg = argv[0];
...@@ -5103,6 +5235,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) ...@@ -5103,6 +5235,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue; continue;
} }
} }
if (strcmp(arg, "-scanbatch") == 0) {
if (++argv, --argc > 0) {
g_opt.m_scanbatch = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-scanpar") == 0) { if (strcmp(arg, "-scanpar") == 0) {
if (++argv, --argc > 0) { if (++argv, --argc > 0) {
g_opt.m_scanpar = atoi(argv[0]); g_opt.m_scanpar = atoi(argv[0]);
......
...@@ -607,7 +607,15 @@ args: ...@@ -607,7 +607,15 @@ args:
max-time: 5000 max-time: 5000
cmd: testOIBasic cmd: testOIBasic
args: args: -case abcdefz
max-time: 2000
cmd: testOIBasic
args: -case gz
max-time: 2000
cmd: testOIBasic
args: -case hz
max-time: 2500 max-time: 2500
cmd: testBitfield cmd: testBitfield
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment