Commit cb0ae166 authored by Rich Prohaska's avatar Rich Prohaska

#241 unique key check should avoid relocking keys if the table is already prelocked by the loader

parent 87d0748d
...@@ -3342,12 +3342,8 @@ int ha_tokudb::end_bulk_insert(bool abort) { ...@@ -3342,12 +3342,8 @@ int ha_tokudb::end_bulk_insert(bool abort) {
if (i == primary_key && !share->pk_has_string) { if (i == primary_key && !share->pk_has_string) {
continue; continue;
} }
error = is_index_unique( error = is_index_unique(&is_unique, transaction, share->key_file[i], &table->key_info[i],
&is_unique, DB_PRELOCKED_WRITE);
transaction,
share->key_file[i],
&table->key_info[i]
);
if (error) goto cleanup; if (error) goto cleanup;
if (!is_unique) { if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
...@@ -3394,7 +3390,7 @@ int ha_tokudb::end_bulk_insert() { ...@@ -3394,7 +3390,7 @@ int ha_tokudb::end_bulk_insert() {
return end_bulk_insert( false ); return end_bulk_insert( false );
} }
int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) { int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info, int lock_flags) {
int error; int error;
DBC* tmp_cursor1 = NULL; DBC* tmp_cursor1 = NULL;
DBC* tmp_cursor2 = NULL; DBC* tmp_cursor2 = NULL;
...@@ -3410,49 +3406,23 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in ...@@ -3410,49 +3406,23 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
memset(&packed_key2, 0, sizeof(packed_key2)); memset(&packed_key2, 0, sizeof(packed_key2));
*is_unique = true; *is_unique = true;
error = db->cursor( error = db->cursor(db, txn, &tmp_cursor1, DB_SERIALIZABLE);
db,
txn,
&tmp_cursor1,
DB_SERIALIZABLE
);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = db->cursor( error = db->cursor(db, txn, &tmp_cursor2, DB_SERIALIZABLE);
db,
txn,
&tmp_cursor2,
DB_SERIALIZABLE
);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
error = tmp_cursor1->c_get(
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
*is_unique = true; *is_unique = true;
error = 0; error = 0;
goto cleanup; goto cleanup;
} }
else if (error) { goto cleanup; } else if (error) { goto cleanup; }
error = tmp_cursor2->c_get( error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = tmp_cursor2->c_get( error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
*is_unique = true; *is_unique = true;
error = 0; error = 0;
...@@ -3464,59 +3434,25 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in ...@@ -3464,59 +3434,25 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
bool has_null1; bool has_null1;
bool has_null2; bool has_null2;
int cmp; int cmp;
place_key_into_mysql_buff( place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key1.data + 1);
key_info, place_key_into_mysql_buff(key_info, table->record[1], (uchar *) key2.data + 1);
table->record[0],
(uchar *) key1.data + 1
);
place_key_into_mysql_buff(
key_info,
table->record[1],
(uchar *) key2.data + 1
);
create_dbt_key_for_lookup( create_dbt_key_for_lookup(&packed_key1, key_info, key_buff, table->record[0], &has_null1);
&packed_key1, create_dbt_key_for_lookup(&packed_key2, key_info, key_buff2, table->record[1], &has_null2);
key_info,
key_buff,
table->record[0],
&has_null1
);
create_dbt_key_for_lookup(
&packed_key2,
key_info,
key_buff2,
table->record[1],
&has_null2
);
if (!has_null1 && !has_null2) { if (!has_null1 && !has_null2) {
cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2); cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2);
if (cmp == 0) { if (cmp == 0) {
memcpy(key_buff, key1.data, key1.size); memcpy(key_buff, key1.data, key1.size);
place_key_into_mysql_buff( place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key_buff + 1);
key_info,
table->record[0],
(uchar *) key_buff + 1
);
*is_unique = false; *is_unique = false;
break; break;
} }
} }
error = tmp_cursor1->c_get( error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = tmp_cursor2->c_get( error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error && (error != DB_NOTFOUND)) { goto cleanup; } if (error && (error != DB_NOTFOUND)) { goto cleanup; }
cnt++; cnt++;
...@@ -7766,7 +7702,8 @@ int ha_tokudb::tokudb_add_index( ...@@ -7766,7 +7702,8 @@ int ha_tokudb::tokudb_add_index(
num_processed++; num_processed++;
if ((num_processed % 1000) == 0) { if ((num_processed % 1000) == 0) {
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows); sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.",
num_processed, (long long unsigned) share->rows);
thd_proc_info(thd, status_msg); thd_proc_info(thd, status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS #ifdef HA_TOKUDB_HAS_THD_PROGRESS
...@@ -7798,12 +7735,8 @@ int ha_tokudb::tokudb_add_index( ...@@ -7798,12 +7735,8 @@ int ha_tokudb::tokudb_add_index(
for (uint i = 0; i < num_of_keys; i++, curr_index++) { for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_NOSAME) { if (key_info[i].flags & HA_NOSAME) {
bool is_unique; bool is_unique;
error = is_index_unique( error = is_index_unique(&is_unique, txn, share->key_file[curr_index], &key_info[i],
&is_unique, creating_hot_index ? 0 : DB_PRELOCKED_WRITE);
txn,
share->key_file[curr_index],
&key_info[i]
);
if (error) goto cleanup; if (error) goto cleanup;
if (!is_unique) { if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
......
...@@ -475,7 +475,7 @@ private: ...@@ -475,7 +475,7 @@ private:
); );
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, toku_compression_method compression_method); int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, toku_compression_method compression_method);
void trace_create_table_info(const char *name, TABLE * form); void trace_create_table_info(const char *name, TABLE * form);
int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info); int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info, int lock_flags);
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn); int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd); int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
void set_main_dict_put_flags(THD* thd, bool opt_eligible, uint32_t* put_flags); void set_main_dict_put_flags(THD* thd, bool opt_eligible, uint32_t* put_flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment