Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
3d7649ca
Commit
3d7649ca
authored
Dec 20, 2004
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ndb: index tests
parent
6463a6d7
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
961 additions
and
265 deletions
+961
-265
mysql-test/ndb/ndb_range_bounds.pl
mysql-test/ndb/ndb_range_bounds.pl
+4
-2
mysql-test/r/ndb_index_ordered.result
mysql-test/r/ndb_index_ordered.result
+1
-0
mysql-test/t/ndb_index_ordered.test
mysql-test/t/ndb_index_ordered.test
+2
-0
ndb/test/ndbapi/testOIBasic.cpp
ndb/test/ndbapi/testOIBasic.cpp
+954
-263
No files found.
mysql-test/ndb/ndb_range_bounds.pl
View file @
3d7649ca
#
# test range scan bounds
# give option --all to test all cases
# set MYSQL_HOME to installation top
#
use
strict
;
...
...
@@ -14,8 +15,9 @@ my $opt_verbose = 0;
GetOptions
("
all
"
=>
\
$opt_all
,
"
cnt=i
"
=>
\
$opt_cnt
,
"
verbose
"
=>
\
$opt_verbose
)
or
die
"
options are: --all --cnt=N --verbose
";
my
$mysql_top
=
$ENV
{
MYSQL_TOP
};
my
$dsn
=
"
dbi:mysql:database=test;host=localhost;mysql_read_default_file=
$mysql_top
/.target/var/my.cnf
";
my
$mysql_home
=
$ENV
{
MYSQL_HOME
};
defined
(
$mysql_home
)
or
die
"
no MYSQL_HOME
";
my
$dsn
=
"
dbi:mysql:database=test;host=localhost;mysql_read_default_file=
$mysql_home
/var/my.cnf
";
my
$opts
=
{
RaiseError
=>
0
,
PrintError
=>
0
,
AutoCommit
=>
1
,
};
my
$dbh
;
...
...
mysql-test/r/ndb_index_ordered.result
View file @
3d7649ca
...
...
@@ -383,6 +383,7 @@ b c
select min(b), max(b) from t1;
min(b) max(b)
1 5000000
drop table t1;
CREATE TABLE test1 (
SubscrID int(11) NOT NULL auto_increment,
UsrID int(11) NOT NULL default '0',
...
...
mysql-test/t/ndb_index_ordered.test
View file @
3d7649ca
...
...
@@ -175,6 +175,8 @@ select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c;
select
b
,
c
from
t1
where
1000
<=
b
and
b
<=
100000
and
c
<
'j'
order
by
b
desc
,
c
desc
;
#
select
min
(
b
),
max
(
b
)
from
t1
;
#
drop
table
t1
;
#
# Bug #6435
...
...
ndb/test/ndbapi/testOIBasic.cpp
View file @
3d7649ca
...
...
@@ -72,7 +72,7 @@ struct Opt {
m_die
(
0
),
m_dups
(
false
),
m_fragtype
(
NdbDictionary
::
Object
::
FragUndefined
),
m_subsubloop
(
4
),
m_subsubloop
(
2
),
m_index
(
0
),
m_loop
(
1
),
m_msglock
(
true
),
...
...
@@ -257,6 +257,9 @@ struct Par : public Opt {
bool
m_verify
;
// deadlock possible
bool
m_deadlock
;
// ordered range scan
bool
m_ordered
;
bool
m_descending
;
// timer location
Par
(
const
Opt
&
opt
)
:
Opt
(
opt
),
...
...
@@ -274,7 +277,9 @@ struct Par : public Opt {
m_bdir
(
0
),
m_randomkey
(
false
),
m_verify
(
false
),
m_deadlock
(
false
)
{
m_deadlock
(
false
),
m_ordered
(
false
),
m_descending
(
false
)
{
}
};
...
...
@@ -444,6 +449,9 @@ struct Chs {
~
Chs
();
};
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Chs
&
chs
);
Chs
::
Chs
(
CHARSET_INFO
*
cs
)
:
m_cs
(
cs
)
{
...
...
@@ -455,10 +463,14 @@ Chs::Chs(CHARSET_INFO* cs) :
unsigned
i
=
0
;
unsigned
miss1
=
0
;
unsigned
miss2
=
0
;
unsigned
miss3
=
0
;
unsigned
miss4
=
0
;
while
(
i
<
maxcharcount
)
{
unsigned
char
*
bytes
=
m_chr
[
i
].
m_bytes
;
unsigned
char
*
xbytes
=
m_chr
[
i
].
m_xbytes
;
unsigned
size
=
m_cs
->
mbminlen
+
urandom
(
m_cs
->
mbmaxlen
-
m_cs
->
mbminlen
+
1
);
unsigned
&
size
=
m_chr
[
i
].
m_size
;
bool
ok
;
size
=
m_cs
->
mbminlen
+
urandom
(
m_cs
->
mbmaxlen
-
m_cs
->
mbminlen
+
1
);
assert
(
m_cs
->
mbminlen
<=
size
&&
size
<=
m_cs
->
mbmaxlen
);
// prefer longer chars
if
(
size
==
m_cs
->
mbminlen
&&
m_cs
->
mbminlen
<
m_cs
->
mbmaxlen
&&
urandom
(
5
)
!=
0
)
...
...
@@ -466,33 +478,57 @@ Chs::Chs(CHARSET_INFO* cs) :
for
(
unsigned
j
=
0
;
j
<
size
;
j
++
)
{
bytes
[
j
]
=
urandom
(
256
);
}
// check wellformed
const
char
*
sbytes
=
(
const
char
*
)
bytes
;
if
((
*
cs
->
cset
->
well_formed_len
)(
cs
,
sbytes
,
sbytes
+
size
,
1
)
!=
size
)
{
miss1
++
;
continue
;
}
// do not trust well_formed_len currently
// check no proper prefix wellformed
ok
=
true
;
for
(
unsigned
j
=
1
;
j
<
size
;
j
++
)
{
if
((
*
cs
->
cset
->
well_formed_len
)(
cs
,
sbytes
,
sbytes
+
j
,
1
)
==
j
)
{
ok
=
false
;
break
;
}
}
if
(
!
ok
)
{
miss2
++
;
continue
;
}
// normalize
memset
(
xbytes
,
0
,
sizeof
(
xbytes
));
// currently returns buffer size always
int
xlen
=
(
*
cs
->
coll
->
strnxfrm
)(
cs
,
xbytes
,
m_xmul
*
size
,
bytes
,
size
);
// check we got something
bool
x
ok
=
false
;
ok
=
false
;
for
(
unsigned
j
=
0
;
j
<
xlen
;
j
++
)
{
if
(
xbytes
[
j
]
!=
0
)
{
x
ok
=
true
;
ok
=
true
;
break
;
}
}
if
(
!
xok
)
{
miss2
++
;
if
(
!
ok
)
{
miss3
++
;
continue
;
}
// check for duplicate (before normalize)
ok
=
true
;
for
(
unsigned
j
=
0
;
j
<
i
;
j
++
)
{
const
Chr
&
chr
=
m_chr
[
j
];
if
(
chr
.
m_size
==
size
&&
memcmp
(
chr
.
m_bytes
,
bytes
,
size
)
==
0
)
{
ok
=
false
;
break
;
}
}
if
(
!
ok
)
{
miss4
++
;
continue
;
}
// occasional duplicate char is ok
m_chr
[
i
].
m_size
=
size
;
i
++
;
}
bool
disorder
=
true
;
unsigned
bubb
el
s
=
0
;
unsigned
bubb
le
s
=
0
;
while
(
disorder
)
{
disorder
=
false
;
for
(
unsigned
i
=
1
;
i
<
maxcharcount
;
i
++
)
{
...
...
@@ -502,11 +538,11 @@ Chs::Chs(CHARSET_INFO* cs) :
m_chr
[
i
]
=
m_chr
[
i
-
1
];
m_chr
[
i
-
1
]
=
chr
;
disorder
=
true
;
bubb
el
s
++
;
bubb
le
s
++
;
}
}
}
LL3
(
"inited charset "
<<
cs
->
name
<<
" miss1="
<<
miss1
<<
" miss2="
<<
miss2
<<
" bubbels="
<<
bubbel
s
);
LL3
(
"inited charset "
<<
*
this
<<
" miss="
<<
miss1
<<
","
<<
miss2
<<
","
<<
miss3
<<
","
<<
miss4
<<
" bubbles="
<<
bubble
s
);
}
Chs
::~
Chs
()
...
...
@@ -514,6 +550,14 @@ Chs::~Chs()
delete
[]
m_chr
;
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Chs
&
chs
)
{
CHARSET_INFO
*
cs
=
chs
.
m_cs
;
out
<<
cs
->
name
<<
"["
<<
cs
->
mbminlen
<<
"-"
<<
cs
->
mbmaxlen
<<
","
<<
chs
.
m_xmul
<<
"]"
;
return
out
;
}
static
Chs
*
cslist
[
maxcsnumber
];
static
void
...
...
@@ -552,22 +596,26 @@ getcs(Par par)
// Col - table column
struct
Col
{
enum
Type
{
Unsigned
=
NdbDictionary
::
Column
::
Unsigned
,
Char
=
NdbDictionary
::
Column
::
Char
};
const
class
Tab
&
m_tab
;
unsigned
m_num
;
const
char
*
m_name
;
bool
m_pk
;
NdbDictionary
::
Column
::
Type
m_type
;
Type
m_type
;
unsigned
m_length
;
unsigned
m_bytelength
;
bool
m_nullable
;
const
Chs
*
m_chs
;
Col
(
const
class
Tab
&
tab
,
unsigned
num
,
const
char
*
name
,
bool
pk
,
NdbDictionary
::
Column
::
Type
type
,
unsigned
length
,
bool
nullable
,
const
Chs
*
chs
);
Col
(
const
class
Tab
&
tab
,
unsigned
num
,
const
char
*
name
,
bool
pk
,
Type
type
,
unsigned
length
,
bool
nullable
,
const
Chs
*
chs
);
~
Col
();
bool
equal
(
const
Col
&
col2
)
const
;
void
verify
(
const
void
*
addr
)
const
;
};
Col
::
Col
(
const
class
Tab
&
tab
,
unsigned
num
,
const
char
*
name
,
bool
pk
,
NdbDictionary
::
Column
::
Type
type
,
unsigned
length
,
bool
nullable
,
const
Chs
*
chs
)
:
Col
::
Col
(
const
class
Tab
&
tab
,
unsigned
num
,
const
char
*
name
,
bool
pk
,
Type
type
,
unsigned
length
,
bool
nullable
,
const
Chs
*
chs
)
:
m_tab
(
tab
),
m_num
(
num
),
m_name
(
strcpy
(
new
char
[
strlen
(
name
)
+
1
],
name
)),
...
...
@@ -595,9 +643,9 @@ void
Col
::
verify
(
const
void
*
addr
)
const
{
switch
(
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
CHARSET_INFO
*
cs
=
m_chs
->
m_cs
;
const
char
*
src
=
(
const
char
*
)
addr
;
...
...
@@ -616,10 +664,10 @@ operator<<(NdbOut& out, const Col& col)
{
out
<<
"col["
<<
col
.
m_num
<<
"] "
<<
col
.
m_name
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
out
<<
" unsigned"
;
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
CHARSET_INFO
*
cs
=
col
.
m_chs
->
m_cs
;
out
<<
" char("
<<
col
.
m_length
<<
"*"
<<
cs
->
mbmaxlen
<<
";"
<<
cs
->
name
<<
")"
;
...
...
@@ -656,25 +704,41 @@ ICol::~ICol()
{
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
ICol
&
icol
)
{
out
<<
"icol["
<<
icol
.
m_num
<<
"] "
<<
icol
.
m_col
;
return
out
;
}
// ITab - index
struct
ITab
{
enum
Type
{
OrderedIndex
=
NdbDictionary
::
Index
::
OrderedIndex
,
UniqueHashIndex
=
NdbDictionary
::
Index
::
UniqueHashIndex
};
const
class
Tab
&
m_tab
;
const
char
*
m_name
;
Type
m_type
;
unsigned
m_icols
;
const
ICol
**
m_icol
;
ITab
(
const
class
Tab
&
tab
,
const
char
*
name
,
unsigned
icols
);
unsigned
m_colmask
;
ITab
(
const
class
Tab
&
tab
,
const
char
*
name
,
Type
type
,
unsigned
icols
);
~
ITab
();
void
icoladd
(
unsigned
k
,
const
ICol
*
icolptr
);
};
ITab
::
ITab
(
const
class
Tab
&
tab
,
const
char
*
name
,
unsigned
icols
)
:
ITab
::
ITab
(
const
class
Tab
&
tab
,
const
char
*
name
,
Type
type
,
unsigned
icols
)
:
m_tab
(
tab
),
m_name
(
strcpy
(
new
char
[
strlen
(
name
)
+
1
],
name
)),
m_type
(
type
),
m_icols
(
icols
),
m_icol
(
new
const
ICol
*
[
icols
+
1
])
m_icol
(
new
const
ICol
*
[
icols
+
1
]),
m_colmask
(
0
)
{
for
(
unsigned
i
=
0
;
i
<=
m_icols
;
i
++
)
m_icol
[
0
]
=
0
;
for
(
unsigned
k
=
0
;
k
<=
m_icols
;
k
++
)
m_icol
[
k
]
=
0
;
}
ITab
::~
ITab
()
...
...
@@ -685,13 +749,21 @@ ITab::~ITab()
delete
[]
m_icol
;
}
void
ITab
::
icoladd
(
unsigned
k
,
const
ICol
*
icolptr
)
{
assert
(
k
==
icolptr
->
m_num
&&
k
<
m_icols
&&
m_icol
[
k
]
==
0
);
m_icol
[
k
]
=
icolptr
;
m_colmask
|=
(
1
<<
icolptr
->
m_col
.
m_num
);
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
ITab
&
itab
)
{
out
<<
"itab "
<<
itab
.
m_name
<<
" icols="
<<
itab
.
m_icols
;
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
out
<<
endl
;
out
<<
"icol["
<<
k
<<
"] "
<<
itab
.
m_icol
[
k
]
->
m_
col
;
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
]
;
out
<<
endl
<<
i
col
;
}
return
out
;
}
...
...
@@ -706,6 +778,8 @@ struct Tab {
const
ITab
**
m_itab
;
// pk must contain an Unsigned column
unsigned
m_keycol
;
void
coladd
(
unsigned
k
,
Col
*
colptr
);
void
itabadd
(
unsigned
j
,
ITab
*
itab
);
Tab
(
const
char
*
name
,
unsigned
cols
,
unsigned
itabs
,
unsigned
keycol
);
~
Tab
();
};
...
...
@@ -718,10 +792,10 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_itab
(
new
const
ITab
*
[
itabs
+
1
]),
m_keycol
(
keycol
)
{
for
(
unsigned
i
=
0
;
i
<=
cols
;
i
++
)
m_col
[
i
]
=
0
;
for
(
unsigned
i
=
0
;
i
<=
itabs
;
i
++
)
m_itab
[
i
]
=
0
;
for
(
unsigned
k
=
0
;
k
<=
cols
;
k
++
)
m_col
[
k
]
=
0
;
for
(
unsigned
j
=
0
;
j
<=
itabs
;
j
++
)
m_itab
[
j
]
=
0
;
}
Tab
::~
Tab
()
...
...
@@ -735,19 +809,33 @@ Tab::~Tab()
delete
[]
m_itab
;
}
void
Tab
::
coladd
(
unsigned
k
,
Col
*
colptr
)
{
assert
(
k
==
colptr
->
m_num
&&
k
<
m_cols
&&
m_col
[
k
]
==
0
);
m_col
[
k
]
=
colptr
;
}
void
Tab
::
itabadd
(
unsigned
j
,
ITab
*
itabptr
)
{
assert
(
j
<
m_itabs
&&
m_itab
[
j
]
==
0
);
m_itab
[
j
]
=
itabptr
;
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Tab
&
tab
)
{
out
<<
"tab "
<<
tab
.
m_name
<<
" cols="
<<
tab
.
m_cols
;
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
out
<<
endl
;
out
<<
*
tab
.
m_col
[
k
]
;
const
Col
&
col
=
*
tab
.
m_col
[
k
]
;
out
<<
endl
<<
col
;
}
for
(
unsigned
i
=
0
;
i
<
tab
.
m_itabs
;
i
++
)
{
if
(
tab
.
m_itab
[
i
]
==
0
)
continue
;
out
<<
endl
;
out
<<
*
tab
.
m_itab
[
i
]
;
const
ITab
&
itab
=
*
tab
.
m_itab
[
i
]
;
out
<<
endl
<<
itab
;
}
return
out
;
}
...
...
@@ -774,7 +862,7 @@ verifytables()
{
assert
(
t
->
m_keycol
<
t
->
m_cols
);
const
Col
*
c
=
t
->
m_col
[
t
->
m_keycol
];
assert
(
c
->
m_pk
&&
c
->
m_type
==
NdbDictionary
::
Column
::
Unsigned
);
assert
(
c
->
m_pk
&&
c
->
m_type
==
Col
::
Unsigned
);
}
assert
(
t
->
m_itabs
!=
0
&&
t
->
m_itab
!=
0
);
for
(
unsigned
i
=
0
;
i
<
t
->
m_itabs
;
i
++
)
{
...
...
@@ -785,6 +873,9 @@ verifytables()
for
(
unsigned
k
=
0
;
k
<
x
->
m_icols
;
k
++
)
{
const
ICol
*
c
=
x
->
m_icol
[
k
];
assert
(
c
!=
0
&&
c
->
m_num
==
k
&&
c
->
m_col
.
m_num
<
t
->
m_cols
);
if
(
x
->
m_type
==
ITab
::
UniqueHashIndex
)
{
assert
(
!
c
->
m_col
.
m_nullable
);
}
}
}
assert
(
t
->
m_itab
[
t
->
m_itabs
]
==
0
);
...
...
@@ -810,127 +901,186 @@ makebuiltintables(Par par)
}
// ti0 - basic
if
(
usetable
(
par
,
0
))
{
const
Tab
*
t
=
new
Tab
(
"ti0"
,
5
,
5
,
0
);
Tab
*
t
=
new
Tab
(
"ti0"
,
5
,
7
,
0
);
// name - pk - type - length - nullable - cs
t
->
m_col
[
0
]
=
new
Col
(
*
t
,
0
,
"a"
,
1
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
0
,
0
);
t
->
m_col
[
1
]
=
new
Col
(
*
t
,
1
,
"b"
,
0
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
1
,
0
);
t
->
m_col
[
2
]
=
new
Col
(
*
t
,
2
,
"c"
,
0
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
1
,
0
);
t
->
m_col
[
3
]
=
new
Col
(
*
t
,
3
,
"d"
,
0
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
1
,
0
);
t
->
m_col
[
4
]
=
new
Col
(
*
t
,
4
,
"e"
,
0
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
1
,
0
);
t
->
coladd
(
0
,
new
Col
(
*
t
,
0
,
"a"
,
1
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
t
->
coladd
(
1
,
new
Col
(
*
t
,
1
,
"b"
,
0
,
Col
::
Unsigned
,
1
,
1
,
0
)
);
t
->
coladd
(
2
,
new
Col
(
*
t
,
2
,
"c"
,
0
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
t
->
coladd
(
3
,
new
Col
(
*
t
,
3
,
"d"
,
0
,
Col
::
Unsigned
,
1
,
1
,
0
)
);
t
->
coladd
(
4
,
new
Col
(
*
t
,
4
,
"e"
,
0
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
if
(
useindex
(
par
,
0
))
{
// a
const
ITab
*
x
=
t
->
m_itab
[
0
]
=
new
ITab
(
*
t
,
"ti0x0"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti0x0"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
t
->
itabadd
(
0
,
x
);
}
if
(
useindex
(
par
,
1
))
{
// b
const
ITab
*
x
=
t
->
m_itab
[
1
]
=
new
ITab
(
*
t
,
"ti0x1"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti0x1"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]));
t
->
itabadd
(
1
,
x
);
}
if
(
useindex
(
par
,
2
))
{
// b, c
const
ITab
*
x
=
t
->
m_itab
[
2
]
=
new
ITab
(
*
t
,
"ti0x2"
,
2
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti0x2"
,
ITab
::
OrderedIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
t
->
itabadd
(
2
,
x
);
}
if
(
useindex
(
par
,
3
))
{
// d, c, b
const
ITab
*
x
=
t
->
m_itab
[
3
]
=
new
ITab
(
*
t
,
"ti0x3"
,
3
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
3
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
2
]
=
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
1
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti0x3"
,
ITab
::
OrderedIndex
,
3
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
3
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
1
]));
t
->
itabadd
(
3
,
x
);
}
if
(
useindex
(
par
,
4
))
{
// b, e, c, d
const
ITab
*
x
=
t
->
m_itab
[
4
]
=
new
ITab
(
*
t
,
"ti0x4"
,
4
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
4
]);
x
->
m_icol
[
2
]
=
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
3
]
=
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
3
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti0x4"
,
ITab
::
OrderedIndex
,
4
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
4
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
3
,
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
3
]));
t
->
itabadd
(
4
,
x
);
}
if
(
useindex
(
par
,
5
))
{
// a, c
ITab
*
x
=
new
ITab
(
*
t
,
"ti0z5"
,
ITab
::
UniqueHashIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
t
->
itabadd
(
5
,
x
);
}
if
(
useindex
(
par
,
6
))
{
// a, e
ITab
*
x
=
new
ITab
(
*
t
,
"ti0z6"
,
ITab
::
UniqueHashIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
4
]));
t
->
itabadd
(
6
,
x
);
}
tablist
[
0
]
=
t
;
}
// ti1 - simple char fields
if
(
usetable
(
par
,
1
))
{
const
Tab
*
t
=
new
Tab
(
"ti1"
,
5
,
5
,
1
);
Tab
*
t
=
new
Tab
(
"ti1"
,
5
,
7
,
1
);
// name - pk - type - length - nullable - cs
t
->
m_col
[
0
]
=
new
Col
(
*
t
,
0
,
"a"
,
0
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
1
,
0
);
t
->
m_col
[
1
]
=
new
Col
(
*
t
,
1
,
"b"
,
1
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
0
,
0
);
t
->
m_col
[
2
]
=
new
Col
(
*
t
,
2
,
"c"
,
0
,
NdbDictionary
::
Column
::
Char
,
20
,
1
,
getcs
(
par
));
t
->
m_col
[
3
]
=
new
Col
(
*
t
,
3
,
"d"
,
0
,
NdbDictionary
::
Column
::
Char
,
5
,
1
,
getcs
(
par
));
t
->
m_col
[
4
]
=
new
Col
(
*
t
,
4
,
"e"
,
0
,
NdbDictionary
::
Column
::
Char
,
5
,
1
,
getcs
(
par
));
t
->
coladd
(
0
,
new
Col
(
*
t
,
0
,
"a"
,
0
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
t
->
coladd
(
1
,
new
Col
(
*
t
,
1
,
"b"
,
1
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
t
->
coladd
(
2
,
new
Col
(
*
t
,
2
,
"c"
,
0
,
Col
::
Char
,
20
,
1
,
getcs
(
par
)
));
t
->
coladd
(
3
,
new
Col
(
*
t
,
3
,
"d"
,
0
,
Col
::
Char
,
5
,
0
,
getcs
(
par
)
));
t
->
coladd
(
4
,
new
Col
(
*
t
,
4
,
"e"
,
0
,
Col
::
Char
,
5
,
1
,
getcs
(
par
)
));
if
(
useindex
(
par
,
0
))
{
// b
const
ITab
*
x
=
t
->
m_itab
[
0
]
=
new
ITab
(
*
t
,
"ti1x0"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]
);
ITab
*
x
=
new
ITab
(
*
t
,
"ti1x0"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
])
);
}
if
(
useindex
(
par
,
1
))
{
// a, c
const
ITab
*
x
=
t
->
m_itab
[
1
]
=
new
ITab
(
*
t
,
"ti1x1"
,
2
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti1x1"
,
ITab
::
OrderedIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
t
->
itabadd
(
1
,
x
);
}
if
(
useindex
(
par
,
2
))
{
// c, a
const
ITab
*
x
=
t
->
m_itab
[
2
]
=
new
ITab
(
*
t
,
"ti1x2"
,
2
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
0
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti1x2"
,
ITab
::
OrderedIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
0
]));
t
->
itabadd
(
2
,
x
);
}
if
(
useindex
(
par
,
3
))
{
// e
const
ITab
*
x
=
t
->
m_itab
[
3
]
=
new
ITab
(
*
t
,
"ti1x3"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti1x3"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]));
t
->
itabadd
(
3
,
x
);
}
if
(
useindex
(
par
,
4
))
{
// e, d, c, b
const
ITab
*
x
=
t
->
m_itab
[
4
]
=
new
ITab
(
*
t
,
"ti1x4"
,
4
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
3
]);
x
->
m_icol
[
2
]
=
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
3
]
=
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
1
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti1x4"
,
ITab
::
OrderedIndex
,
4
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
3
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
3
,
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
1
]));
t
->
itabadd
(
4
,
x
);
}
if
(
useindex
(
par
,
5
))
{
// a, b
ITab
*
x
=
new
ITab
(
*
t
,
"ti1z5"
,
ITab
::
UniqueHashIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
1
]));
t
->
itabadd
(
5
,
x
);
}
if
(
useindex
(
par
,
6
))
{
// a, b, d
ITab
*
x
=
new
ITab
(
*
t
,
"ti1z6"
,
ITab
::
UniqueHashIndex
,
3
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
1
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
3
]));
t
->
itabadd
(
6
,
x
);
}
tablist
[
1
]
=
t
;
}
// ti2 - complex char fields
if
(
usetable
(
par
,
2
))
{
const
Tab
*
t
=
new
Tab
(
"ti2"
,
5
,
5
,
2
);
Tab
*
t
=
new
Tab
(
"ti2"
,
5
,
7
,
2
);
// name - pk - type - length - nullable - cs
t
->
m_col
[
0
]
=
new
Col
(
*
t
,
0
,
"a"
,
1
,
NdbDictionary
::
Column
::
Char
,
101
,
0
,
getcs
(
par
));
t
->
m_col
[
1
]
=
new
Col
(
*
t
,
1
,
"b"
,
0
,
NdbDictionary
::
Column
::
Char
,
4
,
1
,
getcs
(
par
));
t
->
m_col
[
2
]
=
new
Col
(
*
t
,
2
,
"c"
,
1
,
NdbDictionary
::
Column
::
Unsigned
,
1
,
0
,
0
);
t
->
m_col
[
3
]
=
new
Col
(
*
t
,
3
,
"d"
,
1
,
NdbDictionary
::
Column
::
Char
,
3
,
0
,
getcs
(
par
));
t
->
m_col
[
4
]
=
new
Col
(
*
t
,
4
,
"e"
,
0
,
NdbDictionary
::
Column
::
Char
,
101
,
0
,
getcs
(
par
));
t
->
coladd
(
0
,
new
Col
(
*
t
,
0
,
"a"
,
1
,
Col
::
Char
,
31
,
0
,
getcs
(
par
)
));
t
->
coladd
(
1
,
new
Col
(
*
t
,
1
,
"b"
,
0
,
Col
::
Char
,
4
,
1
,
getcs
(
par
)
));
t
->
coladd
(
2
,
new
Col
(
*
t
,
2
,
"c"
,
1
,
Col
::
Unsigned
,
1
,
0
,
0
)
);
t
->
coladd
(
3
,
new
Col
(
*
t
,
3
,
"d"
,
1
,
Col
::
Char
,
3
,
0
,
getcs
(
par
)
));
t
->
coladd
(
4
,
new
Col
(
*
t
,
4
,
"e"
,
0
,
Col
::
Char
,
17
,
0
,
getcs
(
par
)
));
if
(
useindex
(
par
,
0
))
{
// a, c, d
const
ITab
*
x
=
t
->
m_itab
[
0
]
=
new
ITab
(
*
t
,
"ti2x0"
,
3
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
2
]
=
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
3
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti2x0"
,
ITab
::
OrderedIndex
,
3
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
3
]));
t
->
itabadd
(
0
,
x
);
}
if
(
useindex
(
par
,
1
))
{
// e, d, c, b, a
const
ITab
*
x
=
t
->
m_itab
[
1
]
=
new
ITab
(
*
t
,
"ti2x1"
,
5
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
3
]);
x
->
m_icol
[
2
]
=
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]);
x
->
m_icol
[
3
]
=
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
1
]);
x
->
m_icol
[
4
]
=
new
ICol
(
*
x
,
4
,
*
t
->
m_col
[
0
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti2x1"
,
ITab
::
OrderedIndex
,
5
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
4
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
3
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
3
,
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
1
]));
x
->
icoladd
(
4
,
new
ICol
(
*
x
,
4
,
*
t
->
m_col
[
0
]));
t
->
itabadd
(
1
,
x
);
}
if
(
useindex
(
par
,
2
))
{
// d
const
ITab
*
x
=
t
->
m_itab
[
2
]
=
new
ITab
(
*
t
,
"ti2x2"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
3
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti2x2"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
3
]));
t
->
itabadd
(
2
,
x
);
}
if
(
useindex
(
par
,
3
))
{
// b
const
ITab
*
x
=
t
->
m_itab
[
3
]
=
new
ITab
(
*
t
,
"ti2x3"
,
1
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti2x3"
,
ITab
::
OrderedIndex
,
1
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
1
]));
t
->
itabadd
(
3
,
x
);
}
if
(
useindex
(
par
,
4
))
{
// a, e
const
ITab
*
x
=
t
->
m_itab
[
4
]
=
new
ITab
(
*
t
,
"ti2x4"
,
2
);
x
->
m_icol
[
0
]
=
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]);
x
->
m_icol
[
1
]
=
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
4
]);
ITab
*
x
=
new
ITab
(
*
t
,
"ti2x4"
,
ITab
::
OrderedIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
4
]));
t
->
itabadd
(
4
,
x
);
}
if
(
useindex
(
par
,
5
))
{
// a, c
ITab
*
x
=
new
ITab
(
*
t
,
"ti2z5"
,
ITab
::
UniqueHashIndex
,
2
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
t
->
itabadd
(
5
,
x
);
}
if
(
useindex
(
par
,
6
))
{
// a, c, d, e
ITab
*
x
=
new
ITab
(
*
t
,
"ti2z6"
,
ITab
::
UniqueHashIndex
,
4
);
x
->
icoladd
(
0
,
new
ICol
(
*
x
,
0
,
*
t
->
m_col
[
0
]));
x
->
icoladd
(
1
,
new
ICol
(
*
x
,
1
,
*
t
->
m_col
[
2
]));
x
->
icoladd
(
2
,
new
ICol
(
*
x
,
2
,
*
t
->
m_col
[
3
]));
x
->
icoladd
(
3
,
new
ICol
(
*
x
,
3
,
*
t
->
m_col
[
4
]));
t
->
itabadd
(
6
,
x
);
}
tablist
[
2
]
=
t
;
}
...
...
@@ -944,6 +1094,7 @@ struct Con {
NdbDictionary
::
Dictionary
*
m_dic
;
NdbConnection
*
m_tx
;
NdbOperation
*
m_op
;
NdbIndexOperation
*
m_indexop
;
NdbScanOperation
*
m_scanop
;
NdbIndexScanOperation
*
m_indexscanop
;
enum
ScanMode
{
ScanNo
=
0
,
Committed
,
Latest
,
Exclusive
};
...
...
@@ -951,7 +1102,7 @@ struct Con {
enum
ErrType
{
ErrNone
=
0
,
ErrDeadlock
,
ErrOther
};
ErrType
m_errtype
;
Con
()
:
m_ndb
(
0
),
m_dic
(
0
),
m_tx
(
0
),
m_op
(
0
),
m_ndb
(
0
),
m_dic
(
0
),
m_tx
(
0
),
m_op
(
0
),
m_indexop
(
0
),
m_scanop
(
0
),
m_indexscanop
(
0
),
m_scanmode
(
ScanNo
),
m_errtype
(
ErrNone
)
{}
~
Con
()
{
if
(
m_tx
!=
0
)
...
...
@@ -962,6 +1113,7 @@ struct Con {
void
disconnect
();
int
startTransaction
();
int
getNdbOperation
(
const
Tab
&
tab
);
int
getNdbIndexOperation
(
const
ITab
&
itab
,
const
Tab
&
tab
);
int
getNdbScanOperation
(
const
Tab
&
tab
);
int
getNdbScanOperation
(
const
ITab
&
itab
,
const
Tab
&
tab
);
int
equal
(
int
num
,
const
char
*
addr
);
...
...
@@ -972,6 +1124,8 @@ struct Con {
int
execute
(
ExecType
t
,
bool
&
deadlock
);
int
openScanRead
(
unsigned
scanbat
,
unsigned
scanpar
);
int
openScanExclusive
(
unsigned
scanbat
,
unsigned
scanpar
);
int
openScanOrdered
(
unsigned
scanbat
,
unsigned
scanpar
,
bool
descending
);
int
openScanOrderedExclusive
(
unsigned
scanbat
,
unsigned
scanpar
,
bool
descending
);
int
executeScan
();
int
nextScanResult
(
bool
fetchAllowed
);
int
nextScanResult
(
bool
fetchAllowed
,
bool
&
deadlock
);
...
...
@@ -1025,6 +1179,14 @@ Con::getNdbOperation(const Tab& tab)
return
0
;
}
int
Con
::
getNdbIndexOperation
(
const
ITab
&
itab
,
const
Tab
&
tab
)
{
assert
(
m_tx
!=
0
);
CHKCON
((
m_op
=
m_indexop
=
m_tx
->
getNdbIndexOperation
(
itab
.
m_name
,
tab
.
m_name
))
!=
0
,
*
this
);
return
0
;
}
int
Con
::
getNdbScanOperation
(
const
Tab
&
tab
)
{
...
...
@@ -1115,6 +1277,25 @@ Con::openScanExclusive(unsigned scanbat, unsigned scanpar)
return
0
;
}
int
Con
::
openScanOrdered
(
unsigned
scanbat
,
unsigned
scanpar
,
bool
descending
)
{
assert
(
m_tx
!=
0
&&
m_indexscanop
!=
0
);
NdbOperation
::
LockMode
lm
=
NdbOperation
::
LM_Read
;
CHKCON
(
m_indexscanop
->
readTuples
(
lm
,
scanbat
,
scanpar
,
true
,
descending
)
==
0
,
*
this
);
return
0
;
}
int
Con
::
openScanOrderedExclusive
(
unsigned
scanbat
,
unsigned
scanpar
,
bool
descending
)
{
assert
(
m_tx
!=
0
&&
m_indexscanop
!=
0
);
NdbOperation
::
LockMode
lm
=
NdbOperation
::
LM_Exclusive
;
CHKCON
(
m_indexscanop
->
readTuples
(
lm
,
scanbat
,
scanpar
,
true
,
descending
)
==
0
,
*
this
);
return
0
;
}
int
Con
::
executeScan
()
{
...
...
@@ -1202,6 +1383,7 @@ Con::printerror(NdbOut& out)
if
((
code
=
m_tx
->
getNdbError
().
code
)
!=
0
)
{
LL0
(
++
any
<<
" con: error "
<<
m_tx
->
getNdbError
());
die
+=
(
code
==
g_opt
.
m_die
);
// 631 is new, occurs only on 4 db nodes, needs to be checked out
if
(
code
==
266
||
code
==
274
||
code
==
296
||
code
==
297
||
code
==
499
||
code
==
631
)
m_errtype
=
ErrDeadlock
;
}
...
...
@@ -1290,7 +1472,7 @@ createtable(Par par)
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Col
&
col
=
*
tab
.
m_col
[
k
];
NdbDictionary
::
Column
c
(
col
.
m_name
);
c
.
setType
(
col
.
m_type
);
c
.
setType
(
(
NdbDictionary
::
Column
::
Type
)
col
.
m_type
);
c
.
setLength
(
col
.
m_bytelength
);
// NDB API uses length in bytes
c
.
setPrimaryKey
(
col
.
m_pk
);
c
.
setNullable
(
col
.
m_nullable
);
...
...
@@ -1343,8 +1525,10 @@ createindex(Par par, const ITab& itab)
LL4
(
itab
);
NdbDictionary
::
Index
x
(
itab
.
m_name
);
x
.
setTable
(
tab
.
m_name
);
x
.
setType
(
NdbDictionary
::
Index
::
OrderedIndex
);
x
.
setLogging
(
false
);
x
.
setType
((
NdbDictionary
::
Index
::
Type
)
itab
.
m_type
);
if
(
par
.
m_nologging
||
itab
.
m_type
==
ITab
::
OrderedIndex
)
{
x
.
setLogging
(
false
);
}
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
];
const
Col
&
col
=
icol
.
m_col
;
...
...
@@ -1385,6 +1569,8 @@ struct Val {
void
copy
(
const
void
*
addr
);
const
void
*
dataaddr
()
const
;
bool
m_null
;
int
equal
(
Par
par
)
const
;
int
equal
(
Par
par
,
const
ICol
&
icol
)
const
;
int
setval
(
Par
par
)
const
;
void
calc
(
Par
par
,
unsigned
i
);
void
calckey
(
Par
par
,
unsigned
i
);
...
...
@@ -1402,9 +1588,9 @@ Val::Val(const Col& col) :
m_col
(
col
)
{
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
m_char
=
new
unsigned
char
[
col
.
m_bytelength
];
break
;
default:
...
...
@@ -1417,9 +1603,9 @@ Val::~Val()
{
const
Col
&
col
=
m_col
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
delete
[]
m_char
;
break
;
default:
...
...
@@ -1446,10 +1632,10 @@ Val::copy(const void* addr)
{
const
Col
&
col
=
m_col
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
m_uint32
=
*
(
const
Uint32
*
)
addr
;
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
memcpy
(
m_char
,
addr
,
col
.
m_bytelength
);
break
;
default:
...
...
@@ -1464,9 +1650,9 @@ Val::dataaddr() const
{
const
Col
&
col
=
m_col
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
return
&
m_uint32
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
return
m_char
;
default:
break
;
...
...
@@ -1476,18 +1662,37 @@ Val::dataaddr() const
}
int
Val
::
setv
al
(
Par
par
)
const
Val
::
equ
al
(
Par
par
)
const
{
Con
&
con
=
par
.
con
();
const
Col
&
col
=
m_col
;
assert
(
col
.
m_pk
&&
!
m_null
);
const
char
*
addr
=
(
const
char
*
)
dataaddr
();
if
(
m_null
)
addr
=
0
;
LL5
(
"setval ["
<<
m_col
<<
"] "
<<
*
this
);
if
(
col
.
m_pk
)
CHK
(
con
.
equal
(
col
.
m_num
,
addr
)
==
0
);
else
CHK
(
con
.
setValue
(
col
.
m_num
,
addr
)
==
0
);
LL5
(
"equal ["
<<
col
<<
"] "
<<
*
this
);
CHK
(
con
.
equal
(
col
.
m_num
,
addr
)
==
0
);
return
0
;
}
int
Val
::
equal
(
Par
par
,
const
ICol
&
icol
)
const
{
Con
&
con
=
par
.
con
();
assert
(
!
m_null
);
const
char
*
addr
=
(
const
char
*
)
dataaddr
();
LL5
(
"equal ["
<<
icol
<<
"] "
<<
*
this
);
CHK
(
con
.
equal
(
icol
.
m_num
,
addr
)
==
0
);
return
0
;
}
int
Val
::
setval
(
Par
par
)
const
{
Con
&
con
=
par
.
con
();
const
Col
&
col
=
m_col
;
assert
(
!
col
.
m_pk
);
const
char
*
addr
=
!
m_null
?
(
const
char
*
)
dataaddr
()
:
0
;
LL5
(
"setval ["
<<
col
<<
"] "
<<
*
this
);
CHK
(
con
.
setValue
(
col
.
m_num
,
addr
)
==
0
);
return
0
;
}
...
...
@@ -1506,10 +1711,10 @@ Val::calckey(Par par, unsigned i)
const
Col
&
col
=
m_col
;
m_null
=
false
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
m_uint32
=
i
;
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
const
Chs
*
chs
=
col
.
m_chs
;
CHARSET_INFO
*
cs
=
chs
->
m_cs
;
...
...
@@ -1549,10 +1754,10 @@ Val::calcnokey(Par par)
}
unsigned
v
=
par
.
m_range
+
r
;
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
m_uint32
=
v
;
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
const
Chs
*
chs
=
col
.
m_chs
;
CHARSET_INFO
*
cs
=
chs
->
m_cs
;
...
...
@@ -1609,7 +1814,7 @@ Val::cmp(const Val& val2) const
col
.
verify
(
val2
.
dataaddr
());
// compare
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
{
if
(
m_uint32
<
val2
.
m_uint32
)
return
-
1
;
...
...
@@ -1618,7 +1823,7 @@ Val::cmp(const Val& val2) const
return
0
;
}
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
const
Chs
*
chs
=
col
.
m_chs
;
CHARSET_INFO
*
cs
=
chs
->
m_cs
;
...
...
@@ -1657,10 +1862,10 @@ operator<<(NdbOut& out, const Val& val)
return
out
;
}
switch
(
col
.
m_type
)
{
case
NdbDictionary
:
:
Column
::
Unsigned
:
case
Col
:
:
Unsigned
:
out
<<
val
.
m_uint32
;
break
;
case
NdbDictionary
:
:
Column
::
Char
:
case
Col
:
:
Char
:
{
char
buf
[
4
*
8000
];
char
*
p
=
buf
;
...
...
@@ -1697,19 +1902,25 @@ struct Row {
const
Tab
&
m_tab
;
Val
**
m_val
;
bool
m_exist
;
enum
Op
{
NoOp
=
0
,
ReadOp
,
InsOp
,
UpdOp
,
DelOp
};
enum
Op
{
NoOp
=
0
,
ReadOp
=
1
,
InsOp
=
2
,
UpdOp
=
4
,
DelOp
=
8
,
AnyOp
=
15
};
Op
m_pending
;
Row
*
m_dbrow
;
// copy of db row before update
Row
(
const
Tab
&
tab
);
~
Row
();
void
copy
(
const
Row
&
row2
);
void
calc
(
Par
par
,
unsigned
i
);
void
calc
(
Par
par
,
unsigned
i
,
unsigned
mask
=
0
);
const
Row
&
dbrow
()
const
;
int
verify
(
const
Row
&
row2
)
const
;
int
insrow
(
Par
par
);
int
updrow
(
Par
par
);
int
updrow
(
Par
par
,
const
ITab
&
itab
);
int
delrow
(
Par
par
);
int
delrow
(
Par
par
,
const
ITab
&
itab
);
int
selrow
(
Par
par
);
int
selrow
(
Par
par
,
const
ITab
&
itab
);
int
setrow
(
Par
par
);
int
cmp
(
const
Row
&
row2
)
const
;
int
cmp
(
const
Row
&
row2
,
const
ITab
&
itab
)
const
;
private:
Row
&
operator
=
(
const
Row
&
row2
);
};
...
...
@@ -1724,6 +1935,7 @@ Row::Row(const Tab& tab) :
}
m_exist
=
false
;
m_pending
=
NoOp
;
m_dbrow
=
0
;
}
Row
::~
Row
()
...
...
@@ -1733,6 +1945,7 @@ Row::~Row()
delete
m_val
[
k
];
}
delete
[]
m_val
;
delete
m_dbrow
;
}
void
...
...
@@ -1745,27 +1958,49 @@ Row::copy(const Row& row2)
const
Val
&
val2
=
*
row2
.
m_val
[
k
];
val
.
copy
(
val2
);
}
m_exist
=
row2
.
m_exist
;
m_pending
=
row2
.
m_pending
;
if
(
row2
.
m_dbrow
==
0
)
{
m_dbrow
=
0
;
}
else
{
assert
(
row2
.
m_dbrow
->
m_dbrow
==
0
);
if
(
m_dbrow
==
0
)
m_dbrow
=
new
Row
(
tab
);
m_dbrow
->
copy
(
*
row2
.
m_dbrow
);
}
}
void
Row
::
calc
(
Par
par
,
unsigned
i
)
Row
::
calc
(
Par
par
,
unsigned
i
,
unsigned
mask
)
{
const
Tab
&
tab
=
m_tab
;
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
Val
&
val
=
*
m_val
[
k
];
val
.
calc
(
par
,
i
);
if
(
!
(
mask
&
(
1
<<
k
)))
{
Val
&
val
=
*
m_val
[
k
];
val
.
calc
(
par
,
i
);
}
}
}
const
Row
&
Row
::
dbrow
()
const
{
if
(
m_dbrow
==
0
)
return
*
this
;
assert
(
m_pending
==
Row
::
UpdOp
||
m_pending
==
Row
::
DelOp
);
return
*
m_dbrow
;
}
int
Row
::
verify
(
const
Row
&
row2
)
const
{
const
Tab
&
tab
=
m_tab
;
assert
(
&
tab
==
&
row2
.
m_tab
&&
m_exist
&&
row2
.
m_exist
);
const
Row
&
row1
=
*
this
;
assert
(
&
row1
.
m_tab
==
&
row2
.
m_tab
&&
row1
.
m_exist
&&
row2
.
m_exist
);
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
const
Val
&
val
1
=
*
row1
.
m_val
[
k
];
const
Val
&
val2
=
*
row2
.
m_val
[
k
];
CHK
(
val
.
verify
(
val2
)
==
0
);
CHK
(
val
1
.
verify
(
val2
)
==
0
);
}
return
0
;
}
...
...
@@ -1780,7 +2015,15 @@ Row::insrow(Par par)
CHKCON
(
con
.
m_op
->
insertTuple
()
==
0
,
con
);
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
CHK
(
val
.
setval
(
par
)
==
0
);
const
Col
&
col
=
val
.
m_col
;
if
(
col
.
m_pk
)
CHK
(
val
.
equal
(
par
)
==
0
);
}
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
!
col
.
m_pk
)
CHK
(
val
.
setval
(
par
)
==
0
);
}
m_pending
=
InsOp
;
return
0
;
...
...
@@ -1794,19 +2037,43 @@ Row::updrow(Par par)
assert
(
m_exist
);
CHK
(
con
.
getNdbOperation
(
tab
)
==
0
);
CHKCON
(
con
.
m_op
->
updateTuple
()
==
0
,
con
);
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
col
.
m_pk
)
CHK
(
val
.
equal
(
par
)
==
0
);
}
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
!
col
.
m_pk
)
continue
;
CHK
(
val
.
setval
(
par
)
==
0
);
CHK
(
val
.
setval
(
par
)
==
0
);
}
m_pending
=
UpdOp
;
return
0
;
}
int
Row
::
updrow
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
m_tab
;
assert
(
itab
.
m_type
==
ITab
::
UniqueHashIndex
&&
&
itab
.
m_tab
==
&
tab
);
assert
(
m_exist
);
CHK
(
con
.
getNdbIndexOperation
(
itab
,
tab
)
==
0
);
CHKCON
(
con
.
m_op
->
updateTuple
()
==
0
,
con
);
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
];
const
Col
&
col
=
icol
.
m_col
;
unsigned
m
=
col
.
m_num
;
const
Val
&
val
=
*
m_val
[
m
];
CHK
(
val
.
equal
(
par
,
icol
)
==
0
);
}
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
col
.
m_pk
)
continue
;
CHK
(
val
.
setval
(
par
)
==
0
);
if
(
!
col
.
m_pk
)
CHK
(
val
.
setval
(
par
)
==
0
);
}
m_pending
=
UpdOp
;
return
0
;
...
...
@@ -1824,7 +2091,27 @@ Row::delrow(Par par)
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
col
.
m_pk
)
CHK
(
val
.
setval
(
par
)
==
0
);
CHK
(
val
.
equal
(
par
)
==
0
);
}
m_pending
=
DelOp
;
return
0
;
}
int
Row
::
delrow
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
m_tab
;
assert
(
itab
.
m_type
==
ITab
::
UniqueHashIndex
&&
&
itab
.
m_tab
==
&
tab
);
assert
(
m_exist
);
CHK
(
con
.
getNdbIndexOperation
(
itab
,
tab
)
==
0
);
CHKCON
(
con
.
m_op
->
deleteTuple
()
==
0
,
con
);
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
];
const
Col
&
col
=
icol
.
m_col
;
unsigned
m
=
col
.
m_num
;
const
Val
&
val
=
*
m_val
[
m
];
CHK
(
val
.
equal
(
par
,
icol
)
==
0
);
}
m_pending
=
DelOp
;
return
0
;
...
...
@@ -1841,7 +2128,25 @@ Row::selrow(Par par)
const
Val
&
val
=
*
m_val
[
k
];
const
Col
&
col
=
val
.
m_col
;
if
(
col
.
m_pk
)
CHK
(
val
.
setval
(
par
)
==
0
);
CHK
(
val
.
equal
(
par
)
==
0
);
}
return
0
;
}
int
Row
::
selrow
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
m_tab
;
assert
(
itab
.
m_type
==
ITab
::
UniqueHashIndex
&&
&
itab
.
m_tab
==
&
tab
);
CHK
(
con
.
getNdbIndexOperation
(
itab
,
tab
)
==
0
);
CHKCON
(
con
.
m_op
->
readTuple
()
==
0
,
con
);
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
];
const
Col
&
col
=
icol
.
m_col
;
unsigned
m
=
col
.
m_num
;
const
Val
&
val
=
*
m_val
[
m
];
CHK
(
val
.
equal
(
par
,
icol
)
==
0
);
}
return
0
;
}
...
...
@@ -1876,6 +2181,40 @@ Row::cmp(const Row& row2) const
return
c
;
}
int
Row
::
cmp
(
const
Row
&
row2
,
const
ITab
&
itab
)
const
{
const
Tab
&
tab
=
m_tab
;
int
c
=
0
;
for
(
unsigned
i
=
0
;
i
<
itab
.
m_icols
;
i
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
i
];
const
Col
&
col
=
icol
.
m_col
;
unsigned
k
=
col
.
m_num
;
assert
(
k
<
tab
.
m_cols
);
const
Val
&
val
=
*
m_val
[
k
];
const
Val
&
val2
=
*
row2
.
m_val
[
k
];
if
((
c
=
val
.
cmp
(
val2
))
!=
0
)
break
;
}
return
c
;
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Row
::
Op
op
)
{
if
(
op
==
Row
::
NoOp
)
out
<<
"NoOp"
;
else
if
(
op
==
Row
::
InsOp
)
out
<<
"InsOp"
;
else
if
(
op
==
Row
::
UpdOp
)
out
<<
"UpdOp"
;
else
if
(
op
==
Row
::
DelOp
)
out
<<
"DelOp"
;
else
out
<<
op
;
return
out
;
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Row
&
row
)
{
...
...
@@ -1885,10 +2224,21 @@ operator<<(NdbOut& out, const Row& row)
out
<<
" "
;
out
<<
*
row
.
m_val
[
i
];
}
out
<<
"
[
exist="
<<
row
.
m_exist
;
out
<<
" exist="
<<
row
.
m_exist
;
if
(
row
.
m_pending
)
out
<<
" pending="
<<
row
.
m_pending
;
out
<<
"]"
;
if
(
row
.
m_dbrow
!=
0
)
out
<<
" [dbrow="
<<
*
row
.
m_dbrow
<<
"]"
;
return
out
;
}
static
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Row
*
rowptr
)
{
if
(
rowptr
==
0
)
out
<<
"null"
;
else
out
<<
*
rowptr
;
return
out
;
}
...
...
@@ -1898,38 +2248,47 @@ struct Set {
const
Tab
&
m_tab
;
unsigned
m_rows
;
Row
**
m_row
;
Row
**
m_saverow
;
unsigned
*
m_rowkey
;
// maps row number (from 0) in scan to tuple key
Row
*
m_keyrow
;
NdbRecAttr
**
m_rec
;
Set
(
const
Tab
&
tab
,
unsigned
rows
);
~
Set
();
void
reset
();
unsigned
count
()
const
;
//
row method
s
//
old and new value
s
bool
exist
(
unsigned
i
)
const
;
Row
::
Op
pending
(
unsigned
i
)
const
;
void
dbsave
(
unsigned
i
);
void
calc
(
Par
par
,
unsigned
i
,
unsigned
mask
=
0
);
bool
pending
(
unsigned
i
,
unsigned
mask
)
const
;
void
notpending
(
unsigned
i
);
void
notpending
(
const
Lst
&
lst
);
void
calc
(
Par
par
,
unsigned
i
);
void
dbdiscard
(
unsigned
i
);
void
dbdiscard
(
const
Lst
&
lst
);
const
Row
&
dbrow
(
unsigned
i
)
const
;
// operations
int
insrow
(
Par
par
,
unsigned
i
);
int
updrow
(
Par
par
,
unsigned
i
);
int
updrow
(
Par
par
,
const
ITab
&
itab
,
unsigned
i
);
int
delrow
(
Par
par
,
unsigned
i
);
int
selrow
(
Par
par
,
unsigned
i
);
int
delrow
(
Par
par
,
const
ITab
&
itab
,
unsigned
i
);
int
selrow
(
Par
par
,
const
Row
&
keyrow
);
int
selrow
(
Par
par
,
const
ITab
&
itab
,
const
Row
&
keyrow
);
// set and get
void
setkey
(
Par
par
,
const
Row
&
keyrow
);
void
setkey
(
Par
par
,
const
ITab
&
itab
,
const
Row
&
keyrow
);
int
setrow
(
Par
par
,
unsigned
i
);
int
getval
(
Par
par
);
int
getkey
(
Par
par
,
unsigned
*
i
);
int
putval
(
unsigned
i
,
bool
force
);
//
set methods
int
putval
(
unsigned
i
,
bool
force
,
unsigned
n
=
~
0
);
//
verify
int
verify
(
const
Set
&
set2
)
const
;
void
savepoint
();
void
commit
();
void
rollback
();
int
verifyorder
(
const
ITab
&
itab
,
bool
descending
)
const
;
// protect structure
NdbMutex
*
m_mutex
;
void
lock
()
{
void
lock
()
const
{
NdbMutex_Lock
(
m_mutex
);
}
void
unlock
()
{
void
unlock
()
const
{
NdbMutex_Unlock
(
m_mutex
);
}
private:
...
...
@@ -1945,7 +2304,11 @@ Set::Set(const Tab& tab, unsigned rows) :
// allocate on need to save space
m_row
[
i
]
=
0
;
}
m_saverow
=
0
;
m_rowkey
=
new
unsigned
[
m_rows
];
for
(
unsigned
n
=
0
;
n
<
m_rows
;
n
++
)
{
// initialize to null
m_rowkey
[
n
]
=
~
0
;
}
m_keyrow
=
new
Row
(
tab
);
m_rec
=
new
NdbRecAttr
*
[
tab
.
m_cols
];
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
...
...
@@ -1959,11 +2322,9 @@ Set::~Set()
{
for
(
unsigned
i
=
0
;
i
<
m_rows
;
i
++
)
{
delete
m_row
[
i
];
if
(
m_saverow
!=
0
)
delete
m_saverow
[
i
];
}
delete
[]
m_row
;
delete
[]
m_
saverow
;
delete
[]
m_
rowkey
;
delete
m_keyrow
;
delete
[]
m_rec
;
NdbMutex_Destroy
(
m_mutex
);
...
...
@@ -1994,6 +2355,8 @@ Set::count() const
return
count
;
}
// old and new values
bool
Set
::
exist
(
unsigned
i
)
const
{
...
...
@@ -2003,13 +2366,37 @@ Set::exist(unsigned i) const
return
m_row
[
i
]
->
m_exist
;
}
Row
::
Op
Set
::
pending
(
unsigned
i
)
const
void
Set
::
dbsave
(
unsigned
i
)
{
const
Tab
&
tab
=
m_tab
;
assert
(
i
<
m_rows
&&
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
LL5
(
"dbsave "
<<
i
<<
": "
<<
row
);
assert
(
row
.
m_exist
&&
!
row
.
m_pending
&&
row
.
m_dbrow
==
0
);
// could swap pointers but making copy is safer
Row
*
rowptr
=
new
Row
(
tab
);
rowptr
->
copy
(
row
);
row
.
m_dbrow
=
rowptr
;
}
void
Set
::
calc
(
Par
par
,
unsigned
i
,
unsigned
mask
)
{
const
Tab
&
tab
=
m_tab
;
if
(
m_row
[
i
]
==
0
)
m_row
[
i
]
=
new
Row
(
tab
);
Row
&
row
=
*
m_row
[
i
];
row
.
calc
(
par
,
i
,
mask
);
}
bool
Set
::
pending
(
unsigned
i
,
unsigned
mask
)
const
{
assert
(
i
<
m_rows
);
if
(
m_row
[
i
]
==
0
)
// not allocated => not pending
return
Row
::
NoOp
;
return
m_row
[
i
]
->
m_pending
;
return
m_row
[
i
]
->
m_pending
&
mask
;
}
void
...
...
@@ -2017,10 +2404,13 @@ Set::notpending(unsigned i)
{
assert
(
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
if
(
row
.
m_pending
==
Row
::
InsOp
)
if
(
row
.
m_pending
==
Row
::
InsOp
)
{
row
.
m_exist
=
true
;
if
(
row
.
m_pending
==
Row
::
DelOp
)
}
else
if
(
row
.
m_pending
==
Row
::
UpdOp
)
{
;
}
else
if
(
row
.
m_pending
==
Row
::
DelOp
)
{
row
.
m_exist
=
false
;
}
row
.
m_pending
=
Row
::
NoOp
;
}
...
...
@@ -2034,15 +2424,35 @@ Set::notpending(const Lst& lst)
}
void
Set
::
calc
(
Par
par
,
unsigned
i
)
Set
::
dbdiscard
(
unsigned
i
)
{
const
Tab
&
tab
=
m_tab
;
if
(
m_row
[
i
]
==
0
)
m_row
[
i
]
=
new
Row
(
tab
);
assert
(
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
LL5
(
"dbdiscard "
<<
i
<<
": "
<<
row
);
assert
(
row
.
m_dbrow
!=
0
);
delete
row
.
m_dbrow
;
row
.
m_dbrow
=
0
;
}
const
Row
&
Set
::
dbrow
(
unsigned
i
)
const
{
assert
(
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
row
.
calc
(
par
,
i
);
return
row
.
dbrow
();
}
void
Set
::
dbdiscard
(
const
Lst
&
lst
)
{
for
(
unsigned
j
=
0
;
j
<
lst
.
m_cnt
;
j
++
)
{
unsigned
i
=
lst
.
m_arr
[
j
];
dbdiscard
(
i
);
}
}
// operations
int
Set
::
insrow
(
Par
par
,
unsigned
i
)
{
...
...
@@ -2061,6 +2471,15 @@ Set::updrow(Par par, unsigned i)
return
0
;
}
int
Set
::
updrow
(
Par
par
,
const
ITab
&
itab
,
unsigned
i
)
{
assert
(
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
CHK
(
row
.
updrow
(
par
,
itab
)
==
0
);
return
0
;
}
int
Set
::
delrow
(
Par
par
,
unsigned
i
)
{
...
...
@@ -2071,15 +2490,67 @@ Set::delrow(Par par, unsigned i)
}
int
Set
::
selrow
(
Par
par
,
unsigned
i
)
Set
::
delrow
(
Par
par
,
const
ITab
&
itab
,
unsigned
i
)
{
assert
(
m_row
[
i
]
!=
0
);
Row
&
row
=
*
m_row
[
i
];
CHK
(
row
.
delrow
(
par
,
itab
)
==
0
);
return
0
;
}
int
Set
::
selrow
(
Par
par
,
const
Row
&
keyrow
)
{
Con
&
con
=
par
.
con
();
m_keyrow
->
calc
(
par
,
i
);
const
Tab
&
tab
=
par
.
tab
();
setkey
(
par
,
keyrow
);
LL5
(
"selrow "
<<
tab
.
m_name
<<
": keyrow: "
<<
keyrow
);
CHK
(
m_keyrow
->
selrow
(
par
)
==
0
);
CHK
(
getval
(
par
)
==
0
);
return
0
;
}
int
Set
::
selrow
(
Par
par
,
const
ITab
&
itab
,
const
Row
&
keyrow
)
{
Con
&
con
=
par
.
con
();
setkey
(
par
,
itab
,
keyrow
);
LL5
(
"selrow "
<<
itab
.
m_name
<<
": keyrow: "
<<
keyrow
);
CHK
(
m_keyrow
->
selrow
(
par
,
itab
)
==
0
);
CHK
(
getval
(
par
)
==
0
);
return
0
;
}
// set and get
void
Set
::
setkey
(
Par
par
,
const
Row
&
keyrow
)
{
const
Tab
&
tab
=
m_tab
;
for
(
unsigned
k
=
0
;
k
<
tab
.
m_cols
;
k
++
)
{
const
Col
&
col
=
*
tab
.
m_col
[
k
];
if
(
col
.
m_pk
)
{
Val
&
val1
=
*
m_keyrow
->
m_val
[
k
];
const
Val
&
val2
=
*
keyrow
.
dbrow
().
m_val
[
k
];
val1
.
copy
(
val2
);
}
}
}
void
Set
::
setkey
(
Par
par
,
const
ITab
&
itab
,
const
Row
&
keyrow
)
{
const
Tab
&
tab
=
m_tab
;
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
const
ICol
&
icol
=
*
itab
.
m_icol
[
k
];
const
Col
&
col
=
icol
.
m_col
;
unsigned
m
=
col
.
m_num
;
Val
&
val1
=
*
m_keyrow
->
m_val
[
m
];
const
Val
&
val2
=
*
keyrow
.
dbrow
().
m_val
[
m
];
val1
.
copy
(
val2
);
}
}
int
Set
::
setrow
(
Par
par
,
unsigned
i
)
{
...
...
@@ -2114,7 +2585,7 @@ Set::getkey(Par par, unsigned* i)
}
int
Set
::
putval
(
unsigned
i
,
bool
force
)
Set
::
putval
(
unsigned
i
,
bool
force
,
unsigned
n
)
{
const
Tab
&
tab
=
m_tab
;
if
(
m_row
[
i
]
==
0
)
...
...
@@ -2135,55 +2606,55 @@ Set::putval(unsigned i, bool force)
}
if
(
!
row
.
m_exist
)
row
.
m_exist
=
true
;
if
(
n
!=
~
0
)
m_rowkey
[
n
]
=
i
;
return
0
;
}
// verify
int
Set
::
verify
(
const
Set
&
set2
)
const
{
const
Tab
&
tab
=
m_tab
;
assert
(
&
tab
==
&
set2
.
m_tab
&&
m_rows
==
set2
.
m_rows
);
LL3
(
"verify set1 count="
<<
count
()
<<
" vs set2 count="
<<
set2
.
count
());
assert
(
&
m_tab
==
&
set2
.
m_tab
&&
m_rows
==
set2
.
m_rows
);
LL4
(
"verify set1 count="
<<
count
()
<<
" vs set2 count="
<<
set2
.
count
());
for
(
unsigned
i
=
0
;
i
<
m_rows
;
i
++
)
{
CHK
(
exist
(
i
)
==
set2
.
exist
(
i
));
if
(
!
exist
(
i
))
continue
;
Row
&
row
=
*
m_row
[
i
];
Row
&
row2
=
*
set2
.
m_row
[
i
];
CHK
(
row
.
verify
(
row2
)
==
0
);
bool
ok
=
true
;
if
(
exist
(
i
)
!=
set2
.
exist
(
i
))
{
ok
=
false
;
}
else
if
(
exist
(
i
))
{
if
(
dbrow
(
i
).
verify
(
set2
.
dbrow
(
i
))
!=
0
)
ok
=
false
;
}
if
(
!
ok
)
{
LL1
(
"verify failed: key="
<<
i
<<
" row1="
<<
m_row
[
i
]
<<
" row2="
<<
set2
.
m_row
[
i
]);
CHK
(
0
==
1
);
}
}
return
0
;
}
void
Set
::
savepoint
()
int
Set
::
verifyorder
(
const
ITab
&
itab
,
bool
descending
)
const
{
const
Tab
&
tab
=
m_tab
;
assert
(
m_saverow
==
0
);
m_saverow
=
new
Row
*
[
m_rows
];
for
(
unsigned
i
=
0
;
i
<
m_rows
;
i
++
)
{
if
(
m_row
[
i
]
==
0
)
m_saverow
[
i
]
=
0
;
else
{
m_saverow
[
i
]
=
new
Row
(
tab
);
m_saverow
[
i
]
->
copy
(
*
m_row
[
i
]);
}
for
(
unsigned
n
=
0
;
n
<
m_rows
;
n
++
)
{
unsigned
i2
=
m_rowkey
[
n
];
if
(
i2
==
~
0
)
break
;
if
(
n
==
0
)
continue
;
unsigned
i1
=
m_rowkey
[
n
-
1
];
assert
(
i1
<
m_rows
&&
i2
<
m_rows
);
const
Row
&
row1
=
*
m_row
[
i1
];
const
Row
&
row2
=
*
m_row
[
i2
];
assert
(
row1
.
m_exist
&&
row2
.
m_exist
);
if
(
!
descending
)
CHK
(
row1
.
cmp
(
row2
,
itab
)
<=
0
);
else
CHK
(
row1
.
cmp
(
row2
,
itab
)
>=
0
);
}
}
void
Set
::
commit
()
{
delete
[]
m_saverow
;
m_saverow
=
0
;
}
void
Set
::
rollback
()
{
assert
(
m_saverow
!=
0
);
m_row
=
m_saverow
;
m_saverow
=
0
;
return
0
;
}
static
NdbOut
&
...
...
@@ -2384,7 +2855,9 @@ BSet::filter(const Set& set, Set& set2) const
for
(
unsigned
i
=
0
;
i
<
set
.
m_rows
;
i
++
)
{
if
(
!
set
.
exist
(
i
))
continue
;
const
Row
&
row
=
*
set
.
m_row
[
i
];
set
.
lock
();
const
Row
&
row
=
set
.
dbrow
(
i
);
set
.
unlock
();
if
(
!
g_store_null_key
)
{
bool
ok1
=
false
;
for
(
unsigned
k
=
0
;
k
<
itab
.
m_icols
;
k
++
)
{
...
...
@@ -2430,7 +2903,6 @@ BSet::filter(const Set& set, Set& set2) const
Row
&
row2
=
*
set2
.
m_row
[
i
];
assert
(
!
row2
.
m_exist
);
row2
.
copy
(
row
);
row2
.
m_exist
=
true
;
}
}
...
...
@@ -2451,15 +2923,16 @@ static int
pkinsert
(
Par
par
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
par
.
tab
();
Set
&
set
=
par
.
set
();
LL3
(
"pkinsert
"
);
LL3
(
"pkinsert
"
<<
tab
.
m_name
);
CHK
(
con
.
startTransaction
()
==
0
);
Lst
lst
;
for
(
unsigned
j
=
0
;
j
<
par
.
m_rows
;
j
++
)
{
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
if
(
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
set
.
unlock
();
continue
;
}
...
...
@@ -2473,7 +2946,7 @@ pkinsert(Par par)
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
con
.
closeTransaction
();
if
(
deadlock
)
{
LL1
(
"pkinsert: stop on deadlock"
);
LL1
(
"pkinsert: stop on deadlock
[at 1]
"
);
return
0
;
}
set
.
lock
();
...
...
@@ -2488,7 +2961,7 @@ pkinsert(Par par)
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
con
.
closeTransaction
();
if
(
deadlock
)
{
LL1
(
"pkinsert: stop on deadlock"
);
LL1
(
"pkinsert: stop on deadlock
[at 2]
"
);
return
0
;
}
set
.
lock
();
...
...
@@ -2504,8 +2977,9 @@ static int
pkupdate
(
Par
par
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
par
.
tab
();
Set
&
set
=
par
.
set
();
LL3
(
"pkupdate
"
);
LL3
(
"pkupdate
"
<<
tab
.
m_name
);
CHK
(
con
.
startTransaction
()
==
0
);
Lst
lst
;
bool
deadlock
=
false
;
...
...
@@ -2513,10 +2987,11 @@ pkupdate(Par par)
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
set
.
unlock
();
continue
;
}
set
.
dbsave
(
i
);
set
.
calc
(
par
,
i
);
CHK
(
set
.
updrow
(
par
,
i
)
==
0
);
set
.
unlock
();
...
...
@@ -2526,12 +3001,13 @@ pkupdate(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"pkupdate: stop on deadlock"
);
LL1
(
"pkupdate: stop on deadlock
[at 1]
"
);
break
;
}
con
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
lst
.
reset
();
CHK
(
con
.
startTransaction
()
==
0
);
...
...
@@ -2541,10 +3017,11 @@ pkupdate(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"pkupdate: stop on deadlock"
);
LL1
(
"pkupdate: stop on deadlock
[at 1]
"
);
}
else
{
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
}
}
...
...
@@ -2556,8 +3033,9 @@ static int
pkdelete
(
Par
par
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
par
.
tab
();
Set
&
set
=
par
.
set
();
LL3
(
"pkdelete
"
);
LL3
(
"pkdelete
"
<<
tab
.
m_name
);
CHK
(
con
.
startTransaction
()
==
0
);
Lst
lst
;
bool
deadlock
=
false
;
...
...
@@ -2565,7 +3043,7 @@ pkdelete(Par par)
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
set
.
unlock
();
continue
;
}
...
...
@@ -2577,7 +3055,7 @@ pkdelete(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"pkdelete: stop on deadlock"
);
LL1
(
"pkdelete: stop on deadlock
[at 1]
"
);
break
;
}
con
.
closeTransaction
();
...
...
@@ -2592,7 +3070,7 @@ pkdelete(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"pkdelete: stop on deadlock"
);
LL1
(
"pkdelete: stop on deadlock
[at 2]
"
);
}
else
{
set
.
lock
();
set
.
notpending
(
lst
);
...
...
@@ -2609,19 +3087,19 @@ pkread(Par par)
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
par
.
tab
();
Set
&
set
=
par
.
set
();
LL3
(
(
par
.
m_verify
?
"pkverify "
:
"pkread "
)
<<
tab
.
m_name
);
LL3
(
"pkread "
<<
tab
.
m_name
<<
" verify="
<<
par
.
m_verify
);
// expected
const
Set
&
set1
=
set
;
Set
set2
(
tab
,
set
.
m_rows
);
for
(
unsigned
i
=
0
;
i
<
set
.
m_rows
;
i
++
)
{
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
)
)
{
if
(
!
set
.
exist
(
i
))
{
set
.
unlock
();
continue
;
}
set
.
unlock
();
CHK
(
con
.
startTransaction
()
==
0
);
CHK
(
set2
.
selrow
(
par
,
i
)
==
0
);
CHK
(
set2
.
selrow
(
par
,
*
set1
.
m_row
[
i
]
)
==
0
);
CHK
(
con
.
execute
(
Commit
)
==
0
);
unsigned
i2
=
(
unsigned
)
-
1
;
CHK
(
set2
.
getkey
(
par
,
&
i2
)
==
0
&&
i
==
i2
);
...
...
@@ -2659,6 +3137,146 @@ pkreadfast(Par par, unsigned count)
return
0
;
}
// hash index operations
static
int
hashindexupdate
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
Set
&
set
=
par
.
set
();
LL3
(
"hashindexupdate "
<<
itab
.
m_name
);
CHK
(
con
.
startTransaction
()
==
0
);
Lst
lst
;
bool
deadlock
=
false
;
for
(
unsigned
j
=
0
;
j
<
par
.
m_rows
;
j
++
)
{
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
set
.
unlock
();
continue
;
}
set
.
dbsave
(
i
);
// index key columns are not re-calculated
set
.
calc
(
par
,
i
,
itab
.
m_colmask
);
CHK
(
set
.
updrow
(
par
,
itab
,
i
)
==
0
);
set
.
unlock
();
LL4
(
"hashindexupdate "
<<
i
<<
": "
<<
*
set
.
m_row
[
i
]);
lst
.
push
(
i
);
if
(
lst
.
cnt
()
==
par
.
m_batch
)
{
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"hashindexupdate: stop on deadlock [at 1]"
);
break
;
}
con
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
lst
.
reset
();
CHK
(
con
.
startTransaction
()
==
0
);
}
}
if
(
!
deadlock
&&
lst
.
cnt
()
!=
0
)
{
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"hashindexupdate: stop on deadlock [at 1]"
);
}
else
{
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
}
}
con
.
closeTransaction
();
return
0
;
};
static
int
hashindexdelete
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
Set
&
set
=
par
.
set
();
LL3
(
"hashindexdelete "
<<
itab
.
m_name
);
CHK
(
con
.
startTransaction
()
==
0
);
Lst
lst
;
bool
deadlock
=
false
;
for
(
unsigned
j
=
0
;
j
<
par
.
m_rows
;
j
++
)
{
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
set
.
unlock
();
continue
;
}
CHK
(
set
.
delrow
(
par
,
itab
,
i
)
==
0
);
set
.
unlock
();
LL4
(
"hashindexdelete "
<<
i
<<
": "
<<
*
set
.
m_row
[
i
]);
lst
.
push
(
i
);
if
(
lst
.
cnt
()
==
par
.
m_batch
)
{
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"hashindexdelete: stop on deadlock [at 1]"
);
break
;
}
con
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
unlock
();
lst
.
reset
();
CHK
(
con
.
startTransaction
()
==
0
);
}
}
if
(
!
deadlock
&&
lst
.
cnt
()
!=
0
)
{
deadlock
=
par
.
m_deadlock
;
CHK
(
con
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"hashindexdelete: stop on deadlock [at 2]"
);
}
else
{
set
.
lock
();
set
.
notpending
(
lst
);
set
.
unlock
();
}
}
con
.
closeTransaction
();
return
0
;
};
static
int
hashindexread
(
Par
par
,
const
ITab
&
itab
)
{
Con
&
con
=
par
.
con
();
const
Tab
&
tab
=
par
.
tab
();
Set
&
set
=
par
.
set
();
LL3
(
"hashindexread "
<<
itab
.
m_name
<<
" verify="
<<
par
.
m_verify
);
// expected
const
Set
&
set1
=
set
;
Set
set2
(
tab
,
set
.
m_rows
);
for
(
unsigned
i
=
0
;
i
<
set
.
m_rows
;
i
++
)
{
set
.
lock
();
if
(
!
set
.
exist
(
i
))
{
set
.
unlock
();
continue
;
}
set
.
unlock
();
CHK
(
con
.
startTransaction
()
==
0
);
CHK
(
set2
.
selrow
(
par
,
itab
,
*
set1
.
m_row
[
i
])
==
0
);
CHK
(
con
.
execute
(
Commit
)
==
0
);
unsigned
i2
=
(
unsigned
)
-
1
;
CHK
(
set2
.
getkey
(
par
,
&
i2
)
==
0
&&
i
==
i2
);
CHK
(
set2
.
putval
(
i
,
false
)
==
0
);
LL4
(
"row "
<<
set2
.
count
()
<<
": "
<<
*
set2
.
m_row
[
i
]);
con
.
closeTransaction
();
}
if
(
par
.
m_verify
)
CHK
(
set1
.
verify
(
set2
)
==
0
);
return
0
;
}
// scan read
static
int
...
...
@@ -2691,14 +3309,14 @@ scanreadtable(Par par)
}
unsigned
i
=
(
unsigned
)
-
1
;
CHK
(
set2
.
getkey
(
par
,
&
i
)
==
0
);
CHK
(
set2
.
putval
(
i
,
false
)
==
0
);
CHK
(
set2
.
putval
(
i
,
false
,
n
)
==
0
);
LL4
(
"row "
<<
n
<<
": "
<<
*
set2
.
m_row
[
i
]);
n
++
;
}
con
.
closeTransaction
();
if
(
par
.
m_verify
)
CHK
(
set1
.
verify
(
set2
)
==
0
);
LL3
(
"scanread "
<<
tab
.
m_name
<<
" rows="
<<
n
);
LL3
(
"scanread "
<<
tab
.
m_name
<<
"
done
rows="
<<
n
);
return
0
;
}
...
...
@@ -2745,19 +3363,22 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
// prefer proper subset
if
(
0
<
n
&&
n
<
set
.
m_rows
)
break
;
if
(
urandom
(
5
)
==
0
)
if
(
urandom
(
3
)
==
0
)
break
;
set1
.
reset
();
}
}
else
{
bset
.
filter
(
set
,
set1
);
}
LL3
(
"scanread "
<<
itab
.
m_name
<<
" bounds="
<<
bset
.
m_bvals
<<
" verify="
<<
par
.
m_verify
);
LL3
(
"scanread "
<<
itab
.
m_name
<<
" bounds="
<<
bset
<<
" verify="
<<
par
.
m_verify
<<
" ordered="
<<
par
.
m_ordered
<<
" descending="
<<
par
.
m_descending
);
LL4
(
"expect "
<<
set1
.
count
()
<<
" rows"
);
Set
set2
(
tab
,
set
.
m_rows
);
CHK
(
con
.
startTransaction
()
==
0
);
CHK
(
con
.
getNdbScanOperation
(
itab
,
tab
)
==
0
);
CHK
(
con
.
openScanRead
(
par
.
m_scanbat
,
par
.
m_scanpar
)
==
0
);
if
(
!
par
.
m_ordered
)
CHK
(
con
.
openScanRead
(
par
.
m_scanbat
,
par
.
m_scanpar
)
==
0
);
else
CHK
(
con
.
openScanOrdered
(
par
.
m_scanbat
,
par
.
m_scanpar
,
par
.
m_descending
)
==
0
);
CHK
(
bset
.
setbnd
(
par
)
==
0
);
set2
.
getval
(
par
);
CHK
(
con
.
executeScan
()
==
0
);
...
...
@@ -2775,15 +3396,17 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
unsigned
i
=
(
unsigned
)
-
1
;
CHK
(
set2
.
getkey
(
par
,
&
i
)
==
0
);
LL4
(
"key "
<<
i
);
CHK
(
set2
.
putval
(
i
,
par
.
m_dups
)
==
0
);
LL4
(
"row "
<<
n
<<
": "
<<
*
set2
.
m_row
[
i
]);
CHK
(
set2
.
putval
(
i
,
par
.
m_dups
,
n
)
==
0
);
LL4
(
"key "
<<
i
<<
" row "
<<
n
<<
": "
<<
*
set2
.
m_row
[
i
]);
n
++
;
}
con
.
closeTransaction
();
if
(
par
.
m_verify
)
if
(
par
.
m_verify
)
{
CHK
(
set1
.
verify
(
set2
)
==
0
);
LL3
(
"scanread "
<<
itab
.
m_name
<<
" rows="
<<
n
);
if
(
par
.
m_ordered
)
CHK
(
set2
.
verifyorder
(
itab
,
par
.
m_descending
)
==
0
);
}
LL3
(
"scanread "
<<
itab
.
m_name
<<
" done rows="
<<
n
);
return
0
;
}
...
...
@@ -2821,8 +3444,10 @@ scanreadindex(Par par, const ITab& itab)
{
const
Tab
&
tab
=
par
.
tab
();
for
(
unsigned
i
=
0
;
i
<
par
.
m_subsubloop
;
i
++
)
{
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
CHK
(
scanreadindex
(
par
,
itab
,
bset
,
true
)
==
0
);
if
(
itab
.
m_type
==
ITab
::
OrderedIndex
)
{
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
CHK
(
scanreadindex
(
par
,
itab
,
bset
,
true
)
==
0
);
}
}
return
0
;
}
...
...
@@ -2835,7 +3460,11 @@ scanreadindex(Par par)
if
(
tab
.
m_itab
[
i
]
==
0
)
continue
;
const
ITab
&
itab
=
*
tab
.
m_itab
[
i
];
CHK
(
scanreadindex
(
par
,
itab
)
==
0
);
if
(
itab
.
m_type
==
ITab
::
OrderedIndex
)
{
CHK
(
scanreadindex
(
par
,
itab
)
==
0
);
}
else
{
CHK
(
hashindexread
(
par
,
itab
)
==
0
);
}
}
return
0
;
}
...
...
@@ -2932,7 +3561,7 @@ scanupdatetable(Par par)
if
(
ret
==
1
)
break
;
if
(
deadlock
)
{
LL1
(
"scanupdatetable: stop on deadlock"
);
LL1
(
"scanupdatetable: stop on deadlock
[at 1]
"
);
break
;
}
if
(
par
.
m_scanstop
!=
0
&&
urandom
(
par
.
m_scanstop
)
==
0
)
{
...
...
@@ -2944,13 +3573,14 @@ scanupdatetable(Par par)
CHK
(
set2
.
getkey
(
par
,
&
i
)
==
0
);
const
Row
&
row
=
*
set
.
m_row
[
i
];
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
LL4
(
"scan update "
<<
tab
.
m_name
<<
": skip: "
<<
row
);
}
else
{
CHKTRY
(
set2
.
putval
(
i
,
false
)
==
0
,
set
.
unlock
());
CHKTRY
(
con
.
updateScanTuple
(
con2
)
==
0
,
set
.
unlock
());
Par
par2
=
par
;
par2
.
m_con
=
&
con2
;
set
.
dbsave
(
i
);
set
.
calc
(
par
,
i
);
CHKTRY
(
set
.
setrow
(
par2
,
i
)
==
0
,
set
.
unlock
());
LL4
(
"scan update "
<<
tab
.
m_name
<<
": "
<<
row
);
...
...
@@ -2961,12 +3591,13 @@ scanupdatetable(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con2
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"scanupdate
index: stop on deadlock
"
);
LL1
(
"scanupdate
table: stop on deadlock [at 2]
"
);
goto
out
;
}
con2
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
count
+=
lst
.
cnt
();
lst
.
reset
();
...
...
@@ -2977,12 +3608,13 @@ scanupdatetable(Par par)
deadlock
=
par
.
m_deadlock
;
CHK
(
con2
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"scanupdate
index: stop on deadlock
"
);
LL1
(
"scanupdate
table: stop on deadlock [at 3]
"
);
goto
out
;
}
con2
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
count
+=
lst
.
cnt
();
lst
.
reset
();
...
...
@@ -3009,7 +3641,10 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Set
set2
(
tab
,
set
.
m_rows
);
CHK
(
con
.
startTransaction
()
==
0
);
CHK
(
con
.
getNdbScanOperation
(
itab
,
tab
)
==
0
);
CHK
(
con
.
openScanExclusive
(
par
.
m_scanbat
,
par
.
m_scanpar
)
==
0
);
if
(
!
par
.
m_ordered
)
CHK
(
con
.
openScanExclusive
(
par
.
m_scanbat
,
par
.
m_scanpar
)
==
0
);
else
CHK
(
con
.
openScanOrderedExclusive
(
par
.
m_scanbat
,
par
.
m_scanpar
,
par
.
m_descending
)
==
0
);
CHK
(
bset
.
setbnd
(
par
)
==
0
);
set2
.
getval
(
par
);
CHK
(
con
.
executeScan
()
==
0
);
...
...
@@ -3027,7 +3662,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
if
(
ret
==
1
)
break
;
if
(
deadlock
)
{
LL1
(
"scanupdateindex: stop on deadlock"
);
LL1
(
"scanupdateindex: stop on deadlock
[at 1]
"
);
break
;
}
if
(
par
.
m_scanstop
!=
0
&&
urandom
(
par
.
m_scanstop
)
==
0
)
{
...
...
@@ -3039,13 +3674,14 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
CHK
(
set2
.
getkey
(
par
,
&
i
)
==
0
);
const
Row
&
row
=
*
set
.
m_row
[
i
];
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
,
Row
::
AnyOp
))
{
LL4
(
"scan update "
<<
itab
.
m_name
<<
": skip: "
<<
row
);
}
else
{
CHKTRY
(
set2
.
putval
(
i
,
par
.
m_dups
)
==
0
,
set
.
unlock
());
CHKTRY
(
con
.
updateScanTuple
(
con2
)
==
0
,
set
.
unlock
());
Par
par2
=
par
;
par2
.
m_con
=
&
con2
;
set
.
dbsave
(
i
);
set
.
calc
(
par
,
i
);
CHKTRY
(
set
.
setrow
(
par2
,
i
)
==
0
,
set
.
unlock
());
LL4
(
"scan update "
<<
itab
.
m_name
<<
": "
<<
row
);
...
...
@@ -3056,12 +3692,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock
=
par
.
m_deadlock
;
CHK
(
con2
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"scanupdateindex: stop on deadlock"
);
LL1
(
"scanupdateindex: stop on deadlock
[at 2]
"
);
goto
out
;
}
con2
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
count
+=
lst
.
cnt
();
lst
.
reset
();
...
...
@@ -3072,12 +3709,13 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
deadlock
=
par
.
m_deadlock
;
CHK
(
con2
.
execute
(
Commit
,
deadlock
)
==
0
);
if
(
deadlock
)
{
LL1
(
"scanupdateindex: stop on deadlock"
);
LL1
(
"scanupdateindex: stop on deadlock
[at 3]
"
);
goto
out
;
}
con2
.
closeTransaction
();
set
.
lock
();
set
.
notpending
(
lst
);
set
.
dbdiscard
(
lst
);
set
.
unlock
();
count
+=
lst
.
cnt
();
lst
.
reset
();
...
...
@@ -3097,9 +3735,13 @@ scanupdateindex(Par par, const ITab& itab)
{
const
Tab
&
tab
=
par
.
tab
();
for
(
unsigned
i
=
0
;
i
<
par
.
m_subsubloop
;
i
++
)
{
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
bset
.
calc
(
par
);
CHK
(
scanupdateindex
(
par
,
itab
,
bset
)
==
0
);
if
(
itab
.
m_type
==
ITab
::
OrderedIndex
)
{
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
bset
.
calc
(
par
);
CHK
(
scanupdateindex
(
par
,
itab
,
bset
)
==
0
);
}
else
{
CHK
(
hashindexupdate
(
par
,
itab
)
==
0
);
}
}
return
0
;
}
...
...
@@ -3151,8 +3793,12 @@ readverifyfull(Par par)
unsigned
i
=
par
.
m_no
-
1
;
if
(
i
<
tab
.
m_itabs
&&
tab
.
m_itab
[
i
]
!=
0
)
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
i
];
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
CHK
(
scanreadindex
(
par
,
itab
,
bset
,
false
)
==
0
);
if
(
itab
.
m_type
==
ITab
::
OrderedIndex
)
{
BSet
bset
(
tab
,
itab
,
par
.
m_rows
);
CHK
(
scanreadindex
(
par
,
itab
,
bset
,
false
)
==
0
);
}
else
{
CHK
(
hashindexread
(
par
,
itab
)
==
0
);
}
}
}
return
0
;
...
...
@@ -3162,6 +3808,11 @@ static int
readverifyindex
(
Par
par
)
{
par
.
m_verify
=
true
;
unsigned
sel
=
urandom
(
10
);
if
(
sel
<
9
)
{
par
.
m_ordered
=
true
;
par
.
m_descending
=
(
sel
<
5
);
}
CHK
(
scanreadindex
(
par
)
==
0
);
return
0
;
}
...
...
@@ -3169,26 +3820,56 @@ readverifyindex(Par par)
static
int
pkops
(
Par
par
)
{
const
Tab
&
tab
=
par
.
tab
();
par
.
m_randomkey
=
true
;
for
(
unsigned
i
=
0
;
i
<
par
.
m_subsubloop
;
i
++
)
{
unsigned
j
=
0
;
while
(
j
<
tab
.
m_itabs
)
{
if
(
tab
.
m_itab
[
j
]
!=
0
)
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
j
];
if
(
itab
.
m_type
==
ITab
::
UniqueHashIndex
&&
urandom
(
5
)
==
0
)
break
;
}
j
++
;
}
unsigned
sel
=
urandom
(
10
);
if
(
par
.
m_slno
%
2
==
0
)
{
// favor insert
if
(
sel
<
8
)
{
CHK
(
pkinsert
(
par
)
==
0
);
}
else
if
(
sel
<
9
)
{
CHK
(
pkupdate
(
par
)
==
0
);
if
(
j
==
tab
.
m_itabs
)
CHK
(
pkupdate
(
par
)
==
0
);
else
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
j
];
CHK
(
hashindexupdate
(
par
,
itab
)
==
0
);
}
}
else
{
CHK
(
pkdelete
(
par
)
==
0
);
if
(
j
==
tab
.
m_itabs
)
CHK
(
pkdelete
(
par
)
==
0
);
else
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
j
];
CHK
(
hashindexdelete
(
par
,
itab
)
==
0
);
}
}
}
else
{
// favor delete
if
(
sel
<
1
)
{
CHK
(
pkinsert
(
par
)
==
0
);
}
else
if
(
sel
<
2
)
{
CHK
(
pkupdate
(
par
)
==
0
);
if
(
j
==
tab
.
m_itabs
)
CHK
(
pkupdate
(
par
)
==
0
);
else
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
j
];
CHK
(
hashindexupdate
(
par
,
itab
)
==
0
);
}
}
else
{
CHK
(
pkdelete
(
par
)
==
0
);
if
(
j
==
tab
.
m_itabs
)
CHK
(
pkdelete
(
par
)
==
0
);
else
{
const
ITab
&
itab
=
*
tab
.
m_itab
[
j
];
CHK
(
hashindexdelete
(
par
,
itab
)
==
0
);
}
}
}
}
...
...
@@ -3208,6 +3889,10 @@ pkupdatescanread(Par par)
CHK
(
scanreadtable
(
par
)
==
0
);
}
else
{
par
.
m_verify
=
false
;
if
(
sel
<
8
)
{
par
.
m_ordered
=
true
;
par
.
m_descending
=
(
sel
<
7
);
}
CHK
(
scanreadindex
(
par
)
==
0
);
}
return
0
;
...
...
@@ -3227,6 +3912,10 @@ mixedoperations(Par par)
}
else
if
(
sel
<
6
)
{
CHK
(
scanupdatetable
(
par
)
==
0
);
}
else
{
if
(
sel
<
8
)
{
par
.
m_ordered
=
true
;
par
.
m_descending
=
(
sel
<
7
);
}
CHK
(
scanupdateindex
(
par
)
==
0
);
}
return
0
;
...
...
@@ -3720,7 +4409,7 @@ printtables()
{
Par
par
(
g_opt
);
makebuiltintables
(
par
);
ndbout
<<
"builtin tables (
index x0 is on table pk
):"
<<
endl
;
ndbout
<<
"builtin tables (
x0 on pk, x=ordered z=hash
):"
<<
endl
;
for
(
unsigned
j
=
0
;
j
<
tabcount
;
j
++
)
{
if
(
tablist
[
j
]
==
0
)
continue
;
...
...
@@ -3744,6 +4433,7 @@ runtest(Par par)
LL1
(
"random seed: "
<<
seed
);
srandom
((
unsigned
)
seed
);
}
else
if
(
par
.
m_seed
!=
0
)
LL1
(
"random seed: "
<<
par
.
m_seed
);
srandom
(
par
.
m_seed
);
// cs
assert
(
par
.
m_csname
!=
0
);
...
...
@@ -3953,7 +4643,8 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
if
(
strcmp
(
arg
,
"-threads"
)
==
0
)
{
if
(
++
argv
,
--
argc
>
0
)
{
g_opt
.
m_threads
=
atoi
(
argv
[
0
]);
continue
;
if
(
1
<=
g_opt
.
m_threads
)
continue
;
}
}
if
(
strcmp
(
arg
,
"-v"
)
==
0
)
{
...
...
@@ -3970,7 +4661,7 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
printhelp
();
goto
wrongargs
;
}
ndbout
<<
"testOIBasic: unknown option "
<<
arg
;
ndbout
<<
"testOIBasic:
bad or
unknown option "
<<
arg
;
goto
usage
;
}
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment