Commit 413641de authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5254 cleanup upsert handler interface in tokudb

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@50503 c7de825b-a66e-492c-adef-691d508d4ae1
parent 18554201
......@@ -671,12 +671,12 @@ private:
#endif
#if TOKU_INCLUDE_UPSERT
private:
int fast_update(THD *thd, List<Item> &fields, List<Item> &values, Item *conds);
bool check_fast_update(THD *thd, List<Item> &fields, List<Item> &values, Item *conds);
int send_update_message(List<Item> &fields, List<Item> &values, Item *conds, DB_TXN *txn);
int upsert(THD *thd, uchar *record, List<Item> &update_fields, List<Item> &update_values);
int fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds);
bool check_fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds);
int send_update_message(List<Item> &update_fields, List<Item> &update_values, Item *conds, DB_TXN *txn);
int upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values);
bool check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values);
int send_upsert_message(THD *thd, uchar *record, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn);
int send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn);
#endif
};
......
......@@ -131,34 +131,27 @@ static uint32_t update_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_in
// Determine if an update operation can be offloaded to the storage engine.
// The update operation consists of a list of update expressions (fields[i] = values[i]), and a list
// of where conditions (conds). The function returns true is the update is handled in the storage engine.
// Otherwise, false is returned.
int ha_tokudb::fast_update(THD *thd, List<Item> &fields, List<Item> &values, Item *conds) {
// of where conditions (conds). The function returns 0 if the update is handled in the storage engine.
// Otherwise, an error is returned.
int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds) {
int error = 0;
if (tokudb_fast_update_debug) {
dump_item_list("fields", fields);
dump_item_list("values", values);
dump_item_list("fields", update_fields);
dump_item_list("values", update_values);
if (conds) {
fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n");
}
}
if (fields.elements < 1 || fields.elements != values.elements)
if (update_fields.elements < 1 || update_fields.elements != update_values.elements)
return ENOTSUP; // something is fishy with the parameters
rw_rdlock(&share->num_DBs_lock);
if (share->num_DBs > table->s->keys + test(hidden_primary_key)) // hot index in progress
error = ENOTSUP; // run on the slow path
if (error == 0 && !check_fast_update(thd, fields, values, conds))
if (error == 0 && !check_fast_update(thd, update_fields, update_values, conds))
error = ENOTSUP;
if (error == 0)
error = send_update_message(fields, values, conds, transaction);
rw_unlock(&share->num_DBs_lock);
error = send_update_message(update_fields, update_values, conds, transaction);
if (error != 0) {
if (get_disable_slow_update(thd))
......@@ -611,19 +604,27 @@ int ha_tokudb::send_update_message(List<Item> &fields, List<Item> &values, Item
marshall_simple_update(update_message, lhs_item, rhs_item, table, share);
}
// send the message
DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
update_dbt.data = update_message.data();
update_dbt.size = update_message.size();
error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
rw_rdlock(&share->num_DBs_lock);
if (share->num_DBs > table->s->keys + test(hidden_primary_key)) {// hot index in progress
error = ENOTSUP; // run on the slow path
} else {
// send the message
DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
update_dbt.data = update_message.data();
update_dbt.size = update_message.size();
error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
}
rw_unlock(&share->num_DBs_lock);
return error;
}
// Determine if an upsert operation can be offloaded to the storage engine.
// An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]).
// The function returns true is the upsert is handled in the storage engine. Otherwise, false is returned.
int ha_tokudb::upsert(THD *thd, uchar *record, List<Item> &update_fields, List<Item> &update_values) {
// The function returns 0 if the upsert is handled in the storage engine. Otherwise, an error code is returned.
int ha_tokudb::upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) {
int error = 0;
if (tokudb_upsert_debug) {
......@@ -635,18 +636,11 @@ int ha_tokudb::upsert(THD *thd, uchar *record, List<Item> &update_fields, List<I
if (update_fields.elements < 1 || update_fields.elements != update_values.elements)
return ENOTSUP; // not an upsert or something is fishy with the parameters
rw_rdlock(&share->num_DBs_lock);
if (share->num_DBs > table->s->keys + test(hidden_primary_key)) // hot index in progress
error = ENOTSUP; // run on the slow path
if (error == 0 && !check_upsert(thd, update_fields, update_values))
error = ENOTSUP;
if (error == 0)
error = send_upsert_message(thd, record, update_fields, update_values, transaction);
rw_unlock(&share->num_DBs_lock);
error = send_upsert_message(thd, update_fields, update_values, transaction);
if (error != 0) {
if (get_disable_slow_upsert(thd))
......@@ -693,17 +687,17 @@ bool ha_tokudb::check_upsert(THD *thd, List<Item> &update_fields, List<Item> &up
}
// Generate an upsert message and send it into the primary tree. Return 0 if successful.
int ha_tokudb::send_upsert_message(THD *thd, uchar *record, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn) {
int ha_tokudb::send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn) {
int error = 0;
// generate primary key
DBT key_dbt;
bool has_null;
create_dbt_key_from_table(&key_dbt, primary_key, primary_key_buff, record, &has_null);
create_dbt_key_from_table(&key_dbt, primary_key, primary_key_buff, table->record[0], &has_null);
// generate packed row
DBT row;
error = pack_row(&row, (const uchar *) record, primary_key);
error = pack_row(&row, (const uchar *) table->record[0], primary_key);
if (error)
return error;
......@@ -736,11 +730,19 @@ int ha_tokudb::send_upsert_message(THD *thd, uchar *record, List<Item> &update_f
marshall_simple_update(update_message, lhs_item, rhs_item, table, share);
}
// send the upsert message
DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
update_dbt.data = update_message.data();
update_dbt.size = update_message.size();
error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
rw_rdlock(&share->num_DBs_lock);
if (share->num_DBs > table->s->keys + test(hidden_primary_key)) { // hot index in progress
error = ENOTSUP; // run on the slow path
} else {
// send the upsert message
DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
update_dbt.data = update_message.data();
update_dbt.size = update_message.size();
error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
}
rw_unlock(&share->num_DBs_lock);
return error;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment