Commit 024a5647 authored by Rusty Russell's avatar Rusty Russell

tdb2: use counters to decide when to coalesce records.

This simply uses a 7 bit counter which gets incremented on each addition
to the list (but not decremented on removals).  When it wraps, we walk the
entire list looking for things to coalesce.

This causes performance problems, especially when appending records, so
we limit it in the next patch:

Before:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real	0m59.687s
user	0m11.593s
sys	0m4.100s
-rw------- 1 rusty rusty 752004064 2011-04-27 21:14 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real	1m17.738s
user	0m0.348s
sys	0m0.580s
-rw------- 1 rusty rusty 663360 2011-04-27 21:15 torture.tdb
Adding 2000000 records:  926 ns (110556088 bytes)
Finding 2000000 records:  592 ns (110556088 bytes)
Missing 2000000 records:  416 ns (110556088 bytes)
Traversing 2000000 records:  422 ns (110556088 bytes)
Deleting 2000000 records:  741 ns (244003768 bytes)
Re-adding 2000000 records:  799 ns (244003768 bytes)
Appending 2000000 records:  1147 ns (295244592 bytes)
Churning 2000000 records:  1827 ns (568411440 bytes)

After:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real	1m17.022s
user	0m27.206s
sys	0m3.920s
-rw------- 1 rusty rusty 570130576 2011-04-27 21:17 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real	1m27.355s
user	0m0.296s
sys	0m0.516s
-rw------- 1 rusty rusty 617352 2011-04-27 21:18 torture.tdb
Adding 2000000 records:  890 ns (110556088 bytes)
Finding 2000000 records:  565 ns (110556088 bytes)
Missing 2000000 records:  390 ns (110556088 bytes)
Traversing 2000000 records:  410 ns (110556088 bytes)
Deleting 2000000 records:  8623 ns (244003768 bytes)
Re-adding 2000000 records:  7089 ns (244003768 bytes)
Appending 2000000 records:  33708 ns (244003768 bytes)
Churning 2000000 records:  2029 ns (268404160 bytes)
parent a8b30ad4
......@@ -533,11 +533,13 @@ static enum TDB_ERROR check_free_table(struct tdb_context *tdb,
h = bucket_off(ftable_off, i);
for (off = tdb_read_off(tdb, h); off; off = f.next) {
if (!first)
first = off;
if (TDB_OFF_IS_ERR(off)) {
return off;
}
if (!first) {
off &= TDB_OFF_MASK;
first = off;
}
ecode = tdb_read_convert(tdb, off, &f, sizeof(f));
if (ecode != TDB_SUCCESS) {
return ecode;
......
......@@ -109,7 +109,7 @@ static void check_list(struct tdb_context *tdb, tdb_off_t b_off)
tdb_off_t off, prev = 0, first;
struct tdb_free_record r;
first = off = tdb_read_off(tdb, b_off);
first = off = (tdb_read_off(tdb, b_off) & TDB_OFF_MASK);
while (off != 0) {
tdb_read_convert(tdb, off, &r, sizeof(r));
if (frec_magic(&r) != TDB_FREE_MAGIC)
......@@ -150,17 +150,21 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
/* If prev->next == 0, we were head: update bucket to point to next. */
if (prev_next == 0) {
#ifdef CCAN_TDB2_DEBUG
if (tdb_read_off(tdb, b_off) != r_off) {
/* We must preserve upper bits. */
head = tdb_read_off(tdb, b_off);
if (TDB_OFF_IS_ERR(head))
return head;
if ((head & TDB_OFF_MASK) != r_off) {
return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
"remove_from_list:"
" %llu head %llu on list %llu",
(long long)r_off,
(long long)tdb_read_off(tdb, b_off),
(long long)head,
(long long)b_off);
}
#endif
ecode = tdb_write_off(tdb, b_off, r->next);
head = ((head & ~TDB_OFF_MASK) | r->next);
ecode = tdb_write_off(tdb, b_off, head);
if (ecode != TDB_SUCCESS)
return ecode;
} else {
......@@ -175,6 +179,7 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
head = tdb_read_off(tdb, b_off);
if (TDB_OFF_IS_ERR(head))
return head;
head &= TDB_OFF_MASK;
off = head + offsetof(struct tdb_free_record, magic_and_prev);
} else {
/* off = &r->next->prev */
......@@ -195,26 +200,29 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
return tdb_write_off(tdb, off, r->magic_and_prev);
}
/* Enqueue in this free bucket. */
/* Enqueue in this free bucket: sets coalesce if we've added 128
* entries to it. */
static enum TDB_ERROR enqueue_in_free(struct tdb_context *tdb,
tdb_off_t b_off,
tdb_off_t off,
tdb_len_t len)
tdb_len_t len,
bool *coalesce)
{
struct tdb_free_record new;
enum TDB_ERROR ecode;
tdb_off_t prev;
tdb_off_t prev, head;
uint64_t magic = (TDB_FREE_MAGIC << (64 - TDB_OFF_UPPER_STEAL));
head = tdb_read_off(tdb, b_off);
if (TDB_OFF_IS_ERR(head))
return head;
/* We only need to set ftable_and_len; rest is set in enqueue_in_free */
new.ftable_and_len = ((uint64_t)tdb->ftable << (64 - TDB_OFF_UPPER_STEAL))
| len;
/* new->next = head. */
new.next = tdb_read_off(tdb, b_off);
if (TDB_OFF_IS_ERR(new.next)) {
return new.next;
}
new.next = (head & TDB_OFF_MASK);
/* First element? Prev points to ourselves. */
if (!new.next) {
......@@ -255,65 +263,23 @@ static enum TDB_ERROR enqueue_in_free(struct tdb_context *tdb,
}
#endif
}
/* head = new */
ecode = tdb_write_off(tdb, b_off, off);
if (ecode != TDB_SUCCESS) {
return ecode;
}
return tdb_write_convert(tdb, off, &new, sizeof(new));
}
/* Update enqueue count, but don't set high bit: see TDB_OFF_IS_ERR */
if (*coalesce)
head += (1ULL << (64 - TDB_OFF_UPPER_STEAL));
head &= ~(TDB_OFF_MASK | (1ULL << 63));
head |= off;
/* List need not be locked. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len_with_header,
enum tdb_lock_flags waitflag)
{
tdb_off_t b_off;
tdb_len_t len;
enum TDB_ERROR ecode;
assert(len_with_header >= sizeof(struct tdb_free_record));
len = len_with_header - sizeof(struct tdb_used_record);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
ecode = tdb_write_off(tdb, b_off, head);
if (ecode != TDB_SUCCESS) {
return ecode;
}
ecode = enqueue_in_free(tdb, b_off, off, len);
check_list(tdb, b_off);
tdb_unlock_free_bucket(tdb, b_off);
return ecode;
}
static size_t adjust_size(size_t keylen, size_t datalen)
{
size_t size = keylen + datalen;
if (size < TDB_MIN_DATA_LEN)
size = TDB_MIN_DATA_LEN;
/* Round to next uint64_t boundary. */
return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
}
/* If we have enough left over to be useful, split that off. */
static size_t record_leftover(size_t keylen, size_t datalen,
bool want_extra, size_t total_len)
{
ssize_t leftover;
if (want_extra)
datalen += datalen / 2;
leftover = total_len - adjust_size(keylen, datalen);
if (leftover < (ssize_t)sizeof(struct tdb_free_record))
return 0;
/* It's time to coalesce if counter wrapped. */
if (*coalesce)
*coalesce = ((head & ~TDB_OFF_MASK) == 0);
return leftover;
return tdb_write_convert(tdb, off, &new, sizeof(new));
}
static tdb_off_t ftable_offset(struct tdb_context *tdb, unsigned int ftable)
......@@ -334,13 +300,12 @@ static tdb_off_t ftable_offset(struct tdb_context *tdb, unsigned int ftable)
return off;
}
/* Note: we unlock the current bucket if fail (-ve), or coalesce (-ve) and
* need to blatt either of the *protect records (which is set to an error). */
/* Note: we unlock the current bucket if fail (-ve), or coalesce (+ve) and
* need to blatt the *protect record (which is set to an error). */
static tdb_len_t coalesce(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t b_off,
tdb_len_t data_len,
tdb_off_t *protect1,
tdb_off_t *protect2)
tdb_off_t *protect)
{
tdb_off_t end;
struct tdb_free_record rec;
......@@ -405,8 +370,8 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
}
/* Did we just mess up a record you were hoping to use? */
if (end == *protect1 || end == *protect2)
*protect1 = TDB_ERR_NOEXIST;
if (end == *protect)
*protect = TDB_ERR_NOEXIST;
ecode = remove_from_list(tdb, nb_off, end, &rec);
check_list(tdb, nb_off);
......@@ -425,8 +390,8 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
return 0;
/* Before we expand, check this isn't one you wanted protected? */
if (off == *protect1 || off == *protect2)
*protect1 = TDB_ERR_EXISTS;
if (off == *protect)
*protect = TDB_ERR_EXISTS;
/* OK, expand initial record */
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
......@@ -447,11 +412,11 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
goto err;
}
/* Try locking violation first... */
ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT);
/* Try locking violation first. We don't allow coalesce recursion! */
ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT, false);
if (ecode != TDB_SUCCESS) {
/* Need to drop lock. Can't rely on anything stable. */
*protect1 = TDB_ERR_CORRUPT;
*protect = TDB_ERR_CORRUPT;
/* We have to drop this to avoid deadlocks, so make sure record
* doesn't get coalesced by someone else! */
......@@ -469,11 +434,12 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
tdb->stats.alloc_coalesce_succeeded++;
tdb_unlock_free_bucket(tdb, b_off);
ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT);
ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT,
false);
if (ecode != TDB_SUCCESS) {
return ecode;
}
} else if (TDB_OFF_IS_ERR(*protect1)) {
} else if (TDB_OFF_IS_ERR(*protect)) {
/* For simplicity, we always drop lock if they can't continue */
tdb_unlock_free_bucket(tdb, b_off);
}
......@@ -487,6 +453,109 @@ err:
return ecode;
}
/* List is locked: we unlock it. */
static enum TDB_ERROR coalesce_list(struct tdb_context *tdb,
tdb_off_t ftable_off, tdb_off_t b_off)
{
enum TDB_ERROR ecode;
tdb_off_t off;
off = tdb_read_off(tdb, b_off);
if (TDB_OFF_IS_ERR(off)) {
ecode = off;
goto unlock_err;
}
/* A little bit of paranoia */
off &= TDB_OFF_MASK;
while (off) {
struct tdb_free_record rec;
tdb_len_t coal;
tdb_off_t next;
ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
if (ecode != TDB_SUCCESS)
goto unlock_err;
next = rec.next;
coal = coalesce(tdb, off, b_off, frec_len(&rec), &next);
if (TDB_OFF_IS_ERR(coal)) {
/* This has already unlocked on error. */
return coal;
}
if (TDB_OFF_IS_ERR(next)) {
/* Coalescing had to unlock, so stop. */
return TDB_SUCCESS;
}
off = next;
}
tdb_unlock_free_bucket(tdb, b_off);
return TDB_SUCCESS;
unlock_err:
tdb_unlock_free_bucket(tdb, b_off);
return ecode;
}
/* List must not be locked if coalesce_ok is set. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len_with_header,
enum tdb_lock_flags waitflag,
bool coalesce)
{
tdb_off_t b_off;
tdb_len_t len;
enum TDB_ERROR ecode;
assert(len_with_header >= sizeof(struct tdb_free_record));
len = len_with_header - sizeof(struct tdb_used_record);
b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
if (ecode != TDB_SUCCESS) {
return ecode;
}
ecode = enqueue_in_free(tdb, b_off, off, len, &coalesce);
check_list(tdb, b_off);
/* Coalescing unlocks free list. */
if (!ecode && coalesce)
ecode = coalesce_list(tdb, tdb->ftable_off, b_off);
else
tdb_unlock_free_bucket(tdb, b_off);
return ecode;
}
static size_t adjust_size(size_t keylen, size_t datalen)
{
size_t size = keylen + datalen;
if (size < TDB_MIN_DATA_LEN)
size = TDB_MIN_DATA_LEN;
/* Round to next uint64_t boundary. */
return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
}
/* If we have enough left over to be useful, split that off. */
static size_t record_leftover(size_t keylen, size_t datalen,
bool want_extra, size_t total_len)
{
ssize_t leftover;
if (want_extra)
datalen += datalen / 2;
leftover = total_len - adjust_size(keylen, datalen);
if (leftover < (ssize_t)sizeof(struct tdb_free_record))
return 0;
return leftover;
}
/* We need size bytes to put our key and data in. */
static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
tdb_off_t ftable_off,
......@@ -499,12 +568,10 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
tdb_off_t off, b_off,best_off;
struct tdb_free_record best = { 0 };
double multiplier;
bool coalesce_after_best = false; /* Damn GCC warning! */
size_t size = adjust_size(keylen, datalen);
enum TDB_ERROR ecode;
tdb->stats.allocs++;
again:
b_off = bucket_off(ftable_off, bucket);
/* FIXME: Try non-blocking wait first, to measure contention. */
......@@ -530,10 +597,11 @@ again:
ecode = off;
goto unlock_err;
}
off &= TDB_OFF_MASK;
while (off) {
const struct tdb_free_record *r;
tdb_len_t len, coal;
tdb_len_t len;
tdb_off_t next;
r = tdb_access_read(tdb, off, sizeof(*r), true);
......@@ -555,7 +623,6 @@ again:
if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
best_off = off;
best = *r;
coalesce_after_best = false;
}
if (frec_len(&best) <= size * multiplier && best_off) {
......@@ -568,19 +635,6 @@ again:
next = r->next;
len = frec_len(r);
tdb_access_release(tdb, r);
/* Since we're going slow anyway, try coalescing here. */
coal = coalesce(tdb, off, b_off, len, &best_off, &next);
if (TDB_OFF_IS_ERR(coal)) {
/* This has already unlocked on error. */
return coal;
}
if (TDB_OFF_IS_ERR(best_off)) {
/* This has unlocked list, restart. */
goto again;
}
if (coal > 0)
coalesce_after_best = true;
off = next;
}
......@@ -589,14 +643,6 @@ again:
struct tdb_used_record rec;
size_t leftover;
/* If we coalesced, we might have change prev/next ptrs. */
if (coalesce_after_best) {
ecode = tdb_read_convert(tdb, best_off, &best,
sizeof(best));
if (ecode != TDB_SUCCESS)
goto unlock_err;
}
/* We're happy with this size: take it. */
ecode = remove_from_list(tdb, b_off, best_off, &best);
check_list(tdb, b_off);
......@@ -637,7 +683,7 @@ again:
ecode = add_free_record(tdb,
best_off + sizeof(rec)
+ frec_len(&best) - leftover,
leftover, TDB_LOCK_WAIT);
leftover, TDB_LOCK_WAIT, false);
if (ecode != TDB_SUCCESS) {
best_off = ecode;
}
......@@ -811,7 +857,7 @@ static enum TDB_ERROR tdb_expand(struct tdb_context *tdb, tdb_len_t size)
tdb_unlock_expand(tdb, F_WRLCK);
tdb->stats.expands++;
return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT);
return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT, true);
}
/* This won't fail: it will expand the database if it has to. */
......
......@@ -466,7 +466,8 @@ tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
/* Put this record in a free list. */
enum TDB_ERROR add_free_record(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len_with_header,
enum tdb_lock_flags waitflag);
enum tdb_lock_flags waitflag,
bool coalesce_ok);
/* Set up header for a used/ftable/htable/chain record. */
enum TDB_ERROR set_header(struct tdb_context *tdb,
......
......@@ -42,7 +42,7 @@ static enum TDB_ERROR replace_data(struct tdb_context *tdb,
ecode = add_free_record(tdb, old_off,
sizeof(struct tdb_used_record)
+ key.dsize + old_room,
TDB_LOCK_WAIT);
TDB_LOCK_WAIT, true);
if (ecode == TDB_SUCCESS)
ecode = replace_in_hash(tdb, h, new_off);
} else {
......@@ -292,7 +292,7 @@ enum TDB_ERROR tdb_delete(struct tdb_context *tdb, struct tdb_data key)
+ rec_key_length(&rec)
+ rec_data_length(&rec)
+ rec_extra_padding(&rec),
TDB_LOCK_WAIT);
TDB_LOCK_WAIT, true);
if (tdb->flags & TDB_SEQNUM)
tdb_inc_seqnum(tdb);
......
......@@ -150,7 +150,7 @@ static void add_to_freetable(struct tdb_context *tdb,
tdb->ftable_off = freetable->base.off;
tdb->ftable = ftable;
add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen,
TDB_LOCK_WAIT);
TDB_LOCK_WAIT, false);
}
static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
......
......@@ -52,7 +52,7 @@ int main(int argc, char *argv[])
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
test = layout->elem[1].base.off;
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test, &test)
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test)
== 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
......@@ -75,7 +75,7 @@ int main(int argc, char *argv[])
/* Lock and fail to coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
test = layout->elem[1].base.off;
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
== 0);
tdb_unlock_free_bucket(tdb, b_off);
ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
......@@ -99,7 +99,7 @@ int main(int argc, char *argv[])
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
test = layout->elem[2].base.off;
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
== 1024 + sizeof(struct tdb_used_record) + 2048);
/* Should tell us it's erased this one... */
ok1(test == TDB_ERR_NOEXIST);
......@@ -126,7 +126,7 @@ int main(int argc, char *argv[])
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
test = layout->elem[2].base.off;
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
== 1024 + sizeof(struct tdb_used_record) + 512);
ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
ok1(free_record_length(tdb, layout->elem[1].base.off)
......@@ -153,7 +153,7 @@ int main(int argc, char *argv[])
/* Lock and coalesce. */
ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
test = layout->elem[2].base.off;
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
== 1024 + sizeof(struct tdb_used_record) + 512
+ sizeof(struct tdb_used_record) + 256);
ok1(tdb->file->allrecord_lock.count == 0
......
......@@ -177,7 +177,7 @@ int main(int argc, char *argv[])
+ rec_key_length(&rec)
+ rec_data_length(&rec)
+ rec_extra_padding(&rec),
TDB_LOCK_NOWAIT) == 0);
TDB_LOCK_NOWAIT, false) == 0);
ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
F_WRLCK) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
......
......@@ -43,11 +43,11 @@ int main(int argc, char *argv[])
/* This makes a sparse file */
ok1(ftruncate(tdb->file->fd, 0xFFFFFFF0) == 0);
ok1(add_free_record(tdb, old_size, 0xFFFFFFF0 - old_size,
TDB_LOCK_WAIT) == TDB_SUCCESS);
TDB_LOCK_WAIT, false) == TDB_SUCCESS);
/* Now add a little record past the 4G barrier. */
ok1(tdb_expand_file(tdb, 100) == TDB_SUCCESS);
ok1(add_free_record(tdb, 0xFFFFFFF0, 100, TDB_LOCK_WAIT)
ok1(add_free_record(tdb, 0xFFFFFFF0, 100, TDB_LOCK_WAIT, false)
== TDB_SUCCESS);
ok1(tdb_check(tdb, NULL, NULL) == TDB_SUCCESS);
......
......@@ -689,7 +689,7 @@ static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_head,
sizeof(rec) + rec.max_len,
TDB_LOCK_WAIT);
TDB_LOCK_WAIT, true);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment