Commit 345e1dc4 authored by mskold@mysql.com's avatar mskold@mysql.com

Fixed merge problems, optimized bulk insert

parent 156bfa48
......@@ -41,7 +41,10 @@ static const int parallelism= 240;
// Default value for max number of transactions
// createable against NDB from this handler
static const int max_transactions = 256;
static const int max_transactions= 256;
// Default value for prefetch of autoincrement values
static const ha_rows autoincrement_prefetch= 32;
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
......@@ -286,7 +289,7 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
}
// Blob type
NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr);
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
if (field->is_null())
......@@ -832,7 +835,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
ERR_RETURN(trans->getNdbError());
int res;
if (res= set_primary_key_from_old_data(op, old_data))
if ((res= set_primary_key_from_old_data(op, old_data)))
ERR_RETURN(trans->getNdbError());
// Read all unreferenced non-key field(s)
......@@ -950,7 +953,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
If this an update or delete, call nextResult with false
to process any records already cached in NdbApi
*/
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
......@@ -1328,7 +1331,8 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
if ((rows_inserted == rows_to_insert) ||
bulk_insert_not_flushed= true;
if ((rows_to_insert == 1) ||
((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0)
{
......@@ -1336,6 +1340,7 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
(int)rows_inserted, (int)bulk_insert_rows));
bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
}
......@@ -1398,38 +1403,34 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data)))
{
DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
int read_res, insert_res, delete_res;
DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
// Get all old fields, since we optimize away fields not in query
int read_res= complemented_pk_read(old_data, new_data);
read_res= complemented_pk_read(old_data, new_data);
if (read_res)
{
DBUG_PRINT("info", ("pk read failed"));
DBUG_RETURN(read_res);
}
// Insert new row
int insert_res= write_row(new_data);
if (!insert_res)
{
// Delete old row
DBUG_PRINT("info", ("insert succeded"));
int delete_res= delete_row(old_data);
if (!delete_res)
{
DBUG_PRINT("info", ("insert+delete succeeded"));
DBUG_RETURN(0);
}
else
{
DBUG_PRINT("info", ("delete failed"));
DBUG_RETURN(delete_row(new_data));
}
}
else
insert_res= write_row(new_data);
if (insert_res)
{
DBUG_PRINT("info", ("insert failed"));
DBUG_RETURN(insert_res);
}
// Delete old row
DBUG_PRINT("info", ("insert succeded"));
delete_res= delete_row(old_data);
if (delete_res)
{
DBUG_PRINT("info", ("delete failed"));
// Undo write_row(new_data)
DBUG_RETURN(delete_row(new_data));
}
DBUG_PRINT("info", ("insert+delete succeeded"));
DBUG_RETURN(0);
}
if (cursor)
......@@ -1833,7 +1834,7 @@ int ha_ndbcluster::index_next(byte *buf)
{
DBUG_ENTER("index_next");
int error = 1;
int error= 1;
statistic_increment(ha_read_next_count,&LOCK_status);
DBUG_RETURN(next_result(buf));
}
......@@ -2208,7 +2209,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
degrade if too many bytes are inserted, thus it's limited by this
calculation.
*/
const int bytesperbatch = 8192;
const int bytesperbatch= 8192;
bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
batch= bytesperbatch/bytes;
batch= batch == 0 ? 1 : batch;
......@@ -2223,10 +2224,25 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
*/
int ha_ndbcluster::end_bulk_insert()
{
int error= 0;
DBUG_ENTER("end_bulk_insert");
// Check if last inserts need to be flushed
if (bulk_insert_not_flushed)
{
NdbConnection *trans= m_active_trans;
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows));
bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0)
error= ndb_err(trans);
}
rows_inserted= 0;
rows_to_insert= 1;
DBUG_RETURN(0);
DBUG_RETURN(error);
}
......@@ -2247,7 +2263,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
{ static const char *ext[1] = { NullS }; return ext; }
{ static const char *ext[1]= { NullS }; return ext; }
/*
......@@ -2751,7 +2767,7 @@ int ha_ndbcluster::create(const char *name,
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(),
field->pack_length()));
if (my_errno= create_ndb_column(col, field, info))
if ((my_errno= create_ndb_column(col, field, info)))
DBUG_RETURN(my_errno);
tab.addColumn(col);
}
......@@ -3001,7 +3017,10 @@ longlong ha_ndbcluster::get_auto_increment()
{
DBUG_ENTER("get_auto_increment");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
int cache_size = rows_to_insert ? rows_to_insert : 32;
int cache_size=
(rows_to_insert > autoincrement_prefetch) ?
rows_to_insert
: autoincrement_prefetch;
Uint64 auto_value=
m_ndb->getAutoIncrementValue(m_tabname, cache_size);
DBUG_RETURN((longlong)auto_value);
......@@ -3026,6 +3045,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
rows_to_insert(1),
rows_inserted(0),
bulk_insert_rows(1024),
bulk_insert_not_flushed(false),
ops_pending(0),
blobs_buffer(0),
blobs_buffer_size(0)
......@@ -3378,7 +3398,7 @@ void ha_ndbcluster::set_tabname(const char *path_name)
ptr= m_tabname;
while (*ptr != '\0') {
*ptr = tolower(*ptr);
*ptr= tolower(*ptr);
ptr++;
}
#endif
......@@ -3394,17 +3414,17 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
char *end, *ptr;
/* Scan name from the end */
end = strend(path_name)-1;
ptr = end;
end= strend(path_name)-1;
ptr= end;
while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
ptr--;
}
uint name_len = end - ptr;
uint name_len= end - ptr;
memcpy(tabname, ptr + 1, end - ptr);
tabname[name_len] = '\0';
tabname[name_len]= '\0';
#ifdef __WIN__
/* Put to lower case */
ptr = tabname;
ptr= tabname;
while (*ptr != '\0') {
*ptr= tolower(*ptr);
......@@ -3567,7 +3587,7 @@ static int packfrm(const void *data, uint len,
DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
error= 1;
org_len = len;
org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
......@@ -3587,9 +3607,9 @@ static int packfrm(const void *data, uint len,
// Copy frm data into blob, already in machine independent format
memcpy(blob->data, data, org_len);
*pack_data = blob;
*pack_len = blob_len;
error = 0;
*pack_data= blob;
*pack_len= blob_len;
error= 0;
DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
err:
......@@ -3601,7 +3621,7 @@ err:
static int unpackfrm(const void **unpack_data, uint *unpack_len,
const void *pack_data)
{
const frm_blob_struct *blob = (frm_blob_struct*)pack_data;
const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
......@@ -3617,7 +3637,7 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
if (ver != 1)
DBUG_RETURN(1);
if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME))))
if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
DBUG_RETURN(2);
memcpy(data, blob->data, complen);
......@@ -3627,8 +3647,8 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
DBUG_RETURN(3);
}
*unpack_data = data;
*unpack_len = complen;
*unpack_data= data;
*unpack_len= complen;
DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
......
......@@ -221,6 +221,7 @@ class ha_ndbcluster: public handler
ha_rows rows_to_insert;
ha_rows rows_inserted;
ha_rows bulk_insert_rows;
bool bulk_insert_not_flushed;
ha_rows ops_pending;
bool blobs_pending;
// memory for blobs in one tuple
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment