Commit 89b14034 authored by mskold@mysql.com's avatar mskold@mysql.com

Fix for Bug #9675 Auto-increment not working with INSERT..SELECT and NDB storage

parent 934f4581
...@@ -607,3 +607,33 @@ primary key (a)) ...@@ -607,3 +607,33 @@ primary key (a))
engine=ndb engine=ndb
max_rows=1; max_rows=1;
drop table t1; drop table t1;
create table t1
(counter int(64) NOT NULL auto_increment,
datavalue char(40) default 'XXXX',
primary key (counter)
) ENGINE=ndbcluster;
insert into t1 (datavalue) values ('newval');
insert into t1 (datavalue) values ('newval');
select * from t1 order by counter;
counter datavalue
1 newval
2 newval
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;
counter datavalue
1 newval
2 newval
3 newval
4 newval
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;
counter datavalue
1 newval
2 newval
3 newval
4 newval
35 newval
36 newval
37 newval
38 newval
drop table t1;
...@@ -577,3 +577,28 @@ create table t1 ...@@ -577,3 +577,28 @@ create table t1
engine=ndb engine=ndb
max_rows=1; max_rows=1;
drop table t1; drop table t1;
#
# Test auto_increment
#
connect (con1,localhost,,,test);
connect (con2,localhost,,,test);
create table t1
(counter int(64) NOT NULL auto_increment,
datavalue char(40) default 'XXXX',
primary key (counter)
) ENGINE=ndbcluster;
connection con1;
insert into t1 (datavalue) values ('newval');
insert into t1 (datavalue) values ('newval');
select * from t1 order by counter;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;
connection con2;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;
drop table t1;
...@@ -722,26 +722,28 @@ Remark: Returns a new TupleId to the application. ...@@ -722,26 +722,28 @@ Remark: Returns a new TupleId to the application.
Uint64 Uint64
Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize) Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
{ {
DEBUG_TRACE("getAutoIncrementValue"); DBUG_ENTER("getAutoIncrementValue");
const char * internalTableName = internalizeTableName(aTableName); const char * internalTableName = internalizeTableName(aTableName);
Ndb_local_table_info *info= Ndb_local_table_info *info=
theDictionary->get_local_table_info(internalTableName, false); theDictionary->get_local_table_info(internalTableName, false);
if (info == 0) if (info == 0)
return ~0; DBUG_RETURN(~0);
const NdbTableImpl *table= info->m_table_impl; const NdbTableImpl *table= info->m_table_impl;
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize); Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
return tupleId; DBUG_PRINT("info", ("value %u", tupleId));
DBUG_RETURN(tupleId);
} }
Uint64 Uint64
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize) Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable, Uint32 cacheSize)
{ {
DEBUG_TRACE("getAutoIncrementValue"); DBUG_ENTER("getAutoIncrementValue");
if (aTable == 0) if (aTable == 0)
return ~0; DBUG_RETURN(~0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize); Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
return tupleId; DBUG_PRINT("info", ("value %u", tupleId));
DBUG_RETURN(tupleId);
} }
Uint64 Uint64
...@@ -756,39 +758,45 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize) ...@@ -756,39 +758,45 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
Uint64 Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize) Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
{ {
DBUG_ENTER("getTupleIdFromNdb");
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] ) if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
{ {
theFirstTupleId[aTableId]++; theFirstTupleId[aTableId]++;
return theFirstTupleId[aTableId]; DBUG_PRINT("info", ("next cached value %u", theFirstTupleId[aTableId]));
DBUG_RETURN(theFirstTupleId[aTableId]);
} }
else // theFirstTupleId == theLastTupleId else // theFirstTupleId == theLastTupleId
{ {
return opTupleIdOnNdb(aTableId, cacheSize, 0); DBUG_PRINT("info",("reading %u values from database",
(cacheSize == 0) ? 1 : cacheSize));
DBUG_RETURN(opTupleIdOnNdb(aTableId, (cacheSize == 0) ? 1 : cacheSize, 0));
} }
} }
Uint64 Uint64
Ndb::readAutoIncrementValue(const char* aTableName) Ndb::readAutoIncrementValue(const char* aTableName)
{ {
DEBUG_TRACE("readtAutoIncrementValue"); DBUG_ENTER("readtAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName); const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) { if (table == 0) {
theError= theDictionary->getNdbError(); theError= theDictionary->getNdbError();
return ~0; DBUG_RETURN(~0);
} }
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId); Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId; DBUG_PRINT("info", ("value %u", tupleId));
DBUG_RETURN(tupleId);
} }
Uint64 Uint64
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable) Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable)
{ {
DEBUG_TRACE("readtAutoIncrementValue"); DBUG_ENTER("readtAutoIncrementValue");
if (aTable == 0) if (aTable == 0)
return ~0; DBUG_RETURN(~0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId); Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId; DBUG_PRINT("info", ("value %u", tupleId));
DBUG_RETURN(tupleId);
} }
Uint64 Uint64
......
...@@ -2920,6 +2920,10 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) ...@@ -2920,6 +2920,10 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
DBUG_PRINT("enter", ("rows: %d", (int)rows)); DBUG_PRINT("enter", ("rows: %d", (int)rows));
m_rows_inserted= 0; m_rows_inserted= 0;
if (rows == 0)
/* We don't know how many will be inserted, guess */
m_rows_to_insert= m_autoincrement_prefetch;
else
m_rows_to_insert= rows; m_rows_to_insert= rows;
/* /*
...@@ -3929,6 +3933,7 @@ longlong ha_ndbcluster::get_auto_increment() ...@@ -3929,6 +3933,7 @@ longlong ha_ndbcluster::get_auto_increment()
DBUG_ENTER("get_auto_increment"); DBUG_ENTER("get_auto_increment");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
int cache_size= int cache_size=
(m_rows_to_insert - m_rows_inserted < m_autoincrement_prefetch) ? (m_rows_to_insert - m_rows_inserted < m_autoincrement_prefetch) ?
m_rows_to_insert - m_rows_inserted m_rows_to_insert - m_rows_inserted
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment