Commit 2088fa3c authored by Rusty Russell's avatar Rusty Russell

tdb2: more tests, hash collision fixes, attribute support.

parent 0939b691
......@@ -665,6 +665,8 @@ again:
int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
{
list &= ((1ULL << tdb->header.v.hash_bits) - 1);
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count) {
if (tdb->allrecord_lock.ltype == F_RDLCK
......
......@@ -236,12 +236,24 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb_io_init(tdb);
tdb_lock_init(tdb);
/* FIXME */
if (attr) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_open: attributes not yet supported\n");
errno = EINVAL;
goto fail;
while (attr) {
switch (attr->base.attr) {
case TDB_ATTRIBUTE_LOG:
tdb->log = attr->log.log_fn;
tdb->log_priv = attr->log.log_private;
break;
case TDB_ATTRIBUTE_HASH:
tdb->khash = attr->hash.hash_fn;
tdb->hash_priv = attr->hash.hash_private;
break;
default:
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_open: unknown attribute type %u\n",
attr->base.attr);
errno = EINVAL;
goto fail;
}
attr = attr->base.next;
}
if ((open_flags & O_ACCMODE) == O_WRONLY) {
......@@ -402,6 +414,8 @@ static tdb_off_t entry_matches(struct tdb_context *tdb,
uint64_t keylen;
const unsigned char *rkey;
list &= ((1ULL << tdb->header.v.hash_bits) - 1);
off = tdb_read_off(tdb, tdb->header.v.hash_off
+ list * sizeof(tdb_off_t));
if (off == 0 || off == TDB_OFF_ERR)
......@@ -455,7 +469,8 @@ static int lock_lists(struct tdb_context *tdb,
tdb_off_t i;
for (i = list; i < list + num; i++) {
if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT) != 0) {
if (tdb_lock_list(tdb, i, ltype, TDB_LOCK_WAIT)
== TDB_OFF_ERR) {
unlock_lists(tdb, list, i - list, ltype);
return -1;
}
......@@ -469,7 +484,7 @@ static int lock_lists(struct tdb_context *tdb,
static tdb_len_t relock_hash_to_zero(struct tdb_context *tdb,
tdb_off_t start, int ltype)
{
tdb_len_t num, len, pre_locks;
tdb_len_t num, len;
again:
num = 1ULL << tdb->header.v.hash_bits;
......@@ -477,39 +492,45 @@ again:
if (unlikely(len == num - start)) {
/* We hit the end of the hash range. Drop lock: we have
to lock start of hash first. */
tdb_len_t pre_locks;
tdb_unlock_list(tdb, start, ltype);
/* Grab something, so header is stable. */
if (tdb_lock_list(tdb, 0, ltype, TDB_LOCK_WAIT))
return TDB_OFF_ERR;
len = tdb_find_zero_off(tdb, hash_off(tdb, 0), num);
if (lock_lists(tdb, 1, len, ltype) == -1) {
pre_locks = tdb_find_zero_off(tdb, hash_off(tdb, 0), num);
/* We want to lock the zero entry as well. */
pre_locks++;
if (lock_lists(tdb, 1, pre_locks - 1, ltype) == -1) {
tdb_unlock_list(tdb, 0, ltype);
return TDB_OFF_ERR;
}
pre_locks = len;
len = num - start;
} else {
/* We already have lock on start. */
start++;
pre_locks = 0;
}
if (unlikely(lock_lists(tdb, start, len, ltype) == -1)) {
if (pre_locks)
/* Now lock later ones. */
if (unlikely(lock_lists(tdb, start, len, ltype) == -1)) {
unlock_lists(tdb, 0, pre_locks, ltype);
else
return TDB_OFF_ERR;
}
len += pre_locks;
} else {
/* We want to lock the zero entry as well. */
len++;
/* But we already have lock on start. */
if (unlikely(lock_lists(tdb, start+1, len-1, ltype) == -1)) {
tdb_unlock_list(tdb, start, ltype);
return TDB_OFF_ERR;
return TDB_OFF_ERR;
}
}
/* Now, did we lose the race, and it's not zero any more? */
if (unlikely(tdb_read_off(tdb, hash_off(tdb, pre_locks + len)) != 0)) {
unlock_lists(tdb, 0, pre_locks, ltype);
if (unlikely(tdb_read_off(tdb, hash_off(tdb, start + len - 1)) != 0)) {
/* Leave the start locked, as expected. */
unlock_lists(tdb, start + 1, len - 1, ltype);
goto again;
}
return pre_locks + len;
return len;
}
/* FIXME: modify, don't rewrite! */
......@@ -632,13 +653,14 @@ int tdb_store(struct tdb_context *tdb,
for (i = start; i < start + num_locks; i++) {
off = entry_matches(tdb, i, h, &key, &rec);
/* Empty entry or we found it? */
if (off == 0 || off != TDB_OFF_ERR) {
old_bucket = i;
if (off == 0 || off != TDB_OFF_ERR)
break;
}
}
if (i == start + num_locks)
off = 0;
/* Even if not found, this is where we put the new entry. */
old_bucket = i;
}
/* Now we have lock on this hash bucket. */
......@@ -707,7 +729,7 @@ write:
/* FIXME: by simple simulation, this approximated 60% full.
* Check in real case! */
if (unlikely(num_locks > 4 * tdb->header.v.hash_bits - 31))
if (unlikely(num_locks > 4 * tdb->header.v.hash_bits - 30))
enlarge_hash(tdb);
return 0;
......
......@@ -3,6 +3,7 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "logging.h"
struct tdb_layout *new_tdb_layout(void)
{
......@@ -208,7 +209,7 @@ struct tdb_context *tdb_layout_get(struct tdb_layout *layout)
mem = malloc(len);
/* Now populate our header, cribbing from a real TDB header. */
tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, NULL);
tdb = tdb_open(NULL, TDB_INTERNAL, O_RDWR, 0, &tap_log_attr);
hdr = (void *)mem;
*hdr = tdb->header;
hdr->v.generation++;
......
......@@ -7,6 +7,11 @@
unsigned tap_log_messages;
union tdb_attribute tap_log_attr = {
.log = { .base = { .attr = TDB_ATTRIBUTE_LOG },
.log_fn = tap_log_fn }
};
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
const char *fmt, ...)
......@@ -19,7 +24,8 @@ void tap_log_fn(struct tdb_context *tdb,
abort();
diag("tdb log level %u: %s", level, p);
free(p);
tap_log_messages++;
if (level != TDB_DEBUG_TRACE)
tap_log_messages++;
va_end(ap);
}
......@@ -4,7 +4,9 @@
#include <stdbool.h>
#include <string.h>
unsigned tap_log_messages;
extern unsigned tap_log_messages;
extern union tdb_attribute tap_log_attr;
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
......
......@@ -41,7 +41,6 @@ int main(int argc, char *argv[])
tdb_layout_add_freetable(layout, 1, 16, 12, 0);
tdb_layout_add_free(layout, 1024);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(tdb_check(tdb, NULL, NULL) == 0);
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
......@@ -64,7 +63,6 @@ int main(int argc, char *argv[])
tdb_layout_add_free(layout, 1024);
tdb_layout_add_used(layout, key, data, 6);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(tdb_check(tdb, NULL, NULL) == 0);
......@@ -87,7 +85,6 @@ int main(int argc, char *argv[])
tdb_layout_add_free(layout, 1024);
tdb_layout_add_free(layout, 512);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(tdb_check(tdb, NULL, NULL) == 0);
......@@ -113,7 +110,6 @@ int main(int argc, char *argv[])
tdb_layout_add_free(layout, 512);
tdb_layout_add_used(layout, key, data, 6);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(tdb_check(tdb, NULL, NULL) == 0);
......@@ -139,7 +135,6 @@ int main(int argc, char *argv[])
tdb_layout_add_free(layout, 512);
tdb_layout_add_free(layout, 32);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1024);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 512);
ok1(free_record_length(tdb, layout->elem[4].base.off) == 32);
......@@ -166,7 +161,6 @@ int main(int argc, char *argv[])
tdb_layout_add_free(layout, 32768);
tdb_layout_add_free(layout, 30000);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 32768);
ok1(zone_of(tdb, layout->elem[2].base.off) == 0);
ok1(free_record_length(tdb, layout->elem[3].base.off) == 30000);
......@@ -197,7 +191,6 @@ int main(int argc, char *argv[])
}
total -= sizeof(struct tdb_used_record);
tdb = tdb_layout_get(layout);
tdb->log = tap_log_fn;
ok1(free_record_length(tdb, layout->elem[2].base.off) == 1 << 4);
ok1(tdb_check(tdb, NULL, NULL) == 0);
......
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/check.c>
#include <ccan/tap/tap.h>
#include "logging.h"
/* We rig the hash so adjacent-numbered records always clash. */
static uint64_t clash(const void *key, size_t len, uint64_t seed, void *priv)
{
return *(unsigned int *)key / 2;
}
static void test_val(struct tdb_context *tdb, unsigned int val)
{
unsigned int v;
struct tdb_data key = { (unsigned char *)&v, sizeof(v) };
struct tdb_data data = { (unsigned char *)&v, sizeof(v) };
/* Insert an entry, then delete it. */
v = val;
/* Delete should fail. */
ok1(tdb_delete(tdb, key) == -1);
ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Insert should succeed. */
ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Delete should succeed. */
ok1(tdb_delete(tdb, key) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Re-add it, then add collision. */
ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
v = val + 1;
ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Can find both? */
ok1(tdb_fetch(tdb, key).dsize == data.dsize);
v = val;
ok1(tdb_fetch(tdb, key).dsize == data.dsize);
/* Delete second one. */
v = val + 1;
ok1(tdb_delete(tdb, key) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Re-add */
ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Now, try deleting first one. */
v = val;
ok1(tdb_delete(tdb, key) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Can still find second? */
v = val + 1;
ok1(tdb_fetch(tdb, key).dsize == data.dsize);
/* Delete that, so we are empty. */
ok1(tdb_delete(tdb, key) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
}
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
union tdb_attribute hattr = { .hash = { .base = { TDB_ATTRIBUTE_HASH },
.hash_fn = clash } };
int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
hattr.base.next = &tap_log_attr;
plan_tests(sizeof(flags) / sizeof(flags[0]) * 44 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-delete.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &hattr);
ok1(tdb);
if (!tdb)
continue;
/* Check start of hash table. */
test_val(tdb, 0);
/* Check end of hash table (will wrap around!). */
test_val(tdb, ((1 << tdb->header.v.hash_bits) - 1) * 2);
ok1(!tdb_has_locks(tdb));
tdb_close(tdb);
}
ok1(tap_log_messages == 0);
return exit_status();
}
......@@ -16,14 +16,14 @@ int main(int argc, char *argv[])
plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-enlarge-hash.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
enlarge_hash(tdb);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
}
/* FIXME: Test enlarging with hash clash. */
}
ok1(tap_log_messages == 0);
return exit_status();
......
......@@ -28,8 +28,7 @@ int main(int argc, char *argv[])
/* First, lower level expansion tests. */
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-expand.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
......@@ -102,8 +101,7 @@ int main(int argc, char *argv[])
/* Now using tdb_expand. */
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-expand.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
......
......@@ -16,8 +16,7 @@ int main(int argc, char *argv[])
plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-new_database", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
ok1(tdb_check(tdb, NULL, NULL) == 0);
......
......@@ -25,8 +25,7 @@ int main(int argc, char *argv[])
* (3 + (1 + (MAX_SIZE/SIZE_STEP)) * 2) + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-record-expand.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
......
......@@ -18,8 +18,7 @@ int main(int argc, char *argv[])
plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-delete.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
/* Delete should fail. */
......
......@@ -18,8 +18,7 @@ int main(int argc, char *argv[])
plan_tests(sizeof(flags) / sizeof(flags[0]) * 8 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-fetch.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
struct tdb_data d;
......
......@@ -18,8 +18,7 @@ int main(int argc, char *argv[])
plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-simple-store.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (tdb) {
/* Modify should fail. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment