Commit 474f23a4 authored by joreland@mysql.com's avatar joreland@mysql.com

ndb - fixed long overdue problems with unique indexes and null values

parent 74bfe72d
......@@ -522,3 +522,39 @@ uid gid rid cid
1 1 2 3
1 1 2 4
drop table t1,t2,t3,t4,t5,t6,t7;
CREATE TABLE t1 (
a int unsigned NOT NULL PRIMARY KEY,
b int unsigned,
c int unsigned,
UNIQUE bc(b,c) ) engine = ndb;
insert into t1 values(1,1,1),(2,NULL,2),(3,NULL,NULL),(4,4,NULL);
select * from t1 where b=1 and c=1;
a b c
1 1 1
select * from t1 where b is null and c is null;
a b c
3 NULL NULL
select * from t1 where b is null and c = 2;
a b c
2 NULL 2
select * from t1 where b = 4 and c is null;
a b c
4 4 NULL
create table t8 as
select * from t1 where (b = 1 and c = 1)
or (b is null and c is null)
or (b is null and c = 2)
or (b = 4 and c is null);
select * from t8 order by a;
a b c
1 1 1
2 NULL 2
3 NULL NULL
4 4 NULL
select * from t1 order by a;
a b c
1 1 1
2 NULL 2
3 NULL NULL
4 4 NULL
drop table t1, t8;
......@@ -231,4 +231,24 @@ select * from t4 where rid = 2 order by cid;
drop table t1,t2,t3,t4,t5,t6,t7;
# test null in indexes
CREATE TABLE t1 (
a int unsigned NOT NULL PRIMARY KEY,
b int unsigned,
c int unsigned,
UNIQUE bc(b,c) ) engine = ndb;
insert into t1 values(1,1,1),(2,NULL,2),(3,NULL,NULL),(4,4,NULL);
select * from t1 where b=1 and c=1;
select * from t1 where b is null and c is null;
select * from t1 where b is null and c = 2;
select * from t1 where b = 4 and c is null;
create table t8 as
select * from t1 where (b = 1 and c = 1)
or (b is null and c is null)
or (b is null and c = 2)
or (b = 4 and c is null);
select * from t8 order by a;
select * from t1 order by a;
drop table t1, t8;
......@@ -1156,9 +1156,10 @@ int ha_ndbcluster::unique_index_read(const byte *key,
for (i= 0; key_part != end; key_part++, i++)
{
if (set_ndb_key(op, key_part->field, i, key_ptr))
if (set_ndb_key(op, key_part->field, i,
key_part->null_bit ? key_ptr + 1 : key_ptr))
ERR_RETURN(trans->getNdbError());
key_ptr+= key_part->length;
key_ptr+= key_part->store_length;
}
// Get non-index attribute(s)
......@@ -2243,6 +2244,28 @@ int ha_ndbcluster::index_end()
DBUG_RETURN(close_scan());
}
/**
* Check if key contains null
*/
static
int
check_null_in_key(const KEY* key_info, const byte *key, uint key_len)
{
KEY_PART_INFO *curr_part, *end_part;
const byte* end_ptr = key + key_len;
curr_part= key_info->key_part;
end_part= curr_part + key_info->key_parts;
for (; curr_part != end_part && key < end_ptr; curr_part++)
{
if(curr_part->null_bit && *key)
return 1;
key += curr_part->store_length;
}
return 0;
}
int ha_ndbcluster::index_read(byte *buf,
const byte *key, uint key_len,
......@@ -2260,6 +2283,8 @@ int ha_ndbcluster::index_read(byte *buf,
case PRIMARY_KEY_INDEX:
if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
{
if(m_active_cursor && (error= close_scan()))
DBUG_RETURN(error);
DBUG_RETURN(pk_read(key, key_len, buf));
}
else if (type == PRIMARY_KEY_INDEX)
......@@ -2269,8 +2294,11 @@ int ha_ndbcluster::index_read(byte *buf,
break;
case UNIQUE_ORDERED_INDEX:
case UNIQUE_INDEX:
if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len &&
!check_null_in_key(key_info, key, key_len))
{
if(m_active_cursor && (error= close_scan()))
DBUG_RETURN(error);
DBUG_RETURN(unique_index_read(key, key_len, buf));
}
else if (type == UNIQUE_INDEX)
......@@ -2374,6 +2402,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
start_key->length == key_info->key_length &&
start_key->flag == HA_READ_KEY_EXACT)
{
if(m_active_cursor && (error= close_scan()))
DBUG_RETURN(error);
error= pk_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
......@@ -2381,10 +2411,12 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
case UNIQUE_ORDERED_INDEX:
case UNIQUE_INDEX:
key_info= table->key_info + active_index;
if (start_key &&
start_key->length == key_info->key_length &&
start_key->flag == HA_READ_KEY_EXACT)
if (start_key && start_key->length == key_info->key_length &&
start_key->flag == HA_READ_KEY_EXACT &&
!check_null_in_key(key_info, start_key->key, start_key->length))
{
if(m_active_cursor && (error= close_scan()))
DBUG_RETURN(error);
error= unique_index_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment