Commit 570d2ff5 authored by John Esmet's avatar John Esmet Committed by John Esmet

fixes #226 When serializing a nonleaf node, include the offsets stored

in each message tree. This removes a sort during deserialization, which
can be expensive when there are many messages and I/O is fast. This
change supports auto-upgrade from older versions.
parent fd761b36
......@@ -890,6 +890,11 @@ void toku_ftnode_clone_callback(
for (int i = 0; i < node->n_children-1; i++) {
toku_clone_dbt(&cloned_node->childkeys[i], node->childkeys[i]);
}
if (node->height > 0) {
// need to move messages here so that we don't serialize stale
// messages to the fresh tree - ft verify code complains otherwise.
toku_move_ftnode_messages_to_stale(ft, node);
}
// clone partition
ftnode_clone_partitions(node, cloned_node);
......@@ -932,11 +937,14 @@ void toku_ftnode_flush_callback(
int height = ftnode->height;
if (write_me) {
toku_assert_entire_node_in_memory(ftnode);
if (height == 0) {
if (height > 0 && !is_clone) {
// cloned nodes already had their stale messages moved, see toku_ftnode_clone_callback()
toku_move_ftnode_messages_to_stale(h, ftnode);
} else if (height == 0) {
ft_leaf_run_gc(h, ftnode);
}
if (height == 0 && !is_clone) {
ftnode_update_disk_stats(ftnode, h, for_checkpoint);
if (!is_clone) {
ftnode_update_disk_stats(ftnode, h, for_checkpoint);
}
}
int r = toku_serialize_ftnode_to(fd, ftnode->thisnodename, ftnode, ndd, !is_clone, h, for_checkpoint);
assert_zero(r);
......@@ -1150,11 +1158,20 @@ int toku_ftnode_pe_callback(void *ftnode_pv, PAIR_ATTR old_attr, void *write_ext
if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
NONLEAF_CHILDINFO bnc;
if (ft_compress_buffers_before_eviction) {
// When partially evicting, always compress with quicklz
if (ft_compress_buffers_before_eviction &&
// We may not serialize and compress a partition in memory if its
// in memory layout version is different than what's on disk (and
// therefore requires upgrade).
//
// Auto-upgrade code assumes that if a node's layout version read
// from disk is not current, it MUST require upgrade. Breaking
// this rule would cause upgrade code to upgrade this partition
// again after we serialize it as the current version, which is bad.
node->layout_version == node->layout_version_read_from_disk) {
bnc = compress_internal_node_partition(
node,
i,
// Always compress with quicklz
TOKU_QUICKLZ_METHOD
);
} else {
......
......@@ -462,6 +462,7 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
case FT_LAYOUT_VERSION_27:
case FT_LAYOUT_VERSION_26:
case FT_LAYOUT_VERSION_25:
case FT_LAYOUT_VERSION_24:
......
......@@ -120,6 +120,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_24 = 24, // Riddler: change logentries that log transactions to store TXNID_PAIRs instead of TXNIDs
FT_LAYOUT_VERSION_25 = 25, // SecretSquirrel: ROLLBACK_LOG_NODES (on disk and in memory) now just use blocknum (instead of blocknum + hash) to point to other log nodes. same for xstillopen log entry
FT_LAYOUT_VERSION_26 = 26, // Hojo: basements store key/vals separately on disk for fixed klpair length BNs
FT_LAYOUT_VERSION_27 = 27, // serialize message trees with nonleaf buffers to avoid key, msn sort on deserialize
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -291,8 +291,13 @@ serialize_ftnode_partition_size (FTNODE node, int i)
paranoid_invariant(node->bp[i].state == PT_AVAIL);
result++; // Byte that states what the partition is
if (node->height > 0) {
result += 4; // size of bytes in buffer table
result += toku_bnc_nbytesinbuf(BNC(node, i));
NONLEAF_CHILDINFO bnc = BNC(node, i);
// number of messages (4 bytes) plus size of the buffer
result += (4 + toku_bnc_nbytesinbuf(bnc));
// number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree
result += (4 + (4 * bnc->fresh_message_tree.size()));
result += (4 + (4 * bnc->stale_message_tree.size()));
result += (4 + (4 * bnc->broadcast_list.size()));
}
else {
result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header
......@@ -305,8 +310,14 @@ serialize_ftnode_partition_size (FTNODE node, int i)
#define FTNODE_PARTITION_DMT_LEAVES 0xaa
#define FTNODE_PARTITION_FIFO_MSG 0xbb
static int
wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) {
wbuf_nocrc_int(wb, offset);
return 0;
}
static void
serialize_nonleaf_childinfo(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
{
unsigned char ch = FTNODE_PARTITION_FIFO_MSG;
wbuf_nocrc_char(wb, ch);
......@@ -323,6 +334,17 @@ serialize_nonleaf_childinfo(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
wbuf_nocrc_bytes(wb, key, keylen);
wbuf_nocrc_bytes(wb, data, datalen);
});
// serialize the message trees (num entries, offsets array):
// fresh, stale, broadcast
wbuf_nocrc_int(wb, bnc->fresh_message_tree.size());
bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
wbuf_nocrc_int(wb, bnc->stale_message_tree.size());
bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb);
wbuf_nocrc_int(wb, bnc->broadcast_list.size());
bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb);
}
//
......@@ -346,7 +368,7 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
if (node->height > 0) {
// TODO: (Zardosht) possibly exit early if there are no messages
serialize_nonleaf_childinfo(BNC(node, i), &wb);
serialize_child_buffer(BNC(node, i), &wb);
}
else {
unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
......@@ -1024,8 +1046,8 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
}
static void
deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
DESCRIPTOR desc, ft_compare_func cmp) {
deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
DESCRIPTOR desc, ft_compare_func cmp) {
int r;
int n_in_this_buffer = rbuf_int(rbuf);
int32_t *fresh_offsets = NULL, *stale_offsets = NULL;
......@@ -1090,6 +1112,59 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
}
}
// effect: deserialize a single message from rbuf and enque the result into the given fifo
static void
fifo_deserialize_msg_from_rbuf(FIFO fifo, struct rbuf *rbuf) {
bytevec key, val;
ITEMLEN keylen, vallen;
enum ft_msg_type type = (enum ft_msg_type) rbuf_char(rbuf);
bool is_fresh = rbuf_char(rbuf);
MSN msn = rbuf_msn(rbuf);
XIDS xids;
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
int r = toku_fifo_enq(fifo, key, keylen, val, vallen, type, msn, xids, is_fresh, nullptr);
lazy_assert_zero(r);
xids_destroy(&xids);
}
static void
deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf) {
int n_in_this_buffer = rbuf_int(rbuf);
int nfresh = 0, nstale = 0, nbroadcast_offsets = 0;
int32_t *XMALLOC_N(n_in_this_buffer, stale_offsets);
int32_t *XMALLOC_N(n_in_this_buffer, fresh_offsets);
int32_t *XMALLOC_N(n_in_this_buffer, broadcast_offsets);
toku_fifo_resize(bnc->buffer, rbuf->size + 64);
for (int i = 0; i < n_in_this_buffer; i++) {
fifo_deserialize_msg_from_rbuf(bnc->buffer, rbuf);
}
// read in each message tree (fresh, stale, broadcast)
nfresh = rbuf_int(rbuf);
for (int i = 0; i < nfresh; i++) {
fresh_offsets[i] = rbuf_int(rbuf);
}
nstale = rbuf_int(rbuf);
for (int i = 0; i < nstale; i++) {
stale_offsets[i] = rbuf_int(rbuf);
}
nbroadcast_offsets = rbuf_int(rbuf);
for (int i = 0; i < nbroadcast_offsets; i++) {
broadcast_offsets[i] = rbuf_int(rbuf);
}
// build OMTs out of each offset array
bnc->fresh_message_tree.destroy();
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer);
bnc->stale_message_tree.destroy();
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer);
bnc->broadcast_list.destroy();
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer);
}
// dump a buffer to stderr
// no locking around this for now
void
......@@ -1161,13 +1236,16 @@ NONLEAF_CHILDINFO toku_create_empty_nl(void) {
return cn;
}
// does NOT create OMTs, just the FIFO
// must clone the OMTs, since we serialize them along with the FIFO
NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
NONLEAF_CHILDINFO XMALLOC(cn);
toku_fifo_clone(orig_childinfo->buffer, &cn->buffer);
cn->fresh_message_tree.create_no_array();
cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree);
cn->stale_message_tree.create_no_array();
cn->stale_message_tree.clone(orig_childinfo->stale_message_tree);
cn->broadcast_list.create_no_array();
cn->broadcast_list.clone(orig_childinfo->broadcast_list);
memset(cn->flow, 0, sizeof cn->flow);
return cn;
}
......@@ -1513,7 +1591,13 @@ deserialize_ftnode_partition(
if (node->height > 0) {
assert(ch == FTNODE_PARTITION_FIFO_MSG);
deserialize_child_buffer(BNC(node, childnum), &rb, desc, cmp);
NONLEAF_CHILDINFO bnc = BNC(node, childnum);
if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) {
// Layout version <= 26 did not serialize sorted message trees to disk.
deserialize_child_buffer_v26(bnc, &rb, desc, cmp);
} else {
deserialize_child_buffer(bnc, &rb);
}
BP_WORKDONE(node, childnum) = 0;
}
else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment