Commit 597e1444 authored by mskold@mysql.com's avatar mskold@mysql.com

Fix for Bug #18184 SELECT ... FOR UPDATE does not work..: implemented...

Fix for Bug #18184  SELECT ... FOR UPDATE does not work..: implemented ha_ndblcuster::unlock_row() and explicitly lock all rows that are not being unlocked
parent 39b50ce0
......@@ -63,3 +63,62 @@ pk u o
5 5 5
insert into t1 values (1,1,1);
drop table t1;
create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
insert into t1 values (1,'one'), (2,'two'),(3,"three");
begin;
select * from t1 where x = 1 for update;
x y
1 one
begin;
select * from t1 where x = 2 for update;
x y
2 two
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
x y
3 three
1 one
begin;
select * from t1 where x = 2 for update;
x y
2 two
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where x = 1 lock in share mode;
x y
1 one
begin;
select * from t1 where x = 1 lock in share mode;
x y
1 one
select * from t1 where x = 2 for update;
x y
2 two
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
x y
3 three
1 one
begin;
select * from t1 where y = 'one' lock in share mode;
x y
1 one
select * from t1 where x = 2 for update;
x y
2 two
select * from t1 where x = 1 for update;
ERROR HY000: Lock wait timeout exceeded; try restarting transaction
rollback;
commit;
drop table t1;
......@@ -69,4 +69,80 @@ insert into t1 values (1,1,1);
drop table t1;
# Lock for update
create table t1 (x integer not null primary key, y varchar(32)) engine = ndb;
insert into t1 values (1,'one'), (2,'two'),(3,"three");
# PK access
connection con1;
begin;
select * from t1 where x = 1 for update;
connection con2;
begin;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' for update;
connection con2;
begin;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# share locking
# PK access
connection con1;
begin;
select * from t1 where x = 1 lock in share mode;
connection con2;
begin;
select * from t1 where x = 1 lock in share mode;
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
# scan
connection con1;
begin;
select * from t1 where y = 'one' or y = 'three' lock in share mode;
connection con2;
begin;
select * from t1 where y = 'one' lock in share mode;
# Have to check with pk access here since scans take locks on
# all rows and then release them in chunks
select * from t1 where x = 2 for update;
--error 1205
select * from t1 where x = 1 for update;
rollback;
connection con1;
commit;
drop table t1;
# End of 4.1 tests
......@@ -45,14 +45,15 @@ public:
NdbResultSet* readTuples(LockMode = LM_Read,
Uint32 batch = 0,
Uint32 parallel = 0,
bool order_by = false);
bool order_by = false,
bool keyinfo = false);
inline NdbResultSet* readTuples(int parallell){
return readTuples(LM_Read, 0, parallell, false);
return readTuples(LM_Read, 0, parallell);
}
inline NdbResultSet* readTuplesExclusive(int parallell = 0){
return readTuples(LM_Exclusive, 0, parallell, false);
return readTuples(LM_Exclusive, 0, parallell);
}
/**
......
......@@ -101,6 +101,27 @@ public:
*/
int restart(bool forceSend = false);
/**
* Lock current row by transfering scan operation to a locking transaction.
* Use this function
* when a scan has found a record that you want to lock.
* 1. Start a new transaction.
* 2. Call the function takeOverForUpdate using your new transaction
* as parameter, all the properties of the found record will be copied
* to the new transaction.
* 3. When you execute the new transaction, the lock held by the scan will
* be transferred to the new transaction(it's taken over).
*
* @note You must have started the scan with openScanExclusive
* or explictly have requested keyinfo to be able to lock
* the found tuple.
*
* @param lockingTrans the locking transaction connection.
* @return an NdbOperation or NULL.
*/
NdbOperation* lockTuple();
NdbOperation* lockTuple(NdbConnection* lockingTrans);
/**
* Transfer scan operation to an updating transaction. Use this function
* when a scan has found a record that you want to update.
......
......@@ -64,14 +64,16 @@ public:
* Tuples are not stored in NdbResultSet until execute(NoCommit)
* has been executed and nextResult has been called.
*
* @param keyinfo Return primary key, needed to be able to call lockTuple
* @param parallel Scan parallelism
* @param batch No of rows to fetch from each fragment at a time
* @param LockMode Scan lock handling
* @returns NdbResultSet.
* @note specifying 0 for batch and parallall means max performance
* @note specifying 0 for batch and parallell means max performance
*/
NdbResultSet* readTuples(LockMode = LM_Read,
Uint32 batch = 0, Uint32 parallel = 0);
Uint32 batch = 0, Uint32 parallel = 0,
bool keyinfo = false);
inline NdbResultSet* readTuples(int parallell){
return readTuples(LM_Read, 0, parallell);
......
......@@ -72,6 +72,17 @@ void NdbResultSet::close(bool forceSend)
m_operation->closeScan(forceSend, true);
}
NdbOperation*
NdbResultSet::lockTuple(){
return lockTuple(m_operation->m_transConnection);
}
NdbOperation*
NdbResultSet::lockTuple(NdbConnection* takeOverTrans){
return m_operation->takeOverScanOp(NdbOperation::ReadRequest,
takeOverTrans);
}
NdbOperation*
NdbResultSet::updateTuple(){
return updateTuple(m_operation->m_transConnection);
......
......@@ -126,7 +126,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 batch,
Uint32 parallel)
Uint32 parallel,
bool keyinfo)
{
m_ordered = 0;
......@@ -170,7 +171,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return 0;
}
m_keyInfo = lockExcl ? 1 : 0;
m_keyInfo = (keyinfo || lockExcl) ? 1 : 0;
bool range = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
......@@ -956,18 +957,28 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
if (newOp == NULL){
return NULL;
}
if (!m_keyInfo)
{
// Cannot take over lock if no keyinfo was requested
setErrorCodeAbort(4604);
return NULL;
}
pTrans->theSimpleState = 0;
const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
newOp->theTupKeyLen = len;
newOp->theOperationType = opType;
if (opType == DeleteRequest) {
switch (opType) {
case (ReadRequest):
newOp->theLockMode = theLockMode;
// Fall through
case (DeleteRequest):
newOp->theStatus = GetValue;
} else {
break;
default:
newOp->theStatus = SetValue;
}
const Uint32 * src = (Uint32*)tRecAttr->aRef();
const Uint32 tScanInfo = src[len] & 0x3FFFF;
const Uint32 tTakeOverNode = src[len] >> 20;
......@@ -1241,8 +1252,9 @@ NdbResultSet*
NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch,
Uint32 parallel,
bool order_by){
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
bool order_by,
bool keyinfo){
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0, keyinfo);
if(rs && order_by){
m_ordered = 1;
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
......
......@@ -281,7 +281,7 @@ ErrorBundle ErrorCodes[] = {
{ 4601, AE, "Transaction is not started"},
{ 4602, AE, "You must call getNdbOperation before executeScan" },
{ 4603, AE, "There can only be ONE operation in a scan transaction" },
{ 4604, AE, "takeOverScanOp, opType must be UpdateRequest or DeleteRequest" },
{ 4604, AE, "takeOverScanOp, to take over a scanned row one must explicitly request keyinfo in readTuples call" },
{ 4605, AE, "You may only call openScanRead or openScanExclusive once for each operation"},
{ 4607, AE, "There may only be one operation in a scan transaction"},
{ 4608, AE, "You can not takeOverScan unless you have used openScanExclusive"},
......
......@@ -1043,12 +1043,23 @@ void ha_ndbcluster::release_metadata()
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
DBUG_ENTER("ha_ndbcluster::get_ndb_lock_type");
if (type >= TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive;
else if (uses_blob_value(m_retrieve_all_fields))
return NdbOperation::LM_Read;
{
DBUG_PRINT("info", ("Using exclusive lock"));
DBUG_RETURN(NdbOperation::LM_Exclusive);
}
else if (type == TL_READ_WITH_SHARED_LOCKS ||
uses_blob_value(m_retrieve_all_fields))
{
DBUG_PRINT("info", ("Using read lock"));
DBUG_RETURN(NdbOperation::LM_Read);
}
else
return NdbOperation::LM_CommittedRead;
{
DBUG_PRINT("info", ("Using committed read"));
DBUG_RETURN(NdbOperation::LM_CommittedRead);
}
}
static const ulong index_type_flags[]=
......@@ -1385,11 +1396,34 @@ inline int ha_ndbcluster::next_result(byte *buf)
if (!cursor)
DBUG_RETURN(HA_ERR_END_OF_FILE);
if (m_lock_tuple)
{
/*
Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
(SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
LOCK WITH SHARE MODE) and row was not explictly unlocked
with unlock_row() call
*/
NdbConnection *trans= m_active_trans;
NdbOperation *op;
// Lock row
DBUG_PRINT("info", ("Keeping lock on scanned row"));
if (!(op= m_active_cursor->lockTuple()))
{
m_lock_tuple= false;
ERR_RETURN(trans->getNdbError());
}
m_ops_pending++;
}
m_lock_tuple= false;
/*
If this an update or delete, call nextResult with false
to process any records already cached in NdbApi
*/
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
m_lock.type != TL_READ_WITH_SHARED_LOCKS;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
......@@ -1410,6 +1444,13 @@ inline int ha_ndbcluster::next_result(byte *buf)
unpack_record(buf);
table->status= 0;
/*
Explicitly lock tuple if "select for update" or
"select lock in share mode"
*/
m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
||
m_lock.type == TL_READ_WITH_SHARED_LOCKS);
DBUG_RETURN(0);
}
else if (check == 1 || check == 2)
......@@ -1444,7 +1485,6 @@ inline int ha_ndbcluster::next_result(byte *buf)
contact_ndb= (check == 2);
}
} while (check == 2);
table->status= STATUS_NOT_FOUND;
if (check == -1)
DBUG_RETURN(ndb_err(trans));
......@@ -1679,10 +1719,11 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
restart= false;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
!(cursor= op->readTuples(lm, 0, parallelism, sorted, need_pk)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
} else {
......@@ -1817,8 +1858,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
!(cursor= op->readTuples(lm, 0, parallelism)))
!(cursor= op->readTuples(lm, 0, parallelism, need_pk)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
......@@ -2082,6 +2124,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++;
if (uses_blob_value(FALSE))
m_blobs_pending= TRUE;
......@@ -2157,6 +2200,7 @@ int ha_ndbcluster::delete_row(const byte *record)
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
if (cursor->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
m_lock_tuple= false;
m_ops_pending++;
no_uncommitted_rows_update(-1);
......@@ -2439,6 +2483,12 @@ int ha_ndbcluster::index_init(uint index)
{
DBUG_ENTER("index_init");
DBUG_PRINT("enter", ("index: %u", index));
/*
Locks are are explicitly released in scan
unless m_lock.type == TL_READ_HIGH_PRIORITY
and no sub-sequent call to unlock_row()
*/
m_lock_tuple= false;
DBUG_RETURN(handler::index_init(index));
}
......@@ -2683,7 +2733,7 @@ int ha_ndbcluster::close_scan()
if (!cursor)
DBUG_RETURN(1);
m_lock_tuple= false;
if (m_ops_pending)
{
/*
......@@ -3383,6 +3433,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
since ndb does not currently does not support table locking
*/
void ha_ndbcluster::unlock_row()
{
DBUG_ENTER("unlock_row");
DBUG_PRINT("info", ("Unlocking row"));
m_lock_tuple= false;
DBUG_VOID_RETURN;
}
int ha_ndbcluster::start_stmt(THD *thd)
{
int error=0;
......
......@@ -120,6 +120,7 @@ class ha_ndbcluster: public handler
int extra_opt(enum ha_extra_function operation, ulong cache_size);
int reset();
int external_lock(THD *thd, int lock_type);
void unlock_row();
int start_stmt(THD *thd);
const char * table_type() const;
const char ** bas_ext() const;
......@@ -223,6 +224,7 @@ class ha_ndbcluster: public handler
char m_tabname[FN_HEADLEN];
ulong m_table_flags;
THR_LOCK_DATA m_lock;
bool m_lock_tuple;
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment