Commit c1c3e31f authored by mskold@mysql.com's avatar mskold@mysql.com

Fix for bug#4730

parent a9ab9d05
...@@ -1414,12 +1414,19 @@ public: ...@@ -1414,12 +1414,19 @@ public:
* *
* @return tuple id or 0 on error * @return tuple id or 0 on error
*/ */
Uint64 getAutoIncrementValue(const char* aTableName, Uint32 cacheSize = 1); Uint64 getAutoIncrementValue(const char* aTableName,
bool setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase = false); Uint32 cacheSize = 1);
Uint64 getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize = 1000 ); Uint64 readAutoIncrementValue(const char* aTableName);
Uint64 getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize = 1000 ); bool setAutoIncrementValue(const char* aTableName, Uint64 val,
bool setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase = false); bool increase = false);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase = false); Uint64 getTupleIdFromNdb(const char* aTableName,
Uint32 cacheSize = 1000);
Uint64 getTupleIdFromNdb(Uint32 aTableId,
Uint32 cacheSize = 1000);
Uint64 readTupleIdFromNdb(Uint32 aTableId);
bool setTupleIdInNdb(const char* aTableName, Uint64 val,
bool increase);
bool setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase);
Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op); Uint64 opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op);
#endif #endif
......
...@@ -714,9 +714,10 @@ Ndb::getNodeId() ...@@ -714,9 +714,10 @@ Ndb::getNodeId()
} }
/**************************************************************************** /****************************************************************************
Uint64 getTupleIdFromNdb( Uint32 aTableId ); Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );
Parameters: aTableId : The TableId. Parameters: aTableId : The TableId.
cacheSize: Prefetch this many values
Remark: Returns a new TupleId to the application. Remark: Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId. The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp. It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
...@@ -736,7 +737,7 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize) ...@@ -736,7 +737,7 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
} }
Uint64 Uint64
Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize ) Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{ {
const NdbTableImpl* table = theDictionary->getTable(aTableName); const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0) if (table == 0)
...@@ -745,7 +746,7 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize ) ...@@ -745,7 +746,7 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize )
} }
Uint64 Uint64
Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize ) Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
{ {
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] ) if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
{ {
...@@ -758,6 +759,27 @@ Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize ) ...@@ -758,6 +759,27 @@ Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize )
} }
} }
Uint64
Ndb::readAutoIncrementValue(const char* aTableName)
{
DEBUG_TRACE("readtAutoIncrementValue");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return ~0;
Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
return tupleId;
}
Uint64
Ndb::readTupleIdFromNdb(Uint32 aTableId)
{
if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
// Cache is empty, check next in database
return opTupleIdOnNdb(aTableId, 0, 3);
return theFirstTupleId[aTableId] + 1;
}
bool bool
Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase) Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{ {
...@@ -891,6 +913,14 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) ...@@ -891,6 +913,14 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
ret = opValue; ret = opValue;
} }
break; break;
case 3:
tOperation->readTuple();
tOperation->equal("SYSKEY_0", aTableId );
tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
ret = tRecAttrResult->u_64_value();
break;
default: default:
goto error_handler; goto error_handler;
} }
......
...@@ -1281,7 +1281,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1281,7 +1281,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
int ha_ndbcluster::write_row(byte *record) int ha_ndbcluster::write_row(byte *record)
{ {
bool has_auto_increment, auto_increment_field_not_null; bool has_auto_increment;
uint i; uint i;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
...@@ -1292,8 +1292,8 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1292,8 +1292,8 @@ int ha_ndbcluster::write_row(byte *record)
if (table->timestamp_default_now) if (table->timestamp_default_now)
update_timestamp(record+table->timestamp_default_now-1); update_timestamp(record+table->timestamp_default_now-1);
has_auto_increment= (table->next_number_field && record == table->record[0]); has_auto_increment= (table->next_number_field && record == table->record[0]);
auto_increment_field_not_null= table->auto_increment_field_not_null; skip_auto_increment= table->auto_increment_field_not_null;
if ((has_auto_increment) && (!auto_increment_field_not_null)) if ((has_auto_increment) && (!skip_auto_increment))
update_auto_increment(); update_auto_increment();
if (!(op= trans->getNdbOperation(m_tabname))) if (!(op= trans->getNdbOperation(m_tabname)))
...@@ -1347,7 +1347,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1347,7 +1347,7 @@ int ha_ndbcluster::write_row(byte *record)
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
if ((has_auto_increment) && (auto_increment_field_not_null)) if ((has_auto_increment) && (skip_auto_increment))
{ {
Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
DBUG_PRINT("info", DBUG_PRINT("info",
...@@ -1356,6 +1356,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1356,6 +1356,7 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_PRINT("info", DBUG_PRINT("info",
("Setting next auto increment value to %u", next_val)); ("Setting next auto increment value to %u", next_val));
} }
skip_auto_increment= true;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3049,7 +3050,9 @@ longlong ha_ndbcluster::get_auto_increment() ...@@ -3049,7 +3050,9 @@ longlong ha_ndbcluster::get_auto_increment()
rows_to_insert rows_to_insert
: autoincrement_prefetch; : autoincrement_prefetch;
Uint64 auto_value= Uint64 auto_value=
m_ndb->getAutoIncrementValue(m_tabname, cache_size); (skip_auto_increment) ?
m_ndb->readAutoIncrementValue(m_tabname)
: m_ndb->getAutoIncrementValue(m_tabname, cache_size);
DBUG_RETURN((longlong)auto_value); DBUG_RETURN((longlong)auto_value);
} }
...@@ -3074,6 +3077,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3074,6 +3077,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
bulk_insert_rows(1024), bulk_insert_rows(1024),
bulk_insert_not_flushed(false), bulk_insert_not_flushed(false),
ops_pending(0), ops_pending(0),
skip_auto_increment(true),
blobs_buffer(0), blobs_buffer(0),
blobs_buffer_size(0) blobs_buffer_size(0)
{ {
......
...@@ -223,6 +223,7 @@ class ha_ndbcluster: public handler ...@@ -223,6 +223,7 @@ class ha_ndbcluster: public handler
ha_rows bulk_insert_rows; ha_rows bulk_insert_rows;
bool bulk_insert_not_flushed; bool bulk_insert_not_flushed;
ha_rows ops_pending; ha_rows ops_pending;
bool skip_auto_increment;
bool blobs_pending; bool blobs_pending;
// memory for blobs in one tuple // memory for blobs in one tuple
char *blobs_buffer; char *blobs_buffer;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment