Commit 4a884235 authored by Rich Prohaska's avatar Rich Prohaska

#133 dont overlock key ranges for unique secondary keys

parent 3e2f220d
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
begin;
insert into t values (1);
begin;
insert into t values (2);
commit;
commit;
select * from t;
id
1
2
drop table if exists t;
create table t (id int not null, unique key(id));
begin;
insert into t values (1);
begin;
insert into t values (2);
commit;
commit;
select * from t;
id
1
2
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (5);
begin;
insert into t values (6);
commit;
commit;
select * from t;
id
5
6
10
100
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (5);
begin;
insert into t values (6);
commit;
commit;
select * from t;
id
5
6
10
100
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (50);
begin;
insert into t values (60);
commit;
commit;
select * from t;
id
10
50
60
100
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (50);
begin;
insert into t values (60);
commit;
commit;
select * from t;
id
10
50
60
100
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (500);
begin;
insert into t values (600);
commit;
commit;
select * from t;
id
10
100
500
600
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (500);
begin;
insert into t values (600);
commit;
commit;
select * from t;
id
10
100
500
600
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
connect(c1,localhost,root,,);
begin;
insert into t values (1);
connect(c2,localhost,root,,);
begin;
insert into t values (2);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
connect(c1,localhost,root,,);
begin;
insert into t values (1);
connect(c2,localhost,root,,);
begin;
insert into t values (2);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (5);
connect(c2,localhost,root,,);
begin;
insert into t values (6);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (5);
connect(c2,localhost,root,,);
begin;
insert into t values (6);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (50);
connect(c2,localhost,root,,);
begin;
insert into t values (60);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (50);
connect(c2,localhost,root,,);
begin;
insert into t values (60);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (500);
connect(c2,localhost,root,,);
begin;
insert into t values (600);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (500);
connect(c2,localhost,root,,);
begin;
insert into t values (600);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
...@@ -2724,7 +2724,8 @@ DBT* ha_tokudb::create_dbt_key_from_key( ...@@ -2724,7 +2724,8 @@ DBT* ha_tokudb::create_dbt_key_from_key(
const uchar * record, const uchar * record,
bool* has_null, bool* has_null,
bool dont_pack_pk, bool dont_pack_pk,
int key_length int key_length,
uint8_t inf_byte
) )
{ {
uint32_t size = 0; uint32_t size = 0;
...@@ -2739,7 +2740,7 @@ DBT* ha_tokudb::create_dbt_key_from_key( ...@@ -2739,7 +2740,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(
// from a row, there is no way that columns can be missing, so in practice, // from a row, there is no way that columns can be missing, so in practice,
// this will be meaningless. Might as well put in a value // this will be meaningless. Might as well put in a value
// //
*tmp_buff++ = COL_ZERO; *tmp_buff++ = inf_byte;
size++; size++;
size += place_key_into_dbt_buff( size += place_key_into_dbt_buff(
key_info, key_info,
...@@ -2805,7 +2806,7 @@ DBT *ha_tokudb::create_dbt_key_from_table( ...@@ -2805,7 +2806,7 @@ DBT *ha_tokudb::create_dbt_key_from_table(
*has_null = false; *has_null = false;
DBUG_RETURN(key); DBUG_RETURN(key);
} }
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, (keynr == primary_key), key_length)); DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, (keynr == primary_key), key_length, COL_ZERO));
} }
DBT* ha_tokudb::create_dbt_key_for_lookup( DBT* ha_tokudb::create_dbt_key_for_lookup(
...@@ -2818,13 +2819,12 @@ DBT* ha_tokudb::create_dbt_key_for_lookup( ...@@ -2818,13 +2819,12 @@ DBT* ha_tokudb::create_dbt_key_for_lookup(
) )
{ {
TOKUDB_HANDLER_DBUG_ENTER(""); TOKUDB_HANDLER_DBUG_ENTER("");
DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length);
// override the infinity byte, needed in case the pk is a string // override the infinity byte, needed in case the pk is a string
// to make sure that the cursor that uses this key properly positions // to make sure that the cursor that uses this key properly positions
// it at the right location. If the table stores "D", but we look up for "d", // it at the right location. If the table stores "D", but we look up for "d",
// and the infinity byte is 0, then we will skip the "D", because // and the infinity byte is 0, then we will skip the "D", because
// in bytes, "d" > "D". // in bytes, "d" > "D".
buff[0] = COL_NEG_INF; DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length, COL_NEG_INF);
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -3563,62 +3563,56 @@ cleanup: ...@@ -3563,62 +3563,56 @@ cleanup:
} }
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) { int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key;
int error = 0; int error = 0;
bool has_null; bool has_null;
DBC* tmp_cursor = NULL; DBC* tmp_cursor = NULL;
struct index_read_info ir_info;
struct smart_dbt_info info;
memset((void *)&key, 0, sizeof(key));
info.ha = this;
info.buf = NULL;
info.keynr = dict_index;
ir_info.smart_dbt_info = info;
create_dbt_key_for_lookup(
&key,
key_info,
key_buff3,
record,
&has_null
);
ir_info.orig_key = &key;
DBT key; memset((void *)&key, 0, sizeof(key));
create_dbt_key_from_key(&key, key_info, key_buff2, record, &has_null, true, MAX_KEY_LENGTH, COL_NEG_INF);
if (has_null) { if (has_null) {
error = 0; error = 0;
*is_unique = true; *is_unique = true;
goto cleanup; goto cleanup;
} }
error = share->key_file[dict_index]->cursor( error = share->key_file[dict_index]->cursor(share->key_file[dict_index], txn, &tmp_cursor, DB_SERIALIZABLE | DB_RMW);
share->key_file[dict_index], if (error) {
txn, goto cleanup;
&tmp_cursor, } else {
DB_SERIALIZABLE // prelock (key,-inf),(key,+inf) so that the subsequent key lookup does not overlock
); uint flags = 0;
if (error) { goto cleanup; } DBT key_right; memset(&key_right, 0, sizeof key_right);
create_dbt_key_from_key(&key_right, key_info, key_buff3, record, &has_null, true, MAX_KEY_LENGTH, COL_POS_INF);
error = tmp_cursor->c_set_bounds(tmp_cursor, &key, &key_right, true, DB_NOTFOUND);
if (error == 0) {
flags = DB_PRELOCKED | DB_PRELOCKED_WRITE;
}
error = tmp_cursor->c_getf_set_range( // lookup key and check unique prefix
tmp_cursor, struct smart_dbt_info info;
0, info.ha = this;
&key, info.buf = NULL;
smart_dbt_callback_lookup, info.keynr = dict_index;
&ir_info
); struct index_read_info ir_info;
if (error == DB_NOTFOUND) { ir_info.orig_key = &key;
*is_unique = true; ir_info.smart_dbt_info = info;
error = 0;
goto cleanup; error = tmp_cursor->c_getf_set_range(tmp_cursor, flags, &key, smart_dbt_callback_lookup, &ir_info);
} if (error == DB_NOTFOUND) {
else if (error) { *is_unique = true;
goto cleanup; error = 0;
} goto cleanup;
if (ir_info.cmp) { }
*is_unique = true; else if (error) {
} goto cleanup;
else { }
*is_unique = false; if (ir_info.cmp) {
*is_unique = true;
}
else {
*is_unique = false;
}
} }
error = 0; error = 0;
......
...@@ -419,7 +419,7 @@ private: ...@@ -419,7 +419,7 @@ private:
uint32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data); uint32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data);
void unpack_key(uchar * record, DBT const *key, uint index); void unpack_key(uchar * record, DBT const *key, uint index);
uint32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length); uint32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, bool dont_pack_pk, int key_length = MAX_KEY_LENGTH); DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, bool dont_pack_pk, int key_length, uint8_t inf_byte);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH); DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_for_lookup(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH); DBT* create_dbt_key_for_lookup(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, int8_t inf_byte); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, int8_t inf_byte);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment