Commit e4f0614c authored by unknown's avatar unknown

NDB bug-6018 support writeTuple with blobs


mysql-test/r/ndb_blob.result:
  bug-6018
mysql-test/t/ndb_blob.test:
  bug-6018
ndb/include/ndbapi/NdbBlob.hpp:
  bug-6018
ndb/include/ndbapi/NdbConnection.hpp:
  bug-6018
ndb/include/ndbapi/NdbIndexOperation.hpp:
  bug-6018
ndb/include/ndbapi/NdbOperation.hpp:
  bug-6018
ndb/src/ndbapi/NdbBlob.cpp:
  bug-6018
ndb/src/ndbapi/NdbConnection.cpp:
  bug-6018
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  bug-6018
ndb/src/ndbapi/NdbIndexOperation.cpp:
  bug-6018
ndb/src/ndbapi/NdbOperation.cpp:
  bug-6018
ndb/src/ndbapi/NdbOperationExec.cpp:
  bug-6018
ndb/test/ndbapi/testBlobs.cpp:
  bug-6018
parent 2e7b3801
drop table if exists t1; drop table if exists t1;
drop database if exists mysqltest; drop database if exists test2;
create table t1 (
a int not null primary key,
b tinytext
) engine=ndbcluster;
insert into t1 values(1, 'x');
update t1 set b = 'y';
select * from t1;
a b
1 y
delete from t1;
drop table t1;
create table t1 (
a int not null primary key,
b text not null
) engine=ndbcluster;
insert into t1 values(1, '');
select * from t1;
a b
1
drop table t1;
set autocommit=0; set autocommit=0;
create table t1 ( create table t1 (
a int not null primary key, a int not null primary key,
...@@ -102,6 +82,53 @@ commit; ...@@ -102,6 +82,53 @@ commit;
select count(*) from t1; select count(*) from t1;
count(*) count(*)
0 0
replace t1 set a=1,b=@b1,c=111,d=@d1;
replace t1 set a=2,b=@b2,c=222,d=@d2;
commit;
explain select * from t1 where a = 1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
replace t1 set a=1,b=@b2,c=111,d=@d2;
replace t1 set a=2,b=@b1,c=222,d=@d1;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
replace t1 set a=1,b=concat(@b2,@b2),c=111,d=concat(@d2,@d2);
replace t1 set a=2,b=concat(@b1,@b1),c=222,d=concat(@d1,@d1);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 40000 b2 60000 dd2
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3)
2 4512 b1 6000 dd1
replace t1 set a=1,b='xyz',c=111,d=null;
commit;
select a,b from t1 where d is null;
a b
1 xyz
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1); insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2); insert into t1 values(2,@b2,222,@d2);
commit; commit;
...@@ -241,90 +268,6 @@ a b c d ...@@ -241,90 +268,6 @@ a b c d
7 7xb7 777 7xdd7 7 7xb7 777 7xdd7
8 8xb8 888 8xdd8 8 8xb8 888 8xdd8
9 9xb9 999 9xdd9 9 9xb9 999 9xdd9
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
alter table t1 add x int;
select * from t1 order by a;
a b c d x
1 1xb1 111 1xdd1 NULL
2 2xb2 222 2xdd2 NULL
3 3xb3 333 3xdd3 NULL
4 4xb4 444 4xdd4 NULL
5 5xb5 555 5xdd5 NULL
6 6xb6 666 6xdd6 NULL
7 7xb7 777 7xdd7 NULL
8 8xb8 888 8xdd8 NULL
9 9xb9 999 9xdd9 NULL
alter table t1 drop x;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
create database mysqltest;
use mysqltest;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
a b c d a b c
1 1xb1 111 1xdd1 1 1 1
2 2xb2 222 2xdd2 2 2 2
drop table t2;
use test;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
alter table t1 add x int;
select * from t1 order by a;
a b c d x
1 1xb1 111 1xdd1 NULL
2 2xb2 222 2xdd2 NULL
3 3xb3 333 3xdd3 NULL
4 4xb4 444 4xdd4 NULL
5 5xb5 555 5xdd5 NULL
6 6xb6 666 6xdd6 NULL
7 7xb7 777 7xdd7 NULL
8 8xb8 888 8xdd8 NULL
9 9xb9 999 9xdd9 NULL
alter table t1 drop x;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1 where c >= 100; delete from t1 where c >= 100;
commit; commit;
select count(*) from t1; select count(*) from t1;
...@@ -375,8 +318,122 @@ rollback; ...@@ -375,8 +318,122 @@ rollback;
select count(*) from t1; select count(*) from t1;
count(*) count(*)
0 0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
alter table t1 add x int;
select * from t1 order by a;
a b c d x
1 b1 111 dd1 NULL
2 b2 222 dd2 NULL
3 b3 333 dd3 NULL
4 b4 444 dd4 NULL
5 b5 555 dd5 NULL
6 b6 666 dd6 NULL
7 b7 777 dd7 NULL
8 b8 888 dd8 NULL
9 b9 999 dd9 NULL
alter table t1 drop x;
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
a b c d a b c
1 b1 111 dd1 1 1 1
2 b2 222 dd2 2 2 2
drop table t2;
use test;
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
alter table t1 add x int;
select * from t1 order by a;
a b c d x
1 b1 111 dd1 NULL
2 b2 222 dd2 NULL
3 b3 333 dd3 NULL
4 b4 444 dd4 NULL
5 b5 555 dd5 NULL
6 b6 666 dd6 NULL
7 b7 777 dd7 NULL
8 b8 888 dd8 NULL
9 b9 999 dd9 NULL
alter table t1 drop x;
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
drop table t1;
drop database test2;
create table t1 (
a int not null primary key,
b tinytext
) engine=ndbcluster;
insert into t1 values(1, 'x');
update t1 set b = 'y';
select * from t1;
a b
1 x
delete from t1;
drop table t1;
create table t1 (
a int not null primary key,
b text not null
) engine=ndbcluster;
insert into t1 values(1, '');
select * from t1;
a b
1
drop table t1; drop table t1;
drop database mysqltest;
set autocommit=1; set autocommit=1;
use test; use test;
CREATE TABLE t1 ( CREATE TABLE t1 (
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
--disable_warnings --disable_warnings
drop table if exists t1; drop table if exists t1;
drop database if exists mysqltest; drop database if exists test2;
--enable_warnings --enable_warnings
# #
...@@ -12,31 +12,7 @@ drop database if exists mysqltest; ...@@ -12,31 +12,7 @@ drop database if exists mysqltest;
# A prerequisite for this handler test is that "testBlobs" succeeds. # A prerequisite for this handler test is that "testBlobs" succeeds.
# #
# -- bug-5252 tinytext crashes -- # -- general test starts --
create table t1 (
a int not null primary key,
b tinytext
) engine=ndbcluster;
insert into t1 values(1, 'x');
update t1 set b = 'y';
select * from t1;
delete from t1;
drop table t1;
# -- bug-5013 insert empty string to text --
create table t1 (
a int not null primary key,
b text not null
) engine=ndbcluster;
insert into t1 values(1, '');
select * from t1;
drop table t1;
-- general test starts --
# make test harder with autocommit off # make test harder with autocommit off
set autocommit=0; set autocommit=0;
...@@ -117,7 +93,6 @@ from t1 where a=2; ...@@ -117,7 +93,6 @@ from t1 where a=2;
# pk update to null # pk update to null
update t1 set d=null where a=1; update t1 set d=null where a=1;
commit; commit;
# FIXME now fails at random due to weird mixup between the 2 rows
select a from t1 where d is null; select a from t1 where d is null;
# pk delete # pk delete
...@@ -126,6 +101,49 @@ delete from t1 where a=2; ...@@ -126,6 +101,49 @@ delete from t1 where a=2;
commit; commit;
select count(*) from t1; select count(*) from t1;
# -- replace ( bug-6018 ) --
# insert
replace t1 set a=1,b=@b1,c=111,d=@d1;
replace t1 set a=2,b=@b2,c=222,d=@d2;
commit;
explain select * from t1 where a = 1;
# pk read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
# update
replace t1 set a=1,b=@b2,c=111,d=@d2;
replace t1 set a=2,b=@b1,c=222,d=@d1;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
# update
replace t1 set a=1,b=concat(@b2,@b2),c=111,d=concat(@d2,@d2);
replace t1 set a=2,b=concat(@b1,@b1),c=222,d=concat(@d1,@d1);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
# update to null
replace t1 set a=1,b='xyz',c=111,d=null;
commit;
select a,b from t1 where d is null;
# pk delete
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
# -- hash index ops -- # -- hash index ops --
insert into t1 values(1,@b1,111,@d1); insert into t1 values(1,@b1,111,@d1);
...@@ -231,39 +249,6 @@ where c >= 100; ...@@ -231,39 +249,6 @@ where c >= 100;
commit; commit;
select * from t1 where c >= 100 order by a; select * from t1 where c >= 100 order by a;
# alter table
select * from t1 order by a;
alter table t1 add x int;
select * from t1 order by a;
alter table t1 drop x;
select * from t1 order by a;
# multi db
create database mysqltest;
use mysqltest;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
drop table t2;
use test;
# alter table
select * from t1 order by a;
alter table t1 add x int;
select * from t1 order by a;
alter table t1 drop x;
select * from t1 order by a;
# range scan delete # range scan delete
delete from t1 where c >= 100; delete from t1 where c >= 100;
commit; commit;
...@@ -306,10 +291,77 @@ select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3) ...@@ -306,10 +291,77 @@ select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a; from t1 order by a;
rollback; rollback;
select count(*) from t1; select count(*) from t1;
# -- alter table and multi db --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
select * from t1 order by a;
alter table t1 add x int;
select * from t1 order by a;
alter table t1 drop x;
select * from t1 order by a;
create database test2;
use test2;
CREATE TABLE t2 (
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null,
c int unsigned
) engine=ndbcluster;
insert into t2 values (1,1,1),(2,2,2);
select * from test.t1,t2 where test.t1.a = t2.a order by test.t1.a;
drop table t2;
use test;
select * from t1 order by a;
alter table t1 add x int;
select * from t1 order by a;
alter table t1 drop x;
select * from t1 order by a;
# -- end general test --
drop table t1;
drop database test2;
# -- bug-5252 tinytext crashes --
create table t1 (
a int not null primary key,
b tinytext
) engine=ndbcluster;
insert into t1 values(1, 'x');
update t1 set b = 'y';
select * from t1;
delete from t1;
drop table t1;
# -- bug-5013 insert empty string to text --
create table t1 (
a int not null primary key,
b text not null
) engine=ndbcluster;
insert into t1 values(1, '');
select * from t1;
drop table t1; drop table t1;
drop database mysqltest;
# bug #5349 # -- bug #5349 --
set autocommit=1; set autocommit=1;
use test; use test;
CREATE TABLE t1 ( CREATE TABLE t1 (
...@@ -327,7 +379,7 @@ select * from t1 order by a; ...@@ -327,7 +379,7 @@ select * from t1 order by a;
alter table t1 engine=ndb; alter table t1 engine=ndb;
select * from t1 order by a; select * from t1 order by a;
# bug #5872 # -- bug #5872 --
alter table t1 engine=myisam; alter table t1 engine=myisam;
select * from t1 order by a; select * from t1 order by a;
drop table t1; drop table t1;
...@@ -36,7 +36,7 @@ class NdbColumnImpl; ...@@ -36,7 +36,7 @@ class NdbColumnImpl;
* Blob data is stored in 2 places: * Blob data is stored in 2 places:
* *
* - "header" and "inline bytes" stored in the blob attribute * - "header" and "inline bytes" stored in the blob attribute
* - "blob parts" stored in a separate table NDB$BLOB_<t>_<v>_<c> * - "blob parts" stored in a separate table NDB$BLOB_<tid>_<cid>
* *
* Inline and part sizes can be set via NdbDictionary::Column methods * Inline and part sizes can be set via NdbDictionary::Column methods
* when the table is created. * when the table is created.
...@@ -74,23 +74,21 @@ class NdbColumnImpl; ...@@ -74,23 +74,21 @@ class NdbColumnImpl;
* NdbBlob methods return -1 on error and 0 on success, and use output * NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary. * parameters when necessary.
* *
* Notes: * Operation types:
* - table and its blob part tables are not created atomically * - insertTuple must use setValue if blob column is non-nullable
* - scan must use the "new" interface NdbScanOperation * - readTuple with exclusive lock can also update existing value
* - to update a blob in a read op requires exclusive tuple lock * - updateTuple can overwrite with setValue or update existing value
* - update op in scan must do its own getBlobHandle * - writeTuple always overwrites and must use setValue if non-nullable
* - delete creates implicit, not-accessible blob handles * - deleteTuple creates implicit non-accessible blob handles
* - NdbOperation::writeTuple does not support blobs * - scan with exclusive lock can also update existing value
* - there is no support for an asynchronous interface * - scan "lock takeover" update op must do its own getBlobHandle
* *
* Bugs / limitations: * Bugs / limitations:
* - scan must use exclusive locking for now * - lock mode upgrade should be handled automatically
* * - lock mode vs allowed operation is not checked
* Todo: * - too many pending blob ops can blow up i/o buffers
* - add scan method hold-read-lock + return-keyinfo * - table and its blob part tables are not created atomically
* - check keyinfo length when setting keys * - there is no support for an asynchronous interface
* - check allowed blob ops vs locking mode
* - overload control (too many pending ops)
*/ */
class NdbBlob { class NdbBlob {
public: public:
...@@ -172,19 +170,11 @@ public: ...@@ -172,19 +170,11 @@ public:
* read in the in/out bytes parameter. * read in the in/out bytes parameter.
*/ */
int readData(void* data, Uint32& bytes); int readData(void* data, Uint32& bytes);
/**
* Read at given position. Does not use or update current position.
*/
int readData(Uint64 pos, void* data, Uint32& bytes);
/** /**
* Write at current position and set new position to first byte after * Write at current position and set new position to first byte after
* the data written. A write past blob end extends the blob value. * the data written. A write past blob end extends the blob value.
*/ */
int writeData(const void* data, Uint32 bytes); int writeData(const void* data, Uint32 bytes);
/**
* Write at given position. Does not use or update current position.
*/
int writeData(Uint64 pos, const void* data, Uint32 bytes);
/** /**
* Return the blob column. * Return the blob column.
*/ */
...@@ -266,14 +256,17 @@ private: ...@@ -266,14 +256,17 @@ private:
Buf(); Buf();
~Buf(); ~Buf();
void alloc(unsigned n); void alloc(unsigned n);
void copyfrom(const Buf& src);
}; };
Buf theKeyBuf; Buf theKeyBuf;
Buf theAccessKeyBuf; Buf theAccessKeyBuf;
Buf theHeadInlineBuf; Buf theHeadInlineBuf;
Buf theHeadInlineCopyBuf; // for writeTuple
Buf thePartBuf; Buf thePartBuf;
Head* theHead; Head* theHead;
char* theInlineData; char* theInlineData;
NdbRecAttr* theHeadInlineRecAttr; NdbRecAttr* theHeadInlineRecAttr;
NdbOperation* theHeadInlineReadOp;
bool theHeadInlineUpdateFlag; bool theHeadInlineUpdateFlag;
// length and read/write position // length and read/write position
int theNullFlag; int theNullFlag;
...@@ -294,6 +287,7 @@ private: ...@@ -294,6 +287,7 @@ private:
bool isReadOp(); bool isReadOp();
bool isInsertOp(); bool isInsertOp();
bool isUpdateOp(); bool isUpdateOp();
bool isWriteOp();
bool isDeleteOp(); bool isDeleteOp();
bool isScanOp(); bool isScanOp();
// computations // computations
...@@ -309,12 +303,13 @@ private: ...@@ -309,12 +303,13 @@ private:
void getHeadFromRecAttr(); void getHeadFromRecAttr();
int setHeadInlineValue(NdbOperation* anOp); int setHeadInlineValue(NdbOperation* anOp);
// data operations // data operations
int readDataPrivate(Uint64 pos, char* buf, Uint32& bytes); int readDataPrivate(char* buf, Uint32& bytes);
int writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes); int writeDataPrivate(const char* buf, Uint32 bytes);
int readParts(char* buf, Uint32 part, Uint32 count); int readParts(char* buf, Uint32 part, Uint32 count);
int insertParts(const char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count);
int updateParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count);
int deleteParts(Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count);
int deletePartsUnknown(Uint32 part);
// pending ops // pending ops
int executePendingBlobReads(); int executePendingBlobReads();
int executePendingBlobWrites(); int executePendingBlobWrites();
......
...@@ -526,7 +526,7 @@ private: ...@@ -526,7 +526,7 @@ private:
int sendCOMMIT(); // Send a TC_COMMITREQ signal; int sendCOMMIT(); // Send a TC_COMMITREQ signal;
void setGCI(int GCI); // Set the global checkpoint identity void setGCI(int GCI); // Set the global checkpoint identity
int OpCompleteFailure(Uint8 abortoption); int OpCompleteFailure(Uint8 abortoption, bool setFailure = true);
int OpCompleteSuccess(); int OpCompleteSuccess();
void CompletedOperations(); // Move active ops to list of completed void CompletedOperations(); // Move active ops to list of completed
...@@ -552,7 +552,7 @@ private: ...@@ -552,7 +552,7 @@ private:
void setOperationErrorCode(int anErrorCode); void setOperationErrorCode(int anErrorCode);
// Indicate something went wrong in the definition phase // Indicate something went wrong in the definition phase
void setOperationErrorCodeAbort(int anErrorCode); void setOperationErrorCodeAbort(int anErrorCode, int abortOption = -1);
int checkMagicNumber(); // Verify correct object int checkMagicNumber(); // Verify correct object
NdbOperation* getNdbOperation(const class NdbTableImpl* aTable, NdbOperation* getNdbOperation(const class NdbTableImpl* aTable,
......
...@@ -49,6 +49,9 @@ public: ...@@ -49,6 +49,9 @@ public:
* @{ * @{
*/ */
/** insert is not allowed */
int insertTuple();
/** /**
* Define the NdbIndexOperation to be a standard operation of type readTuple. * Define the NdbIndexOperation to be a standard operation of type readTuple.
* When calling NdbConnection::execute, this operation * When calling NdbConnection::execute, this operation
...@@ -193,6 +196,7 @@ private: ...@@ -193,6 +196,7 @@ private:
// Private attributes // Private attributes
const NdbIndexImpl* m_theIndex; const NdbIndexImpl* m_theIndex;
const NdbTableImpl* m_thePrimaryTable;
Uint32 m_theIndexDefined[NDB_MAX_ATTRIBUTES_IN_INDEX][3]; Uint32 m_theIndexDefined[NDB_MAX_ATTRIBUTES_IN_INDEX][3];
Uint32 m_theIndexLen; // Length of the index in words Uint32 m_theIndexLen; // Length of the index in words
Uint32 m_theNoOfIndexDefined; // The number of index attributes Uint32 m_theNoOfIndexDefined; // The number of index attributes
......
...@@ -918,6 +918,13 @@ protected: ...@@ -918,6 +918,13 @@ protected:
// Blobs in this operation // Blobs in this operation
NdbBlob* theBlobList; NdbBlob* theBlobList;
/*
* Abort option per operation, used by blobs. Default -1. If set,
* overrides abort option on connection level. If set to IgnoreError,
* does not cause execute() to return failure. This is different from
* IgnoreError on connection level.
*/
Int8 m_abortOption;
}; };
#ifdef NDB_NO_DROPPED_SIGNAL #ifdef NDB_NO_DROPPED_SIGNAL
...@@ -1160,5 +1167,3 @@ NdbOperation::setValue(Uint32 anAttrId, double aPar) ...@@ -1160,5 +1167,3 @@ NdbOperation::setValue(Uint32 anAttrId, double aPar)
} }
#endif #endif
...@@ -33,21 +33,24 @@ ...@@ -33,21 +33,24 @@
ndbout << prefix << " " << hex << (void*)this << " " << cname; \ ndbout << prefix << " " << hex << (void*)this << " " << cname; \
ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \ ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \
} while (0) } while (0)
#else
#define DBG(x)
#endif
static char* static char*
ndb_blob_debug(const Uint32* data, unsigned size) ndb_blob_debug(const Uint32* data, unsigned size)
{ {
static char buf[128 + 1]; // MT irrelevant static char buf[200]; // MT irrelevant
buf[0] = 0; buf[0] = 0;
for (unsigned i = 0; i < size && i < 128 / 4; i++) { for (unsigned i = 0; i < size; i++) {
sprintf(buf + strlen(buf), "%*s%08x", i != 0, "", data[i]); unsigned n = strlen(buf);
if (n + 10 < sizeof(buf))
sprintf(buf + n, "%*s%08x", i != 0, "", data[i]);
} }
return buf; return buf;
} }
#else
#define DBG(x)
#endif
/* /*
* Reading index table directly (as a table) is faster but there are * Reading index table directly (as a table) is faster but there are
* bugs or limitations. Keep the code and make possible to choose. * bugs or limitations. Keep the code and make possible to choose.
...@@ -162,6 +165,7 @@ NdbBlob::init() ...@@ -162,6 +165,7 @@ NdbBlob::init()
theHead = NULL; theHead = NULL;
theInlineData = NULL; theInlineData = NULL;
theHeadInlineRecAttr = NULL; theHeadInlineRecAttr = NULL;
theHeadInlineReadOp = NULL;
theHeadInlineUpdateFlag = false; theHeadInlineUpdateFlag = false;
theNullFlag = -1; theNullFlag = -1;
theLength = 0; theLength = 0;
...@@ -206,6 +210,13 @@ NdbBlob::Buf::alloc(unsigned n) ...@@ -206,6 +210,13 @@ NdbBlob::Buf::alloc(unsigned n)
#endif #endif
} }
void
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
{
assert(size == src.size);
memcpy(data, src.data, size);
}
// classify operations (inline) // classify operations (inline)
inline bool inline bool
...@@ -226,6 +237,7 @@ NdbBlob::isKeyOp() ...@@ -226,6 +237,7 @@ NdbBlob::isKeyOp()
return return
theNdbOp->theOperationType == NdbOperation::InsertRequest || theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest || theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
theNdbOp->theOperationType == NdbOperation::WriteRequest ||
theNdbOp->theOperationType == NdbOperation::ReadRequest || theNdbOp->theOperationType == NdbOperation::ReadRequest ||
theNdbOp->theOperationType == NdbOperation::ReadExclusive || theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
theNdbOp->theOperationType == NdbOperation::DeleteRequest; theNdbOp->theOperationType == NdbOperation::DeleteRequest;
...@@ -253,6 +265,13 @@ NdbBlob::isUpdateOp() ...@@ -253,6 +265,13 @@ NdbBlob::isUpdateOp()
theNdbOp->theOperationType == NdbOperation::UpdateRequest; theNdbOp->theOperationType == NdbOperation::UpdateRequest;
} }
inline bool
NdbBlob::isWriteOp()
{
return
theNdbOp->theOperationType == NdbOperation::WriteRequest;
}
inline bool inline bool
NdbBlob::isDeleteOp() NdbBlob::isDeleteOp()
{ {
...@@ -289,7 +308,7 @@ inline Uint32 ...@@ -289,7 +308,7 @@ inline Uint32
NdbBlob::getDistKey(Uint32 part) NdbBlob::getDistKey(Uint32 part)
{ {
assert(theStripeSize != 0); assert(theStripeSize != 0);
return (part / theStripeSize) % theStripeSize; return part / theStripeSize;
} }
// getters and setters // getters and setters
...@@ -401,7 +420,7 @@ NdbBlob::getHeadFromRecAttr() ...@@ -401,7 +420,7 @@ NdbBlob::getHeadFromRecAttr()
theNullFlag = theHeadInlineRecAttr->isNULL(); theNullFlag = theHeadInlineRecAttr->isNULL();
assert(theNullFlag != -1); assert(theNullFlag != -1);
theLength = ! theNullFlag ? theHead->length : 0; theLength = ! theNullFlag ? theHead->length : 0;
DBG("getHeadFromRecAttr out"); DBG("getHeadFromRecAttr [out]");
} }
int int
...@@ -453,7 +472,7 @@ NdbBlob::setValue(const void* data, Uint32 bytes) ...@@ -453,7 +472,7 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
setErrorCode(ErrState); setErrorCode(ErrState);
return -1; return -1;
} }
if (! isInsertOp() && ! isUpdateOp()) { if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) {
setErrorCode(ErrUsage); setErrorCode(ErrUsage);
return -1; return -1;
} }
...@@ -466,11 +485,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes) ...@@ -466,11 +485,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
theGetSetBytes = bytes; theGetSetBytes = bytes;
if (isInsertOp()) { if (isInsertOp()) {
// write inline part now // write inline part now
if (theSetBuf != 0) { if (theSetBuf != NULL) {
unsigned n = theGetSetBytes; Uint32 n = theGetSetBytes;
if (n > theInlineSize) if (n > theInlineSize)
n = theInlineSize; n = theInlineSize;
if (writeDataPrivate(0, theSetBuf, n) == -1) assert(thePos == 0);
if (writeDataPrivate(theSetBuf, n) == -1)
return -1; return -1;
} else { } else {
theNullFlag = true; theNullFlag = true;
...@@ -555,7 +575,7 @@ NdbBlob::getLength(Uint64& len) ...@@ -555,7 +575,7 @@ NdbBlob::getLength(Uint64& len)
int int
NdbBlob::truncate(Uint64 length) NdbBlob::truncate(Uint64 length)
{ {
DBG("truncate length=" << length); DBG("truncate [in] length=" << length);
if (theNullFlag == -1) { if (theNullFlag == -1) {
setErrorCode(ErrState); setErrorCode(ErrState);
return -1; return -1;
...@@ -573,7 +593,10 @@ NdbBlob::truncate(Uint64 length) ...@@ -573,7 +593,10 @@ NdbBlob::truncate(Uint64 length)
} }
theLength = length; theLength = length;
theHeadInlineUpdateFlag = true; theHeadInlineUpdateFlag = true;
if (thePos > length)
thePos = length;
} }
DBG("truncate [out]");
return 0; return 0;
} }
...@@ -609,33 +632,21 @@ NdbBlob::setPos(Uint64 pos) ...@@ -609,33 +632,21 @@ NdbBlob::setPos(Uint64 pos)
int int
NdbBlob::readData(void* data, Uint32& bytes) NdbBlob::readData(void* data, Uint32& bytes)
{
if (readData(thePos, data, bytes) == -1)
return -1;
thePos += bytes;
assert(thePos <= theLength);
return 0;
}
int
NdbBlob::readData(Uint64 pos, void* data, Uint32& bytes)
{ {
if (theState != Active) { if (theState != Active) {
setErrorCode(ErrState); setErrorCode(ErrState);
return -1; return -1;
} }
char* buf = static_cast<char*>(data); char* buf = static_cast<char*>(data);
return readDataPrivate(pos, buf, bytes); return readDataPrivate(buf, bytes);
} }
int int
NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
{ {
DBG("readData pos=" << pos << " bytes=" << bytes); DBG("readData [in] bytes=" << bytes);
if (pos > theLength) { assert(thePos <= theLength);
setErrorCode(ErrSeek); Uint64 pos = thePos;
return -1;
}
if (bytes > theLength - pos) if (bytes > theLength - pos)
bytes = theLength - pos; bytes = theLength - pos;
Uint32 len = bytes; Uint32 len = bytes;
...@@ -709,38 +720,29 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) ...@@ -709,38 +720,29 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
len -= n; len -= n;
} }
assert(len == 0); assert(len == 0);
return 0; thePos = pos;
}
int
NdbBlob::writeData(const void* data, Uint32 bytes)
{
if (writeData(thePos, data, bytes) == -1)
return -1;
thePos += bytes;
assert(thePos <= theLength); assert(thePos <= theLength);
DBG("readData [out]");
return 0; return 0;
} }
int int
NdbBlob::writeData(Uint64 pos, const void* data, Uint32 bytes) NdbBlob::writeData(const void* data, Uint32 bytes)
{ {
if (theState != Active) { if (theState != Active) {
setErrorCode(ErrState); setErrorCode(ErrState);
return -1; return -1;
} }
const char* buf = static_cast<const char*>(data); const char* buf = static_cast<const char*>(data);
return writeDataPrivate(pos, buf, bytes); return writeDataPrivate(buf, bytes);
} }
int int
NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
{ {
DBG("writeData pos=" << pos << " bytes=" << bytes); DBG("writeData [in] bytes=" << bytes);
if (pos > theLength) { assert(thePos <= theLength);
setErrorCode(ErrSeek); Uint64 pos = thePos;
return -1;
}
Uint32 len = bytes; Uint32 len = bytes;
// any write makes blob not NULL // any write makes blob not NULL
if (theNullFlag) { if (theNullFlag) {
...@@ -778,7 +780,7 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) ...@@ -778,7 +780,7 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
if (readParts(thePartBuf.data, part, 1) == -1) if (readParts(thePartBuf.data, part, 1) == -1)
return -1; return -1;
// need result now // need result now
DBG("execute pending part reafs"); DBG("execute pending part reads");
if (executePendingBlobReads() == -1) if (executePendingBlobReads() == -1)
return -1; return -1;
Uint32 n = thePartSize - off; Uint32 n = thePartSize - off;
...@@ -855,14 +857,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) ...@@ -855,14 +857,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
theLength = pos; theLength = pos;
theHeadInlineUpdateFlag = true; theHeadInlineUpdateFlag = true;
} }
DBG("writeData out"); thePos = pos;
assert(thePos <= theLength);
DBG("writeData [out]");
return 0; return 0;
} }
int int
NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
{ {
DBG("readParts part=" << part << " count=" << count); DBG("readParts [in] part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
...@@ -873,6 +877,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) ...@@ -873,6 +877,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
tOp->m_abortOption = AbortOnError;
buf += thePartSize; buf += thePartSize;
n++; n++;
thePendingBlobOps |= (1 << NdbOperation::ReadRequest); thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
...@@ -884,7 +889,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) ...@@ -884,7 +889,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
int int
NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
{ {
DBG("insertParts part=" << part << " count=" << count); DBG("insertParts [in] part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
...@@ -895,6 +900,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) ...@@ -895,6 +900,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
tOp->m_abortOption = AbortOnError;
buf += thePartSize; buf += thePartSize;
n++; n++;
thePendingBlobOps |= (1 << NdbOperation::InsertRequest); thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
...@@ -906,7 +912,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) ...@@ -906,7 +912,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
int int
NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
{ {
DBG("updateParts part=" << part << " count=" << count); DBG("updateParts [in] part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
...@@ -917,6 +923,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) ...@@ -917,6 +923,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
tOp->m_abortOption = AbortOnError;
buf += thePartSize; buf += thePartSize;
n++; n++;
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
...@@ -928,7 +935,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) ...@@ -928,7 +935,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
int int
NdbBlob::deleteParts(Uint32 part, Uint32 count) NdbBlob::deleteParts(Uint32 part, Uint32 count)
{ {
DBG("deleteParts part=" << part << " count=" << count); DBG("deleteParts [in] part=" << part << " count=" << count);
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
...@@ -938,6 +945,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) ...@@ -938,6 +945,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
tOp->m_abortOption = AbortOnError;
n++; n++;
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
...@@ -945,6 +953,57 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) ...@@ -945,6 +953,57 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
return 0; return 0;
} }
/*
* Number of blob parts not known. Used to check for race condition
* when writeTuple is used for insert. Deletes all parts found.
*/
int
NdbBlob::deletePartsUnknown(Uint32 part)
{
DBG("deletePartsUnknown [in] part=" << part << " count=all");
static const unsigned maxbat = 256;
static const unsigned minbat = 1;
unsigned bat = minbat;
NdbOperation* tOpList[maxbat];
Uint32 count = 0;
while (true) {
Uint32 n;
n = 0;
while (n < bat) {
NdbOperation*& tOp = tOpList[n]; // ref
tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL ||
tOp->deleteTuple() == -1 ||
setPartKeyValue(tOp, part + count + n) == -1) {
setErrorCode(tOp);
return -1;
}
tOp->m_abortOption = IgnoreError;
n++;
if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1;
}
n = 0;
while (n < bat) {
NdbOperation* tOp = tOpList[n];
if (tOp->theError.code != 0) {
if (tOp->theError.code != 626) {
setErrorCode(tOp);
return -1;
}
// first non-existent part
DBG("deletePartsUnknown [out] count=" << count);
return 0;
}
n++;
count++;
}
bat *= 4;
if (bat > maxbat)
bat = maxbat;
}
}
// pending ops // pending ops
int int
...@@ -1007,7 +1066,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1007,7 +1066,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theTable = anOp->m_currentTable; theTable = anOp->m_currentTable;
theAccessTable = anOp->m_accessTable; theAccessTable = anOp->m_accessTable;
theColumn = aColumn; theColumn = aColumn;
DBG("atPrepare"); DBG("atPrepare [in]");
NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined; NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
switch (theColumn->getType()) { switch (theColumn->getType()) {
case NdbDictionary::Column::Blob: case NdbDictionary::Column::Blob:
...@@ -1046,6 +1105,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1046,6 +1105,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2);
theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2); theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
thePartBuf.alloc(thePartSize); thePartBuf.alloc(thePartSize);
theHead = (Head*)theHeadInlineBuf.data; theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head); theInlineData = theHeadInlineBuf.data + sizeof(Head);
...@@ -1080,6 +1140,12 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1080,6 +1140,12 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theNullFlag = true; theNullFlag = true;
theLength = 0; theLength = 0;
} }
if (isWriteOp()) {
// becomes NULL unless set before execute
theNullFlag = true;
theLength = 0;
theHeadInlineUpdateFlag = true;
}
supportedOp = true; supportedOp = true;
} }
if (isScanOp()) { if (isScanOp()) {
...@@ -1093,19 +1159,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* ...@@ -1093,19 +1159,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
return -1; return -1;
} }
setState(Prepared); setState(Prepared);
DBG("atPrepare out"); DBG("atPrepare [out]");
return 0; return 0;
} }
/* /*
* Before execute of prepared operation. May add new operations before * Before execute of prepared operation. May add new operations before
* this one. May ask that this operation and all before it (a "batch") * this one. May ask that this operation and all before it (a "batch")
* is executed immediately in no-commit mode. * is executed immediately in no-commit mode. In this case remaining
* prepared operations are saved in a separate list. They are added
* back after postExecute.
*/ */
int int
NdbBlob::preExecute(ExecType anExecType, bool& batch) NdbBlob::preExecute(ExecType anExecType, bool& batch)
{ {
DBG("preExecute"); DBG("preExecute [in]");
if (theState == Invalid) if (theState == Invalid)
return -1; return -1;
assert(theState == Prepared); assert(theState == Prepared);
...@@ -1120,11 +1188,11 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1120,11 +1188,11 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
if (isInsertOp()) { if (isInsertOp()) {
if (theSetFlag && theGetSetBytes > theInlineSize) { if (theSetFlag && theGetSetBytes > theInlineSize) {
// add ops to write rest of a setValue // add ops to write rest of a setValue
assert(theSetBuf != 0); assert(theSetBuf != NULL);
Uint64 pos = theInlineSize;
const char* buf = theSetBuf + theInlineSize; const char* buf = theSetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize; Uint32 bytes = theGetSetBytes - theInlineSize;
if (writeDataPrivate(pos, buf, bytes) == -1) assert(thePos == theInlineSize);
if (writeDataPrivate(buf, bytes) == -1)
return -1; return -1;
if (theHeadInlineUpdateFlag) { if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline // add an operation to update head+inline
...@@ -1136,11 +1204,12 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1136,11 +1204,12 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(ErrAbort); setErrorCode(ErrAbort);
return -1; return -1;
} }
DBG("add op to update head+inline");
} }
} }
} }
if (isTableOp()) { if (isTableOp()) {
if (isUpdateOp() || isDeleteOp()) { if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
// add operation before this one to read head+inline // add operation before this one to read head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp); NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
if (tOp == NULL || if (tOp == NULL ||
...@@ -1150,8 +1219,13 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1150,8 +1219,13 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
if (isWriteOp()) {
tOp->m_abortOption = IgnoreError;
}
theHeadInlineReadOp = tOp;
// execute immediately // execute immediately
batch = true; batch = true;
DBG("add op before to read head+inline");
} }
} }
if (isIndexOp()) { if (isIndexOp()) {
...@@ -1180,6 +1254,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1180,6 +1254,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
} }
} }
} }
DBG("added op before to read table key");
if (isUpdateOp() || isDeleteOp()) { if (isUpdateOp() || isDeleteOp()) {
// add op before this one to read head+inline via index // add op before this one to read head+inline via index
NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
...@@ -1190,15 +1265,43 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1190,15 +1265,43 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(tOp); setErrorCode(tOp);
return -1; return -1;
} }
if (isWriteOp()) {
tOp->m_abortOption = IgnoreError;
}
theHeadInlineReadOp = tOp;
// execute immediately // execute immediately
batch = true; batch = true;
DBG("added index op before to read head+inline");
}
if (isWriteOp()) {
// XXX until IgnoreError fixed for index op
batch = true;
}
}
if (isWriteOp()) {
if (theSetFlag) {
// write head+inline now
theNullFlag = true;
theLength = 0;
if (theSetBuf != NULL) {
Uint32 n = theGetSetBytes;
if (n > theInlineSize)
n = theInlineSize;
assert(thePos == 0);
if (writeDataPrivate(theSetBuf, n) == -1)
return -1;
}
if (setHeadInlineValue(theNdbOp) == -1)
return -1;
// the read op before us may overwrite
theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
} }
} }
if (theActiveHook != NULL) { if (theActiveHook != NULL) {
// need blob head for callback // need blob head for callback
batch = true; batch = true;
} }
DBG("preExecute out batch=" << batch); DBG("preExecute [out] batch=" << batch);
return 0; return 0;
} }
...@@ -1211,15 +1314,16 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1211,15 +1314,16 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
int int
NdbBlob::postExecute(ExecType anExecType) NdbBlob::postExecute(ExecType anExecType)
{ {
DBG("postExecute type=" << anExecType); DBG("postExecute [in] type=" << anExecType);
if (theState == Invalid) if (theState == Invalid)
return -1; return -1;
if (theState == Active) { if (theState == Active) {
setState(anExecType == NoCommit ? Active : Closed); setState(anExecType == NoCommit ? Active : Closed);
DBG("postExecute skip"); DBG("postExecute [skip]");
return 0; return 0;
} }
assert(theState == Prepared); assert(theState == Prepared);
setState(anExecType == NoCommit ? Active : Closed);
assert(isKeyOp()); assert(isKeyOp());
if (isIndexOp()) { if (isIndexOp()) {
NdbBlob* tFirstBlob = theNdbOp->theBlobList; NdbBlob* tFirstBlob = theNdbOp->theBlobList;
...@@ -1231,22 +1335,13 @@ NdbBlob::postExecute(ExecType anExecType) ...@@ -1231,22 +1335,13 @@ NdbBlob::postExecute(ExecType anExecType)
} }
if (isReadOp()) { if (isReadOp()) {
getHeadFromRecAttr(); getHeadFromRecAttr();
if (theGetFlag && theGetSetBytes > 0) { if (setPos(0) == -1)
// copy inline bytes to user buffer return -1;
assert(theGetBuf != NULL); if (theGetFlag) {
unsigned n = theGetSetBytes; assert(theGetSetBytes == 0 || theGetBuf != 0);
if (n > theInlineSize) assert(theGetSetBytes <= theInlineSize || anExecType == NoCommit);
n = theInlineSize; Uint32 bytes = theGetSetBytes;
memcpy(theGetBuf, theInlineData, n); if (readDataPrivate(theGetBuf, bytes) == -1)
}
if (theGetFlag && theGetSetBytes > theInlineSize) {
// add ops to read rest of a getValue
assert(anExecType == NoCommit);
assert(theGetBuf != 0);
Uint64 pos = theInlineSize;
char* buf = theGetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
if (readDataPrivate(pos, buf, bytes) == -1)
return -1; return -1;
} }
} }
...@@ -1255,10 +1350,11 @@ NdbBlob::postExecute(ExecType anExecType) ...@@ -1255,10 +1350,11 @@ NdbBlob::postExecute(ExecType anExecType)
getHeadFromRecAttr(); getHeadFromRecAttr();
if (theSetFlag) { if (theSetFlag) {
// setValue overwrites everything // setValue overwrites everything
if (theSetBuf != 0) { if (theSetBuf != NULL) {
if (truncate(0) == -1) if (truncate(0) == -1)
return -1; return -1;
if (writeDataPrivate(0, theSetBuf, theGetSetBytes) == -1) assert(thePos == 0);
if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
return -1; return -1;
} else { } else {
if (setNull() == -1) if (setNull() == -1)
...@@ -1266,6 +1362,57 @@ NdbBlob::postExecute(ExecType anExecType) ...@@ -1266,6 +1362,57 @@ NdbBlob::postExecute(ExecType anExecType)
} }
} }
} }
if (isWriteOp() && isTableOp()) {
assert(anExecType == NoCommit);
if (theHeadInlineReadOp->theError.code == 0) {
int tNullFlag = theNullFlag;
Uint64 tLength = theLength;
Uint64 tPos = thePos;
getHeadFromRecAttr();
DBG("tuple found");
if (truncate(0) == -1)
return -1;
// restore previous head+inline
theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
theNullFlag = tNullFlag;
theLength = tLength;
thePos = tPos;
} else {
if (theHeadInlineReadOp->theError.code != 626) {
setErrorCode(theHeadInlineReadOp);
return -1;
}
DBG("tuple not found");
/*
* Read found no tuple but it is possible that a tuple was
* created after the read by another transaction. Delete all
* blob parts which may exist.
*/
if (deletePartsUnknown(0) == -1)
return -1;
}
if (theSetFlag && theGetSetBytes > theInlineSize) {
assert(theSetBuf != NULL);
const char* buf = theSetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
assert(thePos == theInlineSize);
if (writeDataPrivate(buf, bytes) == -1)
return -1;
}
}
if (isWriteOp() && isIndexOp()) {
// XXX until IgnoreError fixed for index op
if (deletePartsUnknown(0) == -1)
return -1;
if (theSetFlag && theGetSetBytes > theInlineSize) {
assert(theSetBuf != NULL);
const char* buf = theSetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
assert(thePos == theInlineSize);
if (writeDataPrivate(buf, bytes) == -1)
return -1;
}
}
if (isDeleteOp()) { if (isDeleteOp()) {
assert(anExecType == NoCommit); assert(anExecType == NoCommit);
getHeadFromRecAttr(); getHeadFromRecAttr();
...@@ -1278,7 +1425,7 @@ NdbBlob::postExecute(ExecType anExecType) ...@@ -1278,7 +1425,7 @@ NdbBlob::postExecute(ExecType anExecType)
if (invokeActiveHook() == -1) if (invokeActiveHook() == -1)
return -1; return -1;
} }
DBG("postExecute out"); DBG("postExecute [out]");
return 0; return 0;
} }
...@@ -1289,12 +1436,12 @@ NdbBlob::postExecute(ExecType anExecType) ...@@ -1289,12 +1436,12 @@ NdbBlob::postExecute(ExecType anExecType)
int int
NdbBlob::preCommit() NdbBlob::preCommit()
{ {
DBG("preCommit"); DBG("preCommit [in]");
if (theState == Invalid) if (theState == Invalid)
return -1; return -1;
assert(theState == Active); assert(theState == Active);
assert(isKeyOp()); assert(isKeyOp());
if (isInsertOp() || isUpdateOp()) { if (isInsertOp() || isUpdateOp() || isWriteOp()) {
if (theHeadInlineUpdateFlag) { if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline // add an operation to update head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
...@@ -1305,9 +1452,11 @@ NdbBlob::preCommit() ...@@ -1305,9 +1452,11 @@ NdbBlob::preCommit()
setErrorCode(ErrAbort); setErrorCode(ErrAbort);
return -1; return -1;
} }
tOp->m_abortOption = AbortOnError;
DBG("added op to update head+inline");
} }
} }
DBG("preCommit out"); DBG("preCommit [out]");
return 0; return 0;
} }
...@@ -1317,13 +1466,10 @@ NdbBlob::preCommit() ...@@ -1317,13 +1466,10 @@ NdbBlob::preCommit()
int int
NdbBlob::atNextResult() NdbBlob::atNextResult()
{ {
DBG("atNextResult"); DBG("atNextResult [in]");
if (theState == Invalid) if (theState == Invalid)
return -1; return -1;
assert(isScanOp()); assert(isScanOp());
getHeadFromRecAttr();
// reset position
thePos = 0;
// get primary key // get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data; { Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords; unsigned size = theTable->m_sizeOfKeysInWords;
...@@ -1332,26 +1478,14 @@ NdbBlob::atNextResult() ...@@ -1332,26 +1478,14 @@ NdbBlob::atNextResult()
return -1; return -1;
} }
} }
if (! theNullFlag) { getHeadFromRecAttr();
if (theGetFlag && theGetSetBytes > 0) { if (setPos(0) == -1)
// copy inline bytes to user buffer return -1;
assert(theGetBuf != NULL); if (theGetFlag) {
unsigned n = theGetSetBytes; assert(theGetSetBytes == 0 || theGetBuf != 0);
if (n > theLength) Uint32 bytes = theGetSetBytes;
n = theLength; if (readDataPrivate(theGetBuf, bytes) == -1)
if (n > theInlineSize) return -1;
n = theInlineSize;
memcpy(theGetBuf, theInlineData, n);
}
if (theGetFlag && theGetSetBytes > theInlineSize && theLength > theInlineSize) {
// add ops to read rest of a getValue
assert(theGetBuf != 0);
Uint64 pos = theInlineSize;
char* buf = theGetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
if (readDataPrivate(pos, buf, bytes) == -1)
return -1;
}
} }
setState(Active); setState(Active);
// activation callback // activation callback
...@@ -1359,7 +1493,7 @@ NdbBlob::atNextResult() ...@@ -1359,7 +1493,7 @@ NdbBlob::atNextResult()
if (invokeActiveHook() == -1) if (invokeActiveHook() == -1)
return -1; return -1;
} }
DBG("atNextResult out"); DBG("atNextResult [out]");
return 0; return 0;
} }
...@@ -1444,7 +1578,8 @@ operator<<(NdbOut& out, const NdbBlob& blob) ...@@ -1444,7 +1578,8 @@ operator<<(NdbOut& out, const NdbBlob& blob)
ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " n=" << blob.theNullFlag;;
ndbout << dec << " l=" << blob.theLength; ndbout << dec << " l=" << blob.theLength;
ndbout << dec << " p=" << blob.thePos; ndbout << dec << " p=" << blob.thePos;
ndbout << dec << " u=" << (Uint32) blob.theHeadInlineUpdateFlag; ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag;
ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes;
return out; return out;
} }
#endif #endif
...@@ -170,12 +170,14 @@ Remark: Sets an error code on the connection object from an ...@@ -170,12 +170,14 @@ Remark: Sets an error code on the connection object from an
operation object. operation object.
*****************************************************************************/ *****************************************************************************/
void void
NdbConnection::setOperationErrorCodeAbort(int error) NdbConnection::setOperationErrorCodeAbort(int error, int abortOption)
{ {
DBUG_ENTER("NdbConnection::setOperationErrorCodeAbort"); DBUG_ENTER("NdbConnection::setOperationErrorCodeAbort");
if (abortOption == -1)
abortOption = m_abortOption;
if (theTransactionIsStarted == false) { if (theTransactionIsStarted == false) {
theCommitStatus = Aborted; theCommitStatus = Aborted;
} else if ((m_abortOption == AbortOnError) && } else if ((abortOption == AbortOnError) &&
(theCommitStatus != Committed) && (theCommitStatus != Committed) &&
(theCommitStatus != Aborted)) { (theCommitStatus != Aborted)) {
theCommitStatus = NeedAbort; theCommitStatus = NeedAbort;
...@@ -335,8 +337,11 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -335,8 +337,11 @@ NdbConnection::execute(ExecType aTypeOfExec,
tOp = tOp->next(); tOp = tOp->next();
} }
} }
if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
ret = -1; ret = -1;
assert(theFirstOpInList == NULL && theLastOpInList == NULL);
{ {
NdbOperation* tOp = theCompletedFirstOp; NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) { while (tOp != NULL) {
...@@ -360,6 +365,7 @@ NdbConnection::execute(ExecType aTypeOfExec, ...@@ -360,6 +365,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
theLastOpInList->next(tRestOp); theLastOpInList->next(tRestOp);
theLastOpInList = tLastOp; theLastOpInList = tLastOp;
} }
assert(theFirstOpInList == NULL || tExecType == NoCommit);
} while (theFirstOpInList != NULL || tExecType != aTypeOfExec); } while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -1806,11 +1812,12 @@ Parameters: aErrorCode: The error code. ...@@ -1806,11 +1812,12 @@ Parameters: aErrorCode: The error code.
Remark: An operation was completed with failure. Remark: An operation was completed with failure.
*******************************************************************************/ *******************************************************************************/
int int
NdbConnection::OpCompleteFailure(Uint8 abortOption) NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
{ {
Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent; Uint32 tNoSent = theNoOfOpSent;
theCompletionStatus = NdbConnection::CompletedFailure; if (setFailure)
theCompletionStatus = NdbConnection::CompletedFailure;
tNoComp++; tNoComp++;
theNoOfOpCompleted = tNoComp; theNoOfOpCompleted = tNoComp;
if (tNoComp == tNoSent) { if (tNoComp == tNoSent) {
......
...@@ -47,13 +47,15 @@ ...@@ -47,13 +47,15 @@
* Column * Column
*/ */
NdbColumnImpl::NdbColumnImpl() NdbColumnImpl::NdbColumnImpl()
: NdbDictionary::Column(* this), m_facade(this) : NdbDictionary::Column(* this), m_facade(this),
m_attrId(-1)
{ {
init(); init();
} }
NdbColumnImpl::NdbColumnImpl(NdbDictionary::Column & f) NdbColumnImpl::NdbColumnImpl(NdbDictionary::Column & f)
: NdbDictionary::Column(* this), m_facade(&f) : NdbDictionary::Column(* this), m_facade(&f),
m_attrId(-1)
{ {
init(); init();
} }
...@@ -93,8 +95,7 @@ NdbColumnImpl::init(Type t) ...@@ -93,8 +95,7 @@ NdbColumnImpl::init(Type t)
{ {
// do not use default_charset_info as it may not be initialized yet // do not use default_charset_info as it may not be initialized yet
// use binary collation until NDB tests can handle charsets // use binary collation until NDB tests can handle charsets
CHARSET_INFO* default_cs = &my_charset_latin1_bin; CHARSET_INFO* default_cs = &my_charset_bin;
m_attrId = -1;
m_type = t; m_type = t;
switch (m_type) { switch (m_type) {
case Tinyint: case Tinyint:
......
...@@ -71,6 +71,7 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, ...@@ -71,6 +71,7 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex,
return -1; return -1;
} }
m_theIndex = anIndex; m_theIndex = anIndex;
m_thePrimaryTable = aTable;
m_accessTable = anIndex->m_table; m_accessTable = anIndex->m_table;
m_theIndexLen = 0; m_theIndexLen = 0;
m_theNoOfIndexDefined = 0; m_theNoOfIndexDefined = 0;
...@@ -102,6 +103,12 @@ int NdbIndexOperation::readTuple(NdbOperation::LockMode lm) ...@@ -102,6 +103,12 @@ int NdbIndexOperation::readTuple(NdbOperation::LockMode lm)
}; };
} }
int NdbIndexOperation::insertTuple()
{
setErrorCode(4200);
return -1;
}
int NdbIndexOperation::readTuple() int NdbIndexOperation::readTuple()
{ {
// First check that index is unique // First check that index is unique
...@@ -341,12 +348,11 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -341,12 +348,11 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theDistrGroupIndicator = 1; theDistrGroupIndicator = 1;
}//if }//if
/************************************************************************** /**************************************************************************
* If the operation is an insert request and the attribute is stored then * If the operation is a write request and the attribute is stored then
* we also set the value in the stored part through putting the * we also set the value in the stored part through putting the
* information in the INDXATTRINFO signals. * information in the INDXATTRINFO signals.
*************************************************************************/ *************************************************************************/
if ((tOpType == InsertRequest) || if ((tOpType == WriteRequest)) {
(tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){ if (!tAttrInfo->m_indexOnly){
// invalid data can crash kernel // invalid data can crash kernel
if (cs != NULL && if (cs != NULL &&
...@@ -357,7 +363,13 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -357,7 +363,13 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
goto equal_error4; goto equal_error4;
Uint32 ahValue; Uint32 ahValue;
Uint32 sz = totalSizeInWords; Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz); /*
* XXX should be linked in metadata but cannot now because
* things can be defined in arbitrary order
*/
const NdbColumnImpl* primaryCol = m_thePrimaryTable->getColumn(tAttrInfo->m_name.c_str());
assert(primaryCol != NULL);
AttributeHeader::init(&ahValue, primaryCol->m_attrId, sz);
insertATTRINFO( ahValue ); insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords); insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) { if (bitsInLastWord != 0) {
...@@ -369,7 +381,6 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, ...@@ -369,7 +381,6 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
}//if }//if
}//if }//if
}//if }//if
/************************************************************************** /**************************************************************************
* Store the Key information in the TCINDXREQ and INDXKEYINFO signals. * Store the Key information in the TCINDXREQ and INDXKEYINFO signals.
*************************************************************************/ *************************************************************************/
...@@ -734,13 +745,10 @@ NdbIndexOperation::receiveTCINDXREF( NdbApiSignal* aSignal) ...@@ -734,13 +745,10 @@ NdbIndexOperation::receiveTCINDXREF( NdbApiSignal* aSignal)
}//if }//if
theStatus = Finished; theStatus = Finished;
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
Uint32 errorCode = tcIndxRef->errorCode; Uint32 errorCode = tcIndxRef->errorCode;
theError.code = errorCode; theError.code = errorCode;
theNdbCon->setOperationErrorCodeAbort(errorCode); theNdbCon->setOperationErrorCodeAbort(errorCode);
return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption); return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption);
}//NdbIndexOperation::receiveTCINDXREF() }//NdbIndexOperation::receiveTCINDXREF()
...@@ -78,7 +78,8 @@ NdbOperation::NdbOperation(Ndb* aNdb) : ...@@ -78,7 +78,8 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
m_tcReqGSN(GSN_TCKEYREQ), m_tcReqGSN(GSN_TCKEYREQ),
m_keyInfoGSN(GSN_KEYINFO), m_keyInfoGSN(GSN_KEYINFO),
m_attrInfoGSN(GSN_ATTRINFO), m_attrInfoGSN(GSN_ATTRINFO),
theBlobList(NULL) theBlobList(NULL),
m_abortOption(-1)
{ {
theReceiver.init(NdbReceiver::NDB_OPERATION, this); theReceiver.init(NdbReceiver::NDB_OPERATION, this);
theError.code = 0; theError.code = 0;
...@@ -167,6 +168,7 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ ...@@ -167,6 +168,7 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){
theTotalNrOfKeyWordInSignal = 8; theTotalNrOfKeyWordInSignal = 8;
theMagicNumber = 0xABCDEF01; theMagicNumber = 0xABCDEF01;
theBlobList = NULL; theBlobList = NULL;
m_abortOption = -1;
tSignal = theNdb->getSignal(); tSignal = theNdb->getSignal();
if (tSignal == NULL) if (tSignal == NULL)
......
...@@ -191,7 +191,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) ...@@ -191,7 +191,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
Uint8 tDirtyIndicator = theDirtyIndicator; Uint8 tDirtyIndicator = theDirtyIndicator;
OperationType tOperationType = theOperationType; OperationType tOperationType = theOperationType;
Uint32 tTupKeyLen = theTupKeyLen; Uint32 tTupKeyLen = theTupKeyLen;
Uint8 abortOption = theNdbCon->m_abortOption; Uint8 abortOption =
m_abortOption != -1 ? m_abortOption : theNdbCon->m_abortOption;
tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator); tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator);
tcKeyReq->setOperationType(tReqInfo, tOperationType); tcKeyReq->setOperationType(tReqInfo, tOperationType);
...@@ -541,17 +542,20 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) ...@@ -541,17 +542,20 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
return -1; return -1;
}//if }//if
AbortOption ao = (AbortOption)theNdbCon->m_abortOption; AbortOption ao = (AbortOption)
(m_abortOption != -1 ? m_abortOption : theNdbCon->m_abortOption);
theReceiver.m_received_result_length = ~0; theReceiver.m_received_result_length = ~0;
theStatus = Finished; theStatus = Finished;
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; // blobs want this
if (m_abortOption != IgnoreError)
theNdbCon->theReturnStatus = NdbConnection::ReturnFailure;
theError.code = aSignal->readData(4); theError.code = aSignal->readData(4);
theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4)); theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), ao);
if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read
return theNdbCon->OpCompleteFailure(ao); return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError);
/** /**
* If TCKEYCONF has arrived * If TCKEYCONF has arrived
......
...@@ -48,7 +48,7 @@ struct Opt { ...@@ -48,7 +48,7 @@ struct Opt {
unsigned m_rows; unsigned m_rows;
unsigned m_seed; unsigned m_seed;
const char* m_skip; const char* m_skip;
const char* m_style; const char* m_test;
// metadata // metadata
const char* m_tname; const char* m_tname;
const char* m_x1name; // hash index const char* m_x1name; // hash index
...@@ -71,8 +71,8 @@ struct Opt { ...@@ -71,8 +71,8 @@ struct Opt {
m_parts(10), m_parts(10),
m_rows(100), m_rows(100),
m_seed(0), m_seed(0),
m_skip(""), m_skip(0),
m_style("012"), m_test(0),
// metadata // metadata
m_tname("TBLOB1"), m_tname("TBLOB1"),
m_x1name("TBLOB1X1"), m_x1name("TBLOB1X1"),
...@@ -101,45 +101,44 @@ printusage() ...@@ -101,45 +101,44 @@ printusage()
<< " -dbg print debug" << endl << " -dbg print debug" << endl
<< " -dbgall print also NDB API debug (if compiled in)" << endl << " -dbgall print also NDB API debug (if compiled in)" << endl
<< " -full read/write only full blob values" << endl << " -full read/write only full blob values" << endl
<< " -inline read/write only blobs which fit inline" << endl
<< " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl << " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl
<< " -parts N max parts in blob value [" << d.m_parts << "]" << endl << " -parts N max parts in blob value [" << d.m_parts << "]" << endl
<< " -rows N number of rows [" << d.m_rows << "]" << endl << " -rows N number of rows [" << d.m_rows << "]" << endl
<< " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl << " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl
<< " -skip xxx skip these tests (see list) [" << d.m_skip << endl << " -skip xxx skip given tests (see list) [no tests]" << endl
<< " -style xxx access styles to test (see list) [" << d.m_style << "]" << endl << " -test xxx only given tests (see list) [all tests]" << endl
<< "metadata" << endl << "metadata" << endl
<< " -pk2len N length of PK2 [" << d.m_pk2len << "/" << g_max_pk2len <<"]" << endl << " -pk2len N length of PK2 [" << d.m_pk2len << "/" << g_max_pk2len <<"]" << endl
<< " -oneblob only 1 blob attribute [default 2]" << endl << " -oneblob only 1 blob attribute [default 2]" << endl
<< "testcases for -skip" << endl << "testcases for test/skip" << endl
<< " k primary key ops" << endl << " k primary key ops" << endl
<< " i hash index ops" << endl << " i hash index ops" << endl
<< " s table scans" << endl << " s table scans" << endl
<< " r ordered index scans" << endl << " r ordered index scans" << endl
<< " u update blob value" << endl << "additional flags for test/skip" << endl
<< "access styles for -style" << endl << " u update existing blob value" << endl
<< " n normal insert and update" << endl
<< " w insert and update using writeTuple" << endl
<< " 0 getValue / setValue" << endl << " 0 getValue / setValue" << endl
<< " 1 setActiveHook" << endl << " 1 setActiveHook" << endl
<< " 2 readData / writeData" << endl << " 2 readData / writeData" << endl
<< "bug tests (no blob test)" << endl << "bug tests (no blob test)" << endl
<< " -bug 4088 ndb api hang with mixed ops on index table" << endl << " -bug 4088 ndb api hang with mixed ops on index table" << endl
<< " -bug 2222 delete + write gives 626" << endl << " -bug nnnn delete + write gives 626" << endl
<< " -bug 3333 acc crash on delete and long key" << endl << " -bug nnnn acc crash on delete and long key" << endl
; ;
} }
static Opt g_opt; static Opt g_opt;
static bool static bool
skipcase(int x) testcase(char x)
{ {
return strchr(g_opt.m_skip, x) != 0; if (x < 10)
} x += '0';
return
static bool (g_opt.m_test == 0 || strchr(g_opt.m_test, x) != 0) &&
skipstyle(int x) (g_opt.m_skip == 0 || strchr(g_opt.m_skip, x) == 0);
{
return strchr(g_opt.m_style, '0' + x) == 0;
} }
static Ndb* g_ndb = 0; static Ndb* g_ndb = 0;
...@@ -435,7 +434,9 @@ getBlobLength(NdbBlob* h, unsigned& len) ...@@ -435,7 +434,9 @@ getBlobLength(NdbBlob* h, unsigned& len)
CHK(h->getLength(len2) == 0); CHK(h->getLength(len2) == 0);
len = (unsigned)len2; len = (unsigned)len2;
assert(len == len2); assert(len == len2);
DBG("getBlobLength " << h->getColumn()->getName() << " len=" << len); bool isNull;
CHK(h->getNull(isNull) == 0);
DBG("getBlobLength " << h->getColumn()->getName() << " len=" << len << " null=" << isNull);
return 0; return 0;
} }
...@@ -911,6 +912,41 @@ updatePk(int style) ...@@ -911,6 +912,41 @@ updatePk(int style)
return 0; return 0;
} }
static int
writePk(int style)
{
DBG("--- writePk " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("writePk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->writeTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
// non-nullable must be set
CHK(g_bh1->setValue("", 0) == 0);
CHK(setBlobWriteHook(tup) == 0);
} else {
// non-nullable must be set
CHK(g_bh1->setValue("", 0) == 0);
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_con = 0;
tup.m_exists = true;
}
return 0;
}
static int static int
deletePk() deletePk()
{ {
...@@ -995,6 +1031,35 @@ updateIdx(int style) ...@@ -995,6 +1031,35 @@ updateIdx(int style)
return 0; return 0;
} }
static int
writeIdx(int style)
{
DBG("--- writeIdx " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("writeIdx pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->writeTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opx) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobWriteHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opx = 0;
g_con = 0;
tup.m_exists = true;
}
return 0;
}
static int static int
deleteIdx() deleteIdx()
{ {
...@@ -1170,7 +1235,6 @@ deleteScan(bool idx) ...@@ -1170,7 +1235,6 @@ deleteScan(bool idx)
static int static int
testmain() testmain()
{ {
int style;
g_ndb = new Ndb("TEST_DB"); g_ndb = new Ndb("TEST_DB");
CHK(g_ndb->init() == 0); CHK(g_ndb->init() == 0);
CHK(g_ndb->waitUntilReady() == 0); CHK(g_ndb->waitUntilReady() == 0);
...@@ -1194,55 +1258,88 @@ testmain() ...@@ -1194,55 +1258,88 @@ testmain()
if (g_opt.m_seed != 0) if (g_opt.m_seed != 0)
srandom(g_opt.m_seed); srandom(g_opt.m_seed);
for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) { for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) {
int style;
DBG("=== loop " << g_loop << " ==="); DBG("=== loop " << g_loop << " ===");
if (g_opt.m_seed == 0) if (g_opt.m_seed == 0)
srandom(g_loop); srandom(g_loop);
// pk // pk
for (style = 0; style <= 2; style++) { for (style = 0; style <= 2; style++) {
if (skipcase('k') || skipstyle(style)) if (! testcase('k') || ! testcase(style))
continue; continue;
DBG("--- pk ops " << stylename[style] << " ---"); DBG("--- pk ops " << stylename[style] << " ---");
calcTups(false); if (testcase('n')) {
CHK(insertPk(style) == 0); calcTups(false);
CHK(verifyBlob() == 0); CHK(insertPk(style) == 0);
CHK(readPk(style) == 0); CHK(verifyBlob() == 0);
if (! skipcase('u')) { CHK(readPk(style) == 0);
calcTups(style); if (testcase('u')) {
CHK(updatePk(style) == 0); calcTups(style);
CHK(updatePk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readPk(style) == 0);
}
CHK(deletePk() == 0);
CHK(verifyBlob() == 0);
}
if (testcase('w')) {
calcTups(false);
CHK(writePk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readPk(style) == 0);
if (testcase('u')) {
calcTups(style);
CHK(writePk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readPk(style) == 0);
}
CHK(deletePk() == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
} }
CHK(readPk(style) == 0);
CHK(deletePk() == 0);
CHK(verifyBlob() == 0);
} }
// hash index // hash index
for (style = 0; style <= 2; style++) { for (style = 0; style <= 2; style++) {
if (skipcase('i') || skipstyle(style)) if (! testcase('i') || ! testcase(style))
continue; continue;
DBG("--- idx ops " << stylename[style] << " ---"); DBG("--- idx ops " << stylename[style] << " ---");
calcTups(false); if (testcase('n')) {
CHK(insertPk(style) == 0); calcTups(false);
CHK(verifyBlob() == 0); CHK(insertPk(style) == 0);
CHK(readIdx(style) == 0);
calcTups(style);
if (! skipcase('u')) {
CHK(updateIdx(style) == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
CHK(readIdx(style) == 0); CHK(readIdx(style) == 0);
if (testcase('u')) {
calcTups(style);
CHK(updateIdx(style) == 0);
CHK(verifyBlob() == 0);
CHK(readIdx(style) == 0);
}
CHK(deleteIdx() == 0);
CHK(verifyBlob() == 0);
}
if (testcase('w')) {
calcTups(false);
CHK(writePk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readIdx(style) == 0);
if (testcase('u')) {
calcTups(style);
CHK(writeIdx(style) == 0);
CHK(verifyBlob() == 0);
CHK(readIdx(style) == 0);
}
CHK(deleteIdx() == 0);
CHK(verifyBlob() == 0);
} }
CHK(deleteIdx() == 0);
CHK(verifyBlob() == 0);
} }
// scan table // scan table
for (style = 0; style <= 2; style++) { for (style = 0; style <= 2; style++) {
if (skipcase('s') || skipstyle(style)) if (! testcase('s') || ! testcase(style))
continue; continue;
DBG("--- table scan " << stylename[style] << " ---"); DBG("--- table scan " << stylename[style] << " ---");
calcTups(false); calcTups(false);
CHK(insertPk(style) == 0); CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
CHK(readScan(style, false) == 0); CHK(readScan(style, false) == 0);
if (! skipcase('u')) { if (testcase('u')) {
CHK(updateScan(style, false) == 0); CHK(updateScan(style, false) == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
} }
...@@ -1251,14 +1348,14 @@ testmain() ...@@ -1251,14 +1348,14 @@ testmain()
} }
// scan index // scan index
for (style = 0; style <= 2; style++) { for (style = 0; style <= 2; style++) {
if (skipcase('r') || skipstyle(style)) if (! testcase('r') || ! testcase(style))
continue; continue;
DBG("--- index scan " << stylename[style] << " ---"); DBG("--- index scan " << stylename[style] << " ---");
calcTups(false); calcTups(false);
CHK(insertPk(style) == 0); CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
CHK(readScan(style, true) == 0); CHK(readScan(style, true) == 0);
if (! skipcase('u')) { if (testcase('u')) {
CHK(updateScan(style, true) == 0); CHK(updateScan(style, true) == 0);
CHK(verifyBlob() == 0); CHK(verifyBlob() == 0);
} }
...@@ -1331,9 +1428,7 @@ static struct { ...@@ -1331,9 +1428,7 @@ static struct {
int m_bug; int m_bug;
int (*m_test)(); int (*m_test)();
} g_bugtest[] = { } g_bugtest[] = {
{ 4088, bugtest_4088 }, { 4088, bugtest_4088 }
{ 2222, bugtest_2222 },
{ 3333, bugtest_3333 }
}; };
NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
...@@ -1395,9 +1490,9 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1395,9 +1490,9 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
continue; continue;
} }
} }
if (strcmp(arg, "-style") == 0) { if (strcmp(arg, "-test") == 0) {
if (++argv, --argc > 0) { if (++argv, --argc > 0) {
g_opt.m_style = strdup(argv[0]); g_opt.m_test = strdup(argv[0]);
continue; continue;
} }
} }
...@@ -1433,7 +1528,9 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1433,7 +1528,9 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
} }
if (g_opt.m_pk2len == 0) { if (g_opt.m_pk2len == 0) {
char b[100]; char b[100];
strcpy(b, g_opt.m_skip); b[0] = 0;
if (g_opt.m_skip != 0)
strcpy(b, g_opt.m_skip);
strcat(b, "i"); strcat(b, "i");
strcat(b, "r"); strcat(b, "r");
g_opt.m_skip = strdup(b); g_opt.m_skip = strdup(b);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment