Commit 4e9ec6a6 authored by John Esmet's avatar John Esmet

FT-249 Move de/serialization code to the message buffer class itself,

further simplifying the logic that is contained in
ft/ft_node-serialize.cc
parent f6591118
This diff is collapsed.
...@@ -110,7 +110,63 @@ void message_buffer::destroy() { ...@@ -110,7 +110,63 @@ void message_buffer::destroy() {
} }
} }
void message_buffer::resize(size_t new_size) { void message_buffer::deserialize_from_rbuf(struct rbuf *rb,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **stale_offsets, int32_t *nstale,
int32_t **broadcast_offsets, int32_t *nbroadcast) {
// read the number of messages in this buffer
int n_in_this_buffer = rbuf_int(rb);
if (fresh_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *fresh_offsets);
}
if (stale_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *stale_offsets);
}
if (broadcast_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *broadcast_offsets);
}
_resize(rb->size + 64); // rb->size is a good hint for how big the buffer will be
// read in each message individually
for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
// this is weird but it's necessary to pass icc and gcc together
unsigned char ctype = rbuf_char(rb);
enum ft_msg_type type = (enum ft_msg_type) ctype;
bool is_fresh = rbuf_char(rb);
MSN msn = rbuf_msn(rb);
XIDS xids;
xids_create_from_buffer(rb, &xids);
rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rb, &val, &vallen);
int32_t *dest = nullptr;
if (ft_msg_type_applies_once(type)) {
if (is_fresh) {
dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
} else {
dest = stale_offsets ? *stale_offsets + (*nstale)++ : nullptr;
}
} else {
invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
}
// TODO: Function to parse stuff out of an rbuf into an FT_MSG
DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
enqueue(&msg, is_fresh, dest);
xids_destroy(&xids);
}
invariant(num_entries() == n_in_this_buffer);
}
void message_buffer::_resize(size_t new_size) {
XREALLOC_N(new_size, _memory); XREALLOC_N(new_size, _memory);
_memory_size = new_size; _memory_size = new_size;
} }
...@@ -134,7 +190,7 @@ void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) { ...@@ -134,7 +190,7 @@ void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) {
if (_memory == nullptr || need_space_total > _memory_size) { if (_memory == nullptr || need_space_total > _memory_size) {
// resize the buffer to the next power of 2 greater than the needed space // resize the buffer to the next power of 2 greater than the needed space
int next_2 = next_power_of_two(need_space_total); int next_2 = next_power_of_two(need_space_total);
resize(next_2); _resize(next_2);
} }
ITEMLEN keylen = ft_msg_get_keylen(msg); ITEMLEN keylen = ft_msg_get_keylen(msg);
ITEMLEN datalen = ft_msg_get_vallen(msg); ITEMLEN datalen = ft_msg_get_vallen(msg);
...@@ -212,6 +268,26 @@ bool message_buffer::equals(message_buffer *other) const { ...@@ -212,6 +268,26 @@ bool message_buffer::equals(message_buffer *other) const {
memcmp(_memory, other->_memory, _memory_used) == 0); memcmp(_memory, other->_memory, _memory_used) == 0);
} }
void message_buffer::serialize_to_wbuf(struct wbuf *wb) const {
wbuf_nocrc_int(wb, num_entries());
struct msg_serialize_fn {
struct wbuf *wb;
msg_serialize_fn(struct wbuf *w) : wb(w) { }
int operator()(FT_MSG msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type;
paranoid_invariant((int) type >= 0 && (int) type < 256);
wbuf_nocrc_char(wb, (unsigned char) type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, msg->msn);
wbuf_nocrc_xids(wb, ft_msg_get_xids(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
return 0;
}
} serialize_fn(wb);
iterate(serialize_fn);
}
size_t message_buffer::msg_memsize_in_buffer(FT_MSG msg) { size_t message_buffer::msg_memsize_in_buffer(FT_MSG msg) {
const uint32_t keylen = ft_msg_get_keylen(msg); const uint32_t keylen = ft_msg_get_keylen(msg);
const uint32_t datalen = ft_msg_get_vallen(msg); const uint32_t datalen = ft_msg_get_vallen(msg);
......
...@@ -102,7 +102,14 @@ public: ...@@ -102,7 +102,14 @@ public:
void destroy(); void destroy();
void resize(size_t new_size); // effect: deserializes a message buffer from the given rbuf
// returns: *fresh_offsets (etc) malloc'd to be num_entries large and
// populated with *nfresh (etc) offsets in the message buffer
// requires: if fresh_offsets (etc) != nullptr, then nfresh != nullptr
void deserialize_from_rbuf(struct rbuf *rb,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **stale_offsets, int32_t *nstale,
int32_t **broadcast_offsets, int32_t *nbroadcast);
void enqueue(FT_MSG msg, bool is_fresh, int32_t *offset); void enqueue(FT_MSG msg, bool is_fresh, int32_t *offset);
...@@ -139,9 +146,13 @@ public: ...@@ -139,9 +146,13 @@ public:
bool equals(message_buffer *other) const; bool equals(message_buffer *other) const;
void serialize_to_wbuf(struct wbuf *wb) const;
static size_t msg_memsize_in_buffer(FT_MSG msg); static size_t msg_memsize_in_buffer(FT_MSG msg);
private: private:
void _resize(size_t new_size);
// If this isn't packged, the compiler aligns the xids array and we waste a lot of space // If this isn't packged, the compiler aligns the xids array and we waste a lot of space
struct __attribute__((__packed__)) buffer_entry { struct __attribute__((__packed__)) buffer_entry {
unsigned int keylen; unsigned int keylen;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment