Commit 67c7a832 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2033], add process info for queries

git-svn-id: file:///svn/mysql/tokudb-engine/src@14784 c7de825b-a66e-492c-adef-691d508d4ae1
parent a48828aa
......@@ -1005,9 +1005,6 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
using_ignore = 0;
last_cursor_error = 0;
range_lock_grabbed = false;
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
blob_buff = NULL;
num_blob_bytes = 0;
delay_updating_ai_metadata = false;
......@@ -2635,7 +2632,7 @@ int ha_tokudb::write_row(uchar * record) {
TOKUDB_DBUG_ENTER("ha_tokudb::write_row");
DBT row, prim_key, key;
int error;
THD *thd = NULL;
THD *thd = ha_thd();
u_int32_t put_flags;
bool has_null;
DB_TXN* sub_trans = NULL;
......@@ -2643,10 +2640,9 @@ int ha_tokudb::write_row(uchar * record) {
bool is_replace_into;
tokudb_trx_data *trx = NULL;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
declare_lockretry;
declare_lockretry;
thd = ha_thd();
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
......@@ -2807,11 +2803,8 @@ int ha_tokudb::write_row(uchar * record) {
if (!error) {
added_rows++;
num_added_rows_in_stmt++;
if ((num_added_rows_in_stmt % 1000) == 0) {
sprintf(write_status_msg, "Inserted about %llu rows", num_added_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
trx->stmt_progress.inserted++;
track_progress(thd);
}
cleanup:
if (error == DB_KEYEXIST) {
......@@ -2915,6 +2908,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
THD* thd = ha_thd();
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
......@@ -3050,11 +3044,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
if (!error) {
num_updated_rows_in_stmt++;
if ((num_updated_rows_in_stmt % 1000) == 0) {
sprintf(write_status_msg, "Updated about %llu rows", num_updated_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
trx->stmt_progress.updated++;
track_progress(thd);
}
......@@ -3167,6 +3158,7 @@ int ha_tokudb::delete_row(const uchar * record) {
key_map keys = table_share->keys_in_use;
bool has_null;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
......@@ -3190,11 +3182,8 @@ int ha_tokudb::delete_row(const uchar * record) {
}
else {
deleted_rows++;
num_deleted_rows_in_stmt++;
if ((num_deleted_rows_in_stmt % 1000) == 0) {
sprintf(write_status_msg, "Deleted about %llu rows", num_deleted_rows_in_stmt);
thd_proc_info(thd, write_status_msg);
}
trx->stmt_progress.deleted++;
track_progress(thd);
}
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
......@@ -3585,6 +3574,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
bool has_null;
int cmp;
u_int32_t flags;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
......@@ -3613,7 +3604,8 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
if (cmp) {
error = HA_ERR_END_OF_FILE;
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3643,6 +3635,8 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
DBT lookup_key;
int error;
u_int32_t flags = 0;
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
struct smart_dbt_info info;
struct index_read_info ir_info;
......@@ -3718,6 +3712,9 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) {
TOKUDB_TRACE("error:%d:%d\n", error, find_flag);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3738,6 +3735,8 @@ int ha_tokudb::index_next(uchar * buf) {
int error;
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
......@@ -3753,6 +3752,8 @@ int ha_tokudb::index_next(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3777,6 +3778,8 @@ int ha_tokudb::index_prev(uchar * buf) {
int error;
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
......@@ -3792,7 +3795,9 @@ int ha_tokudb::index_prev(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3811,6 +3816,8 @@ int ha_tokudb::index_first(uchar * buf) {
int error;
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status);
......@@ -3827,7 +3834,9 @@ int ha_tokudb::index_first(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3846,6 +3855,8 @@ int ha_tokudb::index_last(uchar * buf) {
int error;
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status);
......@@ -3862,6 +3873,11 @@ int ha_tokudb::index_last(uchar * buf) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
if (trx) {
trx->stmt_progress.queried++;
}
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3921,6 +3937,9 @@ int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error;
u_int32_t flags = SET_READ_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
struct smart_dbt_info info;
HANDLE_INVALID_CURSOR();
......@@ -3935,11 +3954,36 @@ int ha_tokudb::rnd_next(uchar * buf) {
info.keynr = primary_key;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key);
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
void ha_tokudb::track_progress(THD* thd) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (trx) {
bool update_status = (trx->stmt_progress.queried % 1000) == 1 ||
(trx->stmt_progress.inserted% 1000) == 1 ||
(trx->stmt_progress.updated% 1000) == 1 ||
(trx->stmt_progress.deleted% 1000) == 1;
if (update_status) {
sprintf(
write_status_msg,
"Queried about %llu rows, inserted about %llu rows, updated about %llu rows, deleted about %llu rows",
trx->stmt_progress.queried,
trx->stmt_progress.inserted,
trx->stmt_progress.updated,
trx->stmt_progress.deleted
);
thd_proc_info(thd, write_status_msg);
}
}
}
DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
TOKUDB_DBUG_ENTER("ha_tokudb::get_pos");
/* We don't need to set app_data here */
......@@ -4359,6 +4403,7 @@ int ha_tokudb::create_txn(THD* thd, tokudb_trx_data* trx) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("stmt:%p:%p\n", trx->sp_level, trx->stmt);
}
reset_stmt_progress(&trx->stmt_progress);
trans_register_ha(thd, FALSE, tokudb_hton);
cleanup:
return error;
......@@ -4392,12 +4437,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
declare_lockretry;
#endif
//
// reset per-stmt variables
//
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!trx) {
......@@ -4447,6 +4486,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
*/
DBUG_PRINT("trans", ("commiting non-updating transaction"));
error = trx->stmt->commit(trx->stmt, 0);
reset_stmt_progress(&trx->stmt_progress);
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error);
trx->stmt = NULL;
......@@ -4471,12 +4511,6 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt");
int error = 0;
//
// reset per-stmt variables
//
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
DBUG_ASSERT(trx);
......
......@@ -196,19 +196,6 @@ private:
ulonglong deleted_rows;
//
// count on number of rows inserted by statement
// this is to help give user progress on what is happening
// the reason that the variables added_rows and deleted_rows
// are not used is that those variables are also used to help
// estimate the number of rows in the DB. There are tricky things that
// can happen with "lock tables", so I do not want to couple these
// two features together. There is a little duplicate work, but I think it is fine
//
ulonglong num_added_rows_in_stmt;
ulonglong num_deleted_rows_in_stmt;
ulonglong num_updated_rows_in_stmt;
uint last_dup_key;
//
// if set to 0, then the primary key is not hidden
......@@ -445,6 +432,8 @@ public:
return tokudb_prefix_cmp_dbt_key(share->key_file[keynr], first_key, second_key);
}
void track_progress(THD* thd);
int heavi_ret_val;
//
......
......@@ -83,17 +83,33 @@ typedef enum {
typedef struct st_tokudb_stmt_progress {
ulonglong inserted;
ulonglong updated;
ulonglong deleted;
ulonglong queried;
} tokudb_stmt_progress;
typedef struct st_tokudb_trx_data {
DB_TXN *all;
DB_TXN *stmt;
DB_TXN *sp_level;
uint tokudb_lock_count;
HA_TOKU_ISO_LEVEL iso_level;
tokudb_stmt_progress stmt_progress;
} tokudb_trx_data;
extern char *tokudb_data_dir;
extern const char *ha_tokudb_ext;
static void reset_stmt_progress (tokudb_stmt_progress* val) {
val->deleted = 0;
val->inserted = 0;
val->updated = 0;
val->queried = 0;
}
static int get_name_length(const char *name) {
int n = 0;
const char *newname = name;
......
......@@ -442,6 +442,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
if (all) {
trx->iso_level = hatoku_iso_not_set;
}
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error);
}
......@@ -467,6 +468,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
if (all) {
trx->iso_level = hatoku_iso_not_set;
}
reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment