Commit 33b71945 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3117],[t:3118], merge handlerton pieces

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@26452 c7de825b-a66e-492c-adef-691d508d4ae1
parent b9e45955
...@@ -1205,6 +1205,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1205,6 +1205,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
mult_del_flags[i] = DB_DELETE_ANY; mult_del_flags[i] = DB_DELETE_ANY;
mult_dbt_flags[i] = DB_DBT_REALLOC; mult_dbt_flags[i] = DB_DBT_REALLOC;
} }
num_DBs_locked_in_bulk = false;
lock_count = 0;
} }
// //
...@@ -2958,13 +2960,18 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ...@@ -2958,13 +2960,18 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
delay_updating_ai_metadata = true; delay_updating_ai_metadata = true;
ai_metadata_update_required = false; ai_metadata_update_required = false;
abort_loader = false; abort_loader = false;
rw_rdlock(&share->num_DBs_lock);
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
num_DBs_locked_in_bulk = true;
lock_count = 0;
if (share->try_table_lock) { if (share->try_table_lock) {
if (get_prelock_empty(thd) && may_table_be_empty()) { if (get_prelock_empty(thd) && may_table_be_empty()) {
if (using_ignore || get_load_save_space(thd)) { if (using_ignore || get_load_save_space(thd)) {
acquire_table_lock(transaction, lock_write); acquire_table_lock(transaction, lock_write);
} }
else { else {
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
mult_dbt_flags[primary_key] = 0; mult_dbt_flags[primary_key] = 0;
if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) { if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) {
mult_put_flags[primary_key] = DB_NOOVERWRITE; mult_put_flags[primary_key] = DB_NOOVERWRITE;
...@@ -3002,6 +3009,13 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ...@@ -3002,6 +3009,13 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
share->try_table_lock = false; // RFP what good is the mutex? share->try_table_lock = false; // RFP what good is the mutex?
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
} }
for (uint i = 0; i < curr_num_DBs; i++) {
DB* curr_DB = share->key_file[i];
int error = curr_DB->pre_acquire_fileops_shared_lock(curr_DB, transaction);
if (!error) {
mult_put_flags[i] |= DB_PRELOCKED_FILE_READ;
}
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -3016,6 +3030,7 @@ int ha_tokudb::end_bulk_insert(bool abort) { ...@@ -3016,6 +3030,7 @@ int ha_tokudb::end_bulk_insert(bool abort) {
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
bool using_loader = (loader != NULL); bool using_loader = (loader != NULL);
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
if (ai_metadata_update_required) { if (ai_metadata_update_required) {
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment); error = update_max_auto_inc(share->status_block, share->last_auto_increment);
...@@ -3067,6 +3082,17 @@ int ha_tokudb::end_bulk_insert(bool abort) { ...@@ -3067,6 +3082,17 @@ int ha_tokudb::end_bulk_insert(bool abort) {
} }
cleanup: cleanup:
if (num_DBs_locked_in_bulk) {
rw_unlock(&share->num_DBs_lock);
}
num_DBs_locked_in_bulk = false;
lock_count = 0;
for (uint i = 0; i < curr_num_DBs; i++) {
u_int32_t prelocked_read_flag = DB_PRELOCKED_FILE_READ;
mult_put_flags[i] &= ~(prelocked_read_flag);
}
if (loader) { if (loader) {
error = sprintf(write_status_msg, "aborting bulk load"); error = sprintf(write_status_msg, "aborting bulk load");
thd_proc_info(thd, write_status_msg); thd_proc_info(thd, write_status_msg);
...@@ -3451,7 +3477,13 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { ...@@ -3451,7 +3477,13 @@ void ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) {
// //
// set the put flags for the main dictionary // set the put flags for the main dictionary
// //
void ha_tokudb::set_main_dict_put_flags(THD* thd, u_int32_t* put_flags) { void ha_tokudb::set_main_dict_put_flags(
THD* thd,
u_int32_t* put_flags,
bool no_overwrite_no_error_allowed
)
{
u_int32_t old_prelock_flags = (*put_flags)&(DB_PRELOCKED_FILE_READ);
// //
// optimization for "REPLACE INTO..." (and "INSERT IGNORE") command // optimization for "REPLACE INTO..." (and "INSERT IGNORE") command
// if the command is "REPLACE INTO" and the only table // if the command is "REPLACE INTO" and the only table
...@@ -3464,43 +3496,43 @@ void ha_tokudb::set_main_dict_put_flags(THD* thd, u_int32_t* put_flags) { ...@@ -3464,43 +3496,43 @@ void ha_tokudb::set_main_dict_put_flags(THD* thd, u_int32_t* put_flags) {
// consistency between indexes // consistency between indexes
// //
if (hidden_primary_key){ if (hidden_primary_key){
*put_flags = DB_YESOVERWRITE; *put_flags = DB_YESOVERWRITE|old_prelock_flags;
} }
else if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && else if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) &&
!is_replace_into(thd) && !is_replace_into(thd) &&
!is_insert_ignore(thd) !is_insert_ignore(thd)
) )
{ {
*put_flags = DB_YESOVERWRITE; *put_flags = DB_YESOVERWRITE|old_prelock_flags;
} }
else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) && else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) &&
is_replace_into(thd) is_replace_into(thd)
) )
{ {
*put_flags = DB_YESOVERWRITE; *put_flags = DB_YESOVERWRITE|old_prelock_flags;
} }
else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) && else if (do_ignore_flag_optimization(thd,table,share->replace_into_fast) &&
is_insert_ignore(thd) is_insert_ignore(thd) && no_overwrite_no_error_allowed
) )
{ {
*put_flags = DB_NOOVERWRITE_NO_ERROR; *put_flags = DB_NOOVERWRITE_NO_ERROR|old_prelock_flags;
} }
else else
{ {
*put_flags = DB_NOOVERWRITE; *put_flags = DB_NOOVERWRITE|old_prelock_flags;
} }
} }
int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) { int ha_tokudb::insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) {
int error = 0; int error = 0;
u_int32_t put_flags = 0; u_int32_t put_flags = mult_put_flags[primary_key];
THD *thd = ha_thd(); THD *thd = ha_thd();
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd); ulonglong wait_lock_time = get_write_lock_wait_time(thd);
assert(curr_num_DBs == 1); assert(curr_num_DBs == 1);
set_main_dict_put_flags(thd,&put_flags); set_main_dict_put_flags(thd, &put_flags, true);
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = share->file->put( error = share->file->put(
...@@ -3527,14 +3559,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN ...@@ -3527,14 +3559,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd); ulonglong wait_lock_time = get_write_lock_wait_time(thd);
set_main_dict_put_flags(thd, &mult_put_flags[primary_key]); set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false);
if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) {
//
//hopefully temporary, right now, put_multiple does not
// support use of DB_NOOVERWRITE_NO_ERROR as put_flag
//
mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = db_env->put_multiple( error = db_env->put_multiple(
...@@ -3622,7 +3647,17 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -3622,7 +3647,17 @@ int ha_tokudb::write_row(uchar * record) {
// //
// grab reader lock on numDBs_lock // grab reader lock on numDBs_lock
// //
if (!num_DBs_locked_in_bulk) {
rw_rdlock(&share->num_DBs_lock);
}
else {
lock_count++;
if (lock_count >= 2000) {
rw_unlock(&share->num_DBs_lock);
rw_rdlock(&share->num_DBs_lock); rw_rdlock(&share->num_DBs_lock);
lock_count = 0;
}
}
curr_num_DBs = share->num_DBs; curr_num_DBs = share->num_DBs;
if (hidden_primary_key) { if (hidden_primary_key) {
...@@ -3688,7 +3723,9 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -3688,7 +3723,9 @@ int ha_tokudb::write_row(uchar * record) {
track_progress(thd); track_progress(thd);
} }
cleanup: cleanup:
if (!num_DBs_locked_in_bulk) {
rw_unlock(&share->num_DBs_lock); rw_unlock(&share->num_DBs_lock);
}
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
} }
...@@ -3858,14 +3895,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3858,14 +3895,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
error = pack_old_row_for_update(&old_prim_row, old_row, primary_key); error = pack_old_row_for_update(&old_prim_row, old_row, primary_key);
if (error) { goto cleanup; } if (error) { goto cleanup; }
set_main_dict_put_flags(thd, &mult_put_flags[primary_key]); set_main_dict_put_flags(thd, &mult_put_flags[primary_key], false);
if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) {
//
//hopefully temporary, right now, put_multiple does not
// support use of DB_NOOVERWRITE_NO_ERROR as put_flag
//
mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = db_env->update_multiple( error = db_env->update_multiple(
db_env, db_env,
......
...@@ -282,6 +282,9 @@ private: ...@@ -282,6 +282,9 @@ private:
bool abort_loader; bool abort_loader;
int loader_error; int loader_error;
bool num_DBs_locked_in_bulk;
u_int32_t lock_count;
bool fix_rec_buff_for_blob(ulong length); bool fix_rec_buff_for_blob(ulong length);
bool fix_rec_update_buff_for_blob(ulong length); bool fix_rec_update_buff_for_blob(ulong length);
void fix_mult_rec_buff(); void fix_mult_rec_buff();
...@@ -348,7 +351,7 @@ private: ...@@ -348,7 +351,7 @@ private:
int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info); int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info);
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn); int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd); int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
void set_main_dict_put_flags(THD* thd, u_int32_t* put_flags); void set_main_dict_put_flags(THD* thd, u_int32_t* put_flags, bool no_overwrite_no_error_allowed);
int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn); int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd); int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd);
void test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val); void test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment