Commit dfad106c authored by unknown's avatar unknown

Merge mysql.com:/home/jonas/src/mysql-4.1-ndb

into mysql.com:/home/jonas/src/wl2025


ndb/src/ndbapi/NdbOperationInt.cpp:
  Auto merged
ndb/src/ndbapi/NdbScanOperation.cpp:
  Auto merged
parents 629768d5 042d6209
......@@ -40,8 +40,9 @@ public:
* Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words.
* There are 2 special return values to check first. All values fit
* into a signed char.
* If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value.
*/
typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
......@@ -49,8 +50,7 @@ public:
CmpLess = -1,
CmpEqual = 0,
CmpGreater = 1,
CmpUnknown = 126, // insufficient partial data
CmpError = 127 // bad data format or unimplemented comparison
CmpUnknown = 2 // insufficient partial data
};
/**
......@@ -82,19 +82,13 @@ public:
Text // Text blob
};
Enum m_typeId;
Cmp* m_cmp; // set to NULL if cmp not implemented
Cmp* m_cmp; // comparison method
};
/**
* Get type by id. Can return the Undefined type.
*/
static const Type& type(Uint32 typeId);
/**
* Inline comparison method. Most or all real methods Type::m_cmp are
* implemented via this (trusting dead code elimination).
*/
static int cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
static const Type& getType(Uint32 typeId);
private:
/**
......@@ -127,323 +121,4 @@ private:
static Cmp cmpText;
};
inline int
NdbSqlUtil::cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
// XXX require size >= 1
if (size > full)
return CmpError;
switch ((Type::Enum)typeId) {
case Type::Undefined:
break;
case Type::Tinyint:
{
if (size >= 1) {
union { Uint32 p[1]; Int8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Tinyunsigned:
{
if (size >= 1) {
union { Uint32 p[1]; Uint8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Smallint:
{
if (size >= 1) {
union { Uint32 p[1]; Int16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Smallunsigned:
{
if (size >= 1) {
union { Uint32 p[1]; Uint16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Mediumint:
{
if (size >= 1) {
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Int32 v1 = sint3korr(u1.v);
Int32 v2 = sint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Mediumunsigned:
{
if (size >= 1) {
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Uint32 v1 = uint3korr(u1.v);
Uint32 v2 = uint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Int:
{
if (size >= 1) {
union { Uint32 p[1]; Int32 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Unsigned:
{
if (size >= 1) {
union { Uint32 p[1]; Uint32 v; } u1, u2;
u1.v = p1[0];
u2.v = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Bigint:
{
if (size >= 2) {
union { Uint32 p[2]; Int64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Bigunsigned:
{
if (size >= 2) {
union { Uint32 p[2]; Uint64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Float:
{
if (size >= 1) {
union { Uint32 p[1]; float v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Double:
{
if (size >= 2) {
union { Uint32 p[2]; double v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
case Type::Decimal:
// XXX not used by MySQL or NDB
break;
case Type::Char:
{
/*
* Char is blank-padded to length and null-padded to word size.
* There is no terminator so we must compare the full values.
*/
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
int k = memcmp(u1.v, u2.v, size << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
case Type::Varchar:
{
/*
* Varchar is not allowed to contain a null byte and the stored
* value is null-padded. Therefore comparison does not need to
* use the length.
*/
if (size >= 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// length in first 2 bytes
int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
case Type::Binary:
{
// compare byte wise
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
int k = memcmp(u1.v, u2.v, size << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
case Type::Varbinary:
{
// assume correctly padded and compare byte wise
if (size >= 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// length in first 2 bytes
int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
case Type::Datetime:
{
/*
* Datetime is CC YY MM DD hh mm ss \0
*/
if (size >= 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
}
return CmpUnknown;
}
case Type::Timespec:
{
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
*/
if (size >= 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
if (k != 0)
return k < 0 ? -1 : +1;
Uint32 n1 = *(const Uint32*)(u1.v + 8);
Uint32 n2 = *(const Uint32*)(u2.v + 8);
if (n1 < n2)
return -1;
if (n2 > n1)
return +1;
return 0;
}
}
return CmpUnknown;
}
case Type::Blob:
{
// skip blob head, the rest is binary
const unsigned skip = NDB_BLOB_HEAD_SIZE;
if (size >= skip + 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1 + skip;
u2.p = p2 + skip;
int k = memcmp(u1.v, u2.v, (size - 1) << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
case Type::Text:
{
// skip blob head, the rest is char
const unsigned skip = NDB_BLOB_HEAD_SIZE;
if (size >= skip + 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1 + skip;
u2.p = p2 + skip;
int k = memcmp(u1.v, u2.v, (size - 1) << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
}
return CmpError;
}
#endif
......@@ -167,7 +167,7 @@ NdbSqlUtil::m_typeList[] = {
};
const NdbSqlUtil::Type&
NdbSqlUtil::type(Uint32 typeId)
NdbSqlUtil::getType(Uint32 typeId)
{
if (typeId < sizeof(m_typeList) / sizeof(m_typeList[0]) &&
m_typeList[typeId].m_typeId != Type::Undefined) {
......@@ -181,127 +181,352 @@ NdbSqlUtil::type(Uint32 typeId)
int
NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Tinyint, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Int8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Tinyunsigned, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Smallint, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Int16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Smallunsigned, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Mediumint, p1, p2, full, size);
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Int32 v1 = sint3korr(u1.v);
Int32 v2 = sint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
int
NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Mediumunsigned, p1, p2, full, size);
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Uint32 v1 = uint3korr(u1.v);
Uint32 v2 = uint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
int
NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Int, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Int32 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Unsigned, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint32 v; } u1, u2;
u1.v = p1[0];
u2.v = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Bigint, p1, p2, full, size);
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; Int64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Bigunsigned, p1, p2, full, size);
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; Uint64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Float, p1, p2, full, size);
assert(full >= size && size > 0);
union { Uint32 p[1]; float v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
// no format check
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Double, p1, p2, full, size);
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; double v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
// no format check
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Decimal, p1, p2, full, size);
assert(full >= size && size > 0);
// not used by MySQL or NDB
assert(false);
return 0;
}
int
NdbSqlUtil::cmpChar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Char, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Char is blank-padded to length and null-padded to word size. There
* is no terminator so we compare the full values.
*/
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
int k = memcmp(u1.v, u2.v, size << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Varchar, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Varchar is not allowed to contain a null byte and the value is
* null-padded. Therefore comparison does not need to use the length.
*/
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip length in first 2 bytes
int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Binary, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Binary data of full length. Compare bytewise.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
int k = memcmp(u1.v, u2.v, size << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Varbinary, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Binary data of variable length padded with nulls. The comparison
* does not need to use the length.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip length in first 2 bytes
int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Datetime, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Datetime is CC YY MM DD hh mm ss \0
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// no format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Timespec, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// no format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 3) {
Uint32 n1 = *(const Uint32*)(u1.v + 8);
Uint32 n2 = *(const Uint32*)(u2.v + 8);
if (n1 < n2)
return -1;
if (n2 > n1)
return +1;
return 0;
}
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Blob, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Blob comparison is on the inline bytes. Except for larger header
* the format is like Varbinary.
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
if (size >= head + 1) {
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1 + head;
u2.p = p2 + head;
int k = memcmp(u1.v, u2.v, (size - head) << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
int
NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Text, p1, p2, full, size);
assert(full >= size && size > 0);
/*
* Text comparison is on the inline bytes. Except for larger header
* the format is like Varchar.
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
if (size >= head + 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1 + head;
u2.p = p2 + head;
int k = memcmp(u1.v, u2.v, (size - head) << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
}
#ifdef NDB_SQL_UTIL_TEST
......
......@@ -35,7 +35,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
searchKey += start;
int ret = 0;
while (start < numAttrs) {
if (len2 < AttributeHeaderSize) {
if (len2 <= AttributeHeaderSize) {
jam();
ret = NdbSqlUtil::CmpUnknown;
break;
......@@ -46,7 +46,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId;
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
......@@ -55,7 +56,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
// compare
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
ret = (*type.m_cmp)(p1, p2, size1, size2);
if (ret != 0) {
jam();
break;
......@@ -78,8 +79,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++;
}
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
return ret;
}
......@@ -103,13 +102,14 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId;
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = *entryKey;
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
break;
......@@ -132,8 +132,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl
entryKey += 1;
start++;
}
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
return ret;
}
......@@ -172,7 +170,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
*/
unsigned type = 4;
while (boundCount != 0) {
if (len2 < AttributeHeaderSize) {
if (len2 <= AttributeHeaderSize) {
jam();
return NdbSqlUtil::CmpUnknown;
}
......@@ -186,7 +184,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
const unsigned size1 = boundInfo.ah().getDataSize();
......@@ -196,9 +195,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
int ret = (*type.m_cmp)(p1, p2, size1, size2);
if (ret != 0) {
jam();
return ret;
......@@ -269,15 +266,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey;
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
int ret = (*type.m_cmp)(p1, p2, size1, size1);
if (ret != 0) {
jam();
return ret;
......
......@@ -184,7 +184,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
}
#endif
// check if type is valid and has a comparison method
const NdbSqlUtil::Type& type = NdbSqlUtil::type(descAttr.m_typeId);
const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
type.m_cmp == 0) {
jam();
......
......@@ -28,6 +28,8 @@ d
shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown)
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
......@@ -75,11 +77,19 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
[ what happened to PK read performance? ]
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
[ case d: what happened to PK read performance? ]
optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/b 46 ms 81 ms 73 pct
mc02/c 9 ms 13 ms 37 pct
mc02/d 242 ms 285 ms 17 pct
[ case b: do long keys suffer from many subroutine calls? ]
vim: set et:
......@@ -732,6 +732,9 @@ int
NdbOperation::read_attr(const NdbColumnImpl* anAttrObject, Uint32 RegDest)
{
INT_DEBUG(("read_attr %d %u", anAttrObject->m_attrId, RegDest));
if (initial_interpreterCheck() == -1)
return -1;
int tAttrId = read_attrCheck(anAttrObject);
if (tAttrId == -1)
goto read_attr_error1;
......
......@@ -1220,10 +1220,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType;
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
if(!r1_null){
char r = NdbSqlUtil::cmp(type, d1, d2, size, size);
const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type);
int r = (*t.m_cmp)(d1, d2, size, size);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
assert(r != NdbSqlUtil::CmpError);
return r;
}
}
......
......@@ -30,7 +30,7 @@ testSystemRestart \
testTimeout \
testTransactions \
testDeadlock \
test_event
test_event ndbapi_slow_select
#flexTimedAsynch
#testBlobs
......@@ -66,6 +66,7 @@ testTimeout_SOURCES = testTimeout.cpp
testTransactions_SOURCES = testTransactions.cpp
testDeadlock_SOURCES = testDeadlock.cpp
test_event_SOURCES = test_event.cpp
ndbapi_slow_select_SOURCES = slow_select.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
......
#include <NdbApi.hpp>
#include <NdbOut.hpp>
#include <NdbTick.h>
struct
S_Scan {
const char * m_table;
const char * m_index;
NdbIndexScanOperation * m_scan;
NdbResultSet * m_result;
Uint32 metaid;
Uint32 match_count;
Uint32 row_count;
};
static S_Scan g_scans[] = {
{ "affiliatestometa", "ind_affiliatestometa", 0, 0, 0, 0, 0 },
{ "media", "metaid", 0, 0, 0, 0, 0 },
{ "meta", "PRIMARY", 0, 0, 0, 0, 0 },
{ "artiststometamap", "PRIMARY", 0, 0, 0, 0, 0 },
{ "subgenrestometamap", "metaid", 0, 0, 0, 0, 0 }
};
#define require(x) if(!(x)) { ndbout << "LINE: " << __LINE__ << endl;abort(); }
#define require2(o, x) if(!(x)) { ndbout << o->getNdbError() << endl; abort(); }
Uint32 g_affiliateid = 2;
Uint32 g_formatids[] = { 8, 31, 76 };
Uint64 start;
Uint32 g_artistid = 0;
Uint32 g_subgenreid = 0;
NdbConnection* g_trans = 0;
static void lookup();
int
main(void){
Ndb g_ndb("test");
g_ndb.init(1024);
require(g_ndb.waitUntilReady() == 0);
g_trans = g_ndb.startTransaction();
require(g_trans);
size_t i, j;
const size_t cnt = sizeof(g_scans)/sizeof(g_scans[0]);
start = NdbTick_CurrentMillisecond();
for(i = 0; i<cnt; i++){
ndbout_c("starting scan on: %s %s",
g_scans[i].m_table, g_scans[i].m_index);
g_scans[i].m_scan = g_trans->getNdbIndexScanOperation(g_scans[i].m_index,
g_scans[i].m_table);
NdbIndexScanOperation* scan = g_scans[i].m_scan;
require(scan);
g_scans[i].m_result = scan->readTuples(NdbScanOperation::LM_CommittedRead,
0, 0, true);
require(g_scans[i].m_result);
}
require(!g_scans[0].m_scan->setBound((Uint32)0,
NdbIndexScanOperation::BoundEQ,
&g_affiliateid,
sizeof(g_affiliateid)));
#if 0
require(!g_scans[1].m_scan->setBound((Uint32)0,
NdbIndexScanOperation::BoundLE,
&g_formatids[0],
sizeof(g_formatids[0])));
#endif
NdbScanFilter sf(g_scans[1].m_scan);
sf.begin(NdbScanFilter::OR);
sf.eq(2, g_formatids[0]);
sf.eq(2, g_formatids[1]);
sf.eq(2, g_formatids[2]);
sf.end();
// affiliatestometa
require(g_scans[0].m_scan->getValue("uniquekey"));
require(g_scans[0].m_scan->getValue("xml"));
// media
require(g_scans[1].m_scan->getValue("path"));
require(g_scans[1].m_scan->getValue("mediaid"));
require(g_scans[1].m_scan->getValue("formatid"));
// meta
require(g_scans[2].m_scan->getValue("name"));
require(g_scans[2].m_scan->getValue("xml"));
// artiststometamap
require(g_scans[3].m_scan->getValue("artistid", (char*)&g_artistid));
// subgenrestometamap
require(g_scans[4].m_scan->getValue("subgenreid", (char*)&g_subgenreid));
for(i = 0; i<cnt; i++){
g_scans[i].m_scan->getValue("metaid", (char*)&g_scans[i].metaid);
}
g_trans->execute(NoCommit, AbortOnError, 1);
Uint32 max_val = 0;
Uint32 match_val = 0;
S_Scan * F [5], * Q [5], * nextF [5];
Uint32 F_sz = 0, Q_sz = 0;
for(i = 0; i<cnt; i++){
F_sz++;
F[i] = &g_scans[i];
}
Uint32 match_count = 0;
while(F_sz > 0){
Uint32 prev_F_sz = F_sz;
F_sz = 0;
bool found = false;
//for(i = 0; i<cnt; i++)
//ndbout_c("%s - %d", g_scans[i].m_table, g_scans[i].metaid);
for(i = 0; i<prev_F_sz; i++){
int res = F[i]->m_result->nextResult();
if(res == -1)
abort();
if(res == 1){
continue;
}
Uint32 metaid = F[i]->metaid;
F[i]->row_count++;
if(metaid == match_val){
//ndbout_c("flera");
nextF[F_sz++] = F[i];
require(F_sz >= 0 && F_sz <= cnt);
F[i]->match_count++;
Uint32 comb = 1;
for(j = 0; j<cnt; j++){
comb *= (&g_scans[j] == F[i] ? 1 : g_scans[j].match_count);
}
match_count += comb;
found = true;
continue;
}
if(metaid < max_val){
nextF[F_sz++] = F[i];
require(F_sz >= 0 && F_sz <= cnt);
continue;
}
if(metaid > max_val){
for(j = 0; j<Q_sz; j++)
nextF[F_sz++] = Q[j];
require(F_sz >= 0 && F_sz <= cnt);
Q_sz = 0;
max_val = metaid;
}
Q[Q_sz++] = F[i];
require(Q_sz >= 0 && Q_sz <= cnt);
}
if(F_sz == 0 && Q_sz > 0){
match_val = max_val;
for(j = 0; j<Q_sz; j++){
nextF[F_sz++] = Q[j];
Q[j]->match_count = 1;
}
require(F_sz >= 0 && F_sz <= cnt);
require(Q_sz >= 0 && Q_sz <= cnt);
Q_sz = 0;
match_count++;
lookup();
} else if(!found && F_sz + Q_sz < cnt){
F_sz = 0;
}
require(F_sz >= 0 && F_sz <= cnt);
for(i = 0; i<F_sz; i++)
F[i] = nextF[i];
}
start = NdbTick_CurrentMillisecond() - start;
ndbout_c("Elapsed: %lldms", start);
ndbout_c("rows: %d", match_count);
for(i = 0; i<cnt; i++){
ndbout_c("%s : %d", g_scans[i].m_table, g_scans[i].row_count);
}
}
static
void
lookup(){
{
NdbOperation* op = g_trans->getNdbOperation("artists");
require2(g_trans, op);
require2(op, op->readTuple() == 0);
require2(op, op->equal("artistid", g_artistid) == 0);
require2(op, op->getValue("name"));
}
{
NdbOperation* op = g_trans->getNdbOperation("subgenres");
require2(g_trans, op);
require2(op, op->readTuple() == 0);
require2(op, op->equal("subgenreid", g_subgenreid) == 0);
require2(op, op->getValue("name"));
}
static int loop = 0;
if(loop++ >= 16){
loop = 0;
require(g_trans->execute(NoCommit) == 0);
}
//require(g_trans->restart() == 0);
}
......@@ -34,6 +34,7 @@
struct Opt {
// common options
unsigned m_batch;
const char* m_bound;
const char* m_case;
bool m_core;
bool m_dups;
......@@ -43,6 +44,7 @@ struct Opt {
unsigned m_loop;
bool m_nologging;
bool m_msglock;
unsigned m_pctnull;
unsigned m_rows;
unsigned m_samples;
unsigned m_scanrd;
......@@ -54,6 +56,7 @@ struct Opt {
unsigned m_v;
Opt() :
m_batch(32),
m_bound("01234"),
m_case(0),
m_core(false),
m_dups(false),
......@@ -63,6 +66,7 @@ struct Opt {
m_loop(1),
m_nologging(false),
m_msglock(true),
m_pctnull(10),
m_rows(1000),
m_samples(0),
m_scanrd(240),
......@@ -87,6 +91,7 @@ printhelp()
ndbout
<< "usage: testOIbasic [options]" << endl
<< " -batch N pk operations in batch [" << d.m_batch << "]" << endl
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
......@@ -94,6 +99,7 @@ printhelp()
<< " -index xyz only given index numbers (digits 1-9)" << endl
<< " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl
<< " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
<< " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl
......@@ -198,7 +204,6 @@ struct Par : public Opt {
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_totrows;
// value calculation
unsigned m_pctnull;
unsigned m_range;
unsigned m_pctrange;
// do verify after read
......@@ -214,7 +219,6 @@ struct Par : public Opt {
m_set(0),
m_tmr(0),
m_totrows(m_threads * m_rows),
m_pctnull(10),
m_range(m_rows),
m_pctrange(0),
m_verify(false),
......@@ -1622,7 +1626,6 @@ Set::calc(Par par, unsigned i)
m_row[i] = new Row(tab);
Row& row = *m_row[i];
// value generation parameters
par.m_pctnull = 10;
par.m_pctrange = 40;
row.calc(par, i);
}
......@@ -1898,8 +1901,11 @@ BSet::calc(Par par)
BVal& bval = *new BVal(icol);
m_bval[m_bvals++] = &bval;
bval.m_null = false;
unsigned sel;
do {
// equality bound only on i==0
unsigned sel = urandom(5 - i);
sel = urandom(5 - i);
} while (strchr(par.m_bound, '0' + sel) == 0);
if (sel < 2)
bval.m_type = 0 | (1 << i);
else if (sel < 4)
......@@ -3207,6 +3213,15 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue;
}
}
if (strcmp(arg, "-bound") == 0) {
if (++argv, --argc > 0) {
const char* p = argv[0];
if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) {
g_opt.m_bound = strdup(p);
continue;
}
}
}
if (strcmp(arg, "-case") == 0) {
if (++argv, --argc > 0) {
g_opt.m_case = strdup(argv[0]);
......@@ -3257,6 +3272,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
g_opt.m_nologging = true;
continue;
}
if (strcmp(arg, "-pctnull") == 0) {
if (++argv, --argc > 0) {
g_opt.m_pctnull = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-rows") == 0) {
if (++argv, --argc > 0) {
g_opt.m_rows = atoi(argv[0]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment