Commit adbd5ee0 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:4241] use DBTs instead of kv_pairs. closes #4241

removes kv-pair.h completely. now childkeys are DBTs.

two new DBT functions help this. toku_clone_dbt memdup's the data.
toku_copyref_dbt steals the existing data and does not malloc.


git-svn-id: file:///svn/toku/tokudb@43495 c7de825b-a66e-492c-adef-691d508d4ae1
parent 67feab1e
......@@ -378,10 +378,8 @@ ct_maybe_merge_child(struct flusher_advice *fa,
ctme.is_last_child = FALSE;
pivot_to_save = childnum;
}
struct kv_pair *pivot = parent->childkeys[pivot_to_save];
size_t pivotlen = kv_pair_keylen(pivot);
char *buf = toku_xmemdup(kv_pair_key_const(pivot), pivotlen);
toku_fill_dbt(&ctme.target_key, buf, pivotlen);
const DBT *pivot = &parent->childkeys[pivot_to_save];
toku_clone_dbt(&ctme.target_key, *pivot);
// at this point, ctme is properly setup, now we can do the merge
struct flusher_advice new_fa;
......@@ -420,7 +418,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
(void) __sync_fetch_and_add(&STATUS_VALUE(BRT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), -1);
toku_free(buf);
toku_free(ctme.target_key.data);
}
}
......@@ -495,7 +493,7 @@ handle_split_of_child(
int i;
printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(i=0; i<node->n_children-1; i++) printf(" %s", (char*)node->childkeys[i]);
for(i=0; i<node->n_children-1; i++) printf(" %s", (char *) node->childkeys[i].data);
printf("\n");
}
)
......@@ -524,21 +522,19 @@ handle_split_of_child(
// Slide the keys over
{
struct kv_pair *pivot = splitk->data;
for (cnum=node->n_children-2; cnum>childnum; cnum--) {
node->childkeys[cnum] = node->childkeys[cnum-1];
toku_copyref_dbt(&node->childkeys[cnum], node->childkeys[cnum-1]);
}
//if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
node->childkeys[childnum]= pivot;
node->totalchildkeylens += toku_brt_pivot_key_len(pivot);
toku_copyref_dbt(&node->childkeys[childnum], *splitk);
node->totalchildkeylens += splitk->size;
}
WHEN_NOT_GCOV(
if (toku_brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
for(i=0; i<node->n_children-2; i++) printf(" %s", (char*)node->childkeys[i]);
for(i=0; i<node->n_children-2; i++) printf(" %s", (char*)node->childkeys[i].data);
printf("\n");
}
)
......@@ -876,15 +872,15 @@ brtleaf_split(
int base_index = (split_on_boundary ? last_bn_on_left + 1 : last_bn_on_left);
// make pivots in B
for (int i=0; i < num_children_in_b-1; i++) {
B->childkeys[i] = node->childkeys[i+base_index];
B->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i+base_index]);
node->totalchildkeylens -= toku_brt_pivot_key_len(node->childkeys[i+base_index]);
node->childkeys[i+base_index] = NULL;
toku_copyref_dbt(&B->childkeys[i], node->childkeys[i+base_index]);
B->totalchildkeylens += node->childkeys[i+base_index].size;
node->totalchildkeylens -= node->childkeys[i+base_index].size;
toku_init_dbt(&node->childkeys[i+base_index]);
}
if (split_on_boundary) {
// destroy the extra childkey between the nodes, we'll
// recreate it in splitk below
toku_free(node->childkeys[last_bn_on_left]);
toku_free(node->childkeys[last_bn_on_left].data);
}
REALLOC_N(num_children_in_node, node->bp);
REALLOC_N(num_children_in_node-1, node->childkeys);
......@@ -896,9 +892,9 @@ brtleaf_split(
int r=toku_omt_fetch(BLB_BUFFER(node, last_bn_on_left), toku_omt_size(BLB_BUFFER(node, last_bn_on_left))-1, &lev);
assert_zero(r); // that fetch should have worked.
LEAFENTRY le=lev;
splitk->size = le_keylen(le);
splitk->data = kv_pair_malloc(le_key(le), le_keylen(le), 0, 0);
splitk->flags=0;
uint32_t keylen;
void *key = le_key_and_len(le, &keylen);
toku_fill_dbt(splitk, toku_xmemdup(key, keylen), keylen);
}
verify_all_in_mempool(node);
......@@ -959,19 +955,18 @@ brt_nonleaf_split(
{
assert(i>0);
if (i>n_children_in_a) {
B->childkeys[targchild-1] = node->childkeys[i-1];
B->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i-1]);
node->totalchildkeylens -= toku_brt_pivot_key_len(node->childkeys[i-1]);
node->childkeys[i-1] = 0;
toku_copyref_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]);
B->totalchildkeylens += node->childkeys[i-1].size;
node->totalchildkeylens -= node->childkeys[i-1].size;
toku_init_dbt(&node->childkeys[i-1]);
}
}
}
node->n_children=n_children_in_a;
splitk->data = (void*)(node->childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(node->childkeys[n_children_in_a-1]);
node->totalchildkeylens -= toku_brt_pivot_key_len(node->childkeys[n_children_in_a-1]);
toku_copyref_dbt(splitk, node->childkeys[n_children_in_a-1]);
node->totalchildkeylens -= node->childkeys[n_children_in_a-1].size;
REALLOC_N(n_children_in_a, node->bp);
REALLOC_N(n_children_in_a-1, node->childkeys);
......@@ -1137,8 +1132,10 @@ merge_leaf_nodes(BRTNODE a, BRTNODE b)
BLB_BUFFER(a, a->n_children-1),
toku_omt_size(BLB_BUFFER(a, a->n_children-1))-1
);
a->childkeys[a->n_children-1] = kv_pair_malloc(le_key(le), le_keylen(le), 0, 0);
a->totalchildkeylens += le_keylen(le);
uint32_t keylen;
void *key = le_key_and_len(le, &keylen);
toku_fill_dbt(&a->childkeys[a->n_children-1], toku_xmemdup(key, keylen), keylen);
a->totalchildkeylens += keylen;
}
u_int32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
......@@ -1146,8 +1143,8 @@ merge_leaf_nodes(BRTNODE a, BRTNODE b)
a->bp[i+offset] = b->bp[i];
memset(&b->bp[i],0,sizeof(b->bp[0]));
if (i < (b->n_children-1)) {
a->childkeys[i+offset] = b->childkeys[i];
b->childkeys[i] = NULL;
toku_copyref_dbt(&a->childkeys[i+offset], b->childkeys[i]);
toku_init_dbt(&b->childkeys[i]);
}
}
a->totalchildkeylens += b->totalchildkeylens;
......@@ -1163,19 +1160,17 @@ static int
balance_leaf_nodes(
BRTNODE a,
BRTNODE b,
struct kv_pair **splitk)
DBT *splitk)
// Effect:
// If b is bigger then move stuff from b to a until b is the smaller.
// If a is bigger then move stuff from a to b until a is the smaller.
{
STATUS_VALUE(BRT_FLUSHER_BALANCE_LEAF)++;
DBT splitk_dbt;
// first merge all the data into a
merge_leaf_nodes(a,b);
// now split them
// because we are not creating a new node, we can pass in no dependent nodes
brtleaf_split(NULL, a, &a, &b, &splitk_dbt, FALSE, 0, NULL);
*splitk = splitk_dbt.data;
brtleaf_split(NULL, a, &a, &b, splitk, FALSE, 0, NULL);
return 0;
}
......@@ -1184,10 +1179,10 @@ static void
maybe_merge_pinned_leaf_nodes(
BRTNODE a,
BRTNODE b,
struct kv_pair *parent_splitk,
DBT *parent_splitk,
BOOL *did_merge,
BOOL *did_rebalance,
struct kv_pair **splitk)
DBT *splitk)
// Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = TRUE.
// (We do this if the resulting node is not fissible)
// or distribute the leafentries evenly between a and b, and set *did_rebalance = TRUE.
......@@ -1201,11 +1196,11 @@ maybe_merge_pinned_leaf_nodes(
if (sizea*4 > a->nodesize && sizeb*4 > a->nodesize) {
// no need to do anything if both are more than 1/4 of a node.
*did_rebalance = FALSE;
*splitk = parent_splitk;
toku_clone_dbt(splitk, *parent_splitk);
return;
}
// one is less than 1/4 of a node, and together they are more than 3/4 of a node.
toku_free(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk (if we don't merge) we'll malloc a new one.
toku_free(parent_splitk->data); // We don't need the parent_splitk any more. If we need a splitk (if we don't merge) we'll malloc a new one.
*did_rebalance = TRUE;
int r = balance_leaf_nodes(a, b, splitk);
assert(r==0);
......@@ -1213,24 +1208,24 @@ maybe_merge_pinned_leaf_nodes(
// we are merging them.
*did_merge = TRUE;
*did_rebalance = FALSE;
*splitk = 0;
toku_free(parent_splitk); // if we are merging, the splitk gets freed.
toku_init_dbt(splitk);
toku_free(parent_splitk->data); // if we are merging, the splitk gets freed.
merge_leaf_nodes(a, b);
}
}
static void
maybe_merge_pinned_nonleaf_nodes(
struct kv_pair *parent_splitk,
const DBT *parent_splitk,
BRTNODE a,
BRTNODE b,
BOOL *did_merge,
BOOL *did_rebalance,
struct kv_pair **splitk)
DBT *splitk)
{
toku_assert_entire_node_in_memory(a);
toku_assert_entire_node_in_memory(b);
assert(parent_splitk);
assert(parent_splitk->data);
int old_n_children = a->n_children;
int new_n_children = old_n_children + b->n_children;
XREALLOC_N(new_n_children, a->bp);
......@@ -1240,11 +1235,11 @@ maybe_merge_pinned_nonleaf_nodes(
memset(b->bp,0,b->n_children*sizeof(b->bp[0]));
XREALLOC_N(new_n_children-1, a->childkeys);
a->childkeys[old_n_children-1] = parent_splitk;
toku_copyref_dbt(&a->childkeys[old_n_children-1], *parent_splitk);
memcpy(a->childkeys + old_n_children,
b->childkeys,
(b->n_children-1)*sizeof(b->childkeys[0]));
a->totalchildkeylens += b->totalchildkeylens + toku_brt_pivot_key_len(parent_splitk);
a->totalchildkeylens += b->totalchildkeylens + parent_splitk->size;
a->n_children = new_n_children;
b->totalchildkeylens = 0;
......@@ -1255,7 +1250,7 @@ maybe_merge_pinned_nonleaf_nodes(
*did_merge = TRUE;
*did_rebalance = FALSE;
*splitk = NULL;
toku_init_dbt(splitk);
STATUS_VALUE(BRT_FLUSHER_MERGE_NONLEAF)++;
}
......@@ -1263,12 +1258,12 @@ maybe_merge_pinned_nonleaf_nodes(
static void
maybe_merge_pinned_nodes(
BRTNODE parent,
struct kv_pair *parent_splitk,
DBT *parent_splitk,
BRTNODE a,
BRTNODE b,
BOOL *did_merge,
BOOL *did_rebalance,
struct kv_pair **splitk)
DBT *splitk)
// Effect: either merge a and b into one node (merge them into a) and set *did_merge = TRUE.
// (We do this if the resulting node is not fissible)
// or distribute a and b evenly and set *did_merge = FALSE and *did_rebalance = TRUE
......@@ -1390,15 +1385,24 @@ brt_merge_child(
BOOL did_merge, did_rebalance;
{
struct kv_pair *splitk_kvpair = 0;
struct kv_pair *old_split_key = node->childkeys[childnuma];
unsigned int deleted_size = toku_brt_pivot_key_len(old_split_key);
maybe_merge_pinned_nodes(node, node->childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk_kvpair);
if (childa->height>0) { int i; for (i=0; i+1<childa->n_children; i++) assert(childa->childkeys[i]); }
DBT splitk;
toku_init_dbt(&splitk);
DBT *old_split_key = &node->childkeys[childnuma];
unsigned int deleted_size = old_split_key->size;
maybe_merge_pinned_nodes(node, &node->childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk);
if (childa->height>0) {
for (int i=0; i+1<childa->n_children; i++) {
assert(childa->childkeys[i].data);
}
}
//toku_verify_estimates(t,childa);
// the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
*did_react = (BOOL)(did_merge || did_rebalance);
if (did_merge) assert(!splitk_kvpair); else assert(splitk_kvpair);
if (did_merge) {
assert(!splitk.data);
} else {
assert(splitk.data);
}
node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
......@@ -1418,10 +1422,9 @@ brt_merge_child(
toku_mark_node_dirty(childa); // just to make sure
toku_mark_node_dirty(childb); // just to make sure
} else {
assert(splitk_kvpair);
// If we didn't merge the nodes, then we need the correct pivot.
node->childkeys[childnuma] = splitk_kvpair;
node->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[childnuma]);
toku_copyref_dbt(&node->childkeys[childnuma], splitk);
node->totalchildkeylens += node->childkeys[childnuma].size;
toku_mark_node_dirty(node);
}
}
......
......@@ -91,16 +91,13 @@ static void
hot_set_key(DBT *key, BRTNODE parent, int childnum)
{
// assert that childnum is less than number of children - 1.
DBT pivot;
struct kv_pair *pair;
pair = parent->childkeys[childnum];
pivot = kv_pair_key_to_dbt(pair);
DBT *pivot = &parent->childkeys[childnum];
void *data = key->data;
u_int32_t size = pivot.size;
u_int32_t size = pivot->size;
data = toku_xrealloc(data, size);
memcpy(data, pivot.data, size);
memcpy(data, pivot->data, size);
toku_fill_dbt(key, data, size);
}
......
......@@ -23,7 +23,6 @@
#include "fifo.h"
#include "brt.h"
#include "toku_list.h"
#include "kv-pair.h"
#include "omt.h"
#include "leafentry.h"
#include "block_table.h"
......@@ -103,8 +102,7 @@ struct toku_fifo_entry_key_msn_heaviside_extra {
DESCRIPTOR desc;
brt_compare_func cmp;
FIFO fifo;
bytevec key;
ITEMLEN keylen;
const DBT *key;
MSN msn;
};
......@@ -252,7 +250,7 @@ struct brtnode {
int n_children; //for internal nodes, if n_children==TREE_FANOUT+1 then the tree needs to be rebalanced.
// for leaf nodes, represents number of basement nodes
unsigned int totalchildkeylens;
struct kv_pair **childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
DBT *childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Child 1's keys are > childkeys[0]. */
// array of size n_children, consisting of brtnode partitions
// each one is associated with a child
......@@ -493,7 +491,7 @@ void toku_assert_entire_node_in_memory(BRTNODE node);
void bring_node_fully_into_memory(BRTNODE node, struct brt_header* h);
// append a child node to a parent node
void toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize);
void toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, const DBT *pivotkey);
// append a cmd to a nonleaf node child buffer
void toku_brt_append_to_child_buffer(brt_compare_func compare_fun, DESCRIPTOR desc, BRTNODE node, int childnum, enum brt_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val);
......@@ -555,9 +553,6 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(struct brt_
static const BRTNODE null_brtnode=0;
// How long is the pivot key?
unsigned int toku_brt_pivot_key_len (struct kv_pair *);
// Values to be used to update brtcursor if a search is successful.
struct brt_cursor_leaf_info_to_be {
u_int32_t index;
......@@ -704,8 +699,8 @@ struct ancestors {
ANCESTORS next; // Parent of this node (so next->node.(next->childnum) refers to this node).
};
struct pivot_bounds {
struct kv_pair const * const lower_bound_exclusive;
struct kv_pair const * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
const DBT * const lower_bound_exclusive;
const DBT * const upper_bound_inclusive; // NULL to indicate negative or positive infinity (which are in practice exclusive since there are now transfinite keys in messages).
};
// FIXME needs toku prefix
......@@ -795,8 +790,8 @@ int
toku_verify_brtnode (BRT brt,
MSN rootmsn, MSN parentmsn,
BRTNODE node, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
struct kv_pair *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
int (*progress_callback)(void *extra, float progress), void *progress_extra,
int recurse, int verbose, int keep_going_on_failure)
__attribute__ ((warn_unused_result));
......
......@@ -419,7 +419,7 @@ static void serialize_brtnode_info(BRTNODE node,
wbuf_nocrc_int (&wb, node->height);
// pivot information
for (int i = 0; i < node->n_children-1; i++) {
wbuf_nocrc_bytes(&wb, kv_pair_key(node->childkeys[i]), toku_brt_pivot_key_len(node->childkeys[i]));
wbuf_nocrc_bytes(&wb, node->childkeys[i].data, node->childkeys[i].size);
}
// child blocks, only for internal nodes
if (node->height > 0) {
......@@ -599,14 +599,13 @@ rebalance_brtnode_leaf(BRTNODE node, unsigned int basementnodesize)
// first the pivots
for (int i = 0; i < num_pivots; i++) {
LEAFENTRY curr_le_pivot = leafpointers[new_pivots[i]];
node->childkeys[i] = kv_pair_malloc(
le_key(curr_le_pivot),
le_keylen(curr_le_pivot),
0,
0
);
assert(node->childkeys[i]);
node->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i]);
uint32_t keylen;
void *key = le_key_and_len(curr_le_pivot, &keylen);
toku_fill_dbt(&node->childkeys[i],
toku_xmemdup(key, keylen),
keylen);
assert(node->childkeys[i].data);
node->totalchildkeylens += keylen;
}
uint32_t baseindex_this_bn = 0;
......@@ -1285,8 +1284,10 @@ deserialize_brtnode_info(
bytevec childkeyptr;
unsigned int cklen;
rbuf_bytes(&rb, &childkeyptr, &cklen);
node->childkeys[i] = kv_pair_malloc((void*)childkeyptr, cklen, 0, 0);
node->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i]);
toku_fill_dbt(&node->childkeys[i],
toku_xmemdup(childkeyptr, cklen),
cklen);
node->totalchildkeylens += cklen;
}
}
else {
......@@ -1724,11 +1725,10 @@ deserialize_and_upgrade_internal_node(BRTNODE node,
bytevec childkeyptr;
unsigned int cklen;
rbuf_bytes(rb, &childkeyptr, &cklen);
node->childkeys[i] = kv_pair_malloc((void*)childkeyptr,
cklen,
0,
0);
node->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i]);
toku_fill_dbt(&node->childkeys[i],
toku_xmemdup(childkeyptr, cklen),
cklen);
node->totalchildkeylens += cklen;
}
// Create space for the child node buffers (a.k.a. partitions).
......@@ -3226,11 +3226,6 @@ exit:
return e;
}
unsigned int
toku_brt_pivot_key_len (struct kv_pair *pk) {
return kv_pair_keylen(pk);
}
int
toku_db_badformat(void) {
return DB_BADFORMAT;
......
......@@ -42,7 +42,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum, int n_children, char **keys
}
for (i=0; i+1<n_children; i++) {
node->childkeys[i] = kv_pair_malloc(keys[i], keylens[i], 0, 0);
toku_fill_dbt(&node->childkeys[i], toku_xmemdup(keys[i], keylens[i]), keylens[i]);
node->totalchildkeylens += keylens[i];
}
......@@ -63,7 +63,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child
BP_STATE(node,i) = PT_AVAIL;
}
for (i=0; i+1<n_children; i++) {
node->childkeys[i] = kv_pair_malloc(keys[i], keylens[i], 0, 0);
toku_fill_dbt(&node->childkeys[i], toku_xmemdup(keys[i], keylens[i]), keylens[i]);
node->totalchildkeylens += keylens[i];
}
*blocknum = node->thisnodename;
......
......@@ -16,12 +16,9 @@
#include "brt-cachetable-wrappers.h"
static int
compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) {
DBT x,y;
compare_pairs (BRT brt, const DBT *a, const DBT *b) {
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db,
toku_fill_dbt(&x, kv_pair_key(a), kv_pair_keylen(a)),
toku_fill_dbt(&y, kv_pair_key(b), kv_pair_keylen(b)));
int cmp = brt->compare_fun(&db, a, b);
return cmp;
}
......@@ -36,31 +33,27 @@ compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) {
}
static int
compare_pair_to_leafentry (BRT brt, struct kv_pair *a, LEAFENTRY b) {
DBT x,y;
compare_pair_to_leafentry (BRT brt, const DBT *a, LEAFENTRY b) {
DBT y;
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db,
toku_fill_dbt(&x, kv_pair_key(a), kv_pair_keylen(a)),
toku_fill_dbt(&y, le_key(b), le_keylen(b)));
int cmp = brt->compare_fun(&db, a, toku_fill_dbt(&y, le_key(b), le_keylen(b)));
return cmp;
}
static int
compare_pair_to_key (BRT brt, struct kv_pair *a, bytevec key, ITEMLEN keylen) {
DBT x, y;
compare_pair_to_key (BRT brt, const DBT *a, bytevec key, ITEMLEN keylen) {
DBT y;
FAKE_DB(db, &brt->h->cmp_descriptor);
int cmp = brt->compare_fun(&db,
toku_fill_dbt(&x, kv_pair_key(a), kv_pair_keylen(a)),
toku_fill_dbt(&y, key, keylen));
int cmp = brt->compare_fun(&db, a, toku_fill_dbt(&y, key, keylen));
return cmp;
}
static int
verify_msg_in_child_buffer(BRT brt, enum brt_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), struct kv_pair *lesser_pivot, struct kv_pair *greatereq_pivot)
verify_msg_in_child_buffer(BRT brt, enum brt_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot)
__attribute__((warn_unused_result));
static int
verify_msg_in_child_buffer(BRT brt, enum brt_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), struct kv_pair *lesser_pivot, struct kv_pair *greatereq_pivot) {
verify_msg_in_child_buffer(BRT brt, enum brt_msg_type type, MSN msn, bytevec key, ITEMLEN keylen, bytevec UU(data), ITEMLEN UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) {
int result = 0;
if (msn.msn == ZERO_MSN.msn)
result = EINVAL;
......@@ -189,9 +182,9 @@ verify_sorted_by_key_msn(BRT brt, FIFO fifo, OMT mt) {
}
static int
count_eq_key_msn(BRT brt, FIFO fifo, OMT mt, const void *key, size_t keylen, MSN msn) {
count_eq_key_msn(BRT brt, FIFO fifo, OMT mt, const DBT *key, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = {
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo, .key = key, .keylen = keylen, .msn = msn
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo, .key = key, .msn = msn
};
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(mt, toku_fifo_entry_key_msn_heaviside, &extra, &v, &idx);
......@@ -232,8 +225,8 @@ int
toku_verify_brtnode (BRT brt,
MSN rootmsn, MSN parentmsn,
BRTNODE node, int height,
struct kv_pair *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
struct kv_pair *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
int (*progress_callback)(void *extra, float progress), void *progress_extra,
int recurse, int verbose, int keep_going_on_failure)
{
......@@ -258,24 +251,24 @@ toku_verify_brtnode (BRT brt,
}
// Verify that all the pivot keys are in order.
for (int i = 0; i < node->n_children-2; i++) {
int compare = compare_pairs(brt, node->childkeys[i], node->childkeys[i+1]);
int compare = compare_pairs(brt, &node->childkeys[i], &node->childkeys[i+1]);
VERIFY_ASSERTION(compare < 0, i, "Value is >= the next value");
}
// Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
for (int i = 0; i < node->n_children-1; i++) {
if (lesser_pivot) {
int compare = compare_pairs(brt, lesser_pivot, node->childkeys[i]);
int compare = compare_pairs(brt, lesser_pivot, &node->childkeys[i]);
VERIFY_ASSERTION(compare < 0, i, "Pivot is >= the lower-bound pivot");
}
if (greatereq_pivot) {
int compare = compare_pairs(brt, greatereq_pivot, node->childkeys[i]);
int compare = compare_pairs(brt, greatereq_pivot, &node->childkeys[i]);
VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
}
}
for (int i = 0; i < node->n_children; i++) {
struct kv_pair *curr_less_pivot = (i==0) ? lesser_pivot : node->childkeys[i-1];
struct kv_pair *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->childkeys[i];
const DBT *curr_less_pivot = (i==0) ? lesser_pivot : &node->childkeys[i-1];
const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i];
if (node->height > 0) {
MSN last_msn = ZERO_MSN;
// Verify that messages in the buffers are in the right place.
......@@ -292,7 +285,8 @@ toku_verify_brtnode (BRT brt,
VERIFY_ASSERTION((msn.msn > last_msn.msn), i, "msn per msg must be monotonically increasing toward newer messages in buffer");
VERIFY_ASSERTION((msn.msn <= this_msn.msn), i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
int count;
count = count_eq_key_msn(brt, bnc->buffer, bnc->fresh_message_tree, key, keylen, msn);
DBT keydbt;
count = count_eq_key_msn(brt, bnc->buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
if (brt_msg_type_applies_all(type) || brt_msg_type_does_nothing(type)) {
VERIFY_ASSERTION(count == 0, i, "a broadcast message was found in the fresh message tree");
} else {
......@@ -303,7 +297,7 @@ toku_verify_brtnode (BRT brt,
VERIFY_ASSERTION(count == 0, i, "a stale message was found in the fresh message tree");
}
}
count = count_eq_key_msn(brt, bnc->buffer, bnc->stale_message_tree, key, keylen, msn);
count = count_eq_key_msn(brt, bnc->buffer, bnc->stale_message_tree, &keydbt, msn);
if (brt_msg_type_applies_all(type) || brt_msg_type_does_nothing(type)) {
VERIFY_ASSERTION(count == 0, i, "a broadcast message was found in the stale message tree");
} else {
......@@ -314,8 +308,7 @@ toku_verify_brtnode (BRT brt,
VERIFY_ASSERTION(count == 1, i, "a stale message was not found in the stale message tree");
}
}
DBT keydbt;
struct count_msgs_extra extra = { .count = 0, .key = toku_fill_dbt(&keydbt, key, keylen),
struct count_msgs_extra extra = { .count = 0, .key = &keydbt,
.msn = msn, .fifo = bnc->buffer,
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun };
extra.count = 0;
......@@ -371,8 +364,8 @@ toku_verify_brtnode (BRT brt,
toku_get_node_for_verify(BP_BLOCKNUM(node, i), brt, &child_node);
int r = toku_verify_brtnode(brt, rootmsn, this_msn,
child_node, node->height-1,
(i==0) ? lesser_pivot : node->childkeys[i-1],
(i==node->n_children-1) ? greatereq_pivot : node->childkeys[i],
(i==0) ? lesser_pivot : &node->childkeys[i-1],
(i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i],
progress_callback, progress_extra,
recurse, verbose, keep_going_on_failure);
if (r) {
......
......@@ -340,19 +340,19 @@ toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)
return toku_fifo_n_entries(bnc->buffer);
}
static struct kv_pair const *prepivotkey (BRTNODE node, int childnum, struct kv_pair const * const lower_bound_exclusive) {
static const DBT *prepivotkey (BRTNODE node, int childnum, const DBT * const lower_bound_exclusive) {
if (childnum==0)
return lower_bound_exclusive;
else {
return node->childkeys[childnum-1];
return &node->childkeys[childnum-1];
}
}
static struct kv_pair const *postpivotkey (BRTNODE node, int childnum, struct kv_pair const * const upper_bound_inclusive) {
static const DBT *postpivotkey (BRTNODE node, int childnum, const DBT * const upper_bound_inclusive) {
if (childnum+1 == node->n_children)
return upper_bound_inclusive;
else {
return node->childkeys[childnum];
return &node->childkeys[childnum];
}
}
static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct pivot_bounds const * const old_pb) {
......@@ -694,12 +694,7 @@ void toku_brtnode_clone_callback(
XMALLOC_N(node->n_children, cloned_node->bp);
// clone pivots
for (int i = 0; i < node->n_children-1; i++) {
cloned_node->childkeys[i] = kv_pair_malloc(
kv_pair_key(node->childkeys[i]),
kv_pair_keylen(node->childkeys[i]),
0,
0
);
toku_clone_dbt(&cloned_node->childkeys[i], node->childkeys[i]);
}
// clone partition
brtnode_clone_partitions(node, cloned_node);
......@@ -1252,13 +1247,11 @@ toku_cmd_leafval_heaviside (OMTVALUE lev, void *extra) {
}
static int
brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, bytevec ck)
brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, const DBT *pivot)
{
int r;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
FAKE_DB(db, desc);
r = cmp(&db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
r = cmp(&db, key, pivot);
return r;
}
......@@ -1270,7 +1263,7 @@ brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, bytevec
void toku_destroy_brtnode_internals(BRTNODE node)
{
for (int i=0; i<node->n_children-1; i++) {
toku_free(node->childkeys[i]);
toku_free(node->childkeys[i].data);
}
toku_free(node->childkeys);
node->childkeys = NULL;
......@@ -1381,7 +1374,7 @@ brt_init_new_root(struct brt_header *h, BRTNODE nodea, BRTNODE nodeb, DBT splitk
toku_initialize_empty_brtnode (newroot, newroot_diskoff, new_height, 2, h->layout_version, h->nodesize, h->flags, h);
//printf("new_root %lld %d %lld %lld\n", newroot_diskoff, newroot->height, nodea->thisnodename, nodeb->thisnodename);
//printf("%s:%d Splitkey=%p %s\n", __FILE__, __LINE__, splitkey, splitkey);
newroot->childkeys[0] = splitk.data;
toku_copyref_dbt(&newroot->childkeys[0], splitk);
newroot->totalchildkeylens=splitk.size;
BP_BLOCKNUM(newroot,0)=nodea->thisnodename;
BP_BLOCKNUM(newroot,1)=nodeb->thisnodename;
......@@ -1412,14 +1405,14 @@ init_childinfo(BRTNODE node, int childnum, BRTNODE child) {
}
static void
init_childkey(BRTNODE node, int childnum, struct kv_pair *pivotkey, size_t pivotkeysize) {
node->childkeys[childnum] = pivotkey;
node->totalchildkeylens += pivotkeysize;
init_childkey(BRTNODE node, int childnum, const DBT *pivotkey) {
toku_copyref_dbt(&node->childkeys[childnum], *pivotkey);
node->totalchildkeylens += pivotkey->size;
}
// Used only by test programs: append a child node to a parent node
void
toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) {
toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, const DBT *pivotkey) {
int childnum = node->n_children;
node->n_children++;
XREALLOC_N(node->n_children, node->bp);
......@@ -1427,7 +1420,7 @@ toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivot
XREALLOC_N(node->n_children-1, node->childkeys);
if (pivotkey) {
invariant(childnum > 0);
init_childkey(node, childnum-1, pivotkey, pivotkeysize);
init_childkey(node, childnum-1, pivotkey);
}
toku_mark_node_dirty(node);
}
......@@ -1884,9 +1877,9 @@ toku_fifo_entry_key_msn_heaviside(OMTVALUE v, void *extrap)
const struct toku_fifo_entry_key_msn_heaviside_extra *extra = extrap;
const long offset = (long) v;
const struct fifo_entry *query = toku_fifo_get_entry(extra->fifo, offset);
DBT qdbt, tdbt;
DBT qdbt;
const DBT *query_key = fill_dbt_for_fifo_entry(&qdbt, query);
const DBT *target_key = toku_fill_dbt(&tdbt, extra->key, extra->keylen);
const DBT *target_key = extra->key;
return key_msn_cmp(query_key, target_key, query->msn, extra->msn,
extra->desc, extra->cmp);
}
......@@ -1918,7 +1911,8 @@ toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, cons
int r = toku_fifo_enq(bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, &offset);
assert_zero(r);
if (brt_msg_type_applies_once(type)) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer, .key = key, .keylen = keylen, .msn = msn };
DBT keydbt;
struct toku_fifo_entry_key_msn_heaviside_extra extra = { .desc = desc, .cmp = cmp, .fifo = bnc->buffer, .key = toku_fill_dbt(&keydbt, key, keylen), .msn = msn };
if (is_fresh) {
r = toku_omt_insert(bnc->fresh_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &extra, NULL);
assert_zero(r);
......@@ -1963,7 +1957,7 @@ unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
#if DO_PIVOT_SEARCH_LR
int i;
for (i=0; i<node->n_children-1; i++) {
int c = brt_compare_pivot(desc, cmp, k, d, node->childkeys[i]);
int c = brt_compare_pivot(desc, cmp, k, d, &node->childkeys[i]);
if (c > 0) continue;
if (c < 0) return i;
return i;
......@@ -1977,7 +1971,7 @@ unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
// random keys
int i;
for (i = node->n_children-2; i >= 0; i--) {
int c = brt_compare_pivot(desc, cmp, k, d, node->childkeys[i]);
int c = brt_compare_pivot(desc, cmp, k, d, &node->childkeys[i]);
if (c > 0) return i+1;
}
return 0;
......@@ -1989,7 +1983,7 @@ unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
// check the last key to optimize seq insertions
int n = node->n_children-1;
int c = brt_compare_pivot(desc, cmp, k, node->childkeys[n-1]);
int c = brt_compare_pivot(desc, cmp, k, &node->childkeys[n-1]);
if (c > 0) return n;
// binary search the pivots
......@@ -1998,7 +1992,7 @@ unsigned int toku_brtnode_which_child(BRTNODE node, const DBT *k,
int mi;
while (lo < hi) {
mi = (lo + hi) / 2;
c = brt_compare_pivot(desc, cmp, k, node->childkeys[mi]);
c = brt_compare_pivot(desc, cmp, k, &node->childkeys[mi]);
if (c > 0) {
lo = mi+1;
continue;
......@@ -2024,7 +2018,7 @@ toku_brtnode_hot_next_child(BRTNODE node,
int mi;
while (low < hi) {
mi = (low + hi) / 2;
int r = brt_compare_pivot(desc, cmp, k, node->childkeys[mi]);
int r = brt_compare_pivot(desc, cmp, k, &node->childkeys[mi]);
if (r > 0) {
low = mi + 1;
} else if (r < 0) {
......@@ -3841,7 +3835,7 @@ copy_to_stale(OMTVALUE v, u_int32_t UU(idx), void *extrap)
entry->is_fresh = false;
DBT keydbt;
DBT *key = fill_dbt_for_fifo_entry(&keydbt, entry);
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->brt->h->cmp_descriptor, .cmp = extra->brt->compare_fun, .fifo = extra->bnc->buffer, .key = key->data, .keylen = key->size, .msn = entry->msn };
struct toku_fifo_entry_key_msn_heaviside_extra heaviside_extra = { .desc = &extra->brt->h->cmp_descriptor, .cmp = extra->brt->compare_fun, .fifo = extra->bnc->buffer, .key = key, .msn = entry->msn };
int r = toku_omt_insert(extra->bnc->stale_message_tree, (OMTVALUE) offset, toku_fifo_entry_key_msn_heaviside, &heaviside_extra, NULL);
assert_zero(r);
return r;
......@@ -3971,8 +3965,7 @@ find_bounds_within_message_tree(
struct toku_fifo_entry_key_msn_heaviside_extra lbi_extra = {
.desc = desc, .cmp = cmp,
.fifo = buffer,
.key = kv_pair_key((struct kv_pair *) bounds->lower_bound_exclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->lower_bound_exclusive),
.key = bounds->lower_bound_exclusive,
.msn = MAX_MSN };
OMTVALUE found_lb;
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
......@@ -3989,12 +3982,12 @@ find_bounds_within_message_tree(
// Check if what we found for lbi is greater than the upper
// bound inclusive that we have. If so, there are no relevant
// messages between these bounds.
DBT ubidbt_tmp = kv_pair_key_to_dbt((struct kv_pair *) bounds->upper_bound_inclusive);
const DBT *ubi = bounds->upper_bound_inclusive;
const long offset = (long) found_lb;
DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
FAKE_DB(db, desc);
int c = cmp(&db, &found_lbidbt, &ubidbt_tmp);
int c = cmp(&db, &found_lbidbt, ubi);
// These DBTs really are both inclusive bounds, so we need
// strict inequality in order to determine that there's
// nothing between them. If they're equal, then we actually
......@@ -4019,8 +4012,7 @@ find_bounds_within_message_tree(
struct toku_fifo_entry_key_msn_heaviside_extra ube_extra = {
.desc = desc, .cmp = cmp,
.fifo = buffer,
.key = kv_pair_key((struct kv_pair *) bounds->upper_bound_inclusive),
.keylen = kv_pair_keylen((struct kv_pair *) bounds->upper_bound_inclusive),
.key = bounds->upper_bound_inclusive,
.msn = MAX_MSN };
r = toku_omt_find(message_tree, toku_fifo_entry_key_msn_heaviside,
&ube_extra, +1, NULL, ube);
......@@ -4565,9 +4557,7 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
static inline int
search_which_child_cmp_with_bound(DB *db, brt_compare_func cmp, BRTNODE node, int childnum, brt_search_t *search, DBT *dbt)
{
struct kv_pair *pivot = node->childkeys[childnum];
toku_fill_dbt(dbt, kv_pair_key(pivot), kv_pair_keylen(pivot));
return cmp(db, dbt, &search->pivot_bound);
return cmp(db, toku_copyref_dbt(dbt, node->childkeys[childnum]), &search->pivot_bound);
}
int
......@@ -4589,8 +4579,7 @@ toku_brt_search_which_child(
int mi;
while (lo < hi) {
mi = (lo + hi) / 2;
struct kv_pair *pivot = node->childkeys[mi];
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot));
toku_copyref_dbt(&pivotkey, node->childkeys[mi]);
// search->compare is really strange, and only works well with a
// linear search, it makes binary search a pita.
//
......@@ -4652,8 +4641,7 @@ toku_brt_search_which_child(
}
for (c = 0; c < node->n_children-1; c++) {
int p = (search->direction == BRT_SEARCH_LEFT) ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->childkeys[p];
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot));
toku_copyref_dbt(&pivotkey, node->childkeys[p]);
if (search_pivot_is_bounded(search, desc, cmp, &pivotkey) && search->compare(search, &pivotkey)) {
return child[c];
}
......@@ -4671,9 +4659,7 @@ maybe_search_save_bound(
{
int p = (search->direction == BRT_SEARCH_LEFT) ? child_searched : child_searched - 1;
if (p >= 0 && p < node->n_children-1) {
struct kv_pair const * pivot = node->childkeys[p];
DBT pivotkey = { .data = kv_pair_key((struct kv_pair *) pivot), .size = kv_pair_keylen(pivot) };
search_save_bound(search, &pivotkey);
search_save_bound(search, &node->childkeys[p]);
}
}
......@@ -4749,13 +4735,13 @@ brt_search_node(
else {
if (node->height == 0) {
// when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
struct kv_pair const * pivot = NULL;
const DBT *pivot = NULL;
if (search->direction == BRT_SEARCH_LEFT)
pivot = next_bounds.upper_bound_inclusive; // left -> right
else
pivot = next_bounds.lower_bound_exclusive; // right -> left
if (pivot) {
int rr = getf(kv_pair_keylen(pivot), kv_pair_key_const(pivot), 0, NULL, getf_v, true);
int rr = getf(pivot->size, pivot->data, 0, NULL, getf_v, true);
if (rr != 0)
return rr; // lock was not granted
}
......@@ -5474,7 +5460,7 @@ toku_brt_stat64 (BRT brt, TOKUTXN UU(txn), struct brtstat64_s *s) {
/* ********************* debugging dump ************************ */
static int
toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_pair *lorange, struct kv_pair *hirange) {
toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) {
int result=0;
BRTNODE node;
void* node_v;
......@@ -5502,12 +5488,12 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
fprintf(file, "%*sNode=%p\n", depth, "", node);
fprintf(file, "%*sNode %"PRId64" nodesize=%u height=%d n_children=%d keyrange=%s %s\n",
depth, "", blocknum.b, node->nodesize, node->height, node->n_children, (char*)(lorange ? kv_pair_key(lorange) : 0), (char*)(hirange ? kv_pair_key(hirange) : 0));
depth, "", blocknum.b, node->nodesize, node->height, node->n_children, (char*)(lorange ? lorange->data : 0), (char*)(hirange ? hirange->data : 0));
{
int i;
for (i=0; i+1< node->n_children; i++) {
fprintf(file, "%*spivotkey %d =", depth+1, "", i);
toku_print_BYTESTRING(file, toku_brt_pivot_key_len(node->childkeys[i]), node->childkeys[i]->key);
toku_print_BYTESTRING(file, node->childkeys[i].size, node->childkeys[i].data);
fprintf(file, "\n");
}
for (i=0; i< node->n_children; i++) {
......@@ -5541,12 +5527,12 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
for (i=0; i<node->n_children; i++) {
fprintf(file, "%*schild %d\n", depth, "", i);
if (i>0) {
char *key = node->childkeys[i-1]->key;
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->childkeys[i-1]->keylen, (unsigned)toku_dtoh32(*(int*)key));
char *key = node->childkeys[i-1].data;
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->childkeys[i-1].size, (unsigned)toku_dtoh32(*(int*)key));
}
toku_dump_brtnode(file, brt, BP_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->childkeys[i-1],
(i==node->n_children-1) ? hirange : node->childkeys[i]);
(i==0) ? lorange : &node->childkeys[i-1],
(i==node->n_children-1) ? hirange : &node->childkeys[i]);
}
}
}
......
......@@ -151,10 +151,10 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" pivots:\n");
for (int i=0; i<n->n_children-1; i++) {
struct kv_pair *piv = n->childkeys[i];
const DBT *piv = &n->childkeys[i];
printf(" pivot %2d:", i);
assert(n->flags == 0);
print_item(kv_pair_key_const(piv), kv_pair_keylen(piv));
print_item(piv->data, piv->size);
printf("\n");
}
printf(" children:\n");
......
......@@ -2945,21 +2945,13 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
int result = 0;
BRTNODE XMALLOC(node);
toku_initialize_empty_brtnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
BRT_LAYOUT_VERSION, target_nodesize, 0, out->h);
for (int i=0; i<n_children-1; i++)
node->childkeys[i] = NULL;
unsigned int totalchildkeylens = 0;
toku_initialize_empty_brtnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
BRT_LAYOUT_VERSION, target_nodesize, 0, out->h);
node->totalchildkeylens = 0;
for (int i=0; i<n_children-1; i++) {
struct kv_pair *childkey = kv_pair_malloc(pivots[i].data, pivots[i].size, NULL, 0);
if (childkey == NULL) {
result = errno;
break;
}
node->childkeys[i] = childkey;
totalchildkeylens += kv_pair_keylen(childkey);
toku_clone_dbt(&node->childkeys[i], pivots[i]);
node->totalchildkeylens += pivots[i].size;
}
node->totalchildkeylens = totalchildkeylens;
assert(node->bp);
for (int i=0; i<n_children; i++) {
BP_BLOCKNUM(node,i) = make_blocknum(subtree_info[i].block);
......@@ -2992,7 +2984,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
for (int i=0; i<n_children-1; i++) {
toku_free(pivots[i].data);
toku_free(node->childkeys[i]);
toku_free(node->childkeys[i].data);
}
for (int i=0; i<n_children; i++) {
destroy_nonleaf_childinfo(BNC(node,i));
......
......@@ -43,7 +43,6 @@
#include "fifo.h"
#include "toku_list.h"
#include "key.h"
#include "kv-pair.h"
#include "leafentry.h"
#include "log-internal.h"
#include "log_header.h"
......
#ifndef KV_PAIR_H
#define KV_PAIR_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "memory.h"
#include <string.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
/*
* the key value pair contains a key and a value in a contiguous space. the
* key is right after the length fields and the value is right after the key.
*/
struct kv_pair {
unsigned int keylen;
unsigned int vallen;
char key[];
};
/* return the size of a kv pair */
static inline unsigned int kv_pair_size(struct kv_pair *pair) {
return sizeof (struct kv_pair) + pair->keylen + pair->vallen;
}
static inline void kv_pair_init(struct kv_pair *pair, const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
pair->keylen = keylen;
memcpy(pair->key, key, (size_t)keylen);
pair->vallen = vallen;
memcpy(pair->key + keylen, val, (size_t)vallen);
}
static inline struct kv_pair *kv_pair_malloc(const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
struct kv_pair *pair = (struct kv_pair *) toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair)
kv_pair_init(pair, key, keylen, val, vallen);
return pair;
}
/* replace the val, keep the same key */
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, unsigned int newvallen) {
struct kv_pair *pair = (struct kv_pair *) toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
if (pair) {
pair->vallen = newvallen;
memcpy(pair->key + pair->keylen, newval, (size_t)newvallen);
}
return pair;
}
static inline void kv_pair_free(struct kv_pair *pair) {
toku_free_n(pair, sizeof(struct kv_pair)+pair->keylen+pair->vallen);
}
static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key;
}
static inline const void *kv_pair_key_const(const struct kv_pair *pair) {
return pair->key;
}
static inline unsigned int kv_pair_keylen(const struct kv_pair *pair) {
return pair->keylen;
}
static inline DBT kv_pair_key_to_dbt (const struct kv_pair *pair) {
const DBT d = {.data = (void*)kv_pair_key_const(pair),
.size = kv_pair_keylen(pair)};
return d;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -309,8 +309,8 @@ test_prefetching(void) {
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(sn.n_children-1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc(&key1, sizeof(key1), 0, 0);
sn.childkeys[1] = kv_pair_malloc(&key2, sizeof(key2), 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup(&key1, sizeof(key1)), sizeof key1);
toku_fill_dbt(&sn.childkeys[1], toku_xmemdup(&key2, sizeof(key2)), sizeof key2);
sn.totalchildkeylens = sizeof(key1) + sizeof(key2);
BP_BLOCKNUM(&sn, 0).b = 30;
BP_BLOCKNUM(&sn, 1).b = 35;
......@@ -370,8 +370,8 @@ test_prefetching(void) {
test_prefetch_read(fd, brt, brt_h);
test_subset_read(fd, brt, brt_h);
kv_pair_free(sn.childkeys[0]);
kv_pair_free(sn.childkeys[1]);
toku_free(sn.childkeys[0].data);
toku_free(sn.childkeys[1].data);
destroy_nonleaf_childinfo(BNC(&sn, 0));
destroy_nonleaf_childinfo(BNC(&sn, 1));
destroy_nonleaf_childinfo(BNC(&sn, 2));
......
......@@ -248,7 +248,7 @@ test_serialize_nonleaf(void) {
hello_string = toku_strdup("hello");
MALLOC_N(2, sn.bp);
MALLOC_N(1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc(hello_string, 6, 0, 0);
toku_fill_dbt(&sn.childkeys[0], hello_string, 6);
sn.totalchildkeylens = 6;
BP_BLOCKNUM(&sn, 0).b = 30;
BP_BLOCKNUM(&sn, 1).b = 35;
......@@ -307,7 +307,6 @@ test_serialize_nonleaf(void) {
test1(fd, brt_h, &dn);
test2(fd, brt_h, &dn);
kv_pair_free(sn.childkeys[0]);
toku_free(hello_string);
destroy_nonleaf_childinfo(BNC(&sn, 0));
destroy_nonleaf_childinfo(BNC(&sn, 1));
......@@ -349,7 +348,7 @@ test_serialize_leaf(void) {
elts[2] = le_malloc("x", "xval");
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("b", 2, 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup("b", 2), 2);
sn.totalchildkeylens = 2;
BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL;
......@@ -396,7 +395,7 @@ test_serialize_leaf(void) {
test3_leaf(fd, brt_h,&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < 3; ++i) {
toku_free(elts[i]);
......
......@@ -96,7 +96,7 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
}
BLB_NBYTESINBUF(&sn, ck) = nperbn*(KEY_VALUE_OVERHEAD+(sizeof(long)+valsize)) + toku_omt_size(BLB_BUFFER(&sn, ck));
if (ck < 7) {
sn.childkeys[ck] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[ck], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += sizeof k;
}
}
......@@ -154,7 +154,7 @@ test_serialize_leaf(int valsize, int nelts, double entropy) {
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < nelts; ++i) {
if (les[i]) { toku_free(les[i]); }
......@@ -227,7 +227,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
r = toku_bnc_insert_msg(bnc, &k, sizeof k, buf, valsize, BRT_NONE, next_dummymsn(), xids_123, true, NULL, long_key_cmp); assert_zero(r);
}
if (ck < 7) {
sn.childkeys[ck] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[ck], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += sizeof k;
}
}
......@@ -289,7 +289,7 @@ test_serialize_nonleaf(int valsize, int nelts, double entropy) {
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; ++i) {
destroy_nonleaf_childinfo(BNC(&sn, i));
......
......@@ -224,7 +224,7 @@ test_serialize_leaf_check_msn(enum brtnode_verify_type bft, BOOL do_clone) {
sn.dirty = 1;
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("b", 2, 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup("b", 2), 2);
sn.totalchildkeylens = 2;
BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL;
......@@ -313,7 +313,7 @@ test_serialize_leaf_check_msn(enum brtnode_verify_type bft, BOOL do_clone) {
toku_omt_iterate(BLB_BUFFER(dn, i), check_leafentries, &extra);
u_int32_t keylen;
if (i < npartitions-1) {
assert(strcmp(kv_pair_key(dn->childkeys[i]), le_key_and_len(elts[extra.i-1], &keylen))==0);
assert(strcmp(dn->childkeys[i].data, le_key_and_len(elts[extra.i-1], &keylen))==0);
}
// don't check soft_copy_is_up_to_date or seqinsert
assert(BLB_NBYTESINBUF(dn, i) == (extra.i-last_i)*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(dn, i)));
......@@ -325,7 +325,7 @@ test_serialize_leaf_check_msn(enum brtnode_verify_type bft, BOOL do_clone) {
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
BASEMENTNODE bn = BLB(&sn, i);
......@@ -388,7 +388,7 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft, BOOL do_clon
if (i < nrows-1) {
u_int32_t keylen;
char *keyp = le_key_and_len(le, &keylen);
sn.childkeys[i] = kv_pair_malloc(keyp, keylen, 0, 0);
toku_fill_dbt(&sn.childkeys[i], toku_xmemdup(keyp, keylen), keylen);
}
}
......@@ -468,7 +468,7 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft, BOOL do_clon
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
toku_free(sn.childkeys);
for (int i = 0; i < sn.n_children; i++) {
......@@ -605,7 +605,7 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft, BOOL do_clone)
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
bn = BLB(&sn, i);
......@@ -754,7 +754,7 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft, BOOL do_clone)
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
bn = BLB(&sn, i);
......@@ -797,12 +797,12 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft, BOOL
sn.dirty = 1;
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(sn.n_children-1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("A", 2, 0, 0);
sn.childkeys[1] = kv_pair_malloc("a", 2, 0, 0);
sn.childkeys[2] = kv_pair_malloc("a", 2, 0, 0);
sn.childkeys[3] = kv_pair_malloc("b", 2, 0, 0);
sn.childkeys[4] = kv_pair_malloc("b", 2, 0, 0);
sn.childkeys[5] = kv_pair_malloc("x", 2, 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup("A", 2), 2);
toku_fill_dbt(&sn.childkeys[1], toku_xmemdup("a", 2), 2);
toku_fill_dbt(&sn.childkeys[2], toku_xmemdup("a", 2), 2);
toku_fill_dbt(&sn.childkeys[3], toku_xmemdup("b", 2), 2);
toku_fill_dbt(&sn.childkeys[4], toku_xmemdup("b", 2), 2);
toku_fill_dbt(&sn.childkeys[5], toku_xmemdup("x", 2), 2);
sn.totalchildkeylens = (sn.n_children-1)*2;
for (int i = 0; i < sn.n_children; ++i) {
BP_STATE(&sn,i) = PT_AVAIL;
......@@ -904,7 +904,7 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft, BOOL
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
BASEMENTNODE bn = BLB(&sn, i);
......@@ -946,9 +946,9 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
sn.dirty = 1;
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(sn.n_children-1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("A", 2, 0, 0);
sn.childkeys[1] = kv_pair_malloc("A", 2, 0, 0);
sn.childkeys[2] = kv_pair_malloc("A", 2, 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup("A", 2), 2);
toku_fill_dbt(&sn.childkeys[1], toku_xmemdup("A", 2), 2);
toku_fill_dbt(&sn.childkeys[2], toku_xmemdup("A", 2), 2);
sn.totalchildkeylens = (sn.n_children-1)*2;
for (int i = 0; i < sn.n_children; ++i) {
BP_STATE(&sn,i) = PT_AVAIL;
......@@ -1021,7 +1021,7 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
destroy_basement_node(BLB(&sn, i));
......@@ -1064,7 +1064,7 @@ test_serialize_leaf(enum brtnode_verify_type bft, BOOL do_clone) {
sn.dirty = 1;
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc("b", 2, 0, 0);
toku_fill_dbt(&sn.childkeys[0], toku_xmemdup("b", 2), 2);
sn.totalchildkeylens = 2;
BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL;
......@@ -1147,7 +1147,7 @@ test_serialize_leaf(enum brtnode_verify_type bft, BOOL do_clone) {
toku_omt_iterate(BLB_BUFFER(dn, i), check_leafentries, &extra);
u_int32_t keylen;
if (i < npartitions-1) {
assert(strcmp(kv_pair_key(dn->childkeys[i]), le_key_and_len(elts[extra.i-1], &keylen))==0);
assert(strcmp(dn->childkeys[i].data, le_key_and_len(elts[extra.i-1], &keylen))==0);
}
// don't check soft_copy_is_up_to_date or seqinsert
assert(BLB_NBYTESINBUF(dn, i) == (extra.i-last_i)*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(dn, i)));
......@@ -1159,7 +1159,7 @@ test_serialize_leaf(enum brtnode_verify_type bft, BOOL do_clone) {
toku_brtnode_free(&dn);
for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]);
toku_free(sn.childkeys[i].data);
}
for (int i = 0; i < sn.n_children; i++) {
BASEMENTNODE bn = BLB(&sn, i);
......@@ -1205,7 +1205,7 @@ test_serialize_nonleaf(enum brtnode_verify_type bft, BOOL do_clone) {
hello_string = toku_strdup("hello");
MALLOC_N(2, sn.bp);
MALLOC_N(1, sn.childkeys);
sn.childkeys[0] = kv_pair_malloc(hello_string, 6, 0, 0);
toku_fill_dbt(&sn.childkeys[0], hello_string, 6);
sn.totalchildkeylens = 6;
BP_BLOCKNUM(&sn, 0).b = 30;
BP_BLOCKNUM(&sn, 1).b = 35;
......@@ -1269,8 +1269,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft, BOOL do_clone) {
assert(dn->layout_version_read_from_disk ==BRT_LAYOUT_VERSION);
assert(dn->height == 1);
assert(dn->n_children==2);
assert(strcmp(kv_pair_key(dn->childkeys[0]), "hello")==0);
assert(toku_brt_pivot_key_len(dn->childkeys[0])==6);
assert(strcmp(dn->childkeys[0].data, "hello")==0);
assert(dn->childkeys[0].size==6);
assert(dn->totalchildkeylens==6);
assert(BP_BLOCKNUM(dn,0).b==30);
assert(BP_BLOCKNUM(dn,1).b==35);
......@@ -1282,11 +1282,10 @@ test_serialize_nonleaf(enum brtnode_verify_type bft, BOOL do_clone) {
assert(toku_are_fifos_same(src_fifo_1, dest_fifo_1));
assert(toku_are_fifos_same(src_fifo_2, dest_fifo_2));
toku_brtnode_free(&dn);
kv_pair_free(sn.childkeys[0]);
toku_free(hello_string);
toku_free(sn.childkeys[0].data);
destroy_nonleaf_childinfo(BNC(&sn, 0));
destroy_nonleaf_childinfo(BNC(&sn, 1));
toku_free(sn.bp);
......
......@@ -82,12 +82,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = maxkeys[childnum-1]; // use the max of the left tree
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
......
......@@ -409,7 +409,7 @@ flush_to_internal_multiple(BRT t) {
set_BNC(child, i, child_bncs[i]);
BP_STATE(child, i) = PT_AVAIL;
if (i < 7) {
child->childkeys[i] = kv_pair_malloc(childkeys[i]->u.id.key->data, childkeys[i]->u.id.key->size, NULL, 0);
toku_clone_dbt(&child->childkeys[i], *childkeys[i]->u.id.key);
}
}
......@@ -578,7 +578,7 @@ flush_to_leaf(BRT t, bool make_leaf_up_to_date, bool use_flush) {
int num_parent_messages = i;
for (i = 0; i < 7; ++i) {
child->childkeys[i] = kv_pair_malloc(childkeys[i].data, childkeys[i].size, NULL, 0);
toku_clone_dbt(&child->childkeys[i], childkeys[i]);
}
if (make_leaf_up_to_date) {
......@@ -801,7 +801,7 @@ flush_to_leaf_with_keyrange(BRT t, bool make_leaf_up_to_date) {
int num_parent_messages = i;
for (i = 0; i < 7; ++i) {
child->childkeys[i] = kv_pair_malloc(childkeys[i].data, childkeys[i].size, NULL, 0);
toku_clone_dbt(&child->childkeys[i], childkeys[i]);
}
if (make_leaf_up_to_date) {
......@@ -838,7 +838,11 @@ flush_to_leaf_with_keyrange(BRT t, bool make_leaf_up_to_date) {
BP_STATE(parentnode, 0) = PT_AVAIL;
parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = kv_pair_malloc(childkeys[7].data, childkeys[7].size, NULL, 0) };
DBT lbe, ubi;
const struct pivot_bounds bounds = {
.lower_bound_exclusive = toku_init_dbt(&lbe),
.upper_bound_inclusive = toku_clone_dbt(&ubi, childkeys[7])
};
BOOL msgs_applied;
maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied);
......@@ -893,7 +897,7 @@ flush_to_leaf_with_keyrange(BRT t, bool make_leaf_up_to_date) {
for (i = 0; i < num_child_messages; ++i) {
toku_free(child_messages[i]);
}
toku_free((struct kv_pair *) bounds.upper_bound_inclusive);
toku_free(ubi.data);
toku_brtnode_free(&child);
toku_free(parent_messages);
toku_free(child_messages);
......@@ -990,8 +994,8 @@ compare_apply_and_flush(BRT t, bool make_leaf_up_to_date) {
int num_parent_messages = i;
for (i = 0; i < 7; ++i) {
child1->childkeys[i] = kv_pair_malloc(child1keys[i].data, child1keys[i].size, NULL, 0);
child2->childkeys[i] = kv_pair_malloc(child2keys[i].data, child2keys[i].size, NULL, 0);
toku_clone_dbt(&child1->childkeys[i], child1keys[i]);
toku_clone_dbt(&child2->childkeys[i], child2keys[i]);
}
if (make_leaf_up_to_date) {
......
......@@ -111,7 +111,7 @@ static void
destroy_brtnode_and_internals(struct brtnode *node)
{
for (int i = 0; i < node->n_children - 1; ++i) {
kv_pair_free(node->childkeys[i]);
toku_free(node->childkeys[i].data);
}
for (int i = 0; i < node->n_children; ++i) {
BASEMENTNODE bn = BLB(node, i);
......@@ -156,7 +156,7 @@ test_split_on_boundary(void)
insert_dummy_value(&sn, bn, k);
}
if (bn < sn.n_children - 1) {
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
}
}
......@@ -216,7 +216,7 @@ test_split_with_everything_on_the_left(void)
k = bn * eltsperbn + i;
big_val_size += insert_dummy_value(&sn, bn, k);
}
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
} else {
k = bn * eltsperbn;
......@@ -288,7 +288,7 @@ test_split_on_boundary_of_last_node(void)
k = bn * eltsperbn + i;
big_val_size += insert_dummy_value(&sn, bn, k);
}
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
} else {
k = bn * eltsperbn;
......@@ -357,7 +357,7 @@ test_split_at_begin(void)
totalbytes += insert_dummy_value(&sn, bn, k);
}
if (bn < sn.n_children - 1) {
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
}
}
......@@ -436,7 +436,7 @@ test_split_at_end(void)
}
}
if (bn < sn.n_children - 1) {
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
}
}
......@@ -490,7 +490,7 @@ test_split_odd_nodes(void)
insert_dummy_value(&sn, bn, k);
}
if (bn < sn.n_children - 1) {
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
toku_fill_dbt(&sn.childkeys[bn], toku_xmemdup(&k, sizeof k), sizeof k);
sn.totalchildkeylens += (sizeof k);
}
}
......
......@@ -88,12 +88,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = maxkeys[childnum-1]; // use the max of the left tree
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
......
......@@ -59,12 +59,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = minkeys[childnum]; // use the min key of the right subtree, which creates a broken tree
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
}
......
......@@ -59,12 +59,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = maxkeys[0]; // use duplicate pivots, should result in a broken tree
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
}
......
......@@ -73,12 +73,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = maxkeys[childnum-1]; // use the max of the left tree
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
insert_into_child_buffer(brt, node, childnum, minkeys[childnum], maxkeys[childnum]);
......
......@@ -59,12 +59,12 @@ make_tree(BRT brt, int height, int fanout, int nperleaf, int *seq, int *minkey,
int minkeys[fanout], maxkeys[fanout];
for (int childnum = 0; childnum < fanout; childnum++) {
BRTNODE child = make_tree(brt, height-1, fanout, nperleaf, seq, &minkeys[childnum], &maxkeys[childnum]);
if (childnum == 0)
toku_brt_nonleaf_append_child(node, child, NULL, 0);
else {
if (childnum == 0) {
toku_brt_nonleaf_append_child(node, child, NULL);
} else {
int k = minkeys[fanout - childnum - 1]; // use unsorted pivots
struct kv_pair *pivotkey = kv_pair_malloc(&k, sizeof k, NULL, 0);
toku_brt_nonleaf_append_child(node, child, pivotkey, sizeof k);
DBT pivotkey;
toku_brt_nonleaf_append_child(node, child, toku_fill_dbt(&pivotkey, toku_xmemdup(&k, sizeof k), sizeof k));
}
toku_unpin_brtnode(brt->h, child);
}
......
......@@ -36,6 +36,14 @@ toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
return dbt;
}
DBT *toku_copyref_dbt(DBT *dst, const DBT src) {
return toku_fill_dbt(dst, src.data, src.size);
}
DBT *toku_clone_dbt(DBT *dst, const DBT src) {
return toku_fill_dbt(dst, toku_xmemdup(src.data, src.size), src.size);
}
void
toku_sdbt_cleanup(struct simple_dbt *sdbt) {
if (sdbt->data) toku_free(sdbt->data);
......
......@@ -17,6 +17,8 @@ DBT *toku_init_dbt(DBT *);
DBT *toku_init_dbt_flags(DBT *, uint32_t flags);
void toku_destroy_dbt(DBT *);
DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
DBT *toku_copyref_dbt(DBT *dst, const DBT src);
DBT *toku_clone_dbt(DBT *dst, const DBT src);
int toku_dbt_set(ITEMLEN len, bytevec val, DBT *d, struct simple_dbt *sdbt);
int toku_dbt_set_value(DBT *, bytevec *val, ITEMLEN vallen, void **staticptrp, BOOL ybt1_disposable);
void toku_sdbt_cleanup(struct simple_dbt *sdbt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment