Commit 4916bc0a authored by Rusty Russell's avatar Rusty Russell

tdb2: now we can store a key!

parent 06e0037d
......@@ -130,7 +130,7 @@ tdb_off_t zone_of(struct tdb_context *tdb, tdb_off_t off)
return off >> tdb->header.v.zone_bits;
}
/* Returns fl->max_bucket + 1, or list number to search. */
/* Returns free_buckets + 1, or list number to search. */
static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
{
tdb_off_t first, off;
......@@ -138,7 +138,7 @@ static tdb_off_t find_free_head(struct tdb_context *tdb, tdb_off_t bucket)
/* Speculatively search for a non-zero bucket. */
first = tdb->last_zone * (tdb->header.v.free_buckets+1) + bucket;
off = tdb_find_nonzero_off(tdb, free_list_off(tdb, first),
tdb->header.v.free_buckets - bucket);
tdb->header.v.free_buckets + 1 - bucket);
return bucket + off;
}
......@@ -210,7 +210,7 @@ int add_free_record(struct tdb_context *tdb,
tdb->last_zone = zone_of(tdb, off);
list = tdb->last_zone * (tdb->header.v.free_buckets+1)
+ size_to_bucket(tdb, new.data_len);
if (tdb_lock_free_list(tdb, list, TDB_LOCK_WAIT) != 0)
return -1;
......@@ -319,7 +319,7 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
tdb_len_t *actual)
{
tdb_off_t list;
tdb_off_t off, prev, best_off;
tdb_off_t off, best_off;
struct tdb_free_record pad, best = { 0 }, *r;
double multiplier;
......@@ -331,27 +331,21 @@ again:
return TDB_OFF_ERR;
}
prev = free_list_off(tdb, list);
off = tdb_read_off(tdb, prev);
if (unlikely(off == TDB_OFF_ERR))
goto unlock_err;
best.data_len = -1ULL;
best_off = 0;
multiplier = 1.0;
/* Walk the list to see if any are large enough, getting less fussy
* as we go. */
while (off) {
prev = off;
off = tdb_read_off(tdb, prev);
if (unlikely(off == TDB_OFF_ERR))
goto unlock_err;
off = tdb_read_off(tdb, free_list_off(tdb, list));
if (unlikely(off == TDB_OFF_ERR))
goto unlock_err;
while (off) {
r = tdb_get(tdb, off, &pad, sizeof(*r));
if (!r)
goto unlock_err;
if (r->magic != TDB_FREE_MAGIC) {
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"lock_and_alloc: %llu non-free 0x%llx\n",
......@@ -364,18 +358,9 @@ again:
best = *r;
}
if (best.data_len < size * multiplier && best_off) {
/* We're happy with this size: take it. */
if (remove_from_list(tdb, list, &best) != 0)
goto unlock_err;
tdb_unlock_free_list(tdb, list);
if (best.data_len < size * multiplier && best_off)
goto use_best;
if (to_used_record(tdb, best_off, size, best.data_len,
actual)) {
return -1;
}
return best_off;
}
multiplier *= 1.01;
/* Since we're going slow anyway, try coalescing here. */
......@@ -387,6 +372,22 @@ again:
/* This has unlocked list, restart. */
goto again;
}
off = r->next;
}
/* If we found anything at all, use it. */
if (best_off) {
use_best:
/* We're happy with this size: take it. */
if (remove_from_list(tdb, list, &best) != 0)
goto unlock_err;
tdb_unlock_free_list(tdb, list);
if (to_used_record(tdb, best_off, size, best.data_len,
actual)) {
return -1;
}
return best_off;
}
tdb_unlock_free_list(tdb, list);
......@@ -440,11 +441,11 @@ static tdb_off_t get_free(struct tdb_context *tdb, size_t size,
tdb_off_t b;
/* Start at exact size bucket, and search up... */
for (b = bucket; b <= tdb->header.v.num_zones; b++) {
for (b = bucket; b <= tdb->header.v.free_buckets; b++) {
b = find_free_head(tdb, b);
/* Non-empty list? Try getting block. */
if (b <= tdb->header.v.num_zones) {
if (b <= tdb->header.v.free_buckets) {
/* Try getting one from list. */
off = lock_and_alloc(tdb, b, size, actual);
if (off == TDB_OFF_ERR)
......@@ -698,7 +699,6 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
}
}
/* Free up the old free buckets. */
old_free_off -= sizeof(fhdr);
if (tdb_read_convert(tdb, old_free_off, &fhdr, sizeof(fhdr)) == -1)
......@@ -714,6 +714,8 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
/* Start allocating from where the new space is. */
tdb->last_zone = zone_of(tdb, tdb->map_size - add);
tdb_access_release(tdb, oldf);
if (write_header(tdb) == -1)
goto fail;
success:
tdb_allrecord_unlock(tdb, F_WRLCK);
return 0;
......
......@@ -502,7 +502,7 @@ again:
tdb->allrecord_lock.off = upgradable;
/* Now we re-check header, holding lock. */
if (unlikely(update_header(tdb))) {
if (unlikely(header_changed(tdb))) {
tdb_allrecord_unlock(tdb, ltype);
goto again;
}
......@@ -625,13 +625,18 @@ int tdb_unlockall_read(struct tdb_context *tdb)
}
#endif
int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
int ltype, enum tdb_lock_flags waitflag)
/* Returns the list we actually locked. */
tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
int ltype, enum tdb_lock_flags waitflag)
{
tdb_off_t list = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
/* Header can change ONLY if we had no locks before. */
bool can_change = tdb->num_lockrecs == 0;
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count &&
(ltype == tdb->allrecord_lock.ltype || ltype == F_RDLCK)) {
return 0;
return list;
}
if (tdb->allrecord_lock.count) {
......@@ -640,11 +645,22 @@ int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
"tdb_lock_list: have %s allrecordlock\n",
tdb->allrecord_lock.ltype == F_RDLCK
? "read" : "write");
return -1;
return TDB_OFF_ERR;
}
/* FIXME: Should we do header_uptodate and return retry here? */
return tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag);
again:
if (tdb_nest_lock(tdb, TDB_HASH_LOCK_START + list, ltype, waitflag))
return TDB_OFF_ERR;
if (can_change && unlikely(header_changed(tdb))) {
tdb_off_t new = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
if (new != list) {
tdb_nest_unlock(tdb, TDB_HASH_LOCK_START+list, ltype);
list = new;
goto again;
}
}
return list;
}
int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype)
......@@ -702,23 +718,14 @@ void tdb_unlock_free_list(struct tdb_context *tdb, tdb_off_t flist)
}
#if 0
static int chainlock_loop(struct tdb_context *tdb, const TDB_DATA *key,
int ltype, enum tdb_lock_flags waitflag,
const char *func)
static int chainlock(struct tdb_context *tdb, const TDB_DATA *key,
int ltype, enum tdb_lock_flags waitflag,
const char *func)
{
int ret;
uint64_t h = tdb_hash(tdb, key->dptr, key->dsize);
again:
ret = tdb_lock_list(tdb,
h & ((1ULL << tdb->header.v.hash_bits) - 1),
ltype, waitflag);
if (likely(ret == 0) && unlikely(update_header(tdb))) {
tdb_unlock_list(tdb, h & ((1ULL << tdb->header.v.hash_bits)-1),
ltype);
goto again;
}
ret = tdb_lock_list(tdb, h, ltype, waitflag) == TDB_OFF_ERR ? -1 : 0;
tdb_trace_1rec(tdb, func, *key);
return ret;
}
......@@ -727,8 +734,7 @@ again:
contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
{
return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
"tdb_chainlock");
return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, "tdb_chainlock");
}
/* lock/unlock one hash chain, non-blocking. This is meant to be used
......@@ -736,8 +742,8 @@ int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
locked */
int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
{
return chainlock_loop(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
"tdb_chainlock_nonblock");
return chainlock(tdb, &key, F_WRLCK, TDB_LOCK_NOWAIT,
"tdb_chainlock_nonblock");
}
int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
......@@ -750,8 +756,8 @@ int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
{
return chainlock_loop(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
"tdb_chainlock_read");
return chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
"tdb_chainlock_read");
}
int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
......
......@@ -136,7 +136,7 @@ static inline uint64_t rec_extra_padding(const struct tdb_used_record *r)
static inline uint64_t rec_hash(const struct tdb_used_record *r)
{
return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1)) << (64 - 11);
return ((r->magic_and_meta >> 32) & ((1ULL << 11) - 1));
}
static inline uint16_t rec_magic(const struct tdb_used_record *r)
......@@ -254,8 +254,11 @@ struct tdb_methods {
internal prototypes
*/
/* tdb.c: */
/* Returns true if header changed. */
bool update_header(struct tdb_context *tdb);
/* Returns true if header changed (and updates it). */
bool header_changed(struct tdb_context *tdb);
/* Commit header to disk. */
int write_header(struct tdb_context *tdb);
/* Hash random memory. */
uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len);
......@@ -349,8 +352,8 @@ uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off);
void tdb_lock_init(struct tdb_context *tdb);
/* Lock/unlock a particular hash list. */
int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
int ltype, enum tdb_lock_flags waitflag);
tdb_off_t tdb_lock_list(struct tdb_context *tdb, uint64_t hash,
int ltype, enum tdb_lock_flags waitflag);
int tdb_unlock_list(struct tdb_context *tdb, tdb_off_t list, int ltype);
/* Lock/unlock a particular free list. */
......
......@@ -21,9 +21,9 @@ null_log_fn(struct tdb_context *tdb,
/* We do a lot of work assuming our copy of the header volatile area
* is uptodate, and usually it is. However, once we grab a lock, we have to
* re-check it. */
bool update_header(struct tdb_context *tdb)
bool header_changed(struct tdb_context *tdb)
{
struct tdb_header_volatile pad, *v;
uint64_t gen;
if (!(tdb->flags & TDB_NOLOCK) && tdb->header_uptodate) {
tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
......@@ -33,17 +33,23 @@ bool update_header(struct tdb_context *tdb)
/* We could get a partial update if we're not holding any locks. */
assert((tdb->flags & TDB_NOLOCK) || tdb_has_locks(tdb));
v = tdb_get(tdb, offsetof(struct tdb_header, v), &pad, sizeof(*v));
if (!v) {
/* On failure, imply we updated header so they retry. */
return true;
}
tdb->header_uptodate = true;
if (likely(memcmp(&tdb->header.v, v, sizeof(*v)) == 0)) {
return false;
gen = tdb_read_off(tdb, offsetof(struct tdb_header, v.generation));
if (unlikely(gen != tdb->header.v.generation)) {
tdb_read_convert(tdb, offsetof(struct tdb_header, v),
&tdb->header.v, sizeof(tdb->header.v));
return true;
}
tdb->header.v = *v;
return true;
return false;
}
int write_header(struct tdb_context *tdb)
{
assert(tdb_read_off(tdb, offsetof(struct tdb_header, v.generation))
== tdb->header.v.generation);
tdb->header.v.generation++;
return tdb_write_convert(tdb, offsetof(struct tdb_header, v),
&tdb->header.v, sizeof(tdb->header.v));
}
static uint64_t jenkins_hash(const void *key, size_t length, uint64_t seed,
......@@ -382,10 +388,12 @@ static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
static void unlock_lists(struct tdb_context *tdb,
uint64_t start, uint64_t end, int ltype)
{
do {
for (;;) {
tdb_unlock_list(tdb, start, ltype);
if (start == end)
return;
start = (start + 1) & ((1ULL << tdb->header.v.hash_bits) - 1);
} while (start != end);
}
}
/* FIXME: Return header copy? */
......@@ -402,26 +410,19 @@ static tdb_off_t find_bucket_and_lock(struct tdb_context *tdb,
uint64_t hextra;
tdb_off_t off;
/* hash_bits might be out of date... */
again:
*start = *end = hash & ((1ULL << tdb->header.v.hash_bits) - 1);
hextra = hash >> tdb->header.v.hash_bits;
/* FIXME: can we avoid locks for some fast paths? */
if (tdb_lock_list(tdb, *end, ltype, TDB_LOCK_WAIT) == -1)
*start = tdb_lock_list(tdb, hash, ltype, TDB_LOCK_WAIT);
if (*start == TDB_OFF_ERR)
return TDB_OFF_ERR;
/* We only need to check this for first lock. */
if (unlikely(update_header(tdb))) {
tdb_unlock_list(tdb, *end, ltype);
goto again;
}
*end = *start;
hextra = hash >> tdb->header.v.hash_bits;
while ((off = tdb_read_off(tdb, tdb->header.v.hash_off
+ *end * sizeof(tdb_off_t)))
!= TDB_OFF_ERR) {
struct tdb_used_record pad, *r;
uint64_t keylen, next;
uint64_t keylen;
/* Didn't find it? */
if (!off)
......@@ -470,16 +471,10 @@ again:
lock_next:
/* Lock next bucket. */
/* FIXME: We can deadlock if this wraps! */
next = (*end + 1) & ((1ULL << tdb->header.v.hash_bits) - 1);
if (next == *start) {
tdb->ecode = TDB_ERR_CORRUPT;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
"find_bucket_and_lock: full hash table!\n");
off = tdb_lock_list(tdb, ++hash, ltype, TDB_LOCK_WAIT);
if (off == TDB_OFF_ERR)
goto unlock_err;
}
if (tdb_lock_list(tdb, next, ltype, TDB_LOCK_WAIT) == -1)
goto unlock_err;
*end = next;
*end = off;
}
unlock_err:
......@@ -514,11 +509,9 @@ static void enlarge_hash(struct tdb_context *tdb)
if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
return;
if (unlikely(update_header(tdb))) {
/* Someone else enlarged for us? Nothing to do. */
if ((1ULL << tdb->header.v.hash_bits) != num)
goto unlock;
}
/* Someone else enlarged for us? Nothing to do. */
if ((1ULL << tdb->header.v.hash_bits) != num)
goto unlock;
newoff = alloc(tdb, 0, num * 2, 0, false);
if (unlikely(newoff == TDB_OFF_ERR))
......@@ -728,7 +721,8 @@ again:
num++;
for (i = chain + 1; i < chain + 1 + num; i++) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT) == -1) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)
== TDB_OFF_ERR) {
if (i != chain + 1)
unlock_lists(tdb, chain + 1, i-1, F_WRLCK);
return -1;
......@@ -741,7 +735,8 @@ again:
1ULL << tdb->header.v.hash_bits);
(*extra_locks)++;
for (i = 0; i < *extra_locks; i++) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT)) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_NOWAIT)
== TDB_OFF_ERR) {
/* Failed. Caller must lock in order. */
if (i)
unlock_lists(tdb, 0, i-1, F_WRLCK);
......@@ -827,7 +822,8 @@ int tdb_delete(struct tdb_context *tdb, struct tdb_data key)
/* We need extra locks at the start. */
for (i = 0; i < extra_locks; i++) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)) {
if (tdb_lock_list(tdb, i, F_WRLCK, TDB_LOCK_WAIT)
== TDB_OFF_ERR) {
if (i)
unlock_lists(tdb, 0, i-1, F_WRLCK);
return -1;
......
......@@ -34,7 +34,7 @@ int main(int argc, char *argv[])
ok1(rec_key_length(&rec) == klen);
ok1(rec_data_length(&rec) == dlen);
ok1(rec_extra_padding(&rec) == xlen);
ok1(rec_hash(&rec) == h);
ok1(rec_hash(&rec) << (64 - 11) == h);
ok1(rec_magic(&rec) == TDB_MAGIC);
}
ok1(tap_log_messages == 0);
......
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/check.c>
#include <ccan/tap/tap.h>
#include "logging.h"
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
struct tdb_data key = { (unsigned char *)"key", 3 };
struct tdb_data data = { (unsigned char *)"data", 4 };
plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("/tmp/run-new_database.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
ok1(tdb);
if (tdb) {
/* Modify should fail. */
ok1(tdb_store(tdb, key, data, TDB_MODIFY) == -1);
ok1(tdb_error(tdb) == TDB_ERR_NOEXIST);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Insert should succeed. */
ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
/* Second insert should fail. */
ok1(tdb_store(tdb, key, data, TDB_INSERT) == -1);
ok1(tdb_error(tdb) == TDB_ERR_EXISTS);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
}
}
ok1(tap_log_messages == 0);
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment