Commit 77e16ce7 authored by Sergei Petrunia's avatar Sergei Petrunia

MDEV-7648: Extra data in ANALYZE FORMAT=JSON $stmt

Switch from relying on PERFORMANCE_SCHEMA to using our
own hooks for counting the time spent reading rows from
tables.
parent b273e4a5
......@@ -37,7 +37,6 @@
#define PSI_CALL_get_table_share PSI_TABLE_CALL(get_table_share)
#define PSI_CALL_release_table_share PSI_TABLE_CALL(release_table_share)
#define PSI_CALL_drop_table_share PSI_TABLE_CALL(drop_table_share)
#define PSI_CALL_get_table_current_stats PSI_TABLE_CALL(get_table_current_stats)
#else
#define PSI_CALL_unbind_table(A1) /* no-op */
#define PSI_CALL_rebind_table(A1,A2,A3) NULL
......@@ -46,7 +45,6 @@
#define PSI_CALL_get_table_share(A1,A2) NULL
#define PSI_CALL_release_table_share(A1) /* no-op */
#define PSI_CALL_drop_table_share(A1,A2,A3,A4,A5) /* no-op */
#define PSI_CALL_get_table_current_stats(A1,A2,A3) /* no-op */
#endif
/**
......
......@@ -1921,16 +1921,6 @@ typedef struct PSI_digest_locker* (*digest_add_token_v1_t)
typedef int (*set_thread_connect_attrs_v1_t)(const char *buffer, uint length,
const void *from_cs);
/**
Get current row read statistics for the specific instance of a table
@param table Instance of table we need statistics for
@param count OUT Number of operations
@param sum OUT Total duration of operations
*/
typedef void (*get_table_current_stats_v1_t)(PSI_table *table,
ulonglong *count,
ulonglong *sum);
/**
Performance Schema Interface, version 1.
@since PSI_VERSION_1
......@@ -2132,8 +2122,6 @@ struct PSI_v1
digest_add_token_v1_t digest_add_token;
/** @sa set_thread_connect_attrs_v1_t. */
set_thread_connect_attrs_v1_t set_thread_connect_attrs;
/** @sa get_table_current_stats_v1 */
get_table_current_stats_v1_t get_table_current_stats;
};
/** @} (end of group Group_PSI_v1) */
......
......@@ -512,9 +512,6 @@ typedef struct PSI_digest_locker* (*digest_add_token_v1_t)
(struct PSI_digest_locker *locker, uint token, struct OPAQUE_LEX_YYSTYPE *yylval);
typedef int (*set_thread_connect_attrs_v1_t)(const char *buffer, uint length,
const void *from_cs);
typedef void (*get_table_current_stats_v1_t)(PSI_table *table,
ulonglong *count,
ulonglong *sum);
struct PSI_v1
{
register_mutex_v1_t register_mutex;
......@@ -615,7 +612,6 @@ struct PSI_v1
digest_start_v1_t digest_start;
digest_add_token_v1_t digest_add_token;
set_thread_connect_attrs_v1_t set_thread_connect_attrs;
get_table_current_stats_v1_t get_table_current_stats;
};
typedef struct PSI_v1 PSI;
typedef struct PSI_mutex_info_v1 PSI_mutex_info;
......
......@@ -104,6 +104,7 @@ SET (SQL_SOURCE
# added in MariaDB:
sql_explain.h sql_explain.cc
sql_analyze_stmt.h
sql_lifo_buffer.h sql_join_cache.h sql_join_cache.cc
create_options.cc multi_range_read.cc
opt_index_cond_pushdown.cc opt_subselect.cc
......
......@@ -43,6 +43,8 @@
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_audit.h"
#include "sql_analyze_stmt.h" // tracker in TABLE_IO_WAIT
#ifdef WITH_PARTITION_STORAGE_ENGINE
#include "ha_partition.h"
#endif
......@@ -54,6 +56,17 @@
#include "wsrep_mysqld.h"
#include "wsrep.h"
#define TABLE_IO_WAIT(TRACKER, PSI, OP, INDEX, FLAGS, PAYLOAD) \
{ \
if (unlikely(tracker)) \
tracker->start_tracking(); \
\
MYSQL_TABLE_IO_WAIT(PSI, OP, INDEX, FLAGS, PAYLOAD); \
\
if (unlikely(tracker)) \
tracker->stop_tracking(); \
}
/*
While we have legacy_db_type, we have this array to
check for dups and to find handlerton from legacy_db_type.
......@@ -2561,11 +2574,37 @@ int handler::ha_close(void)
PSI_CALL_close_table(m_psi);
m_psi= NULL; /* instrumentation handle, invalid after close_table() */
/* Detach from ANALYZE tracker */
tracker= NULL;
DBUG_ASSERT(m_lock_type == F_UNLCK);
DBUG_ASSERT(inited == NONE);
DBUG_RETURN(close());
}
inline int handler::ha_write_tmp_row(uchar *buf)
{
int error;
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_write_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
return error;
}
inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data)
{
int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_update_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
return error;
}
int handler::ha_rnd_next(uchar *buf)
{
int result;
......@@ -2574,7 +2613,7 @@ int handler::ha_rnd_next(uchar *buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited == RND);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
{ result= rnd_next(buf); })
if (!result)
{
......@@ -2599,7 +2638,7 @@ int handler::ha_rnd_pos(uchar *buf, uchar *pos)
/* TODO: Find out how to solve ha_rnd_pos when finding duplicate update. */
/* DBUG_ASSERT(inited == RND); */
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
{ result= rnd_pos(buf, pos); })
increment_statistics(&SSV::ha_read_rnd_count);
if (!result)
......@@ -2618,7 +2657,7 @@ int handler::ha_index_read_map(uchar *buf, const uchar *key,
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_read_map(buf, key, keypart_map, find_flag); })
increment_statistics(&SSV::ha_read_key_count);
if (!result)
......@@ -2642,7 +2681,7 @@ int handler::ha_index_read_idx_map(uchar *buf, uint index, const uchar *key,
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type != F_UNLCK);
DBUG_ASSERT(end_range == NULL);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, index, 0,
{ result= index_read_idx_map(buf, index, key, keypart_map, find_flag); })
increment_statistics(&SSV::ha_read_key_count);
if (!result)
......@@ -2662,7 +2701,7 @@ int handler::ha_index_next(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_next(buf); })
increment_statistics(&SSV::ha_read_next_count);
if (!result)
......@@ -2679,7 +2718,7 @@ int handler::ha_index_prev(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_prev(buf); })
increment_statistics(&SSV::ha_read_prev_count);
if (!result)
......@@ -2695,7 +2734,7 @@ int handler::ha_index_first(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_first(buf); })
increment_statistics(&SSV::ha_read_first_count);
if (!result)
......@@ -2711,7 +2750,7 @@ int handler::ha_index_last(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_last(buf); })
increment_statistics(&SSV::ha_read_last_count);
if (!result)
......@@ -2727,7 +2766,7 @@ int handler::ha_index_next_same(uchar *buf, const uchar *key, uint keylen)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
{ result= index_next_same(buf, key, keylen); })
increment_statistics(&SSV::ha_read_next_count);
if (!result)
......@@ -5852,7 +5891,7 @@ int handler::ha_write_row(uchar *buf)
mark_trx_read_write();
increment_statistics(&SSV::ha_write_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
......@@ -5885,7 +5924,7 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data)
mark_trx_read_write();
increment_statistics(&SSV::ha_update_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
......@@ -5913,7 +5952,7 @@ int handler::ha_delete_row(const uchar *buf)
mark_trx_read_write();
increment_statistics(&SSV::ha_delete_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_DELETE_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, active_index, 0,
{ error= delete_row(buf);})
MYSQL_DELETE_ROW_DONE(error);
if (unlikely(error))
......
......@@ -1571,6 +1571,7 @@ typedef struct {
#define UNDEF_NODEGROUP 65535
class Item;
class Exec_time_tracker;
struct st_table_log_memory_entry;
class partition_info;
......@@ -2595,6 +2596,9 @@ public:
/* One bigger than needed to avoid to test if key == MAX_KEY */
ulonglong index_rows_read[MAX_KEY+1];
/* ANALYZE time tracker, if present */
Exec_time_tracker *tracker;
Item *pushed_idx_cond;
uint pushed_idx_cond_keyno; /* The index which the above condition is for */
......@@ -2648,6 +2652,7 @@ public:
ft_handler(0), inited(NONE),
implicit_emptied(0),
pushed_cond(0), next_insert_id(0), insert_id_for_cur_row(0),
tracker(NULL),
pushed_idx_cond(NULL),
pushed_idx_cond_keyno(MAX_KEY),
auto_inc_intervals_count(0),
......
/*
Copyright (c) 2015 MariaDB Corporation Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
A class for tracking time it takes to do a certain action
*/
class Exec_time_tracker
{
ulonglong count;
ulonglong cycles;
ulonglong last_start;
public:
Exec_time_tracker() : count(0), cycles(0) {}
// interface for collecting time
void start_tracking()
{
last_start= my_timer_cycles();
}
void stop_tracking()
{
ulonglong last_end= my_timer_cycles();
count++;
cycles += last_end - last_start;
}
// interface for getting the time
ulonglong get_loops() { return count; }
double get_time_ms()
{
// convert 'cycles' to milliseconds.
return 1000 * ((double)cycles) / sys_timer_info.cycles.frequency;
}
};
......@@ -5201,28 +5201,6 @@ inline int handler::ha_read_first_row(uchar *buf, uint primary_key)
return error;
}
inline int handler::ha_write_tmp_row(uchar *buf)
{
int error;
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_write_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
return error;
}
inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data)
{
int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_update_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
return error;
}
extern pthread_attr_t *get_connection_attrib(void);
/**
......
......@@ -1295,8 +1295,11 @@ void Explain_table_access::print_explain_json(Explain_query *query,
else
writer->add_null();
op_tracker.end_tracking();
op_tracker.print_json(writer);
if (op_tracker.get_loops())
{
writer->add_member("r_total_time_ms").
add_double(op_tracker.get_time_ms());
}
}
/* `filtered` */
......@@ -1979,40 +1982,3 @@ void create_explain_query_if_not_exists(LEX *lex, MEM_ROOT *mem_root)
create_explain_query(lex, mem_root);
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
void Table_op_tracker::start_tracking(TABLE *table)
{
//TODO: will this compile without P_S ?
start_count= end_count= 0;
if ((psi_table= table->file->m_psi))
{
PSI_CALL_get_table_current_stats(psi_table, &start_count, &start_sum);
}
}
void Table_op_tracker::end_tracking()
{
if (psi_table)
{
PSI_CALL_get_table_current_stats(psi_table, &end_count, &end_sum);
}
}
void Table_op_tracker::print_json(Json_writer *writer)
{
if (start_count != end_count)
{
/*
We have time in picoseconds, we want to print in milli-seconds
picosecond is sec* 10^ -12
millisecond is sec * 10^-3
*/
double ms= double(end_sum - start_sum) / 1e9;
writer->add_member("r_total_time_ms").add_double(ms);
}
}
......@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "sql_analyze_stmt.h"
/*
......@@ -105,7 +106,7 @@ public:
inline void on_record_after_where() { r_rows_after_where++; }
};
#if 0
/*
A class to track operations (currently, row reads) on a PSI_table.
*/
......@@ -128,7 +129,7 @@ public:
// this may print nothing if the table was not tracked.
void print_json(Json_writer *writer);
};
#endif
#define ANALYZE_START_TRACKING(tracker) \
if (tracker) \
......@@ -138,38 +139,6 @@ public:
if (tracker) \
{ tracker->stop_tracking(); }
/*
A class for tracking time it takes to do a certain action
*/
class Exec_time_tracker
{
ulonglong count;
ulonglong cycles;
ulonglong last_start;
public:
Exec_time_tracker() : count(0), cycles(0) {}
// interface for collecting time
void start_tracking()
{
last_start= my_timer_cycles();
}
void stop_tracking()
{
ulonglong last_end= my_timer_cycles();
count++;
cycles += last_end - last_start;
}
// interface for getting the time
ulonglong get_loops() { return count; }
double get_time_ms()
{
// convert 'cycles' to milliseconds.
return 1000 * ((double)cycles) / sys_timer_info.cycles.frequency;
}
};
/**************************************************************************************
......@@ -772,7 +741,7 @@ public:
/* Tracker for reading the table */
Table_access_tracker tracker;
Table_op_tracker op_tracker;
Exec_time_tracker op_tracker;
Table_access_tracker jbuf_tracker;
int print_explain(select_result_sink *output, uint8 explain_flags,
......
......@@ -23419,7 +23419,7 @@ void JOIN_TAB::save_explain_data(Explain_table_access *eta, table_map prefix_tab
tab->tracker= &eta->tracker;
tab->jbuf_tracker= &eta->jbuf_tracker;
eta->op_tracker.start_tracking(table);
tab->table->file->tracker= &eta->op_tracker;
/* id and select_type are kept in Explain_select */
/* table */
......
......@@ -1728,34 +1728,6 @@ static void close_table_v1(PSI_table *table)
destroy_table(pfs);
}
/**
Used in ANALYZE-stmt: get current table statistics.
*/
static void get_table_current_stats_v1(PSI_table *table, ulonglong *count, ulonglong *sum)
{
PFS_table *pfs= reinterpret_cast<PFS_table*> (table);
if (unlikely(pfs == NULL))
{
*count= 0;
*sum= 0;
return;
}
longlong cur_count= 0;
longlong cur_sum= 0;
time_normalizer *normalizer= time_normalizer::get(wait_timer);
for (int i=0; i <= MAX_INDEXES; i++)
{
ulonglong wait= pfs->m_table_stat.m_index_stat[i].m_fetch.m_sum;
cur_sum += normalizer->wait_to_pico(wait);
cur_count += pfs->m_table_stat.m_index_stat[i].m_fetch.m_count;
}
*count= cur_count;
*sum= cur_sum;
}
static PSI_socket*
init_socket_v1(PSI_socket_key key, const my_socket *fd,
const struct sockaddr *addr, socklen_t addr_len)
......@@ -5273,8 +5245,6 @@ PSI_v1 PFS_v1=
pfs_digest_start_v1,
pfs_digest_add_token_v1,
set_thread_connect_attrs_v1,
/* MariaDB's extension: */
get_table_current_stats_v1
};
static void* get_interface(int version)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment