Commit 78bd0d25 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #4800 report progress to the client and show processlist for offline...

refs #4800 report progress to the client and show processlist for offline indexing, loads, and optimize table. only show process in show processlist for hot optimizing.



git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@45132 c7de825b-a66e-492c-adef-691d508d4ae1
parent ff449e0f
...@@ -382,19 +382,27 @@ static int ai_poll_fun(void *extra, float progress) { ...@@ -382,19 +382,27 @@ static int ai_poll_fun(void *extra, float progress) {
sprintf(context->write_status_msg, "The process has been killed, aborting add index."); sprintf(context->write_status_msg, "The process has been killed, aborting add index.");
return ER_ABORTING_CONNECTION; return ER_ABORTING_CONNECTION;
} }
sprintf(context->write_status_msg, "Adding of indexes about %.1f%% done", progress*100); float percentage = progress * 100;
sprintf(context->write_status_msg, "Adding of indexes about %.1f%% done", percentage);
thd_proc_info(context->thd, context->write_status_msg); thd_proc_info(context->thd, context->write_status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_report(context->thd, (unsigned long long) percentage, 100);
#endif
return 0; return 0;
} }
static int poll_fun(void *extra, float progress) { static int loader_poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra; LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (context->thd->killed) { if (context->thd->killed) {
sprintf(context->write_status_msg, "The process has been killed, aborting bulk load."); sprintf(context->write_status_msg, "The process has been killed, aborting bulk load.");
return ER_ABORTING_CONNECTION; return ER_ABORTING_CONNECTION;
} }
sprintf(context->write_status_msg, "Loading of data about %.1f%% done", progress*100); float percentage = progress * 100;
sprintf(context->write_status_msg, "Loading of data about %.1f%% done", percentage);
thd_proc_info(context->thd, context->write_status_msg); thd_proc_info(context->thd, context->write_status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_report(context->thd, (unsigned long long) percentage, 100);
#endif
return 0; return 0;
} }
...@@ -3182,6 +3190,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ...@@ -3182,6 +3190,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
} }
u_int32_t loader_flags = (get_load_save_space(thd)) ? u_int32_t loader_flags = (get_load_save_space(thd)) ?
LOADER_USE_PUTS : 0; LOADER_USE_PUTS : 0;
int error = db_env->create_loader( int error = db_env->create_loader(
db_env, db_env,
transaction, transaction,
...@@ -3201,7 +3210,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) { ...@@ -3201,7 +3210,7 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
lc.thd = thd; lc.thd = thd;
lc.ha = this; lc.ha = this;
error = loader->set_poll_function(loader, poll_fun, &lc); error = loader->set_poll_function(loader, loader_poll_fun, &lc);
assert(!error); assert(!error);
error = loader->set_error_callback(loader, loader_dup_fun, &lc); error = loader->set_error_callback(loader, loader_dup_fun, &lc);
...@@ -3285,7 +3294,6 @@ cleanup: ...@@ -3285,7 +3294,6 @@ cleanup:
} }
num_DBs_locked_in_bulk = false; num_DBs_locked_in_bulk = false;
lock_count = 0; lock_count = 0;
if (loader) { if (loader) {
error = sprintf(write_status_msg, "aborting bulk load"); error = sprintf(write_status_msg, "aborting bulk load");
thd_proc_info(thd, write_status_msg); thd_proc_info(thd, write_status_msg);
...@@ -7407,7 +7415,6 @@ int ha_tokudb::tokudb_add_index( ...@@ -7407,7 +7415,6 @@ int ha_tokudb::tokudb_add_index(
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
thd_proc_info(thd, "Adding indexes"); thd_proc_info(thd, "Adding indexes");
// //
// in unpack_row, MySQL passes a buffer that is this long, // in unpack_row, MySQL passes a buffer that is this long,
...@@ -7508,7 +7515,14 @@ int ha_tokudb::tokudb_add_index( ...@@ -7508,7 +7515,14 @@ int ha_tokudb::tokudb_add_index(
rw_unlock(&share->num_DBs_lock); rw_unlock(&share->num_DBs_lock);
rw_lock_taken = false; rw_lock_taken = false;
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
// initialize a one phase progress report.
// incremental reports are done in the indexer's callback function.
thd_progress_init(thd, 1);
#endif
error = indexer->build(indexer); error = indexer->build(indexer);
if (error) { goto cleanup; } if (error) { goto cleanup; }
rw_wrlock(&share->num_DBs_lock); rw_wrlock(&share->num_DBs_lock);
...@@ -7542,7 +7556,7 @@ int ha_tokudb::tokudb_add_index( ...@@ -7542,7 +7556,7 @@ int ha_tokudb::tokudb_add_index(
); );
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = loader->set_poll_function(loader, poll_fun, &lc); error = loader->set_poll_function(loader, loader_poll_fun, &lc);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = loader->set_error_callback(loader, loader_ai_err_fun, &lc); error = loader->set_error_callback(loader, loader_ai_err_fun, &lc);
...@@ -7574,6 +7588,12 @@ int ha_tokudb::tokudb_add_index( ...@@ -7574,6 +7588,12 @@ int ha_tokudb::tokudb_add_index(
bulk_fetch_iteration = HA_TOKU_BULK_FETCH_ITERATION_MAX; bulk_fetch_iteration = HA_TOKU_BULK_FETCH_ITERATION_MAX;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED,smart_dbt_bf_callback, &bf_info); cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED,smart_dbt_bf_callback, &bf_info);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
// initialize a two phase progress report.
// first phase: putting rows into the loader
thd_progress_init(thd, 2);
#endif
while (cursor_ret_val != DB_NOTFOUND || ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0)) { while (cursor_ret_val != DB_NOTFOUND || ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0)) {
if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) == 0) { if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) == 0) {
invalidate_bulk_fetch(); // reset the buffers invalidate_bulk_fetch(); // reset the buffers
...@@ -7620,6 +7640,11 @@ int ha_tokudb::tokudb_add_index( ...@@ -7620,6 +7640,11 @@ int ha_tokudb::tokudb_add_index(
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows); sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows);
} }
thd_proc_info(thd, status_msg); thd_proc_info(thd, status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_report(thd, num_processed, (long long unsigned) share->rows);
#endif
if (thd->killed) { if (thd->killed) {
error = ER_ABORTING_CONNECTION; error = ER_ABORTING_CONNECTION;
goto cleanup; goto cleanup;
...@@ -7630,8 +7655,15 @@ int ha_tokudb::tokudb_add_index( ...@@ -7630,8 +7655,15 @@ int ha_tokudb::tokudb_add_index(
assert(error==0); assert(error==0);
tmp_cursor = NULL; tmp_cursor = NULL;
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
// next progress report phase: closing the loader.
// incremental reports are done in the loader's callback function.
thd_progress_next_stage(thd);
#endif
error = loader->close(loader); error = loader->close(loader);
loader = NULL; loader = NULL;
if (error) goto cleanup; if (error) goto cleanup;
} }
curr_index = curr_num_DBs; curr_index = curr_num_DBs;
...@@ -7672,6 +7704,9 @@ int ha_tokudb::tokudb_add_index( ...@@ -7672,6 +7704,9 @@ int ha_tokudb::tokudb_add_index(
error = 0; error = 0;
cleanup: cleanup:
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_end(thd);
#endif
if (rw_lock_taken) { if (rw_lock_taken) {
rw_unlock(&share->num_DBs_lock); rw_unlock(&share->num_DBs_lock);
rw_lock_taken = false; rw_lock_taken = false;
......
...@@ -5,6 +5,14 @@ ...@@ -5,6 +5,14 @@
#pragma interface /* gcc class implementation */ #pragma interface /* gcc class implementation */
#endif #endif
// In MariaDB 5.3, thread progress reporting was introduced.
// Only include that functionality if we're using maria 5.3 +
#ifdef MARIADB_BASE_VERSION
#if MYSQL_VERSION_ID >= 50300
#define HA_TOKUDB_HAS_THD_PROGRESS
#endif
#endif
#if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699 #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
#define TOKU_INCLUDE_ALTER_56 1 #define TOKU_INCLUDE_ALTER_56 1
#define TOKU_INCLUDE_ROW_TYPE_COMPRESSION 0 #define TOKU_INCLUDE_ROW_TYPE_COMPRESSION 0
...@@ -51,6 +59,7 @@ typedef struct hot_optimize_context { ...@@ -51,6 +59,7 @@ typedef struct hot_optimize_context {
THD *thd; THD *thd;
char* write_status_msg; char* write_status_msg;
ha_tokudb *ha; ha_tokudb *ha;
uint progress_stage;
uint current_table; uint current_table;
uint num_tables; uint num_tables;
} *HOT_OPTIMIZE_CONTEXT; } *HOT_OPTIMIZE_CONTEXT;
......
...@@ -45,11 +45,6 @@ ha_tokudb::analyze(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -45,11 +45,6 @@ ha_tokudb::analyze(THD * thd, HA_CHECK_OPT * check_opt) {
} }
#endif #endif
struct hot_poll_fun_extra {
uint current_table;
uint num_tables;
};
static int static int
hot_poll_fun(void *extra, float progress) { hot_poll_fun(void *extra, float progress) {
HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra; HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
...@@ -57,14 +52,25 @@ hot_poll_fun(void *extra, float progress) { ...@@ -57,14 +52,25 @@ hot_poll_fun(void *extra, float progress) {
sprintf(context->write_status_msg, "The process has been killed, aborting hot optimize."); sprintf(context->write_status_msg, "The process has been killed, aborting hot optimize.");
return ER_ABORTING_CONNECTION; return ER_ABORTING_CONNECTION;
} }
sprintf(context->write_status_msg, "Optimization of index %u of %u about %.lf%% done", context->current_table + 1, context->num_tables, progress*100); float percentage = progress * 100;
sprintf(context->write_status_msg, "Optimization of index %u of %u about %.lf%% done", context->current_table + 1, context->num_tables, percentage);
thd_proc_info(context->thd, context->write_status_msg); thd_proc_info(context->thd, context->write_status_msg);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
if (context->progress_stage < context->current_table) {
// the progress stage is behind the current table, so move up
// to the next stage and set the progress stage to current.
thd_progress_next_stage(context->thd);
context->progress_stage = context->current_table;
}
// the percentage we report here is for the current stage/db
thd_progress_report(context->thd, (unsigned long long) percentage, 100);
#endif
return 0; return 0;
} }
volatile int ha_tokudb_optimize_wait = 0; // debug volatile int ha_tokudb_optimize_wait = 0; // debug
// flatten all DB's in this table, to do so, just do a full scan on every DB // flatten all DB's in this table, to do so, peform hot optimize on each db
int int
ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) { ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
TOKUDB_DBUG_ENTER("ha_tokudb::optimize"); TOKUDB_DBUG_ENTER("ha_tokudb::optimize");
...@@ -72,6 +78,13 @@ ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -72,6 +78,13 @@ ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
int error; int error;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
// each DB is its own stage. as HOT goes through each db, we'll
// move on to the next stage.
thd_progress_init(thd, curr_num_DBs);
#endif
// //
// for each DB, run optimize and hot_optimize // for each DB, run optimize and hot_optimize
// //
...@@ -96,6 +109,11 @@ ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -96,6 +109,11 @@ ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
error = 0; error = 0;
cleanup: cleanup:
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
thd_progress_end(thd);
#endif
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment