Commit 96b169e9 authored by Rusty Russell's avatar Rusty Russell

tdb2: remove tdb_get()

We have four internal helpers for reading data from the database:
1) tdb_read_convert() - read (and convert) into a buffer.
2) tdb_read_off() - read (and convert) and offset.
3) tdb_access_read() - malloc or direct access to the database.
4) tdb_get() - copy into a buffer or direct access to the database.

The last one doesn't really buy us anything, so remove it (except for
tdb_read_off/tdb_write_off, see next patch).

Before:
Adding 1000000 records:  6480 ns (59900296 bytes)
Finding 1000000 records:  2839 ns (59900296 bytes)
Missing 1000000 records:  2485 ns (59900296 bytes)
Traversing 1000000 records:  2598 ns (59900296 bytes)
Deleting 1000000 records:  5342 ns (59900296 bytes)
Re-adding 1000000 records:  5613 ns (59900296 bytes)
Appending 1000000 records:  12194 ns (93594224 bytes)
Churning 1000000 records:  14549 ns (93594224 bytes)

After:
Adding 1000000 records:  6497 ns (59900296 bytes)
Finding 1000000 records:  2854 ns (59900296 bytes)
Missing 1000000 records:  2563 ns (59900296 bytes)
Traversing 1000000 records:  2735 ns (59900296 bytes)
Deleting 1000000 records:  11357 ns (59900296 bytes)
Re-adding 1000000 records:  8145 ns (59900296 bytes)
Appending 1000000 records:  10939 ns (93594224 bytes)
Churning 1000000 records:  18479 ns (93594224 bytes)
parent afc3c1e7
......@@ -436,24 +436,23 @@ static bool check_linear(struct tdb_context *tdb,
struct tdb_used_record u;
struct tdb_free_record f;
struct tdb_recovery_record r;
} pad, *p;
} rec;
/* r is larger: only get that if we need to. */
p = tdb_get(tdb, off, &pad, sizeof(pad.f));
if (!p)
if (tdb_read_convert(tdb, off, &rec, sizeof(rec.f)) == -1)
return false;
/* If we crash after ftruncate, we can get zeroes or fill. */
if (p->r.magic == TDB_RECOVERY_INVALID_MAGIC
|| p->r.magic == 0x4343434343434343ULL) {
p = tdb_get(tdb, off, &pad, sizeof(pad.r));
if (!p)
if (rec.r.magic == TDB_RECOVERY_INVALID_MAGIC
|| rec.r.magic == 0x4343434343434343ULL) {
if (tdb_read_convert(tdb, off, &rec, sizeof(rec.r)))
return false;
if (recovery == off) {
found_recovery = true;
len = sizeof(p->r) + p->r.max_len;
len = sizeof(rec.r) + rec.r.max_len;
} else {
len = dead_space(tdb, off);
if (len < sizeof(p->r)) {
if (len < sizeof(rec.r)) {
tdb->log(tdb, TDB_DEBUG_ERROR,
tdb->log_priv,
"tdb_check: invalid dead space"
......@@ -466,9 +465,8 @@ static bool check_linear(struct tdb_context *tdb,
(size_t)off, (size_t)(off + len),
(size_t)tdb->map_size);
}
} else if (p->r.magic == TDB_RECOVERY_MAGIC) {
p = tdb_get(tdb, off, &pad, sizeof(pad.r));
if (!p)
} else if (rec.r.magic == TDB_RECOVERY_MAGIC) {
if (tdb_read_convert(tdb, off, &rec, sizeof(rec.r)))
return false;
if (recovery != off) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
......@@ -477,23 +475,23 @@ static bool check_linear(struct tdb_context *tdb,
(size_t)off);
return false;
}
if (p->r.len > p->r.max_len) {
if (rec.r.len > rec.r.max_len) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: invalid recovery length"
" %zu\n", (size_t)p->r.len);
" %zu\n", (size_t)rec.r.len);
return false;
}
if (p->r.eof > tdb->map_size) {
if (rec.r.eof > tdb->map_size) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: invalid old EOF"
" %zu\n", (size_t)p->r.eof);
" %zu\n", (size_t)rec.r.eof);
return false;
}
found_recovery = true;
len = sizeof(p->r) + p->r.max_len;
} else if (frec_magic(&p->f) == TDB_FREE_MAGIC
|| frec_magic(&p->f) == TDB_COALESCING_MAGIC) {
len = sizeof(p->u) + frec_len(&p->f);
len = sizeof(rec.r) + rec.r.max_len;
} else if (frec_magic(&rec.f) == TDB_FREE_MAGIC
|| frec_magic(&rec.f) == TDB_COALESCING_MAGIC) {
len = sizeof(rec.u) + frec_len(&rec.f);
if (off + len > tdb->map_size) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: free overlength %llu"
......@@ -502,18 +500,18 @@ static bool check_linear(struct tdb_context *tdb,
return false;
}
/* This record is free! */
if (frec_magic(&p->f) == TDB_FREE_MAGIC
if (frec_magic(&rec.f) == TDB_FREE_MAGIC
&& !append(free, num_free, off))
return false;
} else {
uint64_t klen, dlen, extra;
/* This record is used! */
if (rec_magic(&p->u) != TDB_MAGIC) {
if (rec_magic(&rec.u) != TDB_MAGIC) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: Bad magic 0x%llx"
" at offset %llu\n",
(long long)rec_magic(&p->u),
(long long)rec_magic(&rec.u),
(long long)off);
return false;
}
......@@ -521,11 +519,11 @@ static bool check_linear(struct tdb_context *tdb,
if (!append(used, num_used, off))
return false;
klen = rec_key_length(&p->u);
dlen = rec_data_length(&p->u);
extra = rec_extra_padding(&p->u);
klen = rec_key_length(&rec.u);
dlen = rec_data_length(&rec.u);
extra = rec_extra_padding(&rec.u);
len = sizeof(p->u) + klen + dlen + extra;
len = sizeof(rec.u) + klen + dlen + extra;
if (off + len > tdb->map_size) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: used overlength %llu"
......@@ -534,7 +532,7 @@ static bool check_linear(struct tdb_context *tdb,
return false;
}
if (len < sizeof(p->f)) {
if (len < sizeof(rec.f)) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_check: too short record %llu at"
" %llu\n",
......
......@@ -105,7 +105,7 @@ static tdb_off_t find_free_head(struct tdb_context *tdb,
/* Remove from free bucket. */
static int remove_from_list(struct tdb_context *tdb,
tdb_off_t b_off, tdb_off_t r_off,
struct tdb_free_record *r)
const struct tdb_free_record *r)
{
tdb_off_t off;
......@@ -262,27 +262,30 @@ static tdb_off_t flist_offset(struct tdb_context *tdb, unsigned int flist)
static int coalesce(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t b_off, tdb_len_t data_len)
{
struct tdb_free_record pad, *r;
tdb_off_t end;
struct tdb_free_record rec;
add_stat(tdb, alloc_coalesce_tried, 1);
end = off + sizeof(struct tdb_used_record) + data_len;
while (end < tdb->map_size) {
const struct tdb_free_record *r;
tdb_off_t nb_off;
unsigned flist, bucket;
/* FIXME: do tdb_get here and below really win? */
r = tdb_get(tdb, end, &pad, sizeof(pad));
r = tdb_access_read(tdb, end, sizeof(*r), true);
if (!r)
goto err;
if (frec_magic(r) != TDB_FREE_MAGIC)
if (frec_magic(r) != TDB_FREE_MAGIC) {
tdb_access_release(tdb, r);
break;
}
flist = frec_flist(r);
bucket = size_to_bucket(frec_len(r));
nb_off = bucket_off(flist_offset(tdb, flist), bucket);
tdb_access_release(tdb, r);
/* We may be violating lock order here, so best effort. */
if (tdb_lock_free_bucket(tdb, nb_off, TDB_LOCK_NOWAIT) == -1) {
......@@ -291,31 +294,30 @@ static int coalesce(struct tdb_context *tdb,
}
/* Now we have lock, re-check. */
r = tdb_get(tdb, end, &pad, sizeof(pad));
if (!r) {
if (tdb_read_convert(tdb, end, &rec, sizeof(rec))) {
tdb_unlock_free_bucket(tdb, nb_off);
goto err;
}
if (unlikely(frec_magic(r) != TDB_FREE_MAGIC)) {
if (unlikely(frec_magic(&rec) != TDB_FREE_MAGIC)) {
add_stat(tdb, alloc_coalesce_race, 1);
tdb_unlock_free_bucket(tdb, nb_off);
break;
}
if (unlikely(frec_flist(r) != flist)
|| unlikely(size_to_bucket(frec_len(r)) != bucket)) {
if (unlikely(frec_flist(&rec) != flist)
|| unlikely(size_to_bucket(frec_len(&rec)) != bucket)) {
add_stat(tdb, alloc_coalesce_race, 1);
tdb_unlock_free_bucket(tdb, nb_off);
break;
}
if (remove_from_list(tdb, nb_off, end, r) == -1) {
if (remove_from_list(tdb, nb_off, end, &rec) == -1) {
tdb_unlock_free_bucket(tdb, nb_off);
goto err;
}
end += sizeof(struct tdb_used_record) + frec_len(r);
end += sizeof(struct tdb_used_record) + frec_len(&rec);
tdb_unlock_free_bucket(tdb, nb_off);
add_stat(tdb, alloc_coalesce_num_merged, 1);
}
......@@ -324,32 +326,28 @@ static int coalesce(struct tdb_context *tdb,
if (end == off + sizeof(struct tdb_used_record) + data_len)
return 0;
/* OK, expand record */
r = tdb_get(tdb, off, &pad, sizeof(pad));
if (!r)
/* OK, expand initial record */
if (tdb_read_convert(tdb, off, &rec, sizeof(rec)))
goto err;
if (frec_len(r) != data_len) {
if (frec_len(&rec) != data_len) {
tdb->ecode = TDB_ERR_CORRUPT;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
"coalesce: expected data len %llu not %llu\n",
(long long)data_len, (long long)frec_len(r));
(long long)data_len, (long long)frec_len(&rec));
goto err;
}
if (remove_from_list(tdb, b_off, off, r) == -1)
goto err;
r = tdb_access_write(tdb, off, sizeof(*r), true);
if (!r)
if (remove_from_list(tdb, b_off, off, &rec) == -1)
goto err;
/* We have to drop this to avoid deadlocks, so make sure record
* doesn't get coalesced by someone else! */
r->magic_and_prev = TDB_COALESCING_MAGIC << (64 - TDB_OFF_UPPER_STEAL);
rec.magic_and_prev = TDB_COALESCING_MAGIC
<< (64 - TDB_OFF_UPPER_STEAL);
/* FIXME: Use 255 as invalid free list? */
r->flist_and_len = end - off - sizeof(struct tdb_used_record);
if (tdb_access_commit(tdb, r) != 0)
rec.flist_and_len = end - off - sizeof(struct tdb_used_record);
if (tdb_write_convert(tdb, off, &rec, sizeof(rec)) != 0)
goto err;
add_stat(tdb, alloc_coalesce_succeeded, 1);
......@@ -374,7 +372,7 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
unsigned hashlow)
{
tdb_off_t off, b_off,best_off;
struct tdb_free_record pad, best = { 0 }, *r;
struct tdb_free_record best = { 0 };
double multiplier;
size_t size = adjust_size(keylen, datalen);
......@@ -404,12 +402,16 @@ again:
goto unlock_err;
while (off) {
/* FIXME: Does tdb_get win anything here? */
r = tdb_get(tdb, off, &pad, sizeof(*r));
const struct tdb_free_record *r;
tdb_len_t len;
tdb_off_t next;
r = tdb_access_read(tdb, off, sizeof(*r), true);
if (!r)
goto unlock_err;
if (frec_magic(r) != TDB_FREE_MAGIC) {
tdb_access_release(tdb, r);
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"lock_and_alloc: %llu non-free 0x%llx\n",
(long long)off, (long long)r->magic_and_prev);
......@@ -421,13 +423,19 @@ again:
best = *r;
}
if (frec_len(&best) < size * multiplier && best_off)
if (frec_len(&best) < size * multiplier && best_off) {
tdb_access_release(tdb, r);
break;
}
multiplier *= 1.01;
next = r->next;
len = frec_len(r);
tdb_access_release(tdb, r);
/* Since we're going slow anyway, try coalescing here. */
switch (coalesce(tdb, off, b_off, frec_len(r))) {
switch (coalesce(tdb, off, b_off, len)) {
case -1:
/* This has already unlocked on error. */
return -1;
......@@ -435,7 +443,7 @@ again:
/* This has unlocked list, restart. */
goto again;
}
off = r->next;
off = next;
}
/* If we found anything at all, use it. */
......
......@@ -42,17 +42,19 @@ uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len)
uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
{
struct tdb_used_record pad, *r;
const struct tdb_used_record *r;
const void *key;
uint64_t klen, hash;
r = tdb_get(tdb, off, &pad, sizeof(pad));
r = tdb_access_read(tdb, off, sizeof(*r), true);
if (!r)
/* FIXME */
return 0;
klen = rec_key_length(r);
key = tdb_access_read(tdb, off + sizeof(pad), klen, false);
tdb_access_release(tdb, r);
key = tdb_access_read(tdb, off + sizeof(*r), klen, false);
if (!key)
return 0;
......
......@@ -123,19 +123,6 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
return 0;
}
/* Either make a copy into pad and return that, or return ptr into mmap. */
/* Note: pad has to be a real object, so we can't get here if len
* overflows size_t */
void *tdb_get(struct tdb_context *tdb, tdb_off_t off, void *pad, size_t len)
{
if (likely(!(tdb->flags & TDB_CONVERT))) {
void *ret = tdb->methods->direct(tdb, off, len);
if (ret)
return ret;
}
return tdb_read_convert(tdb, off, pad, len) == -1 ? NULL : pad;
}
/* Endian conversion: we only ever deal with 8 byte quantities */
void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size)
{
......@@ -208,13 +195,11 @@ int zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off)
{
tdb_off_t pad, *ret;
tdb_off_t ret;
ret = tdb_get(tdb, off, &pad, sizeof(pad));
if (!ret) {
if (tdb_read_convert(tdb, off, &ret, sizeof(ret)) == -1)
return TDB_OFF_ERR;
}
return *ret;
return ret;
}
/* Even on files, we can get partial writes due to signals. */
......@@ -505,14 +490,19 @@ void *tdb_access_write(struct tdb_context *tdb,
return ret;
}
bool is_direct(const struct tdb_context *tdb, const void *p)
{
return (tdb->map_ptr
&& (char *)p >= (char *)tdb->map_ptr
&& (char *)p < (char *)tdb->map_ptr + tdb->map_size);
}
void tdb_access_release(struct tdb_context *tdb, const void *p)
{
if (!tdb->map_ptr
|| (char *)p < (char *)tdb->map_ptr
|| (char *)p >= (char *)tdb->map_ptr + tdb->map_size)
free((struct tdb_access_hdr *)p - 1);
else
if (is_direct(tdb, p))
tdb->direct_access--;
else
free((struct tdb_access_hdr *)p - 1);
}
int tdb_access_commit(struct tdb_context *tdb, void *p)
......
......@@ -418,16 +418,15 @@ void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size);
void tdb_munmap(struct tdb_context *tdb);
void tdb_mmap(struct tdb_context *tdb);
/* Either make a copy into pad and return that, or return ptr into mmap.
* Converts endian (ie. will use pad in that case). */
void *tdb_get(struct tdb_context *tdb, tdb_off_t off, void *pad, size_t len);
/* Either alloc a copy, or give direct access. Release frees or noop. */
const void *tdb_access_read(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len, bool convert);
void *tdb_access_write(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len, bool convert);
/* Is this pointer direct? (Otherwise it's malloced) */
bool is_direct(const struct tdb_context *tdb, const void *p);
/* Release result of tdb_access_read/write. */
void tdb_access_release(struct tdb_context *tdb, const void *p);
/* Commit result of tdb_acces_write. */
......
......@@ -50,25 +50,29 @@ static bool summarize(struct tdb_context *tdb,
tdb_len_t unc = 0;
for (off = sizeof(struct tdb_header); off < tdb->map_size; off += len) {
union {
const union {
struct tdb_used_record u;
struct tdb_free_record f;
struct tdb_recovery_record r;
} pad, *p;
} *p;
/* We might not be able to get the whole thing. */
p = tdb_get(tdb, off, &pad, sizeof(p->f));
p = tdb_access_read(tdb, off, sizeof(p->f), true);
if (!p)
return false;
if (p->r.magic == TDB_RECOVERY_INVALID_MAGIC
|| p->r.magic == TDB_RECOVERY_MAGIC) {
if (unc) {
tally_add(uncoal, unc);
unc = 0;
}
len = sizeof(p->r) + p->r.max_len;
} else if (rec_magic(&p->u) != TDB_MAGIC) {
} else if (frec_magic(&p->f) == TDB_FREE_MAGIC) {
len = frec_len(&p->f);
tally_add(free, len);
tally_add(buckets, size_to_bucket(len));
len += sizeof(p->u);
unc++;
} else if (frec_magic(&p->f) == TDB_FREE_MAGIC) {
} else if (rec_magic(&p->u) == TDB_MAGIC) {
if (unc) {
tally_add(uncoal, unc);
unc = 0;
......@@ -98,6 +102,7 @@ static bool summarize(struct tdb_context *tdb,
tally_add(extra, rec_extra_padding(&p->u));
} else
len = dead_space(tdb, off);
tdb_access_release(tdb, p);
}
if (unc)
tally_add(uncoal, unc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment