Commit e5487988 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

closes #5864 add compression, decompression, serialization, deserialization...

closes #5864 add compression, decompression, serialization, deserialization statistics to engine status. all interesting code paths should be covered.


git-svn-id: file:///svn/toku/tokudb@51729 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3f038902
......@@ -83,10 +83,13 @@ struct ftnode_fetch_extra {
int child_to_read;
// when we read internal nodes, we want to read all the data off disk in one I/O
// then we'll treat it as normal and only decompress the needed partitions etc.
bool read_all_partitions;
// Accounting: How many bytes were fetched, and how much time did it take?
tokutime_t bytes_read;
uint64_t read_time;
// Accounting: How many bytes were read, and how much time did we spend doing I/O?
uint64_t bytes_read;
tokutime_t io_time;
tokutime_t decompress_time;
tokutime_t deserialize_time;
};
struct toku_fifo_entry_key_msn_heaviside_extra {
......@@ -533,7 +536,7 @@ int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_RO
void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, DESCRIPTOR desc, ft_compare_func cmp);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe);
int toku_deserialize_ftnode_from (int fd, BLOCKNUM off, uint32_t /*fullhash*/, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, struct ftnode_fetch_extra* bfe);
// <CER> For verifying old, non-upgraded nodes (versions 13 and 14).
......@@ -631,18 +634,21 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
#define WHEN_FTTRACE(x) ((void)0)
#endif
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe);
void toku_ft_status_update_flush_reason(FTNODE node, uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, tokutime_t write_time, bool for_checkpoint);
extern void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
extern void toku_ftnode_checkpoint_complete_callback(void *value_data);
extern void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
extern int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
extern void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
extern int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *extraargs);
extern bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
void toku_ft_status_update_serialize_times(tokutime_t serialize_time, tokutime_t compress_time);
void toku_ft_status_update_deserialize_times(tokutime_t deserialize_time, tokutime_t decompress_time);
void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
void toku_ftnode_checkpoint_complete_callback(void *value_data);
void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *extraargs);
bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
int toku_ftnode_pf_callback(void* ftnode_pv, void* UU(disk_data), void* read_extraargs, int fd, PAIR_ATTR* sizep);
extern int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
// Given pinned node and pinned child, split child into two
// and update node with information about its new child.
......@@ -719,7 +725,8 @@ static inline void fill_bfe_for_full_read(struct ftnode_fetch_extra *bfe, FT h)
bfe->disable_prefetching = false;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
bfe->deserialize_time = 0;
}
//
......@@ -758,7 +765,7 @@ static inline void fill_bfe_for_subset_read(
bfe->disable_prefetching = disable_prefetching;
bfe->read_all_partitions = read_all_partitions;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
//
......@@ -780,7 +787,7 @@ static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) {
bfe->disable_prefetching = false;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) {
......@@ -813,7 +820,7 @@ static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe,
bfe->disable_prefetching = c->disable_prefetching;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
struct ancestors {
......@@ -1037,6 +1044,10 @@ typedef enum {
FT_NUM_MSG_BUFFER_FETCHED_WRITE,
FT_BYTES_MSG_BUFFER_FETCHED_WRITE,
FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE,
FT_NODE_COMPRESS_TOKUTIME, // seconds spent compressing nodes to memory
FT_NODE_SERIALIZE_TOKUTIME, // seconds spent serializing nodes to memory
FT_NODE_DECOMPRESS_TOKUTIME, // seconds spent decompressing nodes to memory
FT_NODE_DESERIALIZE_TOKUTIME, // seconds spent deserializing nodes to memory
FT_PRO_NUM_ROOT_SPLIT,
FT_PRO_NUM_ROOT_H0_INJECT,
FT_PRO_NUM_ROOT_H1_INJECT,
......
......@@ -235,7 +235,7 @@ status_init(void)
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, TOKUTIME, "buffers fetched for prefetch (seconds)");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (bytes)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, TOKUTIME, "buffers fetched for write (seconds)");
// Disk write statistics.
//
......@@ -258,6 +258,12 @@ status_init(void)
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, TOKUTIME, "nonleaf nodes flushed to disk (for checkpoint) (seconds)");
// CPU time statistics for [de]serialization and [de]compression.
STATUS_INIT(FT_NODE_COMPRESS_TOKUTIME, TOKUTIME, "node compression to memory (seconds)");
STATUS_INIT(FT_NODE_SERIALIZE_TOKUTIME, TOKUTIME, "node serialization to memory (seconds)");
STATUS_INIT(FT_NODE_DECOMPRESS_TOKUTIME, TOKUTIME, "node decompression to memory (seconds)");
STATUS_INIT(FT_NODE_DESERIALIZE_TOKUTIME, TOKUTIME, "node deserialization to memory (seconds)");
// Promotion statistics.
STATUS_INIT(FT_PRO_NUM_ROOT_SPLIT, PARCOUNT, "promotion: roots split");
STATUS_INIT(FT_PRO_NUM_ROOT_H0_INJECT, PARCOUNT, "promotion: leaf roots injected into");
......@@ -858,15 +864,15 @@ toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
if (bfe->type == ftnode_fetch_prefetch) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->io_time);
} else if (bfe->type == ftnode_fetch_all) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->io_time);
} else if (bfe->type == ftnode_fetch_subset) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->io_time);
}
}
......@@ -1158,7 +1164,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->io_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1166,7 +1172,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->io_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1174,7 +1180,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->io_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1182,7 +1188,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->io_time);
}
}
}
......@@ -1193,7 +1199,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->io_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1201,7 +1207,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->io_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1209,7 +1215,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->io_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1217,12 +1223,22 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->io_time);
}
}
}
}
void toku_ft_status_update_serialize_times(tokutime_t serialize_time, tokutime_t compress_time) {
STATUS_INC(FT_NODE_SERIALIZE_TOKUTIME, serialize_time);
STATUS_INC(FT_NODE_COMPRESS_TOKUTIME, compress_time);
}
void toku_ft_status_update_deserialize_times(tokutime_t deserialize_time, tokutime_t decompress_time) {
STATUS_INC(FT_NODE_DESERIALIZE_TOKUTIME, deserialize_time);
STATUS_INC(FT_NODE_DECOMPRESS_TOKUTIME, decompress_time);
}
// callback for partially reading a node
// could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
......@@ -1252,14 +1268,10 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
enum pt_state state = BP_STATE(node, i);
if (state == PT_COMPRESSED) {
r = toku_deserialize_bp_from_compressed(node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
r = toku_deserialize_bp_from_compressed(node, i, bfe);
} else {
invariant(state == PT_ON_DISK);
tokutime_t io_t0 = toku_time_now();
r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
tokutime_t io_t1 = toku_time_now();
bfe->bytes_read = BP_SIZE(ndd, i);
bfe->read_time = io_t1 - io_t0;
}
ft_status_update_partial_fetch_reason(bfe, i, state, (node->height == 0));
}
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment