Commit 6cce5204 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE

- add support for Cassandra's UUID datatype. We map it to CHAR(36).
parent 29e9406a
...@@ -226,3 +226,37 @@ inserts insert_batches ...@@ -226,3 +226,37 @@ inserts insert_batches
delete from t2; delete from t2;
drop table t2; drop table t2;
drop table t0; drop table t0;
#
# UUID datatype support
#
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
insert into t2 values(2,'not-an-uuid');
ERROR 22003: Out of range value for column 'uuidcol' at row 1
insert into t2 values(3,'9b5658dc-f32f-11e1=94cd-f46d046e9f09');
ERROR 22003: Out of range value for column 'uuidcol' at row 1
insert into t2 values(4,'9b5658dc-fzzf-11e1-94cd-f46d046e9f09');
ERROR 22003: Out of range value for column 'uuidcol' at row 1
insert into t2 values
(5,'9b5658dc-f11f-11e1-94cd-f46d046e9f09'),
(6,'9b5658dc-f11f011e1-94cd-f46d046e9f09');
ERROR 22003: Out of range value for column 'uuidcol' at row 2
select * from t2;
rowkey uuidcol
1 9b5658dc-f32f-11e1-94cd-f46d046e9f09
5 9b5658dc-f11f-11e1-94cd-f46d046e9f09
delete from t2;
drop table t2;
CREATE TABLE t2 (rowkey char(36) PRIMARY KEY, col1 int) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf6';
delete from t2;
insert into t2 values('9b5658dc-f32f-11e1-94cd-f46d046e9f09', 1234);
insert into t2 values('not-an-uuid', 563);
ERROR 22003: Out of range value for column 'rowkey' at row 1
select * from t2;
rowkey col1
9b5658dc-f32f-11e1-94cd-f46d046e9f09 1234
delete from t2;
drop table t2;
...@@ -49,6 +49,10 @@ create columnfamily cf3 (rowkey bigint primary key, intcol int); ...@@ -49,6 +49,10 @@ create columnfamily cf3 (rowkey bigint primary key, intcol int);
create columnfamily cf4 (rowkey bigint primary key, datecol timestamp); create columnfamily cf4 (rowkey bigint primary key, datecol timestamp);
create columnfamily cf5 (rowkey bigint primary key, uuidcol uuid);
create columnfamily cf6 (rowkey uuid primary key, col1 int);
./cassandra-cli ./cassandra-cli
CREATE COLUMN FAMILY cf10 CREATE COLUMN FAMILY cf10
...@@ -277,12 +281,57 @@ delete from t2; ...@@ -277,12 +281,57 @@ delete from t2;
drop table t2; drop table t2;
drop table t0; drop table t0;
--echo #
--echo # UUID datatype support
--echo #
#create columnfamily cf5 (rowkey bigint primary key, uuidcol uuid);
CREATE TABLE t2 (rowkey bigint PRIMARY KEY, uuidcol char(36)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf5';
delete from t2;
insert into t2 values(1,'9b5658dc-f32f-11e1-94cd-f46d046e9f09');
--error ER_WARN_DATA_OUT_OF_RANGE
insert into t2 values(2,'not-an-uuid');
--error ER_WARN_DATA_OUT_OF_RANGE
insert into t2 values(3,'9b5658dc-f32f-11e1=94cd-f46d046e9f09');
--error ER_WARN_DATA_OUT_OF_RANGE
insert into t2 values(4,'9b5658dc-fzzf-11e1-94cd-f46d046e9f09');
--error ER_WARN_DATA_OUT_OF_RANGE
insert into t2 values
(5,'9b5658dc-f11f-11e1-94cd-f46d046e9f09'),
(6,'9b5658dc-f11f011e1-94cd-f46d046e9f09');
select * from t2;
delete from t2;
drop table t2;
# create columnfamily cf6 (rowkey uuid primary key, col1 int);
CREATE TABLE t2 (rowkey char(36) PRIMARY KEY, col1 int) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf6';
delete from t2;
insert into t2 values('9b5658dc-f32f-11e1-94cd-f46d046e9f09', 1234);
--error ER_WARN_DATA_OUT_OF_RANGE
insert into t2 values('not-an-uuid', 563);
select * from t2;
delete from t2;
drop table t2;
############################################################################ ############################################################################
## Cassandra cleanup ## Cassandra cleanup
############################################################################ ############################################################################
--disable_parsing --disable_parsing
drop columnfamily cf1; drop columnfamily cf1;
drop columnfamily cf2; drop columnfamily cf2;
drop columnfamily cf3;
drop columnfamily cf4;
--enable_parsing --enable_parsing
############################################################################ ############################################################################
## Cassandra cleanup ends ## Cassandra cleanup ends
......
...@@ -416,9 +416,8 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) ...@@ -416,9 +416,8 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
cparent.column_family= column_family; cparent.column_family= column_family;
/* SlicePredicate can be used to limit columns we will retrieve */ /* SlicePredicate can be used to limit columns we will retrieve */
// Try passing nothing...
KeyRange key_range; // Try passing nothing, too. KeyRange key_range;
key_range.__isset.start_key= true; key_range.__isset.start_key= true;
key_range.__isset.end_key= true; key_range.__isset.end_key= true;
......
...@@ -302,6 +302,7 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked) ...@@ -302,6 +302,7 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
} }
info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
insert_lineno= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -407,6 +408,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, ...@@ -407,6 +408,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters"); my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
} }
insert_lineno= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -430,8 +432,13 @@ public: ...@@ -430,8 +432,13 @@ public:
/* /*
This will get data from the Field pointer, store Cassandra's form This will get data from the Field pointer, store Cassandra's form
in internal buffer, and return pointer/size. in internal buffer, and return pointer/size.
@return
false - OK
true - Failed to convert value (completely, there is no value to insert
at all).
*/ */
virtual void mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0; virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
virtual ~ColumnDataConverter() {}; virtual ~ColumnDataConverter() {};
}; };
...@@ -447,11 +454,12 @@ public: ...@@ -447,11 +454,12 @@ public:
field->store(*pdata); field->store(*pdata);
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
buf= field->val_real(); buf= field->val_real();
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len=sizeof(double); *cass_data_len=sizeof(double);
return false;
} }
~DoubleDataConverter(){} ~DoubleDataConverter(){}
}; };
...@@ -468,11 +476,12 @@ public: ...@@ -468,11 +476,12 @@ public:
field->store(*pdata); field->store(*pdata);
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
buf= field->val_real(); buf= field->val_real();
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len=sizeof(float); *cass_data_len=sizeof(float);
return false;
} }
~FloatDataConverter(){} ~FloatDataConverter(){}
}; };
...@@ -501,12 +510,13 @@ public: ...@@ -501,12 +510,13 @@ public:
field->store(tmp); field->store(tmp);
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
longlong tmp= field->val_int(); longlong tmp= field->val_int();
flip64((const char*)&tmp, (char*)&buf); flip64((const char*)&tmp, (char*)&buf);
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len=sizeof(longlong); *cass_data_len=sizeof(longlong);
return false;
} }
~BigintDataConverter(){} ~BigintDataConverter(){}
}; };
...@@ -531,12 +541,13 @@ public: ...@@ -531,12 +541,13 @@ public:
field->store(tmp); field->store(tmp);
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
int32_t tmp= field->val_int(); int32_t tmp= field->val_int();
flip32((const char*)&tmp, (char*)&buf); flip32((const char*)&tmp, (char*)&buf);
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len=sizeof(int32_t); *cass_data_len=sizeof(int32_t);
return false;
} }
~Int32DataConverter(){} ~Int32DataConverter(){}
}; };
...@@ -551,11 +562,12 @@ public: ...@@ -551,11 +562,12 @@ public:
field->store(cass_data, cass_data_len,field->charset()); field->store(cass_data, cass_data_len,field->charset());
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
String *pstr= field->val_str(&buf); String *pstr= field->val_str(&buf);
*cass_data= (char*)pstr->c_ptr(); *cass_data= (char*)pstr->c_ptr();
*cass_data_len= pstr->length(); *cass_data_len= pstr->length();
return false;
} }
~StringCopyConverter(){} ~StringCopyConverter(){}
}; };
...@@ -579,7 +591,7 @@ public: ...@@ -579,7 +591,7 @@ public:
((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000); ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
} }
void mariadb_to_cassandra(char **cass_data, int *cass_data_len) bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{ {
my_time_t ts_time; my_time_t ts_time;
ulong ts_microsec; ulong ts_microsec;
...@@ -592,10 +604,85 @@ public: ...@@ -592,10 +604,85 @@ public:
*cass_data= (char*)&buf; *cass_data= (char*)&buf;
*cass_data_len= 8; *cass_data_len= 8;
return false;
} }
~TimestampDataConverter(){} ~TimestampDataConverter(){}
}; };
static int convert_hex_digit(const char c)
{
int num;
if (c >= '0' && c <= '9')
num= c - '0';
else if (c >= 'A' && c <= 'F')
num= c - 'A' + 10;
else if (c >= 'a' && c <= 'f')
num= c - 'a' + 10;
else
return -1; /* Couldn't convert */
return num;
}
const char map2number[]="0123456789abcdef";
class UuidDataConverter : public ColumnDataConverter
{
char buf[16]; /* Binary UUID representation */
String str_buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
char *ptr= str;
/* UUID arrives as 16-byte number in network byte order */
for (uint i=0; i < 16; i++)
{
*(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
*(ptr++)= map2number[cass_data[i] & 0xF];
if (i == 3 || i == 5 || i == 7 || i == 9)
*(ptr++)= '-';
}
*ptr= 0;
field->store(str, 36,field->charset());
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
String *uuid_str= field->val_str(&str_buf);
char *pstr= (char*)uuid_str->c_ptr();
if (uuid_str->length() != 36)
return true;
int lower, upper;
for (uint i=0; i < 16; i++)
{
if ((upper= convert_hex_digit(pstr[0])) == -1 ||
(lower= convert_hex_digit(pstr[1])) == -1)
{
return true;
}
buf[i]= lower | (upper << 4);
pstr += 2;
if (i == 3 || i == 5 || i == 7 || i == 9)
{
if (pstr[0] != '-')
return true;
pstr++;
}
}
*cass_data= buf;
*cass_data_len= 16;
return false;
}
~UuidDataConverter(){}
};
const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType"; const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type"; const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType"; const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
...@@ -609,6 +696,8 @@ const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type" ...@@ -609,6 +696,8 @@ const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type"
const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType"; const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name) ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{ {
ColumnDataConverter *res= NULL; ColumnDataConverter *res= NULL;
...@@ -617,8 +706,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ ...@@ -617,8 +706,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
case MYSQL_TYPE_TINY: case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT: case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_LONGLONG:
if (!strcmp(validator_name, validator_bigint) || if (!strcmp(validator_name, validator_bigint))
0/*!strcmp(validator_name, validator_timestamp)*/)
res= new BigintDataConverter; res= new BigintDataConverter;
break; break;
...@@ -637,9 +725,18 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ ...@@ -637,9 +725,18 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
res= new TimestampDataConverter; res= new TimestampDataConverter;
break; break;
case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
if (!strcmp(validator_name, validator_uuid) &&
field->real_type() == MYSQL_TYPE_STRING &&
field->field_length == 36)
{
// UUID maps to CHAR(36), its text representation
res= new UuidDataConverter;
break;
}
/* fall through: */
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_STRING: // these are space padded strings. case MYSQL_TYPE_VAR_STRING:
if (!strcmp(validator_name, validator_blob) || if (!strcmp(validator_name, validator_blob) ||
!strcmp(validator_name, validator_ascii) || !strcmp(validator_name, validator_ascii) ||
!strcmp(validator_name, validator_text)) !strcmp(validator_name, validator_text))
...@@ -698,7 +795,11 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields) ...@@ -698,7 +795,11 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
} }
if (n_mapped != n_fields - 1) if (n_mapped != n_fields - 1)
{
se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
return true; return true;
}
/* /*
Setup type conversion for row_key. Setup type conversion for row_key.
...@@ -775,7 +876,11 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, ...@@ -775,7 +876,11 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
old_map= dbug_tmp_use_all_columns(table, table->read_set); old_map= dbug_tmp_use_all_columns(table, table->read_set);
rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
{
/* We get here when making lookups like uuid_column='not-an-uuid' */
DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
}
dbug_tmp_restore_column_map(table->read_set, old_map); dbug_tmp_restore_column_map(table->read_set, old_map);
...@@ -858,10 +963,18 @@ int ha_cassandra::write_row(uchar *buf) ...@@ -858,10 +963,18 @@ int ha_cassandra::write_row(uchar *buf)
old_map= dbug_tmp_use_all_columns(table, table->read_set); old_map= dbug_tmp_use_all_columns(table, table->read_set);
insert_lineno++;
/* Convert the key */ /* Convert the key */
char *cass_key; char *cass_key;
int cass_key_len; int cass_key_len;
rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
{
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
rowkey_converter->field->field_name, insert_lineno);
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
}
se->start_row_insert(cass_key, cass_key_len); se->start_row_insert(cass_key, cass_key_len);
/* Convert other fields */ /* Convert other fields */
...@@ -869,7 +982,13 @@ int ha_cassandra::write_row(uchar *buf) ...@@ -869,7 +982,13 @@ int ha_cassandra::write_row(uchar *buf)
{ {
char *cass_data; char *cass_data;
int cass_data_len; int cass_data_len;
field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len); if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
{
my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
field_converters[i]->field->field_name, insert_lineno);
dbug_tmp_restore_column_map(table->read_set, old_map);
DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
}
se->add_insert_column(field_converters[i]->field->field_name, se->add_insert_column(field_converters[i]->field->field_name,
cass_data, cass_data_len); cass_data, cass_data_len);
} }
...@@ -892,7 +1011,7 @@ int ha_cassandra::write_row(uchar *buf) ...@@ -892,7 +1011,7 @@ int ha_cassandra::write_row(uchar *buf)
if (res) if (res)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
} }
...@@ -1060,6 +1179,7 @@ int ha_cassandra::rnd_pos(uchar *buf, uchar *pos) ...@@ -1060,6 +1179,7 @@ int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
int ha_cassandra::reset() int ha_cassandra::reset()
{ {
doing_insert_batch= false; doing_insert_batch= false;
insert_lineno= 0;
return 0; return 0;
} }
......
...@@ -48,6 +48,9 @@ class ha_cassandra: public handler ...@@ -48,6 +48,9 @@ class ha_cassandra: public handler
bool doing_insert_batch; bool doing_insert_batch;
ha_rows insert_rows_batched; ha_rows insert_rows_batched;
/* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno;
public: public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra() ~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment