Commit bce2e668 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE

- Add support for Cassandra's 'varint' datatype, mappable to VARBINARY.
parent c59faf95
......@@ -326,3 +326,20 @@ set cassandra_write_consistency='ANY';
set cassandra_write_consistency='TWO';
set cassandra_write_consistency='THREE';
set cassandra_write_consistency=@tmp;
#
# varint datatype support
#
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
select rowkey, hex(varint_col) from t2;
rowkey hex(varint_col)
val-01 01
val-0x123456 123456
val-0x12345678 12345678
drop table t2;
# now, let's check what happens when MariaDB's column is not wide enough:
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
select rowkey, hex(varint_col) from t2;
ERROR HY000: Internal error: 'Unable to convert value of field `varint_col` from cassandra's data format. Source has 4 bytes, data: 12345678'
drop table t2;
......@@ -68,6 +68,11 @@ create columnfamily cf8 (rowkey varchar primary key, countercol counter);
update cf8 set countercol=countercol+1 where rowkey='cnt1';
update cf8 set countercol=countercol+100 where rowkey='cnt2';
create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
insert into cf9 (rowkey, varint_col) values ('val-01', 1);
insert into cf9 (rowkey, varint_col) values ('val-0x123456', 1193046);
insert into cf9 (rowkey, varint_col) values ('val-0x12345678', 305419896);
EOF
--error 0,1,2
--system cqlsh -3 -f $MYSQLTEST_VARDIR/cassandra_test_init.cql
......@@ -413,6 +418,25 @@ set cassandra_write_consistency='THREE';
set cassandra_write_consistency=@tmp;
--echo #
--echo # varint datatype support
--echo #
# create columnfamily cf9 (rowkey varchar primary key, varint_col varint);
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(32)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
--sorted_result
select rowkey, hex(varint_col) from t2;
drop table t2;
--echo # now, let's check what happens when MariaDB's column is not wide enough:
CREATE TABLE t2 (rowkey varchar(32) PRIMARY KEY, varint_col varbinary(2)) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf9';
--sorted_result
--error ER_INTERNAL_ERROR
select rowkey, hex(varint_col) from t2;
drop table t2;
############################################################################
## Cassandra cleanup
############################################################################
......
......@@ -531,7 +531,7 @@ public:
Field *field;
/* This will save Cassandra's data in the Field */
virtual void cassandra_to_mariadb(const char *cass_data,
virtual int cassandra_to_mariadb(const char *cass_data,
int cass_data_len)=0;
/*
......@@ -552,11 +552,12 @@ class DoubleDataConverter : public ColumnDataConverter
{
double buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(double));
double *pdata= (double*) cass_data;
field->store(*pdata);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -574,11 +575,12 @@ class FloatDataConverter : public ColumnDataConverter
{
float buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(float));
float *pdata= (float*) cass_data;
field->store(*pdata);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -608,7 +610,7 @@ class BigintDataConverter : public ColumnDataConverter
longlong buf;
bool flip; /* is false when reading counter columns */
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
longlong tmp;
DBUG_ASSERT(cass_data_len == sizeof(longlong));
......@@ -617,6 +619,7 @@ public:
else
memcpy(&tmp, cass_data, sizeof(longlong));
field->store(tmp);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -647,10 +650,11 @@ class TinyintDataConverter : public ColumnDataConverter
{
char buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == 1);
field->store(cass_data[0]);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -668,12 +672,13 @@ class Int32DataConverter : public ColumnDataConverter
{
int32_t buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
int32_t tmp;
DBUG_ASSERT(cass_data_len == sizeof(int32_t));
flip32(cass_data, (char*)&tmp);
field->store(tmp);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -691,10 +696,14 @@ public:
class StringCopyConverter : public ColumnDataConverter
{
String buf;
size_t max_length;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
if ((size_t)cass_data_len > max_length)
return 1;
field->store(cass_data, cass_data_len,field->charset());
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -704,6 +713,7 @@ public:
*cass_data_len= pstr->length();
return false;
}
StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter(){}
};
......@@ -712,7 +722,7 @@ class TimestampDataConverter : public ColumnDataConverter
{
int64_t buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
/* Cassandra data is milliseconds-since-epoch in network byte order */
int64_t tmp;
......@@ -724,6 +734,7 @@ public:
- microsecond fraction of a second.
*/
((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -768,7 +779,7 @@ class UuidDataConverter : public ColumnDataConverter
char buf[16]; /* Binary UUID representation */
String str_buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len==16);
char str[37];
......@@ -783,6 +794,7 @@ public:
}
*ptr= 0;
field->store(str, 36,field->charset());
return 0;
}
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
......@@ -836,6 +848,11 @@ const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
/*
VARINTs are stored as little-endian big numbers.
*/
const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{
......@@ -885,14 +902,20 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
/* fall through: */
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
{
bool is_varint;
if (!strcmp(validator_name, validator_blob) ||
!strcmp(validator_name, validator_ascii) ||
!strcmp(validator_name, validator_text))
!strcmp(validator_name, validator_text) ||
(is_varint= !strcmp(validator_name, validator_varint)))
{
res= new StringCopyConverter;
size_t max_size= (size_t)-1;
if (is_varint)
max_size= field->field_length;
res= new StringCopyConverter(max_size);
}
break;
}
case MYSQL_TYPE_LONG:
if (!strcmp(validator_name, validator_int))
res= new Int32DataConverter;
......@@ -1041,24 +1064,43 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
/* TODO: what if we're not reading all columns?? */
if (!found)
{
rc= HA_ERR_KEY_NOT_FOUND;
}
else
rc= read_cassandra_columns(false);
DBUG_RETURN(rc);
}
void ha_cassandra::print_conversion_error(const char *field_name,
char *cass_value,
int cass_value_len)
{
char buf[32];
char *p= cass_value;
size_t i= 0;
for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
{
read_cassandra_columns(false);
buf[i++]= map2number[(*p >> 4) & 0xF];
buf[i++]= map2number[*p & 0xF];
}
buf[i]=0;
DBUG_RETURN(rc);
se->print_error("Unable to convert value for field `%s` from Cassandra's data"
" format. Source data is %d bytes, 0x%s%s",
field_name, cass_value_len, buf,
(i == sizeof(buf) - 1)? "..." : "");
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
}
void ha_cassandra::read_cassandra_columns(bool unpack_pk)
int ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
char *cass_name;
char *cass_value;
int cass_value_len;
Field **field;
int res= 0;
/*
cassandra_to_mariadb() calls will use field->store(...) methods, which
......@@ -1082,7 +1124,14 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
{
int fieldnr= (*field)->field_index;
(*field)->set_notnull();
field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
cass_value_len))
{
print_conversion_error((*field)->field_name, cass_value,
cass_value_len);
res=1;
goto err;
}
break;
}
}
......@@ -1094,10 +1143,17 @@ void ha_cassandra::read_cassandra_columns(bool unpack_pk)
field= table->field;
(*field)->set_notnull();
se->get_read_rowkey(&cass_value, &cass_value_len);
rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len);
if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
{
print_conversion_error((*field)->field_name, cass_value, cass_value_len);
res=1;
goto err;
}
}
err:
dbug_tmp_restore_column_map(table->write_set, old_map);
return res;
}
......@@ -1234,10 +1290,7 @@ int ha_cassandra::rnd_next(uchar *buf)
if (reached_eof)
rc= HA_ERR_END_OF_FILE;
else
{
read_cassandra_columns(true);
rc= 0;
}
rc= read_cassandra_columns(true);
}
DBUG_RETURN(rc);
......@@ -1422,8 +1475,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info)
{
if (!se->get_next_multiget_row())
{
read_cassandra_columns(true);
res= 0;
res= read_cassandra_columns(true);
break;
}
else
......
......@@ -58,7 +58,7 @@ class ha_cassandra: public handler
bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters();
void read_cassandra_columns(bool unpack_pk);
int read_cassandra_columns(bool unpack_pk);
int check_table_options(struct ha_table_option_struct* options);
bool doing_insert_batch;
......@@ -66,6 +66,8 @@ class ha_cassandra: public handler
/* Used to produce 'wrong column %s at row %lu' warnings */
ha_rows insert_lineno;
void print_conversion_error(const char *field_name,
char *cass_value, int cass_value_len);
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment