Commit 06e0037d authored by Rusty Russell's avatar Rusty Russell

tdb2: tdb_expand on empty database now tested.

parent ebdd6451
......@@ -86,8 +86,8 @@ static bool check_header(struct tdb_context *tdb)
static int off_cmp(const tdb_off_t *a, const tdb_off_t *b)
{
/* Can overflow an int. */
return a > b ? 1
: a < b ? -1
return *a > *b ? 1
: *a < *b ? -1
: 0;
}
......
......@@ -45,10 +45,17 @@ static unsigned int quick_random(struct tdb_context *tdb)
}
/* Start by using a random zone to spread the load. */
uint64_t random_free_zone(struct tdb_context *tdb)
void tdb_zone_init(struct tdb_context *tdb)
{
/* num_zones might be out of date, but can only increase */
return quick_random(tdb) % tdb->header.v.num_zones;
/*
* We read num_zones without a proper lock, so we could have
* gotten a partial read. Since zone_bits is 1 byte long, we
* can trust that; even if it's increased, the number of zones
* cannot have decreased. And using the map size means we
* will not start with a zone which hasn't been filled yet.
*/
tdb->last_zone = quick_random(tdb)
% ((tdb->map_size >> tdb->header.v.zone_bits) + 1);
}
static unsigned fls64(uint64_t val)
......@@ -559,33 +566,42 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
bool growing)
{
uint64_t new_num_buckets, new_num_zones, new_zone_bits;
uint64_t old_num_total, i;
uint64_t i, old_num_total, old_num_zones, old_size, old_zone_bits;
tdb_len_t add, freebucket_size, needed;
tdb_off_t off, old_free_off;
const tdb_off_t *oldf;
struct tdb_used_record fhdr;
/* We need room for the record header too. */
needed = sizeof(struct tdb_used_record)
+ adjust_size(klen, dlen, growing);
/* tdb_allrecord_lock will update header; did zones change? */
old_zone_bits = tdb->header.v.zone_bits;
old_num_zones = tdb->header.v.num_zones;
/* FIXME: this is overkill. An expand lock? */
if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false) == -1)
return -1;
/* Someone may have expanded for us. */
if (update_header(tdb))
if (old_zone_bits != tdb->header.v.zone_bits
|| old_num_zones != tdb->header.v.num_zones)
goto success;
/* Make sure we have the latest size. */
/* They may have also expanded the underlying size (otherwise we'd
* have expanded our mmap to look at those offsets already). */
old_size = tdb->map_size;
tdb->methods->oob(tdb, tdb->map_size + 1, true);
if (tdb->map_size != old_size)
goto success;
/* Did we enlarge zones without enlarging file? */
if (tdb->map_size < tdb->header.v.num_zones<<tdb->header.v.zone_bits) {
add = (tdb->header.v.num_zones<<tdb->header.v.zone_bits)
- tdb->map_size;
/* Updates tdb->map_size. */
if (tdb->methods->expand_file(tdb, tdb->map_size, add) == -1)
if (tdb->methods->expand_file(tdb, add) == -1)
goto fail;
if (add_free_record(tdb, tdb->map_size - add, add) == -1)
goto fail;
......@@ -628,7 +644,7 @@ int tdb_expand(struct tdb_context *tdb, tdb_len_t klen, tdb_len_t dlen,
}
/* Updates tdb->map_size. */
if (tdb->methods->expand_file(tdb, tdb->map_size, add) == -1)
if (tdb->methods->expand_file(tdb, add) == -1)
goto fail;
/* Use first part as new free bucket array. */
......
......@@ -228,7 +228,7 @@ int zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
return 0;
} else {
char buf[8192] = { 0 };
return fill(tdb, buf, sizeof(buf), len, off);
return fill(tdb, buf, sizeof(buf), off, len);
}
}
......@@ -445,8 +445,7 @@ int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
/* expand a file. we prefer to use ftruncate, as that is what posix
says to use for mmap expansion */
static int tdb_expand_file(struct tdb_context *tdb,
tdb_len_t size, tdb_len_t addition)
static int tdb_expand_file(struct tdb_context *tdb, tdb_len_t addition)
{
char buf[8192];
......@@ -455,15 +454,33 @@ static int tdb_expand_file(struct tdb_context *tdb,
return -1;
}
/* If this fails, we try to fill anyway. */
if (ftruncate(tdb->fd, size+addition))
;
if (tdb->flags & TDB_INTERNAL) {
char *new = realloc(tdb->map_ptr, tdb->map_size + addition);
if (!new) {
tdb->ecode = TDB_ERR_OOM;
return -1;
}
tdb->map_ptr = new;
tdb->map_size += addition;
} else {
/* Unmap before trying to write; old TDB claimed OpenBSD had
* problem with this otherwise. */
tdb_munmap(tdb);
/* now fill the file with something. This ensures that the
file isn't sparse, which would be very bad if we ran out of
disk. This must be done with write, not via mmap */
memset(buf, 0x43, sizeof(buf));
return fill(tdb, buf, sizeof(buf), addition, size);
/* If this fails, we try to fill anyway. */
if (ftruncate(tdb->fd, tdb->map_size + addition))
;
/* now fill the file with something. This ensures that the
file isn't sparse, which would be very bad if we ran out of
disk. This must be done with write, not via mmap */
memset(buf, 0x43, sizeof(buf));
if (fill(tdb, buf, sizeof(buf), tdb->map_size, addition) == -1)
return -1;
tdb->map_size += addition;
tdb_mmap(tdb);
}
return 0;
}
const void *tdb_access_read(struct tdb_context *tdb,
......@@ -558,84 +575,6 @@ static void tdb_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
(*chain) = h;
}
/* expand the database by expanding the underlying file and doing the
mmap again if necessary */
int tdb_expand(struct tdb_context *tdb)
{
struct tdb_record rec;
tdb_off_t offset, new_size;
/* We have to lock every hash bucket and every free list. */
do {
if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
return -1;
}
/* must know about any previous expansions by another process */
tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
/* always make room for at least 100 more records, and at
least 25% more space. Round the database up to a multiple
of the page size */
new_size = MAX(tdb->map_size + size*100, tdb->map_size * 1.25);
size = TDB_ALIGN(new_size, tdb->page_size) - tdb->map_size;
if (!(tdb->flags & TDB_INTERNAL))
tdb_munmap(tdb);
/*
* We must ensure the file is unmapped before doing this
* to ensure consistency with systems like OpenBSD where
* writes and mmaps are not consistent.
*/
/* expand the file itself */
if (!(tdb->flags & TDB_INTERNAL)) {
if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
goto fail;
}
tdb->map_size += size;
if (tdb->flags & TDB_INTERNAL) {
char *new_map_ptr = (char *)realloc(tdb->map_ptr,
tdb->map_size);
if (!new_map_ptr) {
tdb->map_size -= size;
goto fail;
}
tdb->map_ptr = new_map_ptr;
} else {
/*
* We must ensure the file is remapped before adding the space
* to ensure consistency with systems like OpenBSD where
* writes and mmaps are not consistent.
*/
/* We're ok if the mmap fails as we'll fallback to read/write */
tdb_mmap(tdb);
}
/* form a new freelist record */
memset(&rec,'\0',sizeof(rec));
rec.rec_len = size - sizeof(rec);
/* link it into the free list */
offset = tdb->map_size - size;
if (tdb_free(tdb, offset, &rec) == -1)
goto fail;
tdb_unlock(tdb, -1, F_WRLCK);
return 0;
fail:
tdb_unlock(tdb, -1, F_WRLCK);
return -1;
}
/* read/write a tdb_off_t */
int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
{
......
......@@ -567,6 +567,7 @@ int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
tdb->allrecord_lock.count = 0;
tdb->allrecord_lock.ltype = 0;
tdb->header_uptodate = false;
hash_size = (1ULL << tdb->header.v.hash_bits);
......@@ -668,7 +669,7 @@ int tdb_lock_free_list(struct tdb_context *tdb, tdb_off_t flist,
enum tdb_lock_flags waitflag)
{
/* You're supposed to have a hash lock first! */
if (!tdb_has_locks(tdb)) {
if (!(tdb->flags & TDB_NOLOCK) && !tdb_has_locks(tdb)) {
tdb->ecode = TDB_ERR_LOCK;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
"tdb_lock_free_list without lock!\n");
......@@ -847,3 +848,10 @@ void tdb_release_transaction_locks(struct tdb_context *tdb)
tdb->header_uptodate = false;
}
#endif
void tdb_lock_init(struct tdb_context *tdb)
{
tdb->num_lockrecs = 0;
tdb->lockrecs = NULL;
tdb->allrecord_lock.count = 0;
}
......@@ -223,6 +223,9 @@ struct tdb_context {
tdb_hashfn_t khash;
void *hash_priv;
/* Set if we are in a transaction. */
struct tdb_transaction *transaction;
/* What zone of the tdb to use, for spreading load. */
uint64_t last_zone;
......@@ -234,9 +237,6 @@ struct tdb_context {
uint64_t num_lockrecs;
struct tdb_lock_type *lockrecs;
/* Set if we are in a transaction. */
struct tdb_transaction *transaction;
/* Single list of all TDBs, to avoid multiple opens. */
struct tdb_context *next;
dev_t device;
......@@ -247,7 +247,7 @@ struct tdb_methods {
int (*read)(struct tdb_context *, tdb_off_t, void *, tdb_len_t);
int (*write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
int (*oob)(struct tdb_context *, tdb_off_t, bool);
int (*expand_file)(struct tdb_context *, tdb_len_t, tdb_len_t);
int (*expand_file)(struct tdb_context *, tdb_len_t);
};
/*
......@@ -262,7 +262,7 @@ uint64_t tdb_hash(struct tdb_context *tdb, const void *ptr, size_t len);
/* free.c: */
uint64_t random_free_zone(struct tdb_context *tdb);
void tdb_zone_init(struct tdb_context *tdb);
/* If this fails, try tdb_expand. */
tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
......@@ -346,6 +346,8 @@ int tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off);
/* lock.c: */
void tdb_lock_init(struct tdb_context *tdb);
/* Lock/unlock a particular hash list. */
int tdb_lock_list(struct tdb_context *tdb, tdb_off_t list,
int ltype, enum tdb_lock_flags waitflag);
......
......@@ -25,13 +25,13 @@ bool update_header(struct tdb_context *tdb)
{
struct tdb_header_volatile pad, *v;
if (tdb->header_uptodate) {
if (!(tdb->flags & TDB_NOLOCK) && tdb->header_uptodate) {
tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
"warning: header uptodate already\n");
}
/* We could get a partial update if we're not holding any locks. */
assert(tdb_has_locks(tdb));
assert((tdb->flags & TDB_NOLOCK) || tdb_has_locks(tdb));
v = tdb_get(tdb, offsetof(struct tdb_header, v), &pad, sizeof(*v));
if (!v) {
......@@ -207,20 +207,28 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
uint64_t hash_test;
unsigned v;
if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
tdb = malloc(sizeof(*tdb));
if (!tdb) {
/* Can't log this */
errno = ENOMEM;
goto fail;
}
tdb->fd = -1;
tdb->name = NULL;
tdb->map_ptr = NULL;
tdb->fd = -1;
/* map_size will be set below. */
tdb->ecode = TDB_SUCCESS;
/* header will be read in below. */
tdb->header_uptodate = false;
tdb->flags = tdb_flags;
tdb->log = null_log_fn;
tdb->log_priv = NULL;
tdb->khash = jenkins_hash;
tdb->hash_priv = NULL;
tdb->transaction = NULL;
/* last_zone will be set below. */
tdb_io_init(tdb);
tdb_lock_init(tdb);
/* FIXME */
if (attr) {
......@@ -238,12 +246,13 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
}
if ((open_flags & O_ACCMODE) == O_RDONLY) {
tdb->read_only = 1;
tdb->read_only = true;
/* read only databases don't do locking */
tdb->flags |= TDB_NOLOCK;
}
} else
tdb->read_only = false;
/* internal databases don't mmap or lock */
/* internal databases don't need any of the rest. */
if (tdb->flags & TDB_INTERNAL) {
tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
if (tdb_new_database(tdb) != 0) {
......@@ -253,7 +262,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
}
TEST_IT(tdb->flags & TDB_CONVERT);
tdb_convert(tdb, &tdb->header, sizeof(tdb->header));
goto internal;
return tdb;
}
if ((tdb->fd = open(name, open_flags, mode)) == -1) {
......@@ -331,15 +340,10 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb->map_size = st.st_size;
tdb->device = st.st_dev;
tdb->inode = st.st_ino;
tdb_io_init(tdb);
tdb_mmap(tdb);
internal:
/* Internal (memory-only) databases skip all the code above to
* do with disk files, and resume here by releasing their
* open lock and hooking into the active list. */
tdb_unlock_open(tdb);
tdb->last_zone = random_free_zone(tdb);
tdb_zone_init(tdb);
tdb->next = tdbs;
tdbs = tdb;
return tdb;
......@@ -380,8 +384,7 @@ static void unlock_lists(struct tdb_context *tdb,
{
do {
tdb_unlock_list(tdb, start, ltype);
start = (start + ((1ULL << tdb->header.v.hash_bits) - 1))
& ((1ULL << tdb->header.v.hash_bits) - 1);
start = (start + 1) & ((1ULL << tdb->header.v.hash_bits) - 1);
} while (start != end);
}
......@@ -879,3 +882,8 @@ int tdb_close(struct tdb_context *tdb)
return ret;
}
enum TDB_ERROR tdb_error(struct tdb_context *tdb)
{
return tdb->ecode;
}
......@@ -133,6 +133,8 @@ int tdb_check(struct tdb_context *tdb,
int (*check)(TDB_DATA key, TDB_DATA data, void *private_data),
void *private_data);
enum TDB_ERROR tdb_error(struct tdb_context *tdb);
extern struct tdb_data tdb_null;
#ifdef __cplusplus
......
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <ccan/tap/tap.h>
#include "logging.h"
unsigned tap_log_messages;
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
const char *fmt, ...)
{
va_list ap;
char *p;
va_start(ap, fmt);
if (vasprintf(&p, fmt, ap) == -1)
abort();
diag("tdb log level %u: %s", level, p);
free(p);
tap_log_messages++;
va_end(ap);
}
#ifndef TDB2_TEST_LOGGING_H
#define TDB2_TEST_LOGGING_H
#include <ccan/tdb2/tdb2.h>
unsigned tap_log_messages;
void tap_log_fn(struct tdb_context *tdb,
enum tdb_debug_level level, void *priv,
const char *fmt, ...);
#endif /* TDB2_TEST_LOGGING_H */
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/check.c>
#include <ccan/tap/tap.h>
#include "logging.h"
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
plan_tests(sizeof(flags) / sizeof(flags[0]) * 3 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("/tmp/run-new_database.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
ok1(tdb);
if (tdb) {
ok1(tdb_expand(tdb, 1, 1, false) == 0);
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
}
}
ok1(tap_log_messages == 0);
return exit_status();
}
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/check.c>
#include <ccan/tap/tap.h>
#include "logging.h"
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
int flags[] = { TDB_INTERNAL, TDB_DEFAULT,
TDB_INTERNAL|TDB_CONVERT, TDB_CONVERT };
plan_tests(sizeof(flags) / sizeof(flags[0]) * 2 + 1);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("/tmp/run-new_database.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, NULL);
tdb->log = tap_log_fn;
ok1(tdb);
if (tdb) {
ok1(tdb_check(tdb, NULL, NULL) == 0);
tdb_close(tdb);
}
}
ok1(tap_log_messages == 0);
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment