Commit 55b38998 authored by John Esmet's avatar John Esmet

FT-278 Put ft_msg in its own class and file, organize deserialization

and upgrade code so there's less complexity in ft/ft_node-serialize.cc
parent a13a8e84
...@@ -35,7 +35,6 @@ set(FT_SOURCES ...@@ -35,7 +35,6 @@ set(FT_SOURCES
ft-cachetable-wrappers ft-cachetable-wrappers
ft-flusher ft-flusher
ft-hot-flusher ft-hot-flusher
ft_msg
ft_node-serialize ft_node-serialize
ft-node-deserialize ft-node-deserialize
ft-ops ft-ops
...@@ -52,6 +51,7 @@ set(FT_SOURCES ...@@ -52,6 +51,7 @@ set(FT_SOURCES
logfilemgr logfilemgr
logger logger
log_upgrade log_upgrade
msg
msg_buffer msg_buffer
node node
pivotkeys pivotkeys
......
...@@ -1682,7 +1682,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p ...@@ -1682,7 +1682,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) { ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
stats_delta = { 0, 0 }; stats_delta = { 0, 0 };
} }
int operator()(FT_MSG msg, bool is_fresh) { int operator()(const ft_msg &msg, bool is_fresh) {
size_t flow_deltas[] = { 0, 0 }; size_t flow_deltas[] = { 0, 0 };
size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg); size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
if (remaining_memsize <= bnc->flow[0]) { if (remaining_memsize <= bnc->flow[0]) {
......
...@@ -119,8 +119,6 @@ PATENT RIGHTS GRANT: ...@@ -119,8 +119,6 @@ PATENT RIGHTS GRANT:
struct block_table; struct block_table;
struct ft_search; struct ft_search;
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { FT_MSG_OVERHEAD = (2 + sizeof(MSN)) }; // the type plus freshness plus MSN
enum { FT_DEFAULT_FANOUT = 16 }; enum { FT_DEFAULT_FANOUT = 16 };
enum { FT_DEFAULT_NODE_SIZE = 4 * 1024 * 1024 }; enum { FT_DEFAULT_NODE_SIZE = 4 * 1024 * 1024 };
enum { FT_DEFAULT_BASEMENT_NODE_SIZE = 128 * 1024 }; enum { FT_DEFAULT_BASEMENT_NODE_SIZE = 128 * 1024 };
...@@ -493,7 +491,7 @@ toku_bfe_rightmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node); ...@@ -493,7 +491,7 @@ toku_bfe_rightmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node);
// allocate a block number // allocate a block number
// allocate and initialize a ftnode // allocate and initialize a ftnode
// put the ftnode into the cache table // put the ftnode into the cache table
void toku_create_new_ftnode (FT_HANDLE t, FTNODE *result, int height, int n_children); void toku_create_new_ftnode(FT_HANDLE ft_handle, FTNODE *result, int height, int n_children);
/* Stuff for testing */ /* Stuff for testing */
// toku_testsetup_initialize() must be called before any other test_setup_xxx() functions are called. // toku_testsetup_initialize() must be called before any other test_setup_xxx() functions are called.
...@@ -506,14 +504,10 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_h, BLOCKNUM, const char *key, in ...@@ -506,14 +504,10 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_h, BLOCKNUM, const char *key, in
int toku_testsetup_insert_to_nonleaf (FT_HANDLE ft_h, BLOCKNUM, enum ft_msg_type, const char *key, int keylen, const char *val, int vallen); int toku_testsetup_insert_to_nonleaf (FT_HANDLE ft_h, BLOCKNUM, enum ft_msg_type, const char *key, int keylen, const char *val, int vallen);
void toku_pin_node_with_min_bfe(FTNODE* node, BLOCKNUM b, FT_HANDLE t); void toku_pin_node_with_min_bfe(FTNODE* node, BLOCKNUM b, FT_HANDLE t);
void toku_ft_root_put_msg(FT ft, FT_MSG msg, txn_gc_info *gc_info); void toku_ft_root_put_msg(FT ft, const ft_msg &msg, txn_gc_info *gc_info);
void // TODO: Rename
toku_get_node_for_verify( void toku_get_node_for_verify(BLOCKNUM blocknum, FT_HANDLE ft_h, FTNODE* nodep);
BLOCKNUM blocknum,
FT_HANDLE ft_h,
FTNODE* nodep
);
int int
toku_verify_ftnode (FT_HANDLE ft_h, toku_verify_ftnode (FT_HANDLE ft_h,
......
This diff is collapsed.
...@@ -99,7 +99,7 @@ PATENT RIGHTS GRANT: ...@@ -99,7 +99,7 @@ PATENT RIGHTS GRANT:
#include "cachetable.h" #include "cachetable.h"
#include "log.h" #include "log.h"
#include "compress.h" #include "compress.h"
#include "ft_msg.h" #include "ft/msg.h"
int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *, int nodesize, int basementnodesize, enum toku_compression_method compression_method, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*)) __attribute__ ((warn_unused_result)); int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *, int nodesize, int basementnodesize, enum toku_compression_method compression_method, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*)) __attribute__ ((warn_unused_result));
......
...@@ -224,25 +224,22 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_handle, BLOCKNUM blocknum, const ...@@ -224,25 +224,22 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_handle, BLOCKNUM blocknum, const
toku_verify_or_set_counts(node); toku_verify_or_set_counts(node);
assert(node->height==0); assert(node->height==0);
DBT keydbt,valdbt; DBT kdbt, vdbt;
MSN msn = next_dummymsn(); ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen),
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), FT_INSERT, next_dummymsn(), xids_get_root_xids());
.u = { .id = { toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen) } } };
static size_t zero_flow_deltas[] = { 0, 0 }; static size_t zero_flow_deltas[] = { 0, 0 };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ftnode_put_msg( toku_ftnode_put_msg(ft_handle->ft->cmp,
ft_handle->ft->cmp, ft_handle->ft->update_fun,
ft_handle->ft->update_fun, node,
node, -1,
-1, msg,
&msg, true,
true, &gc_info,
&gc_info, zero_flow_deltas,
zero_flow_deltas, NULL
NULL );
);
toku_verify_or_set_counts(node); toku_verify_or_set_counts(node);
......
...@@ -204,13 +204,13 @@ int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct ve ...@@ -204,13 +204,13 @@ int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct ve
int keep_going_on_failure = e->keep_going_on_failure; int keep_going_on_failure = e->keep_going_on_failure;
int result = 0; int result = 0;
DBT k, v; DBT k, v;
FT_MSG_S msg = e->msg_buffer->get_message(offset, &k, &v); ft_msg msg = e->msg_buffer->get_message(offset, &k, &v);
bool is_fresh = e->msg_buffer->get_freshness(offset); bool is_fresh = e->msg_buffer->get_freshness(offset);
if (e->broadcast) { if (e->broadcast) {
VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type), VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type()) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type()),
e->i, "message found in broadcast list that is not a broadcast"); e->i, "message found in broadcast list that is not a broadcast");
} else { } else {
VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type), VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type()),
e->i, "message found in fresh or stale message tree that does not apply once"); e->i, "message found in fresh or stale message tree that does not apply once");
if (e->is_fresh) { if (e->is_fresh) {
if (e->messages_have_been_moved) { if (e->messages_have_been_moved) {
...@@ -322,14 +322,14 @@ struct verify_msg_fn { ...@@ -322,14 +322,14 @@ struct verify_msg_fn {
blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) { blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) {
} }
int operator()(FT_MSG msg, bool is_fresh) { int operator()(const ft_msg &msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type; enum ft_msg_type type = (enum ft_msg_type) msg.type();
MSN msn = msg->msn; MSN msn = msg.msn();
XIDS xid = msg->xids; XIDS xid = msg.xids();
const void *key = ft_msg_get_key(msg); const void *key = msg.kdbt()->data;
const void *data = ft_msg_get_val(msg); const void *data = msg.vdbt()->data;
ITEMLEN keylen = ft_msg_get_keylen(msg); ITEMLEN keylen = msg.kdbt()->size;
ITEMLEN datalen = ft_msg_get_vallen(msg); ITEMLEN datalen = msg.vdbt()->size;
int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid, int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid,
curr_less_pivot, curr_less_pivot,
......
This diff is collapsed.
...@@ -99,7 +99,7 @@ PATENT RIGHTS GRANT: ...@@ -99,7 +99,7 @@ PATENT RIGHTS GRANT:
#include "ft/txn_manager.h" #include "ft/txn_manager.h"
#include "ft/rbuf.h" #include "ft/rbuf.h"
#include "ft/ft_msg.h" #include "ft/msg.h"
/* /*
Memory format of packed leaf entry Memory format of packed leaf entry
...@@ -248,7 +248,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored ...@@ -248,7 +248,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored
class bn_data; class bn_data;
void void
toku_le_apply_msg(FT_MSG msg, toku_le_apply_msg(const ft_msg &msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data. LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
......
...@@ -2941,16 +2941,12 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2941,16 +2941,12 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
// #3588 TODO can do the rebalancing here and avoid a lot of work later // #3588 TODO can do the rebalancing here and avoid a lot of work later
FTNODE leafnode = lbuf->node; FTNODE leafnode = lbuf->node;
uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs(); uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs();
DBT thekey = { .data = key, .size = (uint32_t) keylen }; DBT kdbt, vdbt;
DBT theval = { .data = val, .size = (uint32_t) vallen }; ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen), FT_INSERT, ZERO_MSN, lbuf->xids);
FT_MSG_S msg = { .type = FT_INSERT, uint64_t workdone = 0;
.msn = ZERO_MSN,
.xids = lbuf->xids,
.u = { .id = { &thekey, &theval } } };
uint64_t workdone=0;
// there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info // there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, &workdone, stats_to_update); toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, &workdone, stats_to_update);
} }
static int write_literal(struct dbout *out, void*data, size_t len) { static int write_literal(struct dbout *out, void*data, size_t len) {
......
...@@ -88,46 +88,84 @@ PATENT RIGHTS GRANT: ...@@ -88,46 +88,84 @@ PATENT RIGHTS GRANT:
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#include "portability/toku_portability.h"
#include <toku_portability.h> #include "ft/fttypes.h"
#include "fttypes.h" #include "ft/msg.h"
#include "xids.h" #include "ft/xids.h"
#include "ft_msg.h" #include "ft/ybt.h"
ft_msg::ft_msg(const DBT *key, const DBT *val, enum ft_msg_type t, MSN m, XIDS x) :
_key(key ? *key : toku_empty_dbt()),
_val(val ? *val : toku_empty_dbt()),
_type(t), _msn(m), _xids(x) {
}
ft_msg ft_msg::deserialize_from_rbuf(struct rbuf *rb, XIDS *x, bool *is_fresh) {
bytevec keyp, valp;
ITEMLEN keylen, vallen;
enum ft_msg_type t = (enum ft_msg_type) rbuf_char(rb);
*is_fresh = rbuf_char(rb);
MSN m = rbuf_msn(rb);
xids_create_from_buffer(rb, x);
rbuf_bytes(rb, &keyp, &keylen);
rbuf_bytes(rb, &valp, &vallen);
DBT k, v;
return ft_msg(toku_fill_dbt(&k, keyp, keylen), toku_fill_dbt(&v, valp, vallen), t, m, *x);
}
ft_msg ft_msg::deserialize_from_rbuf_v13(struct rbuf *rb, MSN m, XIDS *x) {
bytevec keyp, valp;
ITEMLEN keylen, vallen;
enum ft_msg_type t = (enum ft_msg_type) rbuf_char(rb);
xids_create_from_buffer(rb, x);
rbuf_bytes(rb, &keyp, &keylen);
rbuf_bytes(rb, &valp, &vallen);
DBT k, v;
return ft_msg(toku_fill_dbt(&k, keyp, keylen), toku_fill_dbt(&v, valp, vallen), t, m, *x);
}
uint32_t const DBT *ft_msg::kdbt() const {
ft_msg_get_keylen(FT_MSG ft_msg) { return &_key;
uint32_t rval = ft_msg->u.id.key->size;
return rval;
} }
uint32_t const DBT *ft_msg::vdbt() const {
ft_msg_get_vallen(FT_MSG ft_msg) { return &_val;
uint32_t rval = ft_msg->u.id.val->size;
return rval;
} }
XIDS enum ft_msg_type ft_msg::type() const {
ft_msg_get_xids(FT_MSG ft_msg) { return _type;
XIDS rval = ft_msg->xids;
return rval;
} }
void * MSN ft_msg::msn() const {
ft_msg_get_key(FT_MSG ft_msg) { return _msn;
void * rval = ft_msg->u.id.key->data;
return rval;
} }
void * XIDS ft_msg::xids() const {
ft_msg_get_val(FT_MSG ft_msg) { return _xids;
void * rval = ft_msg->u.id.val->data;
return rval;
} }
enum ft_msg_type size_t ft_msg::total_size() const {
ft_msg_get_type(FT_MSG ft_msg) { // Must store two 4-byte lengths
enum ft_msg_type rval = ft_msg->type; static const size_t key_val_overhead = 8;
return rval;
// 1 byte type, 1 byte freshness, then 8 byte MSN
static const size_t msg_overhead = 2 + sizeof(MSN);
static const size_t total_overhead = key_val_overhead + msg_overhead;
const size_t keyval_size = _key.size + _val.size;
const size_t xids_size = xids_get_serialize_size(xids());
return total_overhead + keyval_size + xids_size;
} }
void ft_msg::serialize_to_wbuf(struct wbuf *wb, bool is_fresh) const {
wbuf_nocrc_char(wb, (unsigned char) _type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, _msn);
wbuf_nocrc_xids(wb, _xids);
wbuf_nocrc_bytes(wb, _key.data, _key.size);
wbuf_nocrc_bytes(wb, _val.data, _val.size);
}
...@@ -181,32 +181,36 @@ ft_msg_type_does_nothing(enum ft_msg_type type) ...@@ -181,32 +181,36 @@ ft_msg_type_does_nothing(enum ft_msg_type type)
typedef struct xids_t *XIDS; typedef struct xids_t *XIDS;
/* tree commands */ class ft_msg {
struct ft_msg { public:
enum ft_msg_type type; ft_msg(const DBT *key, const DBT *val, enum ft_msg_type t, MSN m, XIDS x);
MSN msn; // message sequence number
XIDS xids; enum ft_msg_type type() const;
union {
/* insert or delete */ MSN msn() const;
struct ft_msg_insert_delete {
const DBT *key; // for insert, delete, upsertdel
const DBT *val; // for insert, delete, (and it is the "extra" for upsertdel, upsertdel_broadcast_all)
} id;
} u;
};
// Message sent into the ft to implement insert, delete, update, etc XIDS xids() const;
typedef struct ft_msg FT_MSG_S;
typedef struct ft_msg *FT_MSG;
uint32_t ft_msg_get_keylen(FT_MSG ft_msg); const DBT *kdbt() const;
uint32_t ft_msg_get_vallen(FT_MSG ft_msg); const DBT *vdbt() const;
XIDS ft_msg_get_xids(FT_MSG ft_msg); size_t total_size() const;
void *ft_msg_get_key(FT_MSG ft_msg); void serialize_to_wbuf(struct wbuf *wb, bool is_fresh) const;
void *ft_msg_get_val(FT_MSG ft_msg); // deserialization goes through a static factory function so the ft msg
// API stays completely const and there's no default constructor
static ft_msg deserialize_from_rbuf(struct rbuf *rb, XIDS *xids, bool *is_fresh);
enum ft_msg_type ft_msg_get_type(FT_MSG ft_msg); // Version 13/14 messages did not have an msn - so `m' is the MSN
// that will be assigned to the message that gets deserialized.
static ft_msg deserialize_from_rbuf_v13(struct rbuf *rb, MSN m, XIDS *xids);
private:
const DBT _key;
const DBT _val;
enum ft_msg_type _type;
MSN _msn;
XIDS _xids;
};
...@@ -128,42 +128,75 @@ void message_buffer::deserialize_from_rbuf(struct rbuf *rb, ...@@ -128,42 +128,75 @@ void message_buffer::deserialize_from_rbuf(struct rbuf *rb,
_resize(rb->size + 64); // rb->size is a good hint for how big the buffer will be _resize(rb->size + 64); // rb->size is a good hint for how big the buffer will be
// read in each message individually // deserialize each message individually, noting whether it was fresh
// and putting its buffer offset in the appropriate offsets array
for (int i = 0; i < n_in_this_buffer; i++) { for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
// this is weird but it's necessary to pass icc and gcc together
unsigned char ctype = rbuf_char(rb);
enum ft_msg_type type = (enum ft_msg_type) ctype;
bool is_fresh = rbuf_char(rb);
MSN msn = rbuf_msn(rb);
XIDS xids; XIDS xids;
xids_create_from_buffer(rb, &xids); bool is_fresh;
rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */ const ft_msg msg = ft_msg::deserialize_from_rbuf(rb, &xids, &is_fresh);
rbuf_bytes(rb, &val, &vallen);
int32_t *dest = nullptr; int32_t *dest;
if (ft_msg_type_applies_once(type)) { if (ft_msg_type_applies_once(msg.type())) {
if (is_fresh) { if (is_fresh) {
dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr; dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
} else { } else {
dest = stale_offsets ? *stale_offsets + (*nstale)++ : nullptr; dest = stale_offsets ? *stale_offsets + (*nstale)++ : nullptr;
} }
} else { } else {
invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)); invariant(ft_msg_type_applies_all(msg.type()) || ft_msg_type_does_nothing(msg.type()));
dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
}
enqueue(msg, is_fresh, dest);
xids_destroy(&xids);
}
invariant(_num_entries == n_in_this_buffer);
}
MSN message_buffer::deserialize_from_rbuf_v13(struct rbuf *rb,
MSN *highest_unused_msn_for_upgrade,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **broadcast_offsets, int32_t *nbroadcast) {
// read the number of messages in this buffer
int n_in_this_buffer = rbuf_int(rb);
if (fresh_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *fresh_offsets);
}
if (broadcast_offsets != nullptr) {
XMALLOC_N(n_in_this_buffer, *broadcast_offsets);
}
// Atomically decrement the header's MSN count by the number
// of messages in the buffer.
MSN highest_msn_in_this_buffer = {
.msn = toku_sync_sub_and_fetch(&highest_unused_msn_for_upgrade->msn, n_in_this_buffer)
};
// Create the message buffers from the deserialized buffer.
for (int i = 0; i < n_in_this_buffer; i++) {
XIDS xids;
// There were no stale messages at this version, so call it fresh.
const bool is_fresh = true;
// Increment our MSN, the last message should have the
// newest/highest MSN. See above for a full explanation.
highest_msn_in_this_buffer.msn++;
const ft_msg msg = ft_msg::deserialize_from_rbuf_v13(rb, highest_msn_in_this_buffer, &xids);
int32_t *dest;
if (ft_msg_type_applies_once(msg.type())) {
dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
} else {
invariant(ft_msg_type_applies_all(msg.type()) || ft_msg_type_does_nothing(msg.type()));
dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr; dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
} }
// TODO: Function to parse stuff out of an rbuf into an FT_MSG enqueue(msg, is_fresh, dest);
DBT k, v;
FT_MSG_S msg = {
type, msn, xids,
.u = { .id = { toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen) } }
};
enqueue(&msg, is_fresh, dest);
xids_destroy(&xids); xids_destroy(&xids);
} }
invariant(num_entries() == n_in_this_buffer); return highest_msn_in_this_buffer;
} }
void message_buffer::_resize(size_t new_size) { void message_buffer::_resize(size_t new_size) {
...@@ -184,7 +217,7 @@ struct message_buffer::buffer_entry *message_buffer::get_buffer_entry(int32_t of ...@@ -184,7 +217,7 @@ struct message_buffer::buffer_entry *message_buffer::get_buffer_entry(int32_t of
return (struct buffer_entry *) (_memory + offset); return (struct buffer_entry *) (_memory + offset);
} }
void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) { void message_buffer::enqueue(const ft_msg &msg, bool is_fresh, int32_t *offset) {
int need_space_here = msg_memsize_in_buffer(msg); int need_space_here = msg_memsize_in_buffer(msg);
int need_space_total = _memory_used + need_space_here; int need_space_total = _memory_used + need_space_here;
if (_memory == nullptr || need_space_total > _memory_size) { if (_memory == nullptr || need_space_total > _memory_size) {
...@@ -192,18 +225,18 @@ void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) { ...@@ -192,18 +225,18 @@ void message_buffer::enqueue(FT_MSG msg, bool is_fresh, int32_t *offset) {
int next_2 = next_power_of_two(need_space_total); int next_2 = next_power_of_two(need_space_total);
_resize(next_2); _resize(next_2);
} }
ITEMLEN keylen = ft_msg_get_keylen(msg); ITEMLEN keylen = msg.kdbt()->size;
ITEMLEN datalen = ft_msg_get_vallen(msg); ITEMLEN datalen = msg.vdbt()->size;
struct buffer_entry *entry = get_buffer_entry(_memory_used); struct buffer_entry *entry = get_buffer_entry(_memory_used);
entry->type = (unsigned char) ft_msg_get_type(msg); entry->type = (unsigned char) msg.type();
entry->msn = msg->msn; entry->msn = msg.msn();
xids_cpy(&entry->xids_s, ft_msg_get_xids(msg)); xids_cpy(&entry->xids_s, msg.xids());
entry->is_fresh = is_fresh; entry->is_fresh = is_fresh;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s); unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
entry->keylen = keylen; entry->keylen = keylen;
memcpy(e_key, ft_msg_get_key(msg), keylen); memcpy(e_key, msg.kdbt()->data, keylen);
entry->vallen = datalen; entry->vallen = datalen;
memcpy(e_key + keylen, ft_msg_get_val(msg), datalen); memcpy(e_key + keylen, msg.vdbt()->data, datalen);
if (offset) { if (offset) {
*offset = _memory_used; *offset = _memory_used;
} }
...@@ -221,7 +254,7 @@ bool message_buffer::get_freshness(int32_t offset) const { ...@@ -221,7 +254,7 @@ bool message_buffer::get_freshness(int32_t offset) const {
return entry->is_fresh; return entry->is_fresh;
} }
FT_MSG_S message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const { ft_msg message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const {
struct buffer_entry *entry = get_buffer_entry(offset); struct buffer_entry *entry = get_buffer_entry(offset);
ITEMLEN keylen = entry->keylen; ITEMLEN keylen = entry->keylen;
ITEMLEN vallen = entry->vallen; ITEMLEN vallen = entry->vallen;
...@@ -230,11 +263,7 @@ FT_MSG_S message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) c ...@@ -230,11 +263,7 @@ FT_MSG_S message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) c
const XIDS xids = (XIDS) &entry->xids_s; const XIDS xids = (XIDS) &entry->xids_s;
bytevec key = xids_get_end_of_array(xids); bytevec key = xids_get_end_of_array(xids);
bytevec val = (uint8_t *) key + entry->keylen; bytevec val = (uint8_t *) key + entry->keylen;
FT_MSG_S msg = { return ft_msg(toku_fill_dbt(keydbt, key, keylen), toku_fill_dbt(valdbt, val, vallen), type, msn, xids);
type, msn, xids,
.u = { .id = { toku_fill_dbt(keydbt, key, keylen), toku_fill_dbt(valdbt, val, vallen) } }
};
return msg;
} }
void message_buffer::get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const { void message_buffer::get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const {
...@@ -269,28 +298,21 @@ bool message_buffer::equals(message_buffer *other) const { ...@@ -269,28 +298,21 @@ bool message_buffer::equals(message_buffer *other) const {
} }
void message_buffer::serialize_to_wbuf(struct wbuf *wb) const { void message_buffer::serialize_to_wbuf(struct wbuf *wb) const {
wbuf_nocrc_int(wb, num_entries()); wbuf_nocrc_int(wb, _num_entries);
struct msg_serialize_fn { struct msg_serialize_fn {
struct wbuf *wb; struct wbuf *wb;
msg_serialize_fn(struct wbuf *w) : wb(w) { } msg_serialize_fn(struct wbuf *w) : wb(w) { }
int operator()(FT_MSG msg, bool is_fresh) { int operator()(const ft_msg &msg, bool is_fresh) {
enum ft_msg_type type = (enum ft_msg_type) msg->type; msg.serialize_to_wbuf(wb, is_fresh);
paranoid_invariant((int) type >= 0 && (int) type < 256);
wbuf_nocrc_char(wb, (unsigned char) type);
wbuf_nocrc_char(wb, (unsigned char) is_fresh);
wbuf_MSN(wb, msg->msn);
wbuf_nocrc_xids(wb, ft_msg_get_xids(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_key(msg), ft_msg_get_keylen(msg));
wbuf_nocrc_bytes(wb, ft_msg_get_val(msg), ft_msg_get_vallen(msg));
return 0; return 0;
} }
} serialize_fn(wb); } serialize_fn(wb);
iterate(serialize_fn); iterate(serialize_fn);
} }
size_t message_buffer::msg_memsize_in_buffer(FT_MSG msg) { size_t message_buffer::msg_memsize_in_buffer(const ft_msg &msg) {
const uint32_t keylen = ft_msg_get_keylen(msg); const uint32_t keylen = msg.kdbt()->size;
const uint32_t datalen = ft_msg_get_vallen(msg); const uint32_t datalen = msg.vdbt()->size;
const size_t xidslen = xids_get_size(msg->xids); const size_t xidslen = xids_get_size(msg.xids());
return sizeof(struct buffer_entry) + keylen + datalen + xidslen - sizeof(XIDS_S); return sizeof(struct buffer_entry) + keylen + datalen + xidslen - sizeof(XIDS_S);
} }
...@@ -91,7 +91,7 @@ PATENT RIGHTS GRANT: ...@@ -91,7 +91,7 @@ PATENT RIGHTS GRANT:
#include "ft/fttypes.h" #include "ft/fttypes.h"
#include "ft/xids-internal.h" #include "ft/xids-internal.h"
#include "ft/xids.h" #include "ft/xids.h"
#include "ft/ft_msg.h" #include "ft/msg.h"
#include "ft/ybt.h" #include "ft/ybt.h"
class message_buffer { class message_buffer {
...@@ -111,13 +111,24 @@ public: ...@@ -111,13 +111,24 @@ public:
int32_t **stale_offsets, int32_t *nstale, int32_t **stale_offsets, int32_t *nstale,
int32_t **broadcast_offsets, int32_t *nbroadcast); int32_t **broadcast_offsets, int32_t *nbroadcast);
void enqueue(FT_MSG msg, bool is_fresh, int32_t *offset); // effect: deserializes a message buffer whose messages are at version 13/14
// returns: similar to deserialize_from_rbuf(), excpet there are no stale messages
// and each message is assigned a sequential value from *highest_unused_msn_for_upgrade,
// which is modified as needed using toku_sync_fech_and_sub()
// returns: the highest MSN assigned to any message in this buffer
// requires: similar to deserialize_from_rbuf(), and highest_unused_msn_for_upgrade != nullptr
MSN deserialize_from_rbuf_v13(struct rbuf *rb,
MSN *highest_unused_msn_for_upgrade,
int32_t **fresh_offsets, int32_t *nfresh,
int32_t **broadcast_offsets, int32_t *nbroadcast);
void enqueue(const ft_msg &msg, bool is_fresh, int32_t *offset);
void set_freshness(int32_t offset, bool is_fresh); void set_freshness(int32_t offset, bool is_fresh);
bool get_freshness(int32_t offset) const; bool get_freshness(int32_t offset) const;
FT_MSG_S get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const; ft_msg get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const;
void get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const; void get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const;
...@@ -133,13 +144,13 @@ public: ...@@ -133,13 +144,13 @@ public:
int iterate(F &fn) const { int iterate(F &fn) const {
for (int32_t offset = 0; offset < _memory_used; ) { for (int32_t offset = 0; offset < _memory_used; ) {
DBT k, v; DBT k, v;
FT_MSG_S msg = get_message(offset, &k, &v); const ft_msg msg = get_message(offset, &k, &v);
bool is_fresh = get_freshness(offset); bool is_fresh = get_freshness(offset);
int r = fn(&msg, is_fresh); int r = fn(msg, is_fresh);
if (r != 0) { if (r != 0) {
return r; return r;
} }
offset += msg_memsize_in_buffer(&msg); offset += msg_memsize_in_buffer(msg);
} }
return 0; return 0;
} }
...@@ -148,7 +159,7 @@ public: ...@@ -148,7 +159,7 @@ public:
void serialize_to_wbuf(struct wbuf *wb) const; void serialize_to_wbuf(struct wbuf *wb) const;
static size_t msg_memsize_in_buffer(FT_MSG msg); static size_t msg_memsize_in_buffer(const ft_msg &msg);
private: private:
void _resize(size_t new_size); void _resize(size_t new_size);
......
This diff is collapsed.
...@@ -495,20 +495,20 @@ int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator ...@@ -495,20 +495,20 @@ int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator
void toku_ftnode_put_msg(const toku::comparator &cmp, ft_update_func update_fun, void toku_ftnode_put_msg(const toku::comparator &cmp, ft_update_func update_fun,
FTNODE node, int target_childnum, FTNODE node, int target_childnum,
FT_MSG msg, bool is_fresh, txn_gc_info *gc_info, const ft_msg &msg, bool is_fresh, txn_gc_info *gc_info,
size_t flow_deltas[], STAT64INFO stats_to_update); size_t flow_deltas[], STAT64INFO stats_to_update);
void toku_ft_bn_apply_msg_once(BASEMENTNODE bn, const FT_MSG msg, uint32_t idx, void toku_ft_bn_apply_msg_once(BASEMENTNODE bn, const ft_msg &msg, uint32_t idx,
uint32_t le_keylen, LEAFENTRY le, txn_gc_info *gc_info, uint32_t le_keylen, LEAFENTRY le, txn_gc_info *gc_info,
uint64_t *workdonep, STAT64INFO stats_to_update); uint64_t *workdonep, STAT64INFO stats_to_update);
void toku_ft_bn_apply_msg(const toku::comparator &cmp, ft_update_func update_fun, void toku_ft_bn_apply_msg(const toku::comparator &cmp, ft_update_func update_fun,
BASEMENTNODE bn, FT_MSG msg, txn_gc_info *gc_info, BASEMENTNODE bn, const ft_msg &msg, txn_gc_info *gc_info,
uint64_t *workdone, STAT64INFO stats_to_update); uint64_t *workdone, STAT64INFO stats_to_update);
void toku_ft_leaf_apply_msg(const toku::comparator &cmp, ft_update_func update_fun, void toku_ft_leaf_apply_msg(const toku::comparator &cmp, ft_update_func update_fun,
FTNODE node, int target_childnum, FTNODE node, int target_childnum,
FT_MSG msg, txn_gc_info *gc_info, const ft_msg &msg, txn_gc_info *gc_info,
uint64_t *workdone, STAT64INFO stats_to_update); uint64_t *workdone, STAT64INFO stats_to_update);
CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT ft); CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT ft);
......
...@@ -257,13 +257,11 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, ...@@ -257,13 +257,11 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
XIDS xids; XIDS xids;
xids = toku_txn_get_xids(txn); xids = toku_txn_get_xids(txn);
{ {
FT_MSG_S ftmsg = { type, ZERO_MSN, xids, const DBT *kdbt = key.len > 0 ? toku_fill_dbt(&key_dbt, key.data, key.len) :
.u = { .id = { (key.len > 0) toku_init_dbt(&key_dbt);
? toku_fill_dbt(&key_dbt, key.data, key.len) const DBT *vdbt = data ? toku_fill_dbt(&data_dbt, data->data, data->len) :
: toku_init_dbt(&key_dbt), toku_init_dbt(&data_dbt);
data ft_msg msg(kdbt, vdbt, type, ZERO_MSN, xids);
? toku_fill_dbt(&data_dbt, data->data, data->len)
: toku_init_dbt(&data_dbt) } } };
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger); TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger);
txn_manager_state txn_state_for_gc(txn_manager); txn_manager_state txn_state_for_gc(txn_manager);
...@@ -274,7 +272,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, ...@@ -274,7 +272,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
// no messages above us, we can implicitly promote uxrs based on this xid // no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate, oldest_referenced_xid_estimate,
!txn->for_recovery); !txn->for_recovery);
toku_ft_root_put_msg(ft, &ftmsg, &gc_info); toku_ft_root_put_msg(ft, msg, &gc_info);
if (reset_root_xid_that_created) { if (reset_root_xid_that_created) {
TXNID new_root_xid_that_created = xids_get_outermost_xid(xids); TXNID new_root_xid_that_created = xids_get_outermost_xid(xids);
toku_reset_root_xid_that_created(ft, new_root_xid_that_created); toku_reset_root_xid_that_created(ft, new_root_xid_that_created);
......
...@@ -136,10 +136,8 @@ test_enqueue(int n) { ...@@ -136,10 +136,8 @@ test_enqueue(int n) {
startmsn = msn; startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type) i; enum ft_msg_type type = (enum ft_msg_type) i;
DBT k, v; DBT k, v;
FT_MSG_S msg = { ft_msg msg(toku_fill_dbt(&k, thekey, thekeylen), toku_fill_dbt(&v, theval, thevallen), type, msn, xids);
type, msn, xids, .u = { .id = { toku_fill_dbt(&k, thekey, thekeylen), toku_fill_dbt(&v, theval, thevallen) } } msg_buffer.enqueue(msg, true, nullptr);
};
msg_buffer.enqueue(&msg, true, nullptr);
xids_destroy(&xids); xids_destroy(&xids);
toku_free(thekey); toku_free(thekey);
toku_free(theval); toku_free(theval);
...@@ -152,20 +150,20 @@ test_enqueue(int n) { ...@@ -152,20 +150,20 @@ test_enqueue(int n) {
checkit_fn(MSN smsn, bool v) checkit_fn(MSN smsn, bool v)
: startmsn(smsn), verbose(v), i(0) { : startmsn(smsn), verbose(v), i(0) {
} }
int operator()(FT_MSG msg, bool UU(is_fresh)) { int operator()(const ft_msg &msg, bool UU(is_fresh)) {
int thekeylen = i + 1; int thekeylen = i + 1;
int thevallen = i + 2; int thevallen = i + 2;
char *thekey = buildkey(thekeylen); char *thekey = buildkey(thekeylen);
char *theval = buildval(thevallen); char *theval = buildval(thevallen);
MSN msn = msg->msn; MSN msn = msg.msn();
enum ft_msg_type type = ft_msg_get_type(msg); enum ft_msg_type type = msg.type();
if (verbose) printf("checkit %d %d %" PRIu64 "\n", i, type, msn.msn); if (verbose) printf("checkit %d %d %" PRIu64 "\n", i, type, msn.msn);
assert(msn.msn == startmsn.msn + i); assert(msn.msn == startmsn.msn + i);
assert((int) ft_msg_get_keylen(msg) == thekeylen); assert(memcmp(ft_msg_get_key(msg), thekey, ft_msg_get_keylen(msg)) == 0); assert((int) msg.kdbt()->size == thekeylen); assert(memcmp(msg.kdbt()->data, thekey, msg.kdbt()->size) == 0);
assert((int) ft_msg_get_vallen(msg) == thevallen); assert(memcmp(ft_msg_get_val(msg), theval, ft_msg_get_vallen(msg)) == 0); assert((int) msg.vdbt()->size == thevallen); assert(memcmp(msg.vdbt()->data, theval, msg.vdbt()->size) == 0);
assert(i % 256 == (int)type); assert(i % 256 == (int)type);
assert((TXNID)i==xids_get_innermost_xid(ft_msg_get_xids(msg))); assert((TXNID)i==xids_get_innermost_xid(msg.xids()));
i += 1; i += 1;
toku_free(thekey); toku_free(thekey);
toku_free(theval); toku_free(theval);
......
...@@ -125,8 +125,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -125,8 +125,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u = {.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn; leafnode->max_msn_applied_to_node_on_disk = msn;
......
...@@ -131,18 +131,18 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val ...@@ -131,18 +131,18 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
ft->ft->h->max_msn_in_ft = msn; ft->ft->h->max_msn_in_ft = msn;
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, &msg, &gc_info, nullptr, nullptr); toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg, &gc_info, nullptr, nullptr);
{ {
int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair); int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair);
assert(r==0); assert(r==0);
assert(pair.call_count==1); assert(pair.call_count==1);
} }
FT_MSG_S badmsg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} }; ft_msg badmsg(&thekey, &badval, FT_INSERT, msn, xids_get_root_xids());
toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, &badmsg, &gc_info, nullptr, nullptr); toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, badmsg, &gc_info, nullptr, nullptr);
// message should be rejected for duplicate msn, row should still have original val // message should be rejected for duplicate msn, row should still have original val
{ {
...@@ -154,8 +154,8 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val ...@@ -154,8 +154,8 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val
// now verify that message with proper msn gets through // now verify that message with proper msn gets through
msn = next_dummymsn(); msn = next_dummymsn();
ft->ft->h->max_msn_in_ft = msn; ft->ft->h->max_msn_in_ft = msn;
FT_MSG_S msg2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} }; ft_msg msg2(&thekey, &val2, FT_INSERT, msn, xids_get_root_xids());
toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, &msg2, &gc_info, nullptr, nullptr); toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg2, &gc_info, nullptr, nullptr);
// message should be accepted, val should have new value // message should be accepted, val should have new value
{ {
...@@ -166,8 +166,8 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val ...@@ -166,8 +166,8 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val
// now verify that message with lesser (older) msn is rejected // now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10; msn.msn = msn.msn - 10;
FT_MSG_S msg3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }}; ft_msg msg3(&thekey, &badval, FT_INSERT, msn, xids_get_root_xids());
toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, &msg3, &gc_info, nullptr, nullptr); toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg3, &gc_info, nullptr, nullptr);
// message should be rejected, val should still have value in pair2 // message should be rejected, val should still have value in pair2
{ {
......
This diff is collapsed.
...@@ -111,17 +111,6 @@ static void add_committed_entry(ULE ule, DBT *val, TXNID xid) { ...@@ -111,17 +111,6 @@ static void add_committed_entry(ULE ule, DBT *val, TXNID xid) {
ule->uxrs[index].xid = xid; ule->uxrs[index].xid = xid;
} }
static FT_MSG_S
msg_init(enum ft_msg_type type, XIDS xids,
DBT *key, DBT *val) {
FT_MSG_S msg;
msg.type = type;
msg.xids = xids;
msg.u.id.key = key;
msg.u.id.val = val;
return msg;
}
//Test all the different things that can happen to a //Test all the different things that can happen to a
//committed leafentry (logical equivalent of a committed insert). //committed leafentry (logical equivalent of a committed insert).
static void static void
...@@ -161,41 +150,45 @@ run_test(void) { ...@@ -161,41 +150,45 @@ run_test(void) {
add_committed_entry(&ule_initial, &val, 10); add_committed_entry(&ule_initial, &val, 10);
// now do the application of xids to the ule // now do the application of xids to the ule
FT_MSG_S msg;
// do a commit // do a commit
msg = msg_init(FT_COMMIT_ANY, msg_xids_2, &key, &val); {
test_msg_modify_ule(&ule_initial, &msg); ft_msg msg(&key, &val, FT_COMMIT_ANY, ZERO_MSN, msg_xids_2);
assert(ule->num_cuxrs == 2); test_msg_modify_ule(&ule_initial, msg);
assert(ule->uxrs[0].xid == TXNID_NONE); assert(ule->num_cuxrs == 2);
assert(ule->uxrs[1].xid == 10); assert(ule->uxrs[0].xid == TXNID_NONE);
assert(ule->uxrs[0].valp == &val_data_one); assert(ule->uxrs[1].xid == 10);
assert(ule->uxrs[1].valp == &val_data_two); assert(ule->uxrs[0].valp == &val_data_one);
assert(ule->uxrs[1].valp == &val_data_two);
}
// do an abort // do an abort
msg = msg_init(FT_ABORT_ANY, msg_xids_2, &key, &val); {
test_msg_modify_ule(&ule_initial, &msg); ft_msg msg(&key, &val, FT_ABORT_ANY, ZERO_MSN, msg_xids_2);
assert(ule->num_cuxrs == 2); test_msg_modify_ule(&ule_initial, msg);
assert(ule->uxrs[0].xid == TXNID_NONE); assert(ule->num_cuxrs == 2);
assert(ule->uxrs[1].xid == 10); assert(ule->uxrs[0].xid == TXNID_NONE);
assert(ule->uxrs[0].valp == &val_data_one); assert(ule->uxrs[1].xid == 10);
assert(ule->uxrs[1].valp == &val_data_two); assert(ule->uxrs[0].valp == &val_data_one);
assert(ule->uxrs[1].valp == &val_data_two);
}
// do an insert // do an insert
val.data = &val_data_three; val.data = &val_data_three;
msg = msg_init(FT_INSERT, msg_xids_2, &key, &val); {
test_msg_modify_ule(&ule_initial, &msg); ft_msg msg(&key, &val, FT_INSERT, ZERO_MSN, msg_xids_2);
// now that message applied, verify that things are good test_msg_modify_ule(&ule_initial, msg);
assert(ule->num_cuxrs == 2); // now that message applied, verify that things are good
assert(ule->num_puxrs == 2); assert(ule->num_cuxrs == 2);
assert(ule->uxrs[0].xid == TXNID_NONE); assert(ule->num_puxrs == 2);
assert(ule->uxrs[1].xid == 10); assert(ule->uxrs[0].xid == TXNID_NONE);
assert(ule->uxrs[2].xid == 1000); assert(ule->uxrs[1].xid == 10);
assert(ule->uxrs[3].xid == 10); assert(ule->uxrs[2].xid == 1000);
assert(ule->uxrs[0].valp == &val_data_one); assert(ule->uxrs[3].xid == 10);
assert(ule->uxrs[1].valp == &val_data_two); assert(ule->uxrs[0].valp == &val_data_one);
assert(ule->uxrs[2].type == XR_PLACEHOLDER); assert(ule->uxrs[1].valp == &val_data_two);
assert(ule->uxrs[3].valp == &val_data_three); assert(ule->uxrs[2].type == XR_PLACEHOLDER);
assert(ule->uxrs[3].valp == &val_data_three);
}
xids_destroy(&msg_xids_2); xids_destroy(&msg_xids_2);
xids_destroy(&msg_xids_1); xids_destroy(&msg_xids_1);
......
This diff is collapsed.
...@@ -127,9 +127,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -127,9 +127,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
// apply an insert to the leaf node // apply an insert to the leaf node
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// Create bad tree (don't do following): // Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn; // leafnode->max_msn_applied_to_node = msn;
......
...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -116,9 +116,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -116,9 +116,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -116,9 +116,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -116,9 +116,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -118,9 +118,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -118,9 +118,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) ...@@ -115,9 +115,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; ft_msg msg(&thekey, &theval, FT_INSERT, msn, xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL); toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -280,14 +280,14 @@ static void dump_node(int fd, BLOCKNUM blocknum, FT ft) { ...@@ -280,14 +280,14 @@ static void dump_node(int fd, BLOCKNUM blocknum, FT ft) {
} }
if (do_dump_data) { if (do_dump_data) {
struct dump_data_fn { struct dump_data_fn {
int operator()(FT_MSG msg, bool UU(is_fresh)) { int operator()(const ft_msg &msg, bool UU(is_fresh)) {
enum ft_msg_type type = (enum ft_msg_type) msg->type; enum ft_msg_type type = (enum ft_msg_type) msg.type();
MSN msn = msg->msn; MSN msn = msg.msn();
XIDS xids = msg->xids; XIDS xids = msg.xids();
const void *key = ft_msg_get_key(msg); const void *key = msg.kdbt()->data;
const void *data = ft_msg_get_val(msg); const void *data = msg.vdbt()->data;
ITEMLEN keylen = ft_msg_get_keylen(msg); ITEMLEN keylen = msg.kdbt()->size;
ITEMLEN datalen = ft_msg_get_vallen(msg); ITEMLEN datalen = msg.vdbt()->size;
printf(" msn=%" PRIu64 " (0x%" PRIx64 ") ", msn.msn, msn.msn); printf(" msn=%" PRIu64 " (0x%" PRIx64 ") ", msn.msn, msn.msn);
printf(" TYPE="); printf(" TYPE=");
switch (type) { switch (type) {
......
...@@ -135,7 +135,7 @@ typedef struct ule { // unpacked leaf entry ...@@ -135,7 +135,7 @@ typedef struct ule { // unpacked leaf entry
void test_msg_modify_ule(ULE ule, FT_MSG msg); void test_msg_modify_ule(ULE ule, const ft_msg &msg);
////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////
......
...@@ -105,7 +105,7 @@ PATENT RIGHTS GRANT: ...@@ -105,7 +105,7 @@ PATENT RIGHTS GRANT:
#include <toku_portability.h> #include <toku_portability.h>
#include "ft/fttypes.h" #include "ft/fttypes.h"
#include "ft/ft-internal.h" #include "ft/ft-internal.h"
#include "ft/ft_msg.h" #include "ft/msg.h"
#include "ft/leafentry.h" #include "ft/leafentry.h"
#include "ft/logger.h" #include "ft/logger.h"
#include "ft/txn.h" #include "ft/txn.h"
...@@ -216,7 +216,7 @@ const UXR_S committed_delete = { ...@@ -216,7 +216,7 @@ const UXR_S committed_delete = {
// Local functions: // Local functions:
static void msg_init_empty_ule(ULE ule); static void msg_init_empty_ule(ULE ule);
static void msg_modify_ule(ULE ule, FT_MSG msg); static void msg_modify_ule(ULE ule, const ft_msg &msg);
static void ule_init_empty_ule(ULE ule); static void ule_init_empty_ule(ULE ule);
static void ule_do_implicit_promotions(ULE ule, XIDS xids); static void ule_do_implicit_promotions(ULE ule, XIDS xids);
static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid); static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid);
...@@ -496,7 +496,7 @@ enum { ...@@ -496,7 +496,7 @@ enum {
// Otehrwise the new_leafentry_p points at the new leaf entry. // Otehrwise the new_leafentry_p points at the new leaf entry.
// As of October 2011, this function always returns 0. // As of October 2011, this function always returns 0.
void void
toku_le_apply_msg(FT_MSG msg, toku_le_apply_msg(const ft_msg &msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data. LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
...@@ -510,7 +510,7 @@ toku_le_apply_msg(FT_MSG msg, ...@@ -510,7 +510,7 @@ toku_le_apply_msg(FT_MSG msg,
int64_t oldnumbytes = 0; int64_t oldnumbytes = 0;
int64_t newnumbytes = 0; int64_t newnumbytes = 0;
uint64_t oldmemsize = 0; uint64_t oldmemsize = 0;
uint32_t keylen = ft_msg_get_keylen(msg); uint32_t keylen = msg.kdbt()->size;
if (old_leafentry == NULL) { if (old_leafentry == NULL) {
msg_init_empty_ule(&ule); msg_init_empty_ule(&ule);
...@@ -555,7 +555,7 @@ toku_le_apply_msg(FT_MSG msg, ...@@ -555,7 +555,7 @@ toku_le_apply_msg(FT_MSG msg,
&ule, // create packed leafentry &ule, // create packed leafentry
data_buffer, data_buffer,
idx, idx,
ft_msg_get_key(msg), // contract of this function is caller has this set, always msg.kdbt()->data, // contract of this function is caller has this set, always
keylen, // contract of this function is caller has this set, always keylen, // contract of this function is caller has this set, always
old_keylen, old_keylen,
oldmemsize, oldmemsize,
...@@ -693,10 +693,10 @@ msg_init_empty_ule(ULE ule) { ...@@ -693,10 +693,10 @@ msg_init_empty_ule(ULE ule) {
// Purpose is to modify the unpacked leafentry in our private workspace. // Purpose is to modify the unpacked leafentry in our private workspace.
// //
static void static void
msg_modify_ule(ULE ule, FT_MSG msg) { msg_modify_ule(ULE ule, const ft_msg &msg) {
XIDS xids = ft_msg_get_xids(msg); XIDS xids = msg.xids();
invariant(xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS); invariant(xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS);
enum ft_msg_type type = ft_msg_get_type(msg); enum ft_msg_type type = msg.type();
if (type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE) { if (type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE) {
ule_do_implicit_promotions(ule, xids); ule_do_implicit_promotions(ule, xids);
} }
...@@ -709,9 +709,9 @@ msg_modify_ule(ULE ule, FT_MSG msg) { ...@@ -709,9 +709,9 @@ msg_modify_ule(ULE ule, FT_MSG msg) {
//fall through to FT_INSERT on purpose. //fall through to FT_INSERT on purpose.
} }
case FT_INSERT: { case FT_INSERT: {
uint32_t vallen = ft_msg_get_vallen(msg); uint32_t vallen = msg.vdbt()->size;
invariant(IS_VALID_LEN(vallen)); invariant(IS_VALID_LEN(vallen));
void * valp = ft_msg_get_val(msg); void * valp = msg.vdbt()->data;
ule_apply_insert(ule, xids, vallen, valp); ule_apply_insert(ule, xids, vallen, valp);
break; break;
} }
...@@ -738,17 +738,15 @@ msg_modify_ule(ULE ule, FT_MSG msg) { ...@@ -738,17 +738,15 @@ msg_modify_ule(ULE ule, FT_MSG msg) {
assert(false); // These messages don't get this far. Instead they get translated (in setval_fun in do_update) into FT_INSERT messages. assert(false); // These messages don't get this far. Instead they get translated (in setval_fun in do_update) into FT_INSERT messages.
break; break;
default: default:
assert(false /* illegal FT_MSG.type */); assert(false); /* illegal ft msg type */
break; break;
} }
} }
void void test_msg_modify_ule(ULE ule, const ft_msg &msg){
test_msg_modify_ule(ULE ule, FT_MSG msg){
msg_modify_ule(ule,msg); msg_modify_ule(ule,msg);
} }
static void ule_optimize(ULE ule, XIDS xids) { static void ule_optimize(ULE ule, XIDS xids) {
if (ule->num_puxrs) { if (ule->num_puxrs) {
TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid; // outermost uncommitted TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid; // outermost uncommitted
......
...@@ -90,23 +90,30 @@ PATENT RIGHTS GRANT: ...@@ -90,23 +90,30 @@ PATENT RIGHTS GRANT:
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <db.h> #include <db.h>
#include <memory.h>
#include <string.h> #include <string.h>
#include <fttypes.h>
#include "ybt.h" #include "portability/memory.h"
#include "ft/fttypes.h"
#include "ft/ybt.h"
DBT * DBT *
toku_init_dbt(DBT *ybt) { toku_init_dbt(DBT *dbt) {
memset(ybt, 0, sizeof(*ybt)); memset(dbt, 0, sizeof(*dbt));
return ybt; return dbt;
}
DBT
toku_empty_dbt(void) {
static const DBT empty_dbt = { .data = 0, .size = 0, .ulen = 0, .flags = 0 };
return empty_dbt;
} }
DBT * DBT *
toku_init_dbt_flags(DBT *ybt, uint32_t flags) { toku_init_dbt_flags(DBT *dbt, uint32_t flags) {
toku_init_dbt(ybt); toku_init_dbt(dbt);
ybt->flags = flags; dbt->flags = flags;
return ybt; return dbt;
} }
DBT_ARRAY * DBT_ARRAY *
......
...@@ -102,6 +102,9 @@ PATENT RIGHTS GRANT: ...@@ -102,6 +102,9 @@ PATENT RIGHTS GRANT:
DBT *toku_init_dbt(DBT *); DBT *toku_init_dbt(DBT *);
// returns: an initialized but empty dbt (for which toku_dbt_is_empty() is true)
DBT toku_empty_dbt(void);
DBT *toku_init_dbt_flags(DBT *, uint32_t flags); DBT *toku_init_dbt_flags(DBT *, uint32_t flags);
void toku_destroy_dbt(DBT *); void toku_destroy_dbt(DBT *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment