Commit eb935485 authored by John Esmet's avatar John Esmet

fixes #145 Destroy resources outside of the pair's write lock during

partial eviction
parent a08ad590
......@@ -4126,23 +4126,48 @@ exit:
return ret_val;
}
struct pair_unpin_with_new_attr_extra {
pair_unpin_with_new_attr_extra(evictor *e, PAIR p) :
ev(e), pair(p) {
}
evictor *ev;
PAIR pair;
};
static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) {
struct pair_unpin_with_new_attr_extra *info =
reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra);
PAIR p = info->pair;
evictor *ev = info->ev;
// change the attr in the evictor, then update the value in the pair
ev->change_pair_attr(p->attr, new_attr);
p->attr = new_attr;
// unpin
pair_lock(p);
p->value_rwlock.write_unlock();
pair_unlock(p);
}
//
// on entry and exit, pair's mutex is not held
// on exit, PAIR is unpinned
//
void evictor::do_partial_eviction(PAIR p) {
PAIR_ATTR new_attr;
// Copy the old attr
PAIR_ATTR old_attr = p->attr;
long long size_evicting_estimate = p->size_evicting_estimate;
p->pe_callback(p->value_data, old_attr, &new_attr, p->write_extraargs);
struct pair_unpin_with_new_attr_extra extra(this, p);
p->pe_callback(p->value_data, old_attr, p->write_extraargs,
// passed as the finalize continuation, which allows the
// pe_callback to unpin the node before doing expensive cleanup
pair_unpin_with_new_attr, &extra);
this->change_pair_attr(old_attr, new_attr);
p->attr = new_attr;
this->decrease_size_evicting(p->size_evicting_estimate);
pair_lock(p);
p->value_rwlock.write_unlock();
pair_unlock(p);
// now that the pe_callback (and its pair_unpin_with_new_attr continuation)
// have finished, we can safely decrease size_evicting
this->decrease_size_evicting(size_evicting_estimate);
}
//
......
......@@ -223,11 +223,15 @@ typedef void (*CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK)(void *ftnode_pv, void*
// The cachetable calls the partial eviction callback is to possibly try and partially evict pieces
// of the PAIR. The callback determines the strategy for what to evict. The callback may choose to free
// nothing, or may choose to free as much as possible.
// old_attr is the PAIR_ATTR of the PAIR when the callback is called.
// new_attr is set to the new PAIR_ATTR after the callback executes partial eviction
// Requires a write lock to be held on the PAIR in the cachetable while this function is called
typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *write_extraargs);
// nothing, or may choose to free as much as possible. When the partial eviction callback is finished,
// it must call finalize with the new PAIR_ATTR and the given finalize_extra. After this point, the
// write lock will be released on the PAIR and it is no longer safe to operate on any of the passed arguments.
// This is useful for doing expensive cleanup work outside of the PAIR's write lock (such as destroying objects, etc)
//
// on entry, requires a write lock to be held on the PAIR in the cachetable while this function is called
// on exit, the finalize continuation is called
typedef int (*CACHETABLE_PARTIAL_EVICTION_CALLBACK)(void *ftnode_pv, PAIR_ATTR old_attr, void *write_extraargs,
void (*finalize)(PAIR_ATTR new_attr, void *extra), void *finalize_extra);
// The cachetable calls this function to determine if get_and_pin call requires a partial fetch. If this function returns true,
// then the cachetable will subsequently call CACHETABLE_PARTIAL_FETCH_CALLBACK to perform
......
......@@ -730,7 +730,8 @@ void toku_ftnode_checkpoint_complete_callback(void *value_data);
void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *extraargs);
int toku_ftnode_pe_callback(void *ftnode_pv, PAIR_ATTR old_attr, void *extraargs,
void (*finalize)(PAIR_ATTR new_attr, void *extra), void *finalize_extra);
bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
int toku_ftnode_pf_callback(void* ftnode_pv, void* UU(disk_data), void* read_extraargs, int fd, PAIR_ATTR* sizep);
int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
......
......@@ -1040,7 +1040,9 @@ exit:
return;
}
static void
// replace the child buffer with a compressed version of itself.
// @return the old child buffer
static NONLEAF_CHILDINFO
compress_internal_node_partition(FTNODE node, int i, enum toku_compression_method compression_method)
{
// if we should evict, compress the
......@@ -1051,10 +1053,11 @@ compress_internal_node_partition(FTNODE node, int i, enum toku_compression_metho
sub_block_init(sb);
toku_create_compressed_partition_from_available(node, i, compression_method, sb);
// now free the old partition and replace it with this
destroy_nonleaf_childinfo(BNC(node,i));
// now set the state to compressed and return the old, available partition
NONLEAF_CHILDINFO bnc = BNC(node, i);
set_BSB(node, i, sb);
BP_STATE(node,i) = PT_COMPRESSED;
return bnc;
}
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) {
......@@ -1076,11 +1079,21 @@ BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
}
// callback for partially evicting a node
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* new_attr, void* extraargs) {
FTNODE node = (FTNODE)ftnode_pv;
FT ft = (FT) extraargs;
long size_before = 0;
int toku_ftnode_pe_callback(void *ftnode_pv, PAIR_ATTR old_attr, void *write_extraargs,
void (*finalize)(PAIR_ATTR new_attr, void *extra), void *finalize_extra) {
FTNODE node = (FTNODE) ftnode_pv;
FT ft = (FT) write_extraargs;
int num_partial_evictions = 0;
// Hold things we intend to destroy here.
// They will be taken care of after finalize().
int num_basements_to_destroy = 0;
int num_buffers_to_destroy = 0;
int num_pointers_to_free = 0;
BASEMENTNODE basements_to_destroy[node->n_children];
NONLEAF_CHILDINFO buffers_to_destroy[node->n_children];
void *pointers_to_free[node->n_children * 2];
// Don't partially evict dirty nodes
if (node->dirty) {
goto exit;
......@@ -1097,12 +1110,10 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
if (num_partial_evictions++ == 0) {
size_before = ftnode_memory_size(node);
}
NONLEAF_CHILDINFO bnc;
if (ft_compress_buffers_before_eviction) {
// When partially evicting, always compress with quicklz
compress_internal_node_partition(
bnc = compress_internal_node_partition(
node,
i,
TOKU_QUICKLZ_METHOD
......@@ -1110,10 +1121,12 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
} else {
// We're not compressing buffers before eviction. Simply
// detach the buffer and set the child's state to on-disk.
destroy_nonleaf_childinfo(BNC(node, i));
bnc = BNC(node, i);
set_BNULL(node, i);
BP_STATE(node, i) = PT_ON_DISK;
}
buffers_to_destroy[num_buffers_to_destroy++] = bnc;
num_partial_evictions++;
}
else {
BP_SWEEP_CLOCK(node,i);
......@@ -1133,21 +1146,21 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
for (int i = 0; i < node->n_children; i++) {
// Get rid of compressed stuff no matter what.
if (BP_STATE(node,i) == PT_COMPRESSED) {
if (num_partial_evictions++ == 0) {
size_before = ftnode_memory_size(node);
}
SUB_BLOCK sb = BSB(node, i);
toku_free(sb->compressed_ptr);
toku_free(sb);
pointers_to_free[num_pointers_to_free++] = sb->compressed_ptr;
pointers_to_free[num_pointers_to_free++] = sb;
set_BNULL(node, i);
BP_STATE(node,i) = PT_ON_DISK;
num_partial_evictions++;
}
else if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) {
if (num_partial_evictions++ == 0) {
size_before = ftnode_memory_size(node);
}
toku_evict_bn_from_memory(node, i, ft);
BASEMENTNODE bn = BLB(node, i);
basements_to_destroy[num_basements_to_destroy++] = bn;
toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
set_BNULL(node, i);
BP_STATE(node, i) = PT_ON_DISK;
num_partial_evictions++;
}
else {
BP_SWEEP_CLOCK(node,i);
......@@ -1162,19 +1175,34 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
}
}
exit:
// call the finalize callback with a new pair attr
PAIR_ATTR new_attr = make_ftnode_pair_attr(node);
finalize(new_attr, finalize_extra);
// destroy everything now that we've called finalize(),
// and, by contract, and it's safe to do expensive work.
for (int i = 0; i < num_basements_to_destroy; i++) {
destroy_basement_node(basements_to_destroy[i]);
}
for (int i = 0; i < num_buffers_to_destroy; i++) {
destroy_nonleaf_childinfo(buffers_to_destroy[i]);
}
for (int i = 0; i < num_pointers_to_free; i++) {
toku_free(pointers_to_free[i]);
}
// stats
if (num_partial_evictions > 0) {
long delta = size_before - ftnode_memory_size(node);
if (node->height == 0) {
long delta = old_attr.leaf_size - new_attr.leaf_size;
STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, num_partial_evictions);
STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF_BYTES, delta);
} else {
long delta = old_attr.nonleaf_size - new_attr.nonleaf_size;
STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF, num_partial_evictions);
STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF_BYTES, delta);
}
}
exit:
*new_attr = make_ftnode_pair_attr(node);
return 0;
}
......
......@@ -249,13 +249,14 @@ void toku_rollback_pe_est_callback(
// callback for partially evicting a cachetable entry
int toku_rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
void* UU(extraargs)
PAIR_ATTR old_attr,
void* UU(extraargs),
void (*finalize)(PAIR_ATTR new_attr, void * extra),
void *finalize_extra
)
{
assert(rollback_v != NULL);
*new_attr = old_attr;
finalize(old_attr, finalize_extra);
return 0;
}
......
......@@ -107,10 +107,11 @@ void toku_rollback_pe_est_callback(
);
int toku_rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
void* UU(extraargs)
) ;
PAIR_ATTR old_attr,
void* UU(extraargs),
void (*finalize)(PAIR_ATTR new_attr, void * extra),
void *finalize_extra
);
bool toku_rollback_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) ;
int toku_rollback_pf_callback(void* UU(ftnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep));
void toku_rollback_clone_callback(void* value_data, void** cloned_value_data, long* clone_size, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
......
......@@ -103,15 +103,16 @@ static int
pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = make_pair_attr(1);
if (check_pe_callback) {
pe_callback_called = true;
}
usleep(4*1024*1024);
finalize(make_pair_attr(1), finalize_extra);
return 0;
}
......
......@@ -155,15 +155,16 @@ static int
pe_callback (
void *ftnode_pv,
PAIR_ATTR UU(bytes_to_free),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = make_pair_attr(bytes_to_free.size-1);
expected_bytes_to_free--;
int* CAST_FROM_VOIDP(foo, ftnode_pv);
int blah = *foo;
*foo = blah-1;
finalize(make_pair_attr(bytes_to_free.size-1), finalize_extra);
return 0;
}
......@@ -171,10 +172,12 @@ static int
other_pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed __attribute__((__unused__)),
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
finalize(bytes_to_free, finalize_extra);
return 0;
}
......
......@@ -169,17 +169,18 @@ static int
pe_callback (
void *ftnode_pv,
PAIR_ATTR UU(bytes_to_free),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = make_pair_attr(bytes_to_free.size-1);
usleep(1*1024*1024);
if (verbose) printf("calling pe_callback\n");
expected_bytes_to_free--;
int* CAST_FROM_VOIDP(foo, ftnode_pv);
int blah = *foo;
*foo = blah-1;
finalize(make_pair_attr(bytes_to_free.size-1), finalize_extra);
return 0;
}
......@@ -187,11 +188,12 @@ static int
other_pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed __attribute__((__unused__)),
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = bytes_to_free;
finalize(bytes_to_free, finalize_extra);
return 0;
}
......
......@@ -164,12 +164,13 @@ static int
pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
assert(false);
*bytes_freed = bytes_to_free;
finalize(bytes_to_free, finalize_extra);
return 0;
}
......
......@@ -154,12 +154,13 @@ static int
pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = make_pair_attr(bytes_to_free.size-7);
sleep(2);
finalize(bytes_to_free, finalize_extra);
return 0;
}
......
......@@ -110,12 +110,13 @@ static int
pe_callback (
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR new_attr, void *extra),
void *finalize_extra
)
{
sleep(3);
*bytes_freed = make_pair_attr(bytes_to_free.size-7);
finalize(make_pair_attr(bytes_to_free.size - 7), finalize_extra);
return 0;
}
......
......@@ -146,7 +146,7 @@ test_prefetch_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,0) == PT_AVAIL);
assert(BP_STATE(dn,1) == PT_AVAIL);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_COMPRESSED);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
......@@ -168,7 +168,7 @@ test_prefetch_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_AVAIL);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
......@@ -190,7 +190,7 @@ test_prefetch_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_AVAIL);
assert(BP_STATE(dn,2) == PT_ON_DISK);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_ON_DISK);
......@@ -211,7 +211,7 @@ test_prefetch_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
......@@ -232,7 +232,7 @@ test_prefetch_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,0) == PT_AVAIL);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_ON_DISK);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_COMPRESSED);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_ON_DISK);
......@@ -292,11 +292,11 @@ test_subset_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
// need to call this twice because we had a subset read before, that touched the clock
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
......@@ -317,11 +317,11 @@ test_subset_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,1) == PT_AVAIL);
assert(BP_STATE(dn,2) == PT_AVAIL);
// need to call this twice because we had a subset read before, that touched the clock
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
......@@ -341,11 +341,11 @@ test_subset_read(int fd, FT_HANDLE UU(brt), FT brt_h) {
assert(BP_STATE(dn,1) == PT_AVAIL);
assert(BP_STATE(dn,2) == PT_ON_DISK);
// need to call this twice because we had a subset read before, that touched the clock
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_AVAIL);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_ON_DISK);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(dn,0) == PT_COMPRESSED);
assert(BP_STATE(dn,1) == PT_COMPRESSED);
assert(BP_STATE(dn,2) == PT_ON_DISK);
......
......@@ -154,12 +154,12 @@ test1(int fd, FT brt_h, FTNODE *dn) {
// should sweep and NOT get rid of anything
PAIR_ATTR attr;
memset(&attr,0,sizeof(attr));
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
// should sweep and get compress all
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
if (!is_leaf) {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
......@@ -172,12 +172,12 @@ test1(int fd, FT brt_h, FTNODE *dn) {
bool req = toku_ftnode_pf_req_callback(*dn, &bfe_all);
assert(req);
toku_ftnode_pf_callback(*dn, ndd, &bfe_all, fd, &size);
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
// should sweep and get compress all
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
if (!is_leaf) {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
......@@ -190,15 +190,15 @@ test1(int fd, FT brt_h, FTNODE *dn) {
req = toku_ftnode_pf_req_callback(*dn, &bfe_all);
assert(req);
toku_ftnode_pf_callback(*dn, ndd, &bfe_all, fd, &size);
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
(*dn)->dirty = 1;
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
......@@ -252,11 +252,11 @@ test2(int fd, FT brt_h, FTNODE *dn) {
assert(!BP_SHOULD_EVICT(*dn, 1));
PAIR_ATTR attr;
memset(&attr,0,sizeof(attr));
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(*dn, 0) == (is_leaf) ? PT_ON_DISK : PT_COMPRESSED);
assert(BP_STATE(*dn, 1) == PT_AVAIL);
assert(BP_SHOULD_EVICT(*dn, 1));
toku_ftnode_pe_callback(*dn, attr, &attr, brt_h);
toku_ftnode_pe_callback(*dn, attr, brt_h, def_pe_finalize_impl, nullptr);
assert(BP_STATE(*dn, 1) == (is_leaf) ? PT_ON_DISK : PT_COMPRESSED);
bool req = toku_ftnode_pf_req_callback(*dn, &bfe_subset);
......
......@@ -187,8 +187,7 @@ setup_dn(enum ftnode_verify_type bft, int fd, FT brt_h, FTNODE *dn, FTNODE_DISK_
// if read_none, get rid of the compressed bp's
if (bft == read_none) {
if ((*dn)->height == 0) {
PAIR_ATTR attr;
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
// assert all bp's are on disk
for (int i = 0; i < (*dn)->n_children; i++) {
if ((*dn)->height == 0) {
......@@ -213,14 +212,14 @@ setup_dn(enum ftnode_verify_type bft, int fd, FT brt_h, FTNODE *dn, FTNODE_DISK_
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_AVAIL);
}
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
// assert all bp's are still available, because we touched the clock
assert(BP_STATE(*dn,i) == PT_AVAIL);
// now assert all should be evicted
assert(BP_SHOULD_EVICT(*dn, i));
}
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), &attr, brt_h);
toku_ftnode_pe_callback(*dn, make_pair_attr(0xffffffff), brt_h, def_pe_finalize_impl, nullptr);
for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_COMPRESSED);
}
......
......@@ -260,8 +260,7 @@ doit (bool keep_other_bn_in_memory) {
assert(node->n_children == 2);
// a hack to get the basement nodes evicted
for (int i = 0; i < 20; i++) {
PAIR_ATTR attr;
toku_ftnode_pe_callback(node, make_pair_attr(0xffffffff), &attr, brt->ft);
toku_ftnode_pe_callback(node, make_pair_attr(0xffffffff), brt->ft, def_pe_finalize_impl, nullptr);
}
// this ensures that when we do the lookups below,
// that the data is read off disk
......
......@@ -369,8 +369,7 @@ doit (void) {
assert_zero(r);
toku_pin_node_with_min_bfe(&node, node_internal, t);
for (int i = 0; i < 20; i++) {
PAIR_ATTR attr;
toku_ftnode_pe_callback(node, make_pair_attr(0xffffffff), &attr, t->ft);
toku_ftnode_pe_callback(node, make_pair_attr(0xffffffff), t->ft, def_pe_finalize_impl, nullptr);
}
assert(BP_STATE(node,0) == PT_COMPRESSED);
toku_unpin_ftnode(t->ft, node);
......
......@@ -239,17 +239,21 @@ def_pe_est_callback(
}
static UU() int
def_pe_callback (
def_pe_callback(
void *ftnode_pv __attribute__((__unused__)),
PAIR_ATTR bytes_to_free __attribute__((__unused__)),
PAIR_ATTR* bytes_freed,
void* extraargs __attribute__((__unused__))
)
void* extraargs __attribute__((__unused__)),
void (*finalize)(PAIR_ATTR bytes_freed, void *extra),
void *finalize_extra
)
{
*bytes_freed = bytes_to_free;
finalize(bytes_to_free, finalize_extra);
return 0;
}
static UU() void
def_pe_finalize_impl(PAIR_ATTR UU(bytes_freed), void *UU(extra)) { }
static UU() bool def_pf_req_callback(void* UU(ftnode_pv), void* UU(read_extraargs)) {
return false;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment