Commit 5efff1fe authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - bug#14509, part 1: move autoincr pre-fetch from Ndb to local dict cache

parent 91031fc2
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
CREATE TABLE t1 (
a INT NOT NULL,
......@@ -315,3 +315,24 @@ unique key tx1 (c002, c003, c004, c005)) engine=ndb;
create index tx2
on t1 (c010, c011, c012, c013);
drop table t1;
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
a b
3 103
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
a b
6 203
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
a b c
9 303 NULL
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
a b c
12 403 NULL
drop table t1;
......@@ -3,7 +3,7 @@
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1, t2;
drop database if exists mysqltest;
--enable_warnings
......@@ -326,5 +326,20 @@ on t1 (c010, c011, c012, c013);
drop table t1;
# simple test that auto incr is not lost at rename or alter
create table t1 (a int primary key auto_increment, b int) engine=ndb;
insert into t1 (b) values (101),(102),(103);
select * from t1 where a = 3;
alter table t1 rename t2;
insert into t2 (b) values (201),(202),(203);
select * from t2 where a = 6;
alter table t2 add c int;
insert into t2 (b) values (301),(302),(303);
select * from t2 where a = 9;
alter table t2 rename t1;
insert into t1 (b) values (401),(402),(403);
select * from t1 where a = 12;
drop table t1;
# End of 4.1 tests
......@@ -984,6 +984,7 @@ class BaseString;
class NdbEventOperation;
class NdbBlob;
class NdbReceiver;
class Ndb_local_table_info;
template <class T> struct Ndb_free_list_t;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
......@@ -1443,15 +1444,12 @@ public:
bool increase = false);
bool setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val,
bool increase = false);
Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId,
Uint32 cacheSize = 1000);
Uint64 readTupleIdFromNdb(Uint32 aTableId);
bool setTupleIdInNdb(const char* aTableName, Uint64 val,
bool increase);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op);
private:
Uint64 getTupleIdFromNdb(Ndb_local_table_info* info, Uint32 cacheSize);
Uint64 readTupleIdFromNdb(Ndb_local_table_info* info);
bool setTupleIdInNdb(Ndb_local_table_info* info, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 opValue, Uint32 op);
public:
/**
*/
......@@ -1650,11 +1648,6 @@ private:
Uint64 the_last_check_time;
Uint64 theFirstTransId;
// The tupleId is retreived from DB the
// tupleId is unique for each tableid.
Uint64 theFirstTupleId[2048];
Uint64 theLastTupleId[2048];
Uint32 theRestartGCI; // the Restart GCI used by DIHNDBTAMPER
......
......@@ -45,6 +45,8 @@ void Ndb_local_table_info::destroy(Ndb_local_table_info *info)
Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
m_table_impl= table_impl;
m_first_tuple_id = ~(Uint64)0;
m_last_tuple_id = ~(Uint64)0;
}
Ndb_local_table_info::~Ndb_local_table_info()
......
......@@ -33,6 +33,11 @@ public:
static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0);
static void destroy(Ndb_local_table_info *);
NdbTableImpl *m_table_impl;
// range of cached tuple ids per thread
Uint64 m_first_tuple_id;
Uint64 m_last_tuple_id;
Uint64 m_local_data[1]; // Must be last member. Used to access extra space.
private:
Ndb_local_table_info(NdbTableImpl *table_impl);
......
......@@ -768,11 +768,12 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0)
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl *table= info->m_table_impl;
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
}
Uint64 tupleId = getTupleIdFromNdb(info, cacheSize);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
......@@ -783,50 +784,54 @@ Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
DBUG_RETURN(tupleId);
}
const BaseString& internal_tabname = table->m_internalName;
Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return ~(Uint64)0;
return getTupleIdFromNdb(table->m_tableId, cacheSize);
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = getTupleIdFromNdb(info, cacheSize);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
Ndb::getTupleIdFromNdb(Ndb_local_table_info* info, Uint32 cacheSize)
{
DBUG_ENTER("getTupleIdFromNdb");
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
Uint64 tupleId;
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
theFirstTupleId[aTableId]++;
DBUG_PRINT("info", ("next cached value %ul",
(ulong) theFirstTupleId[aTableId]));
DBUG_RETURN(theFirstTupleId[aTableId]);
assert(info->m_first_tuple_id < info->m_last_tuple_id);
tupleId = ++info->m_first_tuple_id;
DBUG_PRINT("info", ("next cached value %llu", (ulonglong)tupleId));
}
else // theFirstTupleId == theLastTupleId
else
{
DBUG_PRINT("info",("reading %u values from database",
(cacheSize == 0) ? 1 : cacheSize));
DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
if (cacheSize == 0)
cacheSize = 1;
DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
tupleId = opTupleIdOnNdb(info, cacheSize, 0);
}
DBUG_RETURN(tupleId);
}
Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
{
DBUG_ENTER("readAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
Uint64 tupleId = readTupleIdFromNdb(info);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
......@@ -837,19 +842,34 @@ Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
const BaseString& internal_tabname = table->m_internalName;
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(~(Uint64)0);
}
Uint64 tupleId = readTupleIdFromNdb(info);
DBUG_PRINT("info", ("value %llu", (ulonglong)tupleId));
DBUG_RETURN(tupleId);
}
Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
Ndb::readTupleIdFromNdb(Ndb_local_table_info* info)
{
if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
// Cache is empty, check next in database
return opTupleIdOnNdb(aTableId, 0, 3);
return theFirstTupleId[aTableId] + 1;
DBUG_ENTER("Ndb::readTupleIdFromNdb");
Uint64 tupleId;
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
assert(info->m_first_tuple_id < info->m_last_tuple_id);
tupleId = info->m_first_tuple_id + 1;
}
else // Cache is empty, check next in database
{
tupleId = opTupleIdOnNdb(info, 0, 3);
}
DBUG_RETURN(tupleId);
}
bool
......@@ -861,11 +881,10 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError= theDictionary->getNdbError();
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(false);
}
const NdbTableImpl* table= info->m_table_impl;
DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
DBUG_RETURN(setTupleIdInNdb(info, val, increase));
}
bool
......@@ -873,51 +892,49 @@ Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable, Uint64 val, bool
{
DBUG_ENTER("setAutoIncrementValue");
if (aTable == 0)
DBUG_RETURN(~(Uint64)0);
DBUG_RETURN(false);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
}
const BaseString& internal_tabname = table->m_internalName;
bool
Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
DBUG_ENTER("setTupleIdInNdb(const char*, ...)");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) {
theError= theDictionary->getNdbError();
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
if (info == 0) {
theError.code = theDictionary->getNdbError().code;
DBUG_RETURN(false);
}
DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
DBUG_RETURN(setTupleIdInNdb(info, val, increase));
}
bool
Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
Ndb::setTupleIdInNdb(Ndb_local_table_info* info, Uint64 val, bool increase)
{
DBUG_ENTER("setTupleIdInNdb(Uint32, ...)");
DBUG_ENTER("setTupleIdInNdb");
if (increase)
{
if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
if (info->m_first_tuple_id != info->m_last_tuple_id)
{
// We have a cache sequence
if (val <= theFirstTupleId[aTableId]+1)
assert(info->m_first_tuple_id < info->m_last_tuple_id);
if (val <= info->m_first_tuple_id + 1)
DBUG_RETURN(false);
if (val <= theLastTupleId[aTableId])
if (val <= info->m_last_tuple_id)
{
theFirstTupleId[aTableId] = val - 1;
info->m_first_tuple_id = val - 1;
DBUG_RETURN(true);
}
// else continue;
}
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 2) == val));
DBUG_RETURN((opTupleIdOnNdb(info, val, 2) == val));
}
else
DBUG_RETURN((opTupleIdOnNdb(aTableId, val, 1) == val));
DBUG_RETURN((opTupleIdOnNdb(info, val, 1) == val));
}
Uint64
Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 opValue, Uint32 op)
{
DBUG_ENTER("Ndb::opTupleIdOnNdb");
Uint32 aTableId = info->m_table_impl->m_tableId;
DBUG_PRINT("enter", ("table=%u value=%llu op=%u", aTableId, opValue, op));
NdbTransaction* tConnection;
......@@ -958,9 +975,9 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
tValue = tRecAttrResult->u_64_value();
theFirstTupleId[aTableId] = tValue - opValue;
theLastTupleId[aTableId] = tValue - 1;
ret = theFirstTupleId[aTableId];
info->m_first_tuple_id = tValue - opValue;
info->m_last_tuple_id = tValue - 1;
ret = info->m_first_tuple_id;
break;
case 1:
tOperation->updateTuple();
......@@ -970,8 +987,8 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
theFirstTupleId[aTableId] = ~(Uint64)0;
theLastTupleId[aTableId] = ~(Uint64)0;
info->m_first_tuple_id = ~(Uint64)0;
info->m_last_tuple_id = ~(Uint64)0;
ret = opValue;
break;
case 2:
......@@ -992,7 +1009,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
ret = ~(Uint64)0;
else
{
theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
ret = opValue;
}
break;
......
......@@ -759,10 +759,6 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(internalTableName.c_str(), info);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
return info;
}
......
......@@ -96,10 +96,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
for (i = 0; i < MAX_NDB_NODES ; i++) {
theConnectionArray[i] = NULL;
}//forg
for (i = 0; i < 2048 ; i++) {
theFirstTupleId[i] = 0;
theLastTupleId[i] = 0;
}//for
theImpl->m_dbname.assign(aDataBase);
theImpl->m_schemaname.assign(aSchema);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment