Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
d3625970
Commit
d3625970
authored
Aug 19, 2012
by
Sergey Petrunya
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
MDEV-431: Cassandra storage engine
- Descriptive error messages - Unpack PK column on range scans
parent
9cdf5eee
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
147 additions
and
31 deletions
+147
-31
mysql-test/r/cassandra.result
mysql-test/r/cassandra.result
+15
-4
mysql-test/t/cassandra.test
mysql-test/t/cassandra.test
+8
-2
storage/cassandra/cassandra_se.cc
storage/cassandra/cassandra_se.cc
+31
-3
storage/cassandra/cassandra_se.h
storage/cassandra/cassandra_se.h
+2
-0
storage/cassandra/ha_cassandra.cc
storage/cassandra/ha_cassandra.cc
+87
-21
storage/cassandra/ha_cassandra.h
storage/cassandra/ha_cassandra.h
+4
-1
No files found.
mysql-test/r/cassandra.result
View file @
d3625970
...
...
@@ -13,15 +13,26 @@ thrift_host='localhost' keyspace='no_such_keyspace' column_family='colfam';
ERROR HY000: Unable to connect to foreign data source: Default TException. [Keyspace no_such_keyspace does not exist]
create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
thrift_host='localhost' keyspace='no_such_keyspace';
ERROR HY000:
Can't create table 'test.t1' (errno: 140)
create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
ERROR HY000:
Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s
create table t1 (rowkey
var
char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
select * from t1;
rowkey data1 data2
insert into t1 values ('rowkey10', 'data1-value', 123456);
insert into t1 values ('rowkey11', 'data1-value2', 34543);
insert into t1 values ('rowkey12', 'data1-value3', 454);
select * from t1;
rowkey data1 data2
data1-value 123456
data1-value2 34543
rowkey12 data1-value3 454
rowkey10 data1-value 123456
rowkey11 data1-value2 34543
explain
select * from t1 where rowkey='rowkey11';
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 38 const 1
select * from t1 where rowkey='rowkey11';
rowkey data1 data2
rowkey11 data1-value2 34543
delete from t1;
select * from t1;
rowkey data1 data2
...
...
mysql-test/t/cassandra.test
View file @
d3625970
...
...
@@ -25,7 +25,7 @@ create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
column_family
=
'colfam'
;
# No column family specified
--
error
ER_C
ANT_CREATE_TABL
E
--
error
ER_C
ONNECT_TO_FOREIGN_DATA_SOURC
E
create
table
t1
(
rowkey
char
(
10
)
primary
key
,
column1
char
(
10
))
engine
=
cassandra
thrift_host
=
'localhost'
keyspace
=
'no_such_keyspace'
;
...
...
@@ -49,13 +49,19 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint);
############################################################################
# Now, create a table for real and insert data
create
table
t1
(
rowkey
char
(
36
)
primary
key
,
data1
varchar
(
60
),
data2
bigint
)
engine
=
cassandra
create
table
t1
(
rowkey
var
char
(
36
)
primary
key
,
data1
varchar
(
60
),
data2
bigint
)
engine
=
cassandra
thrift_host
=
'localhost'
keyspace
=
'mariadbtest2'
column_family
=
'cf1'
;
select
*
from
t1
;
insert
into
t1
values
(
'rowkey10'
,
'data1-value'
,
123456
);
insert
into
t1
values
(
'rowkey11'
,
'data1-value2'
,
34543
);
insert
into
t1
values
(
'rowkey12'
,
'data1-value3'
,
454
);
select
*
from
t1
;
explain
select
*
from
t1
where
rowkey
=
'rowkey11'
;
select
*
from
t1
where
rowkey
=
'rowkey11'
;
# Check if deletion works
delete
from
t1
;
select
*
from
t1
;
...
...
storage/cassandra/cassandra_se.cc
View file @
d3625970
...
...
@@ -65,6 +65,8 @@ class Cassandra_se_impl: public Cassandra_se_interface
std
::
vector
<
KeySlice
>
key_slice_vec
;
std
::
vector
<
KeySlice
>::
iterator
key_slice_it
;
std
::
string
rowkey
;
/* key of the record we're returning now */
SlicePredicate
slice_pred
;
public:
Cassandra_se_impl
()
:
cass
(
NULL
)
{}
...
...
@@ -77,6 +79,7 @@ public:
bool
setup_ddl_checks
();
void
first_ddl_column
();
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
int
*
value_len
);
void
get_rowkey_type
(
char
**
name
,
char
**
type
);
/* Writes */
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
);
...
...
@@ -86,6 +89,7 @@ public:
/* Reads, point lookups */
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
);
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
);
void
get_read_rowkey
(
char
**
value
,
int
*
value_len
);
/* Reads, multi-row scans */
bool
get_range_slices
();
...
...
@@ -193,6 +197,21 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
return
false
;
}
void
Cassandra_se_impl
::
get_rowkey_type
(
char
**
name
,
char
**
type
)
{
if
(
cf_def
.
__isset
.
key_validation_class
)
*
type
=
(
char
*
)
cf_def
.
key_validation_class
.
c_str
();
else
*
type
=
NULL
;
if
(
cf_def
.
__isset
.
key_alias
)
*
name
=
(
char
*
)
cf_def
.
key_alias
.
c_str
();
else
*
name
=
NULL
;
}
/////////////////////////////////////////////////////////////////////////////
// Data writes
/////////////////////////////////////////////////////////////////////////////
...
...
@@ -269,8 +288,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
ColumnParent
cparent
;
cparent
.
column_family
=
column_family
;
std
::
string
rowkey_str
;
rowkey_str
.
assign
(
key
,
key_len
);
rowkey
.
assign
(
key
,
key_len
);
SlicePredicate
slice_pred
;
SliceRange
sr
;
...
...
@@ -279,7 +297,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
slice_pred
.
__set_slice_range
(
sr
);
try
{
cass
->
get_slice
(
column_data_vec
,
rowkey
_str
,
cparent
,
slice_pred
,
cass
->
get_slice
(
column_data_vec
,
rowkey
,
cparent
,
slice_pred
,
cur_consistency_level
);
if
(
column_data_vec
.
size
()
==
0
)
...
...
@@ -333,6 +351,15 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
}
/* Return the rowkey for the record that was read */
void
Cassandra_se_impl
::
get_read_rowkey
(
char
**
value
,
int
*
value_len
)
{
*
value
=
(
char
*
)
rowkey
.
c_str
();
*
value_len
=
rowkey
.
length
();
}
bool
Cassandra_se_impl
::
get_range_slices
()
//todo: start_range/end_range as parameters
{
bool
res
=
true
;
...
...
@@ -375,6 +402,7 @@ bool Cassandra_se_impl::get_next_range_slice_row()
return
true
;
column_data_vec
=
key_slice_it
->
columns
;
rowkey
=
key_slice_it
->
key
;
column_data_it
=
column_data_vec
.
begin
();
key_slice_it
++
;
return
false
;
...
...
storage/cassandra/cassandra_se.h
View file @
d3625970
...
...
@@ -25,6 +25,7 @@ public:
virtual
void
first_ddl_column
()
=
0
;
virtual
bool
next_ddl_column
(
char
**
name
,
int
*
name_len
,
char
**
value
,
int
*
value_len
)
=
0
;
virtual
void
get_rowkey_type
(
char
**
name
,
char
**
type
)
=
0
;
/* Writes */
virtual
void
start_prepare_insert
(
const
char
*
key
,
int
key_len
)
=
0
;
...
...
@@ -35,6 +36,7 @@ public:
/* Reads */
virtual
bool
get_slice
(
char
*
key
,
size_t
key_len
,
bool
*
found
)
=
0
;
virtual
bool
get_next_read_column
(
char
**
name
,
char
**
value
,
int
*
value_len
)
=
0
;
virtual
void
get_read_rowkey
(
char
**
value
,
int
*
value_len
)
=
0
;
/* Reads, multi-row scans */
virtual
bool
get_range_slices
()
=
0
;
...
...
storage/cassandra/ha_cassandra.cc
View file @
d3625970
...
...
@@ -212,7 +212,7 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra
::
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
)
:
handler
(
hton
,
table_arg
),
se
(
NULL
),
field_converters
(
NULL
)
se
(
NULL
),
field_converters
(
NULL
)
,
rowkey_converter
(
NULL
)
{}
...
...
@@ -251,7 +251,6 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
if
(
setup_field_converters
(
table
->
field
,
table
->
s
->
fields
))
{
my_error
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
,
MYF
(
0
),
"setup_field_converters"
);
DBUG_RETURN
(
HA_ERR_NO_CONNECTION
);
}
...
...
@@ -342,7 +341,11 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
#endif
DBUG_ASSERT
(
!
se
);
if
(
!
options
->
host
||
!
options
->
keyspace
||
!
options
->
column_family
)
{
my_error
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
,
MYF
(
0
),
"thrift_host, keyspace, and column_family table options must be specified"
);
DBUG_RETURN
(
HA_WRONG_CREATE_OPTION
);
}
se
=
get_cassandra_se
();
se
->
set_column_family
(
options
->
column_family
);
if
(
se
->
connect
(
options
->
host
,
options
->
keyspace
))
...
...
@@ -515,6 +518,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
case
MYSQL_TYPE_VAR_STRING
:
case
MYSQL_TYPE_VARCHAR
:
//case MYSQL_TYPE_STRING: <-- todo: should we allow end-padded 'CHAR(N)'?
if
(
!
strcmp
(
validator_name
,
validator_blob
)
||
!
strcmp
(
validator_name
,
validator_ascii
)
||
!
strcmp
(
validator_name
,
validator_text
))
...
...
@@ -560,7 +564,12 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
n_mapped
++
;
ColumnDataConverter
**
conv
=
field_converters
+
(
*
field
)
->
field_index
;
if
(
!
(
*
conv
=
map_field_to_validator
(
*
field
,
col_type
)))
{
se
->
print_error
(
"Failed to map column %s to datatype %s"
,
(
*
field
)
->
field_name
,
col_type
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
}
(
*
conv
)
->
field
=
*
field
;
}
}
...
...
@@ -569,12 +578,37 @@ bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
if
(
n_mapped
!=
n_fields
-
1
)
return
true
;
/*
Setup type conversion for row_key. It may also have a name, but we ignore
it currently
*/
se
->
get_rowkey_type
(
&
col_name
,
&
col_type
);
if
(
col_type
!=
NULL
)
{
if
(
!
(
rowkey_converter
=
map_field_to_validator
(
*
field_arg
,
col_type
)))
{
se
->
print_error
(
"Failed to map PRIMARY KEY to datatype %s"
,
col_type
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
}
rowkey_converter
->
field
=
*
field_arg
;
}
else
{
se
->
print_error
(
"Cassandra's rowkey has no defined datatype (todo: support this)"
);
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
return
true
;
}
return
false
;
}
void
ha_cassandra
::
free_field_converters
()
{
delete
rowkey_converter
;
rowkey_converter
=
NULL
;
if
(
field_converters
)
{
for
(
uint
i
=
0
;
i
<
n_field_converters
;
i
++
)
...
...
@@ -597,19 +631,26 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
if
(
find_flag
!=
HA_READ_KEY_EXACT
)
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
// todo: decode the search key.
uint
key_len
=
calculate_key_len
(
table
,
active_index
,
key
,
keypart_map
);
store_key_image_to_rec
(
table
->
field
[
0
],
(
uchar
*
)
key
,
key_len
);
#if 0
char buff[256];
String tmp(buff,sizeof(buff), &my_charset_bin);
tmp.length(0);
String *str;
str= table->field[0]->val_str(&tmp);
#endif
char
*
cass_key
;
int
cass_key_len
;
rowkey_converter
->
mariadb_to_cassandra
(
&
cass_key
,
&
cass_key_len
);
bool
found
;
if
(
se
->
get_slice
((
char
*
)
str
->
ptr
(),
str
->
length
(),
&
found
))
if
(
se
->
get_slice
(
cass_key
,
cass_key_len
,
&
found
))
{
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
rc
=
HA_ERR_INTERNAL_ERROR
;
}
/* TODO: what if we're not reading all columns?? */
if
(
!
found
)
...
...
@@ -618,14 +659,14 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
}
else
{
read_cassandra_columns
();
read_cassandra_columns
(
false
);
}
DBUG_RETURN
(
rc
);
}
void
ha_cassandra
::
read_cassandra_columns
()
void
ha_cassandra
::
read_cassandra_columns
(
bool
unpack_pk
)
{
char
*
cass_name
;
char
*
cass_value
;
...
...
@@ -660,6 +701,15 @@ void ha_cassandra::read_cassandra_columns()
}
}
if
(
unpack_pk
)
{
/* Unpack rowkey to primary key */
field
=
table
->
field
;
(
*
field
)
->
set_notnull
();
se
->
get_read_rowkey
(
&
cass_value
,
&
cass_value_len
);
rowkey_converter
->
cassandra_to_mariadb
(
cass_value
,
cass_value_len
);
}
dbug_tmp_restore_column_map
(
table
->
write_set
,
old_map
);
}
...
...
@@ -667,12 +717,13 @@ void ha_cassandra::read_cassandra_columns()
int
ha_cassandra
::
write_row
(
uchar
*
buf
)
{
my_bitmap_map
*
old_map
;
char
buff
[
512
];
//
char buff[512];
DBUG_ENTER
(
"ha_cassandra::write_row"
);
old_map
=
dbug_tmp_use_all_columns
(
table
,
table
->
read_set
);
/* Convert the key (todo: unify with the rest of the processing) */
#if 0
{
Field *pk_col= table->field[0];
String tmp(buff,sizeof(buff), &my_charset_bin);
...
...
@@ -682,6 +733,11 @@ int ha_cassandra::write_row(uchar *buf)
se->start_prepare_insert(str->ptr(), str->length());
}
#endif
char
*
cass_key
;
int
cass_key_len
;
rowkey_converter
->
mariadb_to_cassandra
(
&
cass_key
,
&
cass_key_len
);
se
->
start_prepare_insert
(
cass_key
,
cass_key_len
);
/* Convert other fields */
for
(
uint
i
=
1
;
i
<
table
->
s
->
fields
;
i
++
)
...
...
@@ -697,6 +753,9 @@ int ha_cassandra::write_row(uchar *buf)
bool
res
=
se
->
do_insert
();
if
(
res
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
res
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
...
...
@@ -713,6 +772,8 @@ int ha_cassandra::rnd_init(bool scan)
se
->
add_read_column
(
table
->
field
[
i
]
->
field_name
);
bres
=
se
->
get_range_slices
();
if
(
bres
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
...
...
@@ -739,7 +800,7 @@ int ha_cassandra::rnd_next(uchar *buf)
}
else
{
read_cassandra_columns
();
read_cassandra_columns
(
true
);
rc
=
0
;
}
...
...
@@ -754,10 +815,21 @@ int ha_cassandra::delete_all_rows()
bres
=
se
->
truncate
();
if
(
bres
)
my_error
(
ER_INTERNAL_ERROR
,
MYF
(
0
),
se
->
error_str
());
DBUG_RETURN
(
bres
?
HA_ERR_INTERNAL_ERROR
:
0
);
}
int
ha_cassandra
::
delete_row
(
const
uchar
*
buf
)
{
DBUG_ENTER
(
"ha_cassandra::delete_row"
);
// todo: delete the row we've just read.
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
}
/////////////////////////////////////////////////////////////////////////////
// Dummy implementations start
/////////////////////////////////////////////////////////////////////////////
...
...
@@ -815,7 +887,8 @@ ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
key_range
*
max_key
)
{
DBUG_ENTER
(
"ha_cassandra::records_in_range"
);
DBUG_RETURN
(
10
);
// low number to force index usage
//DBUG_RETURN(10); // low number to force index usage
DBUG_RETURN
(
HA_POS_ERROR
);
}
...
...
@@ -866,13 +939,6 @@ int ha_cassandra::delete_table(const char *name)
}
int
ha_cassandra
::
delete_row
(
const
uchar
*
buf
)
{
DBUG_ENTER
(
"ha_cassandra::delete_row"
);
DBUG_RETURN
(
HA_ERR_WRONG_COMMAND
);
}
/**
check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
if new and old definition are compatible
...
...
storage/cassandra/ha_cassandra.h
View file @
d3625970
...
...
@@ -38,10 +38,13 @@ class ha_cassandra: public handler
ColumnDataConverter
**
field_converters
;
uint
n_field_converters
;
ColumnDataConverter
*
rowkey_converter
;
bool
setup_field_converters
(
Field
**
field
,
uint
n_fields
);
void
free_field_converters
();
void
read_cassandra_columns
();
void
read_cassandra_columns
(
bool
unpack_pk
);
public:
ha_cassandra
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
~
ha_cassandra
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment