Commit be608984 authored by mskold@mysql.com's avatar mskold@mysql.com

Merge mysql.com:/home/marty/MySQL/mysql-5.0

into  mysql.com:/home/marty/MySQL/mysql-5.1
parents a94ad6a2 d17c3d3c
DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t1, t2;
CREATE TABLE t2 ( CREATE TABLE t1 (
a bigint unsigned NOT NULL PRIMARY KEY, a bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,
b int unsigned not null, b int unsigned not null,
c int unsigned c int unsigned
) engine=ndbcluster; ) engine=ndbcluster;
select count(*) from t2; select count(*) from t1;
count(*) count(*)
5000 5000
truncate table t2; select * from t1 order by a limit 2;
select count(*) from t2; a b c
1 509 2500
2 510 7
truncate table t1;
select count(*) from t1;
count(*) count(*)
0 0
drop table t2; insert into t1 values(NULL,1,1),(NULL,2,2);
select * from t1 order by a;
a b c
1 1 1
2 2 2
drop table t1;
...@@ -69,6 +69,119 @@ insert into t1 values (1,1,1); ...@@ -69,6 +69,119 @@ insert into t1 values (1,1,1);
drop table t1; drop table t1;
# Lock for update
create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb;
insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3);
# PK access
connection con1;
begin;
select * from t1 where x = 1 for update;
connection con2;
begin;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# table scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
connection con2;
begin;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
# Bug #20390 SELECT FOR UPDATE does not release locks of untouched rows in full table scans
#select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# index scan
connection con1;
begin;
select * from t1 where z > 1 and z < 3 for update;
connection con2;
begin;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 1 for update;
--error 1205
select * from t1 where x = 2 for update;
rollback;
connection con1;
commit;
# share locking
# PK access
connection con1;
begin;
select * from t1 where x = 1 lock in share mode;
connection con2;
begin;
select * from t1 where x = 1 lock in share mode;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# table scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
connection con2;
begin;
select * from t1 where y = 'one' lock in share mode;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
# Bug #20390 SELECT FOR UPDATE does not release locks of untouched rows in full table scans
#select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# index scan
connection con1;
begin;
select * from t1 where z > 1 and z < 3 lock in share mode;
connection con2;
begin;
select * from t1 where z = 1 lock in share mode;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 1 for update;
--error 1205
select * from t1 where x = 2 for update;
rollback;
connection con1;
commit;
drop table t1;
# End of 4.1 tests # End of 4.1 tests
# #
......
...@@ -2,12 +2,11 @@ ...@@ -2,12 +2,11 @@
-- source include/not_embedded.inc -- source include/not_embedded.inc
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t1, t2;
--enable_warnings --enable_warnings
CREATE TABLE t1 (
CREATE TABLE t2 ( a bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,
a bigint unsigned NOT NULL PRIMARY KEY,
b int unsigned not null, b int unsigned not null,
c int unsigned c int unsigned
) engine=ndbcluster; ) engine=ndbcluster;
...@@ -20,17 +19,23 @@ let $1=500; ...@@ -20,17 +19,23 @@ let $1=500;
disable_query_log; disable_query_log;
while ($1) while ($1)
{ {
eval insert into t2 values($1*10, $1+9, 5*$1), ($1*10+1, $1+10, 7),($1*10+2, $1+10, 7*$1), ($1*10+3, $1+10, 10+$1), ($1*10+4, $1+10, 70*$1), ($1*10+5, $1+10, 7), ($1*10+6, $1+10, 9), ($1*10+7, $1+299, 899), ($1*10+8, $1+10, 12), ($1*10+9, $1+10, 14*$1); eval insert into t1 values(NULL, $1+9, 5*$1), (NULL, $1+10, 7),(NULL, $1+10, 7*$1), (NULL, $1+10, 10+$1), (NULL, $1+10, 70*$1), (NULL, $1+10, 7), (NULL, $1+10, 9), (NULL, $1+299, 899), (NULL, $1+10, 12), (NULL, $1+10, 14*$1);
dec $1; dec $1;
} }
enable_query_log; enable_query_log;
select count(*) from t2; select count(*) from t1;
select * from t1 order by a limit 2;
truncate table t1;
select count(*) from t1;
truncate table t2; insert into t1 values(NULL,1,1),(NULL,2,2);
select count(*) from t2; select * from t1 order by a;
drop table t2; drop table t1;
# End of 4.1 tests # End of 4.1 tests
...@@ -628,6 +628,7 @@ class ha_ndbcluster: public handler ...@@ -628,6 +628,7 @@ class ha_ndbcluster: public handler
int extra_opt(enum ha_extra_function operation, ulong cache_size); int extra_opt(enum ha_extra_function operation, ulong cache_size);
int reset(); int reset();
int external_lock(THD *thd, int lock_type); int external_lock(THD *thd, int lock_type);
void unlock_row();
int start_stmt(THD *thd, thr_lock_type lock_type); int start_stmt(THD *thd, thr_lock_type lock_type);
void print_error(int error, myf errflag); void print_error(int error, myf errflag);
const char * table_type() const; const char * table_type() const;
...@@ -855,6 +856,7 @@ private: ...@@ -855,6 +856,7 @@ private:
char m_tabname[FN_HEADLEN]; char m_tabname[FN_HEADLEN];
ulong m_table_flags; ulong m_table_flags;
THR_LOCK_DATA m_lock; THR_LOCK_DATA m_lock;
bool m_lock_tuple;
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY]; NDB_INDEX_DATA m_index[MAX_KEY];
THD_NDB_SHARE *m_thd_ndb_share; THD_NDB_SHARE *m_thd_ndb_share;
......
...@@ -62,11 +62,14 @@ public: ...@@ -62,11 +62,14 @@ public:
Uint32 parallel, Uint32 parallel,
bool order_by, bool order_by,
bool order_desc = false, bool order_desc = false,
bool read_range_no = false) { bool read_range_no = false,
bool keyinfo = false) {
Uint32 scan_flags = Uint32 scan_flags =
(SF_OrderBy & -(Int32)order_by) | (SF_OrderBy & -(Int32)order_by) |
(SF_Descending & -(Int32)order_desc) | (SF_Descending & -(Int32)order_desc) |
(SF_ReadRangeNo & -(Int32)read_range_no); (SF_ReadRangeNo & -(Int32)read_range_no) |
(SF_KeyInfo & -(Int32)keyinfo);
return readTuples(lock_mode, scan_flags, parallel); return readTuples(lock_mode, scan_flags, parallel);
} }
#endif #endif
......
...@@ -45,7 +45,8 @@ public: ...@@ -45,7 +45,8 @@ public:
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead
SF_OrderBy = (1 << 24), // index scan in order SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order SF_Descending = (2 << 24), // index scan in descending order
SF_ReadRangeNo = (4 << 24) // enable @ref get_range_no SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
SF_KeyInfo = 1 // request KeyInfo to be sent back
}; };
/** /**
...@@ -62,15 +63,14 @@ public: ...@@ -62,15 +63,14 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
/** /**
* readTuples * readTuples
*
* @param lock_mode Lock mode * @param lock_mode Lock mode
* @param batch No of rows to fetch from each fragment at a time * @param batch No of rows to fetch from each fragment at a time
* @param parallel No of fragments to scan in parallell * @param parallel No of fragments to scan in parallell
* @note specifying 0 for batch and parallall means max performance * @note specifying 0 for batch and parallell means max performance
*/ */
#ifdef ndb_readtuples_impossible_overload #ifdef ndb_readtuples_impossible_overload
int readTuples(LockMode lock_mode = LM_Read, int readTuples(LockMode lock_mode = LM_Read,
Uint32 batch = 0, Uint32 parallel = 0); Uint32 batch = 0, Uint32 parallel = 0, bool keyinfo = false);
#endif #endif
inline int readTuples(int parallell){ inline int readTuples(int parallell){
...@@ -142,6 +142,20 @@ public: ...@@ -142,6 +142,20 @@ public:
*/ */
void close(bool forceSend = false, bool releaseOp = false); void close(bool forceSend = false, bool releaseOp = false);
/**
* Lock current tuple
*
* @return an NdbOperation or NULL.
*/
NdbOperation* lockCurrentTuple();
/**
* Lock current tuple
*
* @param lockTrans Transaction that should perform the lock
*
* @return an NdbOperation or NULL.
*/
NdbOperation* lockCurrentTuple(NdbTransaction* lockTrans);
/** /**
* Update current tuple * Update current tuple
* *
...@@ -251,6 +265,19 @@ protected: ...@@ -251,6 +265,19 @@ protected:
bool m_executed; // Marker if operation should be released at close bool m_executed; // Marker if operation should be released at close
}; };
inline
NdbOperation*
NdbScanOperation::lockCurrentTuple(){
return lockCurrentTuple(m_transConnection);
}
inline
NdbOperation*
NdbScanOperation::lockCurrentTuple(NdbTransaction* takeOverTrans){
return takeOverScanOp(NdbOperation::ReadRequest,
takeOverTrans);
}
inline inline
NdbOperation* NdbOperation*
NdbScanOperation::updateCurrentTuple(){ NdbScanOperation::updateCurrentTuple(){
......
...@@ -581,7 +581,7 @@ public: ...@@ -581,7 +581,7 @@ public:
int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab); int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
int dropIndex(const char * indexName, int dropIndex(const char * indexName,
const char * tableName); const char * tableName);
int dropIndex(NdbIndexImpl &, const char * tableName); int dropIndex(NdbIndexImpl &);
NdbTableImpl * getIndexTable(NdbIndexImpl * index, NdbTableImpl * getIndexTable(NdbIndexImpl * index,
NdbTableImpl * table); NdbTableImpl * table);
......
...@@ -162,7 +162,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -162,7 +162,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return -1; return -1;
} }
m_keyInfo = lockExcl ? 1 : 0; m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
bool tupScan = (scan_flags & SF_TupScan); bool tupScan = (scan_flags & SF_TupScan);
#if 1 // XXX temp for testing #if 1 // XXX temp for testing
...@@ -942,6 +942,12 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans) ...@@ -942,6 +942,12 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
if (newOp == NULL){ if (newOp == NULL){
return NULL; return NULL;
} }
if (!m_keyInfo)
{
// Cannot take over lock if no keyinfo was requested
setErrorCodeAbort(4604);
return NULL;
}
pTrans->theSimpleState = 0; pTrans->theSimpleState = 0;
assert(tRecAttr->get_size_in_bytes() > 0); assert(tRecAttr->get_size_in_bytes() > 0);
...@@ -950,12 +956,16 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans) ...@@ -950,12 +956,16 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
newOp->theTupKeyLen = len; newOp->theTupKeyLen = len;
newOp->theOperationType = opType; newOp->theOperationType = opType;
if (opType == DeleteRequest) { switch (opType) {
case (ReadRequest):
newOp->theLockMode = theLockMode;
// Fall through
case (DeleteRequest):
newOp->theStatus = GetValue; newOp->theStatus = GetValue;
} else { break;
default:
newOp->theStatus = SetValue; newOp->theStatus = SetValue;
} }
const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 * src = (Uint32*)tRecAttr->aRef();
const Uint32 tScanInfo = src[len] & 0x3FFFF; const Uint32 tScanInfo = src[len] & 0x3FFFF;
const Uint32 tTakeOverFragment = src[len] >> 20; const Uint32 tTakeOverFragment = src[len] >> 20;
......
...@@ -308,7 +308,7 @@ ErrorBundle ErrorCodes[] = { ...@@ -308,7 +308,7 @@ ErrorBundle ErrorCodes[] = {
{ 4601, DMEC, AE, "Transaction is not started"}, { 4601, DMEC, AE, "Transaction is not started"},
{ 4602, DMEC, AE, "You must call getNdbOperation before executeScan" }, { 4602, DMEC, AE, "You must call getNdbOperation before executeScan" },
{ 4603, DMEC, AE, "There can only be ONE operation in a scan transaction" }, { 4603, DMEC, AE, "There can only be ONE operation in a scan transaction" },
{ 4604, DMEC, AE, "takeOverScanOp, opType must be UpdateRequest or DeleteRequest" }, { 4604, DMEC, AE, "takeOverScanOp, to take over a scanned row one must explicitly request keyinfo on readTuples call" },
{ 4605, DMEC, AE, "You may only call openScanRead or openScanExclusive once for each operation"}, { 4605, DMEC, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
{ 4607, DMEC, AE, "There may only be one operation in a scan transaction"}, { 4607, DMEC, AE, "There may only be one operation in a scan transaction"},
{ 4608, DMEC, AE, "You can not takeOverScan unless you have used openScanExclusive"}, { 4608, DMEC, AE, "You can not takeOverScan unless you have used openScanExclusive"},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment