Commit 75c75798 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5560, merge to main

git-svn-id: file:///svn/toku/tokudb@49128 c7de825b-a66e-492c-adef-691d508d4ae1
parent b26ea7f3
......@@ -61,6 +61,7 @@ set(FT_SOURCES
rollback
rollback-apply
rollback-ct-callbacks
rollback_log_node_cache
roll
sub_block
txn
......
......@@ -29,11 +29,11 @@ block_allocator_validate (BLOCK_ALLOCATOR ba) {
uint64_t i;
uint64_t n_bytes_in_use = ba->reserve_at_beginning;
for (i=0; i<ba->n_blocks; i++) {
n_bytes_in_use += ba->blocks_array[i].size;
if (i>0) {
assert(ba->blocks_array[i].offset > ba->blocks_array[i-1].offset);
assert(ba->blocks_array[i].offset >= ba->blocks_array[i-1].offset + ba->blocks_array[i-1].size );
}
n_bytes_in_use += ba->blocks_array[i].size;
if (i>0) {
assert(ba->blocks_array[i].offset > ba->blocks_array[i-1].offset);
assert(ba->blocks_array[i].offset >= ba->blocks_array[i-1].offset + ba->blocks_array[i-1].size );
}
}
assert(n_bytes_in_use == ba->n_bytes_in_use);
}
......@@ -49,7 +49,7 @@ void
block_allocator_print (BLOCK_ALLOCATOR ba) {
uint64_t i;
for (i=0; i<ba->n_blocks; i++) {
printf("%" PRId64 ":%" PRId64 " ", ba->blocks_array[i].offset, ba->blocks_array[i].size);
printf("%" PRId64 ":%" PRId64 " ", ba->blocks_array[i].offset, ba->blocks_array[i].size);
}
printf("\n");
VALIDATE(ba);
......@@ -80,13 +80,13 @@ destroy_block_allocator (BLOCK_ALLOCATOR *bap) {
static void
grow_blocks_array_by (BLOCK_ALLOCATOR ba, uint64_t n_to_add) {
if (ba->n_blocks + n_to_add > ba->blocks_array_size) {
uint64_t new_size = ba->n_blocks + n_to_add;
uint64_t at_least = ba->blocks_array_size * 2;
if (at_least > new_size) {
new_size = at_least;
}
ba->blocks_array_size = new_size;
XREALLOC_N(ba->blocks_array_size, ba->blocks_array);
uint64_t new_size = ba->n_blocks + n_to_add;
uint64_t at_least = ba->blocks_array_size * 2;
if (at_least > new_size) {
new_size = at_least;
}
ba->blocks_array_size = new_size;
XREALLOC_N(ba->blocks_array_size, ba->blocks_array);
}
}
......@@ -98,37 +98,37 @@ grow_blocks_array (BLOCK_ALLOCATOR ba) {
void
block_allocator_merge_blockpairs_into (uint64_t d, struct block_allocator_blockpair dst[/*d*/],
uint64_t s, const struct block_allocator_blockpair src[/*s*/])
uint64_t s, const struct block_allocator_blockpair src[/*s*/])
{
uint64_t tail = d+s;
while (d>0 && s>0) {
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair const *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
assert(tail>0);
if (dp->offset > sp->offset) {
*tp = *dp;
d--;
tail--;
} else {
*tp = *sp;
s--;
tail--;
}
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair const *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
assert(tail>0);
if (dp->offset > sp->offset) {
*tp = *dp;
d--;
tail--;
} else {
*tp = *sp;
s--;
tail--;
}
}
while (d>0) {
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *dp;
d--;
tail--;
struct block_allocator_blockpair *dp = &dst[d-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *dp;
d--;
tail--;
}
while (s>0) {
struct block_allocator_blockpair const *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *sp;
s--;
tail--;
struct block_allocator_blockpair const *sp = &src[s-1];
struct block_allocator_blockpair *tp = &dst[tail-1];
*tp = *sp;
s--;
tail--;
}
}
......@@ -148,13 +148,14 @@ block_allocator_alloc_blocks_at (BLOCK_ALLOCATOR ba, uint64_t n_blocks, struct b
VALIDATE(ba);
qsort(pairs, n_blocks, sizeof(*pairs), compare_blockpairs);
for (uint64_t i=0; i<n_blocks; i++) {
assert(pairs[i].offset >= ba->reserve_at_beginning);
assert(pairs[i].offset%ba->alignment == 0);
ba->n_bytes_in_use += pairs[i].size;
assert(pairs[i].offset >= ba->reserve_at_beginning);
assert(pairs[i].offset%ba->alignment == 0);
ba->n_bytes_in_use += pairs[i].size;
invariant(pairs[i].size > 0); //Allocator does not support size 0 blocks. See block_allocator_free_block.
}
grow_blocks_array_by(ba, n_blocks);
block_allocator_merge_blockpairs_into(ba->n_blocks, ba->blocks_array,
n_blocks, pairs);
n_blocks, pairs);
ba->n_blocks += n_blocks;
VALIDATE(ba);
}
......@@ -177,46 +178,47 @@ align (uint64_t value, BLOCK_ALLOCATOR ba)
void
block_allocator_alloc_block (BLOCK_ALLOCATOR ba, uint64_t size, uint64_t *offset) {
invariant(size > 0); //Allocator does not support size 0 blocks. See block_allocator_free_block.
grow_blocks_array(ba);
ba->n_bytes_in_use += size;
if (ba->n_blocks==0) {
assert(ba->n_bytes_in_use == ba->reserve_at_beginning + size); // we know exactly how many are in use
ba->blocks_array[0].offset = align(ba->reserve_at_beginning, ba);
ba->blocks_array[0].size = size;
*offset = ba->blocks_array[0].offset;
ba->n_blocks++;
return;
assert(ba->n_bytes_in_use == ba->reserve_at_beginning + size); // we know exactly how many are in use
ba->blocks_array[0].offset = align(ba->reserve_at_beginning, ba);
ba->blocks_array[0].size = size;
*offset = ba->blocks_array[0].offset;
ba->n_blocks++;
return;
}
// Implement first fit.
// Implement first fit.
{
uint64_t end_of_reserve = align(ba->reserve_at_beginning, ba);
if (end_of_reserve + size <= ba->blocks_array[0].offset ) {
// Check to see if the space immediately after the reserve is big enough to hold the new block.
struct block_allocator_blockpair *bp = &ba->blocks_array[0];
memmove(bp+1, bp, (ba->n_blocks)*sizeof(*bp));
bp[0].offset = end_of_reserve;
bp[0].size = size;
ba->n_blocks++;
*offset = end_of_reserve;
VALIDATE(ba);
return;
}
uint64_t end_of_reserve = align(ba->reserve_at_beginning, ba);
if (end_of_reserve + size <= ba->blocks_array[0].offset ) {
// Check to see if the space immediately after the reserve is big enough to hold the new block.
struct block_allocator_blockpair *bp = &ba->blocks_array[0];
memmove(bp+1, bp, (ba->n_blocks)*sizeof(*bp));
bp[0].offset = end_of_reserve;
bp[0].size = size;
ba->n_blocks++;
*offset = end_of_reserve;
VALIDATE(ba);
return;
}
}
for (uint64_t blocknum = 0; blocknum +1 < ba->n_blocks; blocknum ++) {
// Consider the space after blocknum
struct block_allocator_blockpair *bp = &ba->blocks_array[blocknum];
uint64_t this_offset = bp[0].offset;
uint64_t this_size = bp[0].size;
uint64_t answer_offset = align(this_offset + this_size, ba);
if (answer_offset + size > bp[1].offset) continue; // The block we want doesn't fit after this block.
// It fits, so allocate it here.
memmove(bp+2, bp+1, (ba->n_blocks - blocknum -1)*sizeof(*bp));
bp[1].offset = answer_offset;
bp[1].size = size;
ba->n_blocks++;
*offset = answer_offset;
VALIDATE(ba);
return;
// Consider the space after blocknum
struct block_allocator_blockpair *bp = &ba->blocks_array[blocknum];
uint64_t this_offset = bp[0].offset;
uint64_t this_size = bp[0].size;
uint64_t answer_offset = align(this_offset + this_size, ba);
if (answer_offset + size > bp[1].offset) continue; // The block we want doesn't fit after this block.
// It fits, so allocate it here.
memmove(bp+2, bp+1, (ba->n_blocks - blocknum -1)*sizeof(*bp));
bp[1].offset = answer_offset;
bp[1].size = size;
ba->n_blocks++;
*offset = answer_offset;
VALIDATE(ba);
return;
}
// It didn't fit anywhere, so fit it on the end.
assert(ba->n_blocks < ba->blocks_array_size);
......@@ -236,26 +238,31 @@ find_block (BLOCK_ALLOCATOR ba, uint64_t offset)
{
VALIDATE(ba);
if (ba->n_blocks==1) {
assert(ba->blocks_array[0].offset == offset);
return 0;
assert(ba->blocks_array[0].offset == offset);
return 0;
}
uint64_t lo = 0;
uint64_t hi = ba->n_blocks;
while (1) {
assert(lo<hi); // otherwise no such block exists.
uint64_t mid = (lo+hi)/2;
uint64_t thisoff = ba->blocks_array[mid].offset;
//printf("lo=%" PRId64 " hi=%" PRId64 " mid=%" PRId64 " thisoff=%" PRId64 " offset=%" PRId64 "\n", lo, hi, mid, thisoff, offset);
if (thisoff < offset) {
lo = mid+1;
} else if (thisoff > offset) {
hi = mid;
} else {
return mid;
}
assert(lo<hi); // otherwise no such block exists.
uint64_t mid = (lo+hi)/2;
uint64_t thisoff = ba->blocks_array[mid].offset;
//printf("lo=%" PRId64 " hi=%" PRId64 " mid=%" PRId64 " thisoff=%" PRId64 " offset=%" PRId64 "\n", lo, hi, mid, thisoff, offset);
if (thisoff < offset) {
lo = mid+1;
} else if (thisoff > offset) {
hi = mid;
} else {
return mid;
}
}
}
// To support 0-sized blocks, we need to include size as an input to this function.
// All 0-sized blocks at the same offset can be considered identical, but
// a 0-sized block can share offset with a non-zero sized block.
// The non-zero sized block is not exchangable with a zero sized block (or vice versa),
// so inserting 0-sized blocks can cause corruption here.
void
block_allocator_free_block (BLOCK_ALLOCATOR ba, uint64_t offset) {
VALIDATE(ba);
......@@ -278,8 +285,8 @@ uint64_t
block_allocator_allocated_limit (BLOCK_ALLOCATOR ba) {
if (ba->n_blocks==0) return ba->reserve_at_beginning;
else {
struct block_allocator_blockpair *last = &ba->blocks_array[ba->n_blocks-1];
return last->offset + last->size;
struct block_allocator_blockpair *last = &ba->blocks_array[ba->n_blocks-1];
return last->offset + last->size;
}
}
......@@ -290,15 +297,15 @@ block_allocator_get_nth_block_in_layout_order (BLOCK_ALLOCATOR ba, uint64_t b, u
// Return 0 if there is a block that big, return nonzero if b is too big.
{
if (b==0) {
*offset=0;
*size =ba->reserve_at_beginning;
return 0;
*offset=0;
*size =ba->reserve_at_beginning;
return 0;
} else if (b > ba->n_blocks) {
return -1;
return -1;
} else {
*offset=ba->blocks_array[b-1].offset;
*size =ba->blocks_array[b-1].size;
return 0;
*offset=ba->blocks_array[b-1].offset;
*size =ba->blocks_array[b-1].size;
return 0;
}
}
......
......@@ -5,7 +5,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include "ft-internal.h" // ugly but pragmatic, need access to dirty bits while holding translation lock
#include "ft-internal.h" // ugly but pragmatic, need access to dirty bits while holding translation lock
#include "fttypes.h"
#include "block_table.h"
#include "memory.h"
......@@ -145,8 +145,8 @@ copy_translation(struct translation * dst, struct translation * src, enum transl
dst->length_of_array = dst->smallest_never_used_blocknum.b;
XMALLOC_N(dst->length_of_array, dst->block_translation);
memcpy(dst->block_translation,
src->block_translation,
dst->length_of_array * sizeof(*dst->block_translation));
src->block_translation,
dst->length_of_array * sizeof(*dst->block_translation));
//New version of btt is not yet stored on disk.
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].size = 0;
dst->block_translation[RESERVED_BLOCKNUM_TRANSLATION].u.diskoff = diskoff_unused;
......@@ -241,7 +241,7 @@ cleanup_failed_checkpoint (BLOCK_TABLE bt) {
for (i = 0; i < t->length_of_array; i++) {
struct block_translation_pair *pair = &t->block_translation[i];
if (pair->size > 0 &&
if (pair->size > 0 &&
!translation_prevents_freeing(&bt->current, make_blocknum(i), pair) &&
!translation_prevents_freeing(&bt->checkpointed, make_blocknum(i), pair)) {
PRNTF("free", i, pair->size, pair->u.diskoff, bt);
......@@ -368,7 +368,7 @@ static int64_t
calculate_size_on_disk (struct translation *t) {
int64_t r = (8 + // smallest_never_used_blocknum
8 + // blocknum_freelist_head
t->smallest_never_used_blocknum.b * 16 + // Array
t->smallest_never_used_blocknum.b * 16 + // Array
4); // 4 for checksum
return r;
}
......@@ -400,18 +400,21 @@ PRNTF("Freed", b.b, old_pair.size, old_pair.u.diskoff, bt);
block_allocator_free_block(bt->block_allocator, old_pair.u.diskoff);
}
uint64_t allocator_offset;
//Allocate a new block
block_allocator_alloc_block(bt->block_allocator, size, &allocator_offset);
uint64_t allocator_offset = diskoff_unused;
t->block_translation[b.b].size = size;
if (size > 0) {
// Allocate a new block if the size is greater than 0,
// if the size is just 0, offset will be set to diskoff_unused
block_allocator_alloc_block(bt->block_allocator, size, &allocator_offset);
}
t->block_translation[b.b].u.diskoff = allocator_offset;
t->block_translation[b.b].size = size;
*offset = allocator_offset;
PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
//Update inprogress btt if appropriate (if called because Pending bit is set).
if (for_checkpoint) {
assert(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
assert(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
}
}
......@@ -630,15 +633,36 @@ toku_block_verify_no_free_blocknums(BLOCK_TABLE bt) {
assert(bt->current.blocknum_freelist_head.b == freelist_null.b);
}
// Frees blocknums that have a size of 0 and unused diskoff
// Currently used for eliminating unused cached rollback log nodes
void
toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root) {
lock_for_blocktable(bt);
int64_t smallest = bt->current.smallest_never_used_blocknum.b;
for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) {
if (i == root.b) {
continue;
}
BLOCKNUM b = make_blocknum(i);
if (bt->current.block_translation[b.b].size == 0) {
invariant(bt->current.block_translation[b.b].u.diskoff == diskoff_unused);
free_blocknum_in_translation(&bt->current, b);
}
}
unlock_for_blocktable(bt);
}
//Verify there are no data blocks except root.
void
toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root) {
toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) {
lock_for_blocktable(bt);
//Relies on checkpoint having used optimize_translation
assert(root.b >= RESERVED_BLOCKNUMS);
assert(bt->current.smallest_never_used_blocknum.b == root.b + 1);
int64_t i;
for (i=RESERVED_BLOCKNUMS; i < root.b; i++) {
int64_t smallest = bt->current.smallest_never_used_blocknum.b;
for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) {
if (i == root.b) {
continue;
}
BLOCKNUM b = make_blocknum(i);
assert(bt->current.block_translation[b.b].size == size_is_free);
}
......
......@@ -37,7 +37,8 @@ void toku_allocate_blocknum(BLOCK_TABLE bt, BLOCKNUM *res, FT h);
void toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT h);
void toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *b, FT h, bool for_checkpoint);
void toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b);
void toku_block_verify_no_data_blocks_except_root_unlocked(BLOCK_TABLE bt, BLOCKNUM root);
void toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root);
void toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root);
void toku_block_verify_no_free_blocknums(BLOCK_TABLE bt);
void toku_realloc_descriptor_on_disk(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h, int fd);
void toku_realloc_descriptor_on_disk_unlocked(BLOCK_TABLE bt, DISKOFF size, DISKOFF *offset, FT h);
......
......@@ -31,7 +31,6 @@
// so they are still easily available to the debugger and to save lots of typing.
static uint64_t cachetable_miss;
static uint64_t cachetable_misstime; // time spent waiting for disk read
static uint64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static uint64_t cachetable_evictions;
static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
......@@ -53,7 +52,6 @@ status_init(void) {
STATUS_INIT(CT_MISS, UINT64, "miss");
STATUS_INIT(CT_MISSTIME, UINT64, "miss time");
STATUS_INIT(CT_PUTS, UINT64, "puts (new nodes created)");
STATUS_INIT(CT_PREFETCHES, UINT64, "prefetches");
STATUS_INIT(CT_SIZE_CURRENT, UINT64, "size current");
STATUS_INIT(CT_SIZE_LIMIT, UINT64, "size limit");
......@@ -107,7 +105,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
}
STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions;
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions;
......@@ -780,9 +777,9 @@ void pair_init(PAIR p,
// Requires pair list's write lock to be held on entry.
// On exit, get pair with mutex held
//
static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value,
uint32_t fullhash,
static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value,
uint32_t fullhash,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback,
enum cachetable_dirty dirty) {
......@@ -798,13 +795,20 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
fullhash,
write_callback,
&ct->ev,
&ct->list);
&ct->list
);
ct->list.put(p);
ct->ev.add_pair_attr(attr);
return p;
}
static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
ct->list.put(p);
ct->ev.add_pair_attr(attr);
}
// has ct locked on entry
// This function MUST NOT release and reacquire the cachetable lock
// Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
......@@ -813,41 +817,27 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
//
static void cachetable_put_internal(
CACHEFILE cachefile,
CACHEKEY key,
uint32_t fullhash,
PAIR p,
void *value,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback,
CACHETABLE_PUT_CALLBACK put_callback
)
{
CACHETABLE ct = cachefile->cachetable;
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
invariant_null(p);
// flushing could change the table size, but wont' change the fullhash
cachetable_puts++;
p = cachetable_insert_at(
ct,
cachefile,
key,
value,
fullhash,
attr,
write_callback,
CACHETABLE_DIRTY
);
invariant_notnull(p);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
//note_hash_count(count);
//
//
// TODO: (Zardosht), make code run in debug only
//
//
//PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash);
//invariant_null(dummy_p);
cachetable_insert_pair_at(ct, p, attr);
invariant_notnull(put_callback);
put_callback(value, p);
}
// Pair mutex (p->mutex) is may or may not be held on entry,
// Holding the pair mutex on entry is not important
// Holding the pair mutex on entry is not important
// for performance or corrrectness
// Pair is pinned on entry
static void
......@@ -1061,10 +1051,41 @@ static void get_pairs(
}
}
// does NOT include the actual key and fullhash we eventually want
// a helper function for the two cachetable_put functions below
static inline PAIR malloc_and_init_pair(
CACHEFILE cachefile,
void *value,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback
)
{
CACHETABLE ct = cachefile->cachetable;
CACHEKEY dummy_key = {0};
uint32_t dummy_fullhash = 0;
PAIR XMALLOC(p);
memset(p, 0, sizeof *p);
pair_init(p,
cachefile,
dummy_key,
value,
attr,
CACHETABLE_DIRTY,
dummy_fullhash,
write_callback,
&ct->ev,
&ct->list
);
pair_lock(p);
p->value_rwlock.write_lock(true);
pair_unlock(p);
return p;
}
void toku_cachetable_put_with_dep_pairs(
CACHEFILE cachefile,
CACHEFILE cachefile,
CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
void *value,
void *value,
PAIR_ATTR attr,
CACHETABLE_WRITE_CALLBACK write_callback,
void *get_key_and_fullhash_extra,
......@@ -1088,15 +1109,18 @@ void toku_cachetable_put_with_dep_pairs(
if (ct->ev.should_client_wake_eviction_thread()) {
ct->ev.signal_eviction_thread();
}
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
ct->list.write_list_lock();
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
p->key.b = key->b;
p->fullhash = *fullhash;
cachetable_put_internal(
cachefile,
*key,
*fullhash,
p,
value,
attr,
write_callback,
put_callback
);
PAIR dependent_pairs[num_dependent_pairs];
......@@ -1141,14 +1165,16 @@ void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, v
if (ct->ev.should_client_wake_eviction_thread()) {
ct->ev.signal_eviction_thread();
}
PAIR p = malloc_and_init_pair(cachefile, value, attr, write_callback);
ct->list.write_list_lock();
p->key.b = key.b;
p->fullhash = fullhash;
cachetable_put_internal(
cachefile,
key,
fullhash,
p,
value,
attr,
write_callback,
put_callback
);
ct->list.write_list_unlock();
......@@ -4475,7 +4501,6 @@ void
toku_cachetable_helgrind_ignore(void) {
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
......
......@@ -528,7 +528,6 @@ uint64_t toku_cachefile_size(CACHEFILE cf);
typedef enum {
CT_MISS = 0,
CT_MISSTIME, // how many usec spent waiting for disk read because of cache miss
CT_PUTS, // how many times has a newly created node been put into the cachetable?
CT_PREFETCHES, // how many times has a block been prefetched into the cachetable?
CT_SIZE_CURRENT, // the sum of the sizes of the nodes represented in the cachetable
CT_SIZE_LIMIT, // the limit to the sum of the node sizes
......
......@@ -2064,6 +2064,7 @@ deserialize_and_upgrade_leaf_node(FTNODE node,
static int
read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
DISKOFF offset, DISKOFF size,
FT h,
struct rbuf *rb,
/* out */ int *layout_version_p);
......@@ -2085,9 +2086,17 @@ deserialize_and_upgrade_ftnode(FTNODE node,
// I. First we need to de-compress the entire node, only then can
// we read the different sub-sections.
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(bfe->h->blocktable,
blocknum,
&offset,
&size);
struct rbuf rb;
r = read_and_decompress_block_from_fd_into_rbuf(fd,
blocknum,
offset,
size,
bfe->h,
&rb,
&version);
......@@ -2838,15 +2847,13 @@ decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_blo
static int
read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
DISKOFF offset, DISKOFF size,
FT h,
struct rbuf *rb,
/* out */ int *layout_version_p) {
int r = 0;
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
uint8_t *XMALLOC_N(size, raw_block);
{
// read the (partially compressed) block
......@@ -2903,12 +2910,26 @@ int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash,
ROLLBACK_LOG_NODE *logp, FT h) {
toku_trace("deserial start");
int layout_version = 0;
int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
int layout_version = 0;
r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, h, &rb, &layout_version);
// get the file offset and block size for the block
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
// if the size is 0, then the blocknum is unused
if (size == 0) {
// blocknum is unused, just create an empty one and get out
ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log);
log->blocknum.b = blocknum.b;
log->hash = fullhash;
r = 0;
*logp = log;
goto cleanup;
}
r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, h, &rb, &layout_version);
if (r!=0) goto cleanup;
{
......
......@@ -20,6 +20,7 @@
#include "txn_manager.h"
#include <portability/toku_pthread.h>
#include <util/omt.h>
#include "rollback_log_node_cache.h"
using namespace toku;
// Locking for the logger
......@@ -93,6 +94,7 @@ struct tokulogger {
void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ...
void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted.
CACHEFILE rollback_cachefile;
rollback_log_node_cache rollback_cache;
TXN_MANAGER txn_manager;
};
......
......@@ -6,6 +6,7 @@
#include "includes.h"
#include "txn_manager.h"
#include "rollback_log_node_cache.h"
static const int log_format_version=TOKU_LOG_VERSION;
......@@ -176,19 +177,29 @@ bool toku_logger_rollback_is_open (TOKULOGGER logger) {
return logger->rollback_cachefile != NULL;
}
#define MAX_CACHED_ROLLBACK_NODES 4096
void
toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft) {
toku_free_unused_blocknums(ft->blocktable, ft->h->root_blocknum);
logger->rollback_cache.init(MAX_CACHED_ROLLBACK_NODES);
}
int
toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create) {
assert(logger->is_open);
assert(!logger->rollback_cachefile);
FT_HANDLE t = NULL; // Note, there is no DB associated with this BRT.
toku_ft_handle_create(&t);
int r = toku_ft_handle_open(t, ROLLBACK_CACHEFILE_NAME, create, create, cachetable, NULL_TXN);
assert_zero(r);
logger->rollback_cachefile = t->ft->cf;
toku_logger_initialize_rollback_cache(logger, t->ft);
//Verify it is empty
//Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(t->ft->blocktable, t->ft->h->root_blocknum);
toku_block_verify_no_data_blocks_except_root(t->ft->blocktable, t->ft->h->root_blocknum);
bool is_empty;
is_empty = toku_ft_is_empty_fast(t);
assert(is_empty);
......@@ -205,11 +216,13 @@ void toku_logger_close_rollback(TOKULOGGER logger) {
if (cf) {
FT_HANDLE ft_to_close;
{ //Find "brt"
logger->rollback_cache.destroy();
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
//Verify it is safe to close it.
assert(!ft->h->dirty); //Must not be dirty.
toku_free_unused_blocknums(ft->blocktable, ft->h->root_blocknum);
//Must have no data blocks (rollback logs or otherwise).
toku_block_verify_no_data_blocks_except_root_unlocked(ft->blocktable, ft->h->root_blocknum);
toku_block_verify_no_data_blocks_except_root(ft->blocktable, ft->h->root_blocknum);
assert(!ft->h->dirty);
ft_to_close = toku_ft_get_only_existing_ft_handle(ft);
{
......
......@@ -25,6 +25,7 @@ int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_open_with_last_xid(const char *directory, TOKULOGGER logger, TXNID last_xid);
void toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp);
void toku_logger_initialize_rollback_cache(TOKULOGGER logger, FT ft);
int toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, bool create);
void toku_logger_close_rollback(TOKULOGGER logger);
bool toku_logger_rollback_is_open (TOKULOGGER); // return true iff the rollback is open.
......
......@@ -33,8 +33,8 @@ void memarena_clear (MEMARENA ma) {
// Free the other bufs.
int i;
for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]);
ma->other_bufs[i]=0;
toku_free(ma->other_bufs[i]);
ma->other_bufs[i]=0;
}
ma->n_other_bufs=0;
// But reuse the main buffer
......@@ -54,25 +54,25 @@ round_to_page (size_t size) {
void* malloc_in_memarena (MEMARENA ma, size_t size) {
if (ma->buf_size < ma->buf_used + size) {
// The existing block isn't big enough.
// Add the block to the vector of blocks.
if (ma->buf) {
int old_n = ma->n_other_bufs;
REALLOC_N(old_n+1, ma->other_bufs);
assert(ma->other_bufs);
ma->other_bufs[old_n]=ma->buf;
ma->n_other_bufs = old_n+1;
// The existing block isn't big enough.
// Add the block to the vector of blocks.
if (ma->buf) {
int old_n = ma->n_other_bufs;
REALLOC_N(old_n+1, ma->other_bufs);
assert(ma->other_bufs);
ma->other_bufs[old_n]=ma->buf;
ma->n_other_bufs = old_n+1;
ma->size_of_other_bufs += ma->buf_size;
}
// Make a new one
{
size_t new_size = 2*ma->buf_size;
if (new_size<size) new_size=size;
new_size=round_to_page(new_size); // at least size, but round to the next page size
XMALLOC_N(new_size, ma->buf);
ma->buf_used = 0;
ma->buf_size = new_size;
}
}
// Make a new one
{
size_t new_size = 2*ma->buf_size;
if (new_size<size) new_size=size;
new_size=round_to_page(new_size); // at least size, but round to the next page size
XMALLOC_N(new_size, ma->buf);
ma->buf_used = 0;
ma->buf_size = new_size;
}
}
// allocate in the existing block.
char *result=ma->buf+ma->buf_used;
......@@ -89,12 +89,12 @@ void *memarena_memdup (MEMARENA ma, const void *v, size_t len) {
void memarena_close(MEMARENA *map) {
MEMARENA ma=*map;
if (ma->buf) {
toku_free(ma->buf);
ma->buf=0;
toku_free(ma->buf);
ma->buf=0;
}
int i;
for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]);
toku_free(ma->other_bufs[i]);
}
if (ma->other_bufs) toku_free(ma->other_bufs);
ma->other_bufs=0;
......@@ -116,15 +116,15 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) {
REALLOC_N(dest->n_other_bufs + source->n_other_bufs + 1, other_bufs);
#if TOKU_WINDOWS_32
if (other_bufs == 0) {
char **new_other_bufs;
char **new_other_bufs;
printf("_CrtCheckMemory:%d\n", _CrtCheckMemory());
printf("Z: move_counter:%d dest:%p %p %d source:%p %p %d errno:%d\n",
move_counter,
dest, dest->other_bufs, dest->n_other_bufs,
source, source->other_bufs, source->n_other_bufs,
errno);
new_other_bufs = toku_malloc((dest->n_other_bufs + source->n_other_bufs + 1)*sizeof (char **));
printf("new_other_bufs=%p errno=%d\n", new_other_bufs, errno);
new_other_bufs = toku_malloc((dest->n_other_bufs + source->n_other_bufs + 1)*sizeof (char **));
printf("new_other_bufs=%p errno=%d\n", new_other_bufs, errno);
}
#endif
......@@ -134,7 +134,7 @@ void memarena_move_buffers(MEMARENA dest, MEMARENA source) {
assert(other_bufs);
dest->other_bufs = other_bufs;
for (i=0; i<source->n_other_bufs; i++) {
dest->other_bufs[dest->n_other_bufs++] = source->other_bufs[i];
dest->other_bufs[dest->n_other_bufs++] = source->other_bufs[i];
}
dest->other_bufs[dest->n_other_bufs++] = source->buf;
source->n_other_bufs = 0;
......
......@@ -420,6 +420,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re
toku_ft_handle_create(&t);
r = toku_ft_handle_open_recovery(t, ROLLBACK_CACHEFILE_NAME, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn);
renv->logger->rollback_cachefile = t->ft->cf;
toku_logger_initialize_rollback_cache(renv->logger, t->ft);
} else {
r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn);
assert(r==0);
......
......@@ -122,7 +122,18 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
txn->roll_info.spilled_rollback_head_hash = next_log_hash;
}
}
toku_rollback_log_unpin_and_remove(txn, log);
bool give_back = false;
// each txn tries to give back at most one rollback log node
// to the cache.
if (next_log.b == ROLLBACK_NONE.b) {
give_back = txn->logger->rollback_cache.give_rollback_log_node(
txn,
log
);
}
if (!give_back) {
toku_rollback_log_unpin_and_remove(txn, log);
}
}
return r;
}
......@@ -183,7 +194,17 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
}
toku_rollback_log_unpin_and_remove(txn, child_log);
// each txn tries to give back at most one rollback log node
// to the cache. All other rollback log nodes for this child
// transaction are included in the parent's rollback log,
// so this is the only node we can give back to the cache
bool give_back = txn->logger->rollback_cache.give_rollback_log_node(
txn,
child_log
);
if (!give_back) {
toku_rollback_log_unpin_and_remove(txn, child_log);
}
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
......
......@@ -14,37 +14,58 @@
#include "rollback.h"
// Address used as a sentinel. Otherwise unused.
static struct serialized_rollback_log_node cloned_rollback;
// Cleanup the rollback memory
static void
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
make_rollback_log_empty(log);
toku_free(log);
}
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
bool write_me, bool keep_me, bool for_checkpoint, bool is_clone) {
ROLLBACK_LOG_NODE log = nullptr;
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
if (is_clone) {
CAST_FROM_VOIDP(serialized, rollback_v);
invariant(serialized->blocknum.b == logname.b);
// flush an ununused log to disk, by allocating a size 0 blocknum in
// the blocktable
static void
toku_rollback_flush_unused_log(
ROLLBACK_LOG_NODE log,
BLOCKNUM logname,
int fd,
FT ft,
bool write_me,
bool keep_me,
bool for_checkpoint,
bool is_clone
)
{
if (write_me) {
DISKOFF offset;
toku_blocknum_realloc_on_disk(ft->blocktable, logname, 0, &offset,
ft, fd, for_checkpoint);
}
else {
CAST_FROM_VOIDP(log, rollback_v);
invariant(log->blocknum.b == logname.b);
if (!keep_me && !is_clone) {
toku_free(log);
}
FT CAST_FROM_VOIDP(h, extraargs);
}
// flush a used log to disk by serializing and writing the node out
static void
toku_rollback_flush_used_log (
ROLLBACK_LOG_NODE log,
SERIALIZED_ROLLBACK_LOG_NODE serialized,
int fd,
FT ft,
bool write_me,
bool keep_me,
bool for_checkpoint,
bool is_clone
)
{
if (write_me) {
assert(h->cf == cachefile);
int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, h, for_checkpoint);
int r = toku_serialize_rollback_log_to(fd, log, serialized, is_clone, ft, for_checkpoint);
assert(r == 0);
}
*new_size = size;
if (!keep_me) {
if (is_clone) {
toku_serialized_rollback_log_destroy(serialized);
......@@ -55,12 +76,69 @@ void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname
}
}
// Write something out. Keep trying even if partial writes occur.
// On error: Return negative with errno set.
// On success return nbytes.
void toku_rollback_flush_callback (
CACHEFILE UU(cachefile),
int fd,
BLOCKNUM logname,
void *rollback_v,
void** UU(disk_data),
void *extraargs,
PAIR_ATTR size,
PAIR_ATTR* new_size,
bool write_me,
bool keep_me,
bool for_checkpoint,
bool is_clone
)
{
ROLLBACK_LOG_NODE log = nullptr;
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
bool is_unused = false;
if (is_clone) {
is_unused = (rollback_v == &cloned_rollback);
CAST_FROM_VOIDP(serialized, rollback_v);
}
else {
CAST_FROM_VOIDP(log, rollback_v);
is_unused = rollback_log_is_unused(log);
}
*new_size = size;
FT ft;
CAST_FROM_VOIDP(ft, extraargs);
if (is_unused) {
toku_rollback_flush_unused_log(
log,
logname,
fd,
ft,
write_me,
keep_me,
for_checkpoint,
is_clone
);
}
else {
toku_rollback_flush_used_log(
log,
serialized,
fd,
ft,
write_me,
keep_me,
for_checkpoint,
is_clone
);
}
}
int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r;
FT CAST_FROM_VOIDP(h, extraargs);
assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
if (r==0) {
......@@ -130,11 +208,15 @@ void toku_rollback_clone_callback(
)
{
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(log, value_data);
SERIALIZED_ROLLBACK_LOG_NODE XMALLOC(serialized);
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized);
SERIALIZED_ROLLBACK_LOG_NODE serialized = nullptr;
if (!rollback_log_is_unused(log)) {
XMALLOC(serialized);
toku_serialize_rollback_log_to_memory_uncompressed(log, serialized);
*cloned_value_data = serialized;
}
else {
*cloned_value_data = &cloned_rollback;
}
new_attr->is_valid = false;
*cloned_value_data = serialized;
}
......@@ -6,18 +6,18 @@
#include "includes.h"
#include "rollback-ct-callbacks.h"
#include <inttypes.h>
static void rollback_unpin_remove_callback(CACHEKEY* cachekey, bool for_checkpoint, void* extra) {
FT CAST_FROM_VOIDP(h, extra);
toku_free_blocknum(
h->blocktable,
h->blocktable,
cachekey,
h,
for_checkpoint
);
}
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r;
CACHEFILE cf = txn->logger->rollback_cachefile;
......@@ -46,19 +46,21 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len)
static inline PAIR_ATTR make_rollback_pair_attr(long size) {
PAIR_ATTR result={
.size = size,
.nonleaf_size = 0,
.leaf_size = 0,
.rollback_size = size,
.nonleaf_size = 0,
.leaf_size = 0,
.rollback_size = size,
.cache_pressure_size = 0,
.is_valid = true
};
return result;
};
return result;
}
PAIR_ATTR
rollback_memory_size(ROLLBACK_LOG_NODE log) {
size_t size = sizeof(*log);
size += memarena_total_memory_size(log->rollentry_arena);
if (log->rollentry_arena) {
size += memarena_total_memory_size(log->rollentry_arena);
}
return make_rollback_pair_attr(size);
}
......@@ -67,37 +69,78 @@ static void toku_rollback_node_save_ct_pair(void *value_data, PAIR p) {
log->ct_pair = p;
}
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previous_hash, ROLLBACK_LOG_NODE *result) {
ROLLBACK_LOG_NODE MALLOC(log);
assert(log);
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
//
// initializes an empty rollback log node
// Does not touch the blocknum or hash, that is the
// responsibility of the caller
//
void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
// Having a txnid set to TXNID_NONE is how we determine if the
// rollback log node is empty or in use.
log->txnid = TXNID_NONE;
log->layout_version = FT_LAYOUT_VERSION;
log->layout_version_original = FT_LAYOUT_VERSION;
log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
log->dirty = true;
log->sequence = 0;
log->previous = {0};
log->previous_hash = 0;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = NULL;
log->rollentry_resident_bytecount = 0;
}
static void rollback_initialize_for_txn(
ROLLBACK_LOG_NODE log,
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash
)
{
log->txnid = txn->txnid64;
log->sequence = txn->roll_info.num_rollback_nodes++;
toku_allocate_blocknum(h->blocktable, &log->blocknum, h);
log->hash = toku_cachetable_hash(cf, log->blocknum);
log->previous = previous;
log->previous = previous;
log->previous_hash = previous_hash;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = memarena_create();
log->rollentry_resident_bytecount = 0;
}
void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
rollback_empty_log_init(log);
}
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash,
ROLLBACK_LOG_NODE *result
)
{
ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log);
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
rollback_initialize_for_txn(log, txn, previous, previous_hash);
toku_allocate_blocknum(ft->blocktable, &log->blocknum, ft);
log->hash = toku_cachetable_hash(ft->cf, log->blocknum);
*result = log;
toku_cachetable_put(cf, log->blocknum, log->hash,
log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(h),
get_write_callbacks_for_rollback_log(ft),
toku_rollback_node_save_ct_pair);
txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback_hash = log->hash;
}
......@@ -211,26 +254,50 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
PL_WRITE_EXPENSIVE, // lock_type
PL_WRITE_CHEAP, // lock_type
h,
0, NULL, NULL, NULL, NULL
);
assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
assert(pinned_log->blocknum.b == blocknum.b);
assert(pinned_log->hash == hash);
*log = pinned_log;
}
void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
ROLLBACK_LOG_NODE pinned_log;
ROLLBACK_LOG_NODE pinned_log = NULL;
invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);
} else {
// create a new log for this transaction to use.
// this call asserts success internally
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
// For each transaction, we try to acquire the first rollback log
// from the rollback log node cache, so that we avoid
// putting something new into the cachetable. However,
// if transaction has spilled rollbacks, that means we
// have already done a lot of work for this transaction,
// and subsequent rollback log nodes are created
// and put into the cachetable. The idea is for
// transactions that don't do a lot of work to (hopefully)
// get a rollback log node from a cache, as opposed to
// taking the more expensive route of creating a new one.
if (!txn_has_spilled_rollback_logs(txn)) {
txn->logger->rollback_cache.get_rollback_log_node(txn, &pinned_log);
if (pinned_log != NULL) {
rollback_initialize_for_txn(
pinned_log,
txn,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.spilled_rollback_tail_hash
);
txn->roll_info.current_rollback = pinned_log->blocknum;
txn->roll_info.current_rollback_hash = pinned_log->hash;
}
}
if (pinned_log == NULL) {
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
}
}
assert(pinned_log->txnid == txn->txnid64);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
......
......@@ -101,5 +101,12 @@ toku_serialized_rollback_log_destroy(SERIALIZED_ROLLBACK_LOG_NODE log) {
toku_free(log);
}
void rollback_empty_log_init(ROLLBACK_LOG_NODE log);
void make_rollback_log_empty(ROLLBACK_LOG_NODE log);
static inline bool rollback_log_is_unused(ROLLBACK_LOG_NODE log) {
return (log->txnid == TXNID_NONE);
}
#endif // TOKU_ROLLBACK_H
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: rollback.cc 49033 2012-10-17 18:48:30Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "rollback_log_node_cache.h"
void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
XMALLOC_N(max_num_avail_nodes, m_avail_blocknums);
XMALLOC_N(max_num_avail_nodes, m_hashes);
m_max_num_avail = max_num_avail_nodes;
m_first = 0;
m_num_avail = 0;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
toku_mutex_init(&m_mutex, &attr);
}
void rollback_log_node_cache::destroy() {
toku_mutex_destroy(&m_mutex);
toku_free(m_avail_blocknums);
toku_free(m_hashes);
}
// returns true if rollback log node was successfully added,
// false otherwise
bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log){
bool retval = false;
toku_mutex_lock(&m_mutex);
if (m_num_avail < m_max_num_avail) {
retval = true;
uint32_t index = m_first + m_num_avail;
if (index > m_max_num_avail) {
index -= m_max_num_avail;
}
m_avail_blocknums[index].b = log->blocknum.b;
m_hashes[index] = log->hash;
m_num_avail++;
}
toku_mutex_unlock(&m_mutex);
//
// now unpin the rollback log node
//
if (retval) {
make_rollback_log_empty(log);
toku_rollback_log_unpin(txn, log);
}
return retval;
}
// if a rollback log node is available, will set log to it,
// otherwise, will set log to NULL and caller is on his own
// for getting a rollback log node
void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){
BLOCKNUM b = ROLLBACK_NONE;
uint32_t hash;
toku_mutex_lock(&m_mutex);
if (m_num_avail > 0) {
b.b = m_avail_blocknums[m_first].b;
hash = m_hashes[m_first];
m_num_avail--;
if (++m_first >= m_max_num_avail) {
m_first = 0;
}
}
toku_mutex_unlock(&m_mutex);
if (b.b != ROLLBACK_NONE.b) {
toku_get_and_pin_rollback_log(txn, b, hash, log);
invariant(rollback_log_is_unused(*log));
} else {
*log = NULL;
}
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef TOKU_ROLLBACK_LOG_NODE_CACHE_H
#define TOKU_ROLLBACK_LOG_NODE_CACHE_H
#ident "$Id: rollback.h 49033 2012-10-17 18:48:30Z zardosht $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
class rollback_log_node_cache {
public:
void init (uint32_t max_num_avail_nodes);
void destroy();
// returns true if rollback log node was successfully added,
// false otherwise
bool give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// if a rollback log node is available, will set log to it,
// otherwise, will set log to NULL and caller is on his own
// for getting a rollback log node
void get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log);
private:
BLOCKNUM* m_avail_blocknums;
uint32_t* m_hashes;
uint32_t m_first;
uint32_t m_num_avail;
uint32_t m_max_num_avail;
toku_mutex_t m_mutex;
};
ENSURE_POD(rollback_log_node_cache);
#endif // TOKU_ROLLBACK_LOG_NODE_CACHE_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment