Commit 402c4c60 authored by Yoni Fogel's avatar Yoni Fogel

Refs Tokutek/ft-index#46 cleaned up serialization, upped version to 26

parent 494856ed
......@@ -125,7 +125,7 @@ void bn_data::remove_key(uint32_t keylen) {
void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
uint32_t fixed_klpair_length) {
paranoid_invariant(version >= FT_LAYOUT_VERSION_25); // Support was added @25
paranoid_invariant(version >= FT_LAYOUT_VERSION_26); // Support was added @26
uint32_t ndone_before = rb->ndone;
init_zero();
invariant(all_keys_same_length); // Until otherwise supported.
......@@ -152,6 +152,44 @@ void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struc
invariant(rb->ndone - ndone_before == data_size);
}
static int
wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
// need to pack the leafentry as it was in versions
// where the key was integrated into it (< 26)
uint32_t begin_spot UU() = wb->ndone;
uint32_t le_disk_size = leafentry_disksize(le);
wbuf_nocrc_uint8_t(wb, le->type);
wbuf_nocrc_uint32_t(wb, keylen);
if (le->type == LE_CLEAN) {
wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
wbuf_nocrc_literal_bytes(wb, key, keylen);
wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
}
else {
paranoid_invariant(le->type == LE_MVCC);
wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
wbuf_nocrc_literal_bytes(wb, key, keylen);
wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
}
uint32_t end_spot UU() = wb->ndone;
paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
return 0;
}
void bn_data::serialize_to_wbuf(struct wbuf *const wb) {
prepare_to_serialize();
serialize_header(wb);
if (m_buffer.value_length_is_fixed()) {
serialize_rest(wb);
} else {
//
// iterate over leafentries and place them into the buffer
//
dmt_iterate<struct wbuf, wbufwriteleafentry>(wb);
}
}
// If we have fixed-length keys, we prepare the dmt and mempool.
// The mempool is prepared by removing any fragmented space and ordering leafentries in the same order as their keys.
void bn_data::prepare_to_serialize(void) {
......@@ -188,40 +226,36 @@ void bn_data::serialize_rest(struct wbuf *wb) const {
wbuf_nocrc_literal_bytes(wb, toku_mempool_get_base(&m_buffer_mempool), val_data_size);
}
// No optimized (de)serialize method implemented (yet?) for non-fixed length keys.
bool bn_data::need_to_serialize_each_leafentry_with_key(void) const {
return !m_buffer.value_length_is_fixed();
}
// Deserialize from rbuf
void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version) {
uint32_t key_data_size = data_size; // overallocate if < version 25 (best guess that is guaranteed not too small)
uint32_t val_data_size = data_size; // overallocate if < version 25 (best guess that is guaranteed not too small)
uint32_t key_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
uint32_t val_data_size = data_size; // overallocate if < version 26 (best guess that is guaranteed not too small)
bool all_keys_same_length = false;
bool keys_vals_separate = false;
uint32_t fixed_klpair_length = 0;
// In version 24 and older there is no header. Skip reading header for old version.
if (version >= FT_LAYOUT_VERSION_25) {
// In version 25 and older there is no header. Skip reading header for old version.
if (version >= FT_LAYOUT_VERSION_26) {
uint32_t ndone_before = rb->ndone;
key_data_size = rbuf_int(rb);
val_data_size = rbuf_int(rb);
fixed_klpair_length = rbuf_int(rb); // 0 if !all_keys_same_length
all_keys_same_length = rbuf_char(rb);
keys_vals_separate = rbuf_char(rb);
invariant(all_keys_same_length == keys_vals_separate); // Until we support this
invariant(all_keys_same_length == keys_vals_separate); // Until we support otherwise
uint32_t header_size = rb->ndone - ndone_before;
data_size -= header_size;
invariant(header_size == HEADER_LENGTH);
if (keys_vals_separate) {
invariant(fixed_klpair_length >= sizeof(klpair_struct));
initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
key_data_size, val_data_size, all_keys_same_length,
fixed_klpair_length);
return;
}
}
// Version >= 25 and version 24 deserialization are now identical except that <= 24 might allocate too much memory.
// Version >= 26 and version 25 deserialization are now identical except that <= 25 might allocate too much memory.
bytevec bytes;
rbuf_literal_bytes(rb, &bytes, data_size);
const unsigned char *CAST_FROM_VOIDP(buf, bytes);
......@@ -232,6 +266,7 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3
klpair_dmt_t::builder dmt_builder;
dmt_builder.create(num_entries, key_data_size);
// TODO(leif): clean this up (#149)
unsigned char *newmem = nullptr;
// add same wiggle room that toku_mempool_construct would, 25% extra
uint32_t allocated_bytes_vals = val_data_size + val_data_size/4;
......@@ -306,25 +341,13 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3
toku_mempool_init(&m_buffer_mempool, newmem, (size_t)(curr_dest_pos - newmem), allocated_bytes_vals);
paranoid_invariant(get_disk_size() == data_size);
// Versions older than 25 might have allocated too much memory. Try to shrink the mempool now that we
// Versions older than 26 might have allocated too much memory. Try to shrink the mempool now that we
// know how much memory we need.
if (version < FT_LAYOUT_VERSION_25) {
//Maybe shrink mempool. Unnecessary after version 25
size_t used = toku_mempool_get_used_space(&m_buffer_mempool);
size_t max_allowed = used + used / 4;
size_t allocated = toku_mempool_get_size(&m_buffer_mempool);
size_t footprint = toku_mempool_footprint(&m_buffer_mempool);
if (allocated > max_allowed && footprint > max_allowed) {
// Reallocate smaller mempool to save memory
invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
struct mempool new_mp;
toku_mempool_construct(&new_mp, used);
void * newbase = toku_mempool_malloc(&new_mp, used, 1);
invariant_notnull(newbase);
memcpy(newbase, toku_mempool_get_base(&m_buffer_mempool), used);
toku_mempool_destroy(&m_buffer_mempool);
m_buffer_mempool = new_mp;
}
if (version < FT_LAYOUT_VERSION_26) {
// Unnecessary after version 26
// Reallocate smaller mempool to save memory
invariant_zero(toku_mempool_get_frag_size(&m_buffer_mempool));
toku_mempool_realloc_larger(&m_buffer_mempool, toku_mempool_get_used_space(&m_buffer_mempool));
}
}
......@@ -370,7 +393,7 @@ static int move_it (const uint32_t, klpair_struct *klpair, const uint32_t idx UU
return 0;
}
// Compress things, and grow the mempool if needed.
// Compress things, and grow or shrink the mempool if needed.
// May (always if force_compress) have a side effect of putting contents of mempool in sorted order.
void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool force_compress) {
uint32_t total_size_needed = toku_mempool_get_used_space(&m_buffer_mempool) + added_size;
......
......@@ -316,6 +316,8 @@ public:
// Gets a leafentry given a klpair from this basement node.
LEAFENTRY get_le_from_klpair(const klpair_struct *klpair) const;
void serialize_to_wbuf(struct wbuf *const wb);
// Prepares this basement node for serialization.
// Must be called before serializing this basement node.
// Between calling prepare_to_serialize and actually serializing, the basement node may not be modified
......@@ -332,11 +334,6 @@ public:
// Currently only supported when all keys are fixed-length.
void serialize_rest(struct wbuf *wb) const;
// Returns true if we must use the old (version 24) serialization method for this basement node
// Requires prepare_to_serialize() to have been called first.
// In other words, the bndata does not know how to serialize the keys and leafentries.
bool need_to_serialize_each_leafentry_with_key(void) const;
static const uint32_t HEADER_LENGTH = 0
+ sizeof(uint32_t) // key_data_size
+ sizeof(uint32_t) // val_data_size
......
......@@ -462,6 +462,7 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
case FT_LAYOUT_VERSION_26:
case FT_LAYOUT_VERSION_25:
case FT_LAYOUT_VERSION_24:
case FT_LAYOUT_VERSION_23:
......
......@@ -118,7 +118,8 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection
FT_LAYOUT_VERSION_23 = 23, // Ming: Fix upgrade path #5902
FT_LAYOUT_VERSION_24 = 24, // Riddler: change logentries that log transactions to store TXNID_PAIRs instead of TXNIDs
FT_LAYOUT_VERSION_25 = 25, // SecretSquirrel: ROLLBACK_LOG_NODES (on disk and in memory) now just use blocknum (instead of blocknum + hash) to point to other log nodes. same for xstillopen log entry, basements store key/vals separately on disk
FT_LAYOUT_VERSION_25 = 25, // SecretSquirrel: ROLLBACK_LOG_NODES (on disk and in memory) now just use blocknum (instead of blocknum + hash) to point to other log nodes. same for xstillopen log entry
FT_LAYOUT_VERSION_26 = 26, // Hojo: basements store key/vals separately on disk for fixed klpair length BNs
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -284,31 +284,6 @@ serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
invariant(wbuf->ndone == wbuf->size);
}
static int
wbufwriteleafentry(const void* key, const uint32_t keylen, const LEAFENTRY &le, const uint32_t UU(idx), struct wbuf * const wb) {
// need to pack the leafentry as it was in versions
// where the key was integrated into it
uint32_t begin_spot UU() = wb->ndone;
uint32_t le_disk_size = leafentry_disksize(le);
wbuf_nocrc_uint8_t(wb, le->type);
wbuf_nocrc_uint32_t(wb, keylen);
if (le->type == LE_CLEAN) {
wbuf_nocrc_uint32_t(wb, le->u.clean.vallen);
wbuf_nocrc_literal_bytes(wb, key, keylen);
wbuf_nocrc_literal_bytes(wb, le->u.clean.val, le->u.clean.vallen);
}
else {
paranoid_invariant(le->type == LE_MVCC);
wbuf_nocrc_uint32_t(wb, le->u.mvcc.num_cxrs);
wbuf_nocrc_uint8_t(wb, le->u.mvcc.num_pxrs);
wbuf_nocrc_literal_bytes(wb, key, keylen);
wbuf_nocrc_literal_bytes(wb, le->u.mvcc.xrs, le_disk_size - (1 + 4 + 1));
}
uint32_t end_spot UU() = wb->ndone;
paranoid_invariant((end_spot - begin_spot) == keylen + sizeof(keylen) + le_disk_size);
return 0;
}
static uint32_t
serialize_ftnode_partition_size (FTNODE node, int i)
{
......@@ -380,16 +355,7 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
wbuf_nocrc_char(&wb, ch);
wbuf_nocrc_uint(&wb, bd->dmt_size());
bd->prepare_to_serialize();
bd->serialize_header(&wb);
if (bd->need_to_serialize_each_leafentry_with_key()) {
//
// iterate over leafentries and place them into the buffer
//
bd->dmt_iterate<struct wbuf, wbufwriteleafentry>(&wb);
} else {
bd->serialize_rest(&wb);
}
bd->serialize_to_wbuf(&wb);
}
uint32_t end_to_end_checksum = x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb));
wbuf_nocrc_int(&wb, end_to_end_checksum);
......
......@@ -1240,7 +1240,7 @@ void dmt<dmtdata_t, dmtdataout_t, dmtwriter_t>::builder::build(dmt<dmtdata_t, dm
// When we know the elements are fixed-length, we use the better dmt constructor.
// In practice, as of Jan 2014, we use the builder in two cases:
// - When we know the elements are not fixed-length.
// - During upgrade of a pre version 25 basement node.
// - During upgrade of a pre version 26 basement node.
// During upgrade, we will probably wildly overallocate because we don't account for the values that aren't stored in the dmt, so here we want to shrink the mempool.
// When we know the elements are not fixed-length, we still know how much memory they occupy in total, modulo alignment, so we want to allow for mempool overhead and worst-case alignment overhead, and not shrink the mempool.
const size_t max_allowed = used + (ALIGNMENT-1) * this->temp.size();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment