Commit bb4d59ee authored by Rusty Russell's avatar Rusty Russell

tdb2: allow multiple opens of the same file.

Finally, we enable sharing of the struct tdb_file.  We add a reference count
so we can free the struct tdb_file when the last tdb is closed.
parent 2960e90f
......@@ -853,7 +853,13 @@ Status
\end_layout
\begin_layout Standard
\change_deleted 0 1300360823
Incomplete.
\change_inserted 0 1300360824
Complete.
\change_unchanged
\end_layout
\begin_layout Subsection
......
......@@ -30,14 +30,11 @@
#include <ccan/build_assert/build_assert.h>
/* If we were threaded, we could wait for unlock, but we're not, so fail. */
static bool owner_conflict(struct tdb_context *tdb, struct tdb_lock *lock)
static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
{
if (lock->owner != tdb) {
tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"Lock already owned by another opener");
return true;
}
return false;
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"%s: lock owned by another tdb in this process.",
call);
}
static int fcntl_lock(struct tdb_context *tdb,
......@@ -226,6 +223,10 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
" already upgraded?");
}
if (tdb->file->allrecord_lock.owner != tdb) {
return owner_conflict(tdb, "tdb_allrecord_upgrade");
}
while (count--) {
struct timeval tv;
if (tdb_brlock(tdb, F_WRLCK,
......@@ -306,8 +307,9 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
new_lck = find_nestlock(tdb, offset, NULL);
if (new_lck) {
if (owner_conflict(tdb, new_lck))
return -1;
if (new_lck->owner != tdb) {
return owner_conflict(tdb, "tdb_nest_lock");
}
if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
......@@ -470,14 +472,17 @@ enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum TDB_ERROR ecode;
tdb_bool_err berr;
if (tdb->file->allrecord_lock.count
&& (ltype == F_RDLCK
|| tdb->file->allrecord_lock.ltype == F_WRLCK)) {
tdb->file->allrecord_lock.count++;
return TDB_SUCCESS;
}
if (tdb->file->allrecord_lock.count) {
if (tdb->file->allrecord_lock.owner != tdb) {
return owner_conflict(tdb, "tdb_allrecord_lock");
}
if (ltype == F_RDLCK
|| tdb->file->allrecord_lock.ltype == F_WRLCK) {
tdb->file->allrecord_lock.count++;
return TDB_SUCCESS;
}
/* a global lock of a different type exists */
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"tdb_allrecord_lock: already have %s lock",
......@@ -658,12 +663,14 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
+ (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->file->allrecord_lock.count &&
(ltype == tdb->file->allrecord_lock.ltype || ltype == F_RDLCK)) {
return TDB_SUCCESS;
}
if (tdb->file->allrecord_lock.count) {
if (tdb->file->allrecord_lock.owner != tdb)
return owner_conflict(tdb, "tdb_lock_hashes");
if (ltype == tdb->file->allrecord_lock.ltype
|| ltype == F_RDLCK) {
return TDB_SUCCESS;
}
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"tdb_lock_hashes:"
" already have %s allrecordlock",
......
#include "private.h"
#include <assert.h>
/* all lock info, to detect double-opens (fcntl file don't nest!) */
static struct tdb_file *files = NULL;
......@@ -9,6 +10,7 @@ static struct tdb_file *find_file(dev_t device, ino_t ino)
for (i = files; i; i = i->next) {
if (i->device == device && i->inode == ino) {
i->refcnt++;
break;
}
}
......@@ -174,6 +176,7 @@ static enum TDB_ERROR tdb_new_file(struct tdb_context *tdb)
tdb->file->num_lockrecs = 0;
tdb->file->lockrecs = NULL;
tdb->file->allrecord_lock.count = 0;
tdb->file->refcnt = 1;
return TDB_SUCCESS;
}
......@@ -291,7 +294,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
"tdb_open: could not open file %s: %s",
name, strerror(errno));
goto fail;
goto fail_errno;
}
/* on exec, don't inherit the fd */
......@@ -303,7 +306,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
"tdb_open: could not stat open %s: %s",
name, strerror(errno));
goto fail;
goto fail_errno;
}
ecode = tdb_new_file(tdb);
......@@ -316,13 +319,6 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb->file->inode = st.st_ino;
tdb->file->map_ptr = NULL;
tdb->file->map_size = sizeof(struct tdb_header);
} else {
/* FIXME */
ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
"tdb_open: %s (%d,%d) is already open in"
" this process",
name, (int)st.st_dev, (int)st.st_ino);
goto fail;
}
/* ensure there is only one process initialising at once */
......@@ -332,7 +328,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
}
/* If they used O_TRUNC, read will return 0. */
rlen = read(tdb->file->fd, &hdr, sizeof(hdr));
rlen = pread(tdb->file->fd, &hdr, sizeof(hdr), 0);
if (rlen == 0 && (open_flags & O_CREAT)) {
ecode = tdb_new_database(tdb, seed, &hdr);
if (ecode != TDB_SUCCESS) {
......@@ -416,51 +412,54 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
goto fail;
}
/* Add to linked list. */
files = tdb->file;
/* Add to linked list if we're new. */
if (tdb->file->refcnt == 1)
files = tdb->file;
return tdb;
fail:
/* Map ecode to some logical errno. */
if (!saved_errno) {
switch (ecode) {
case TDB_ERR_CORRUPT:
case TDB_ERR_IO:
saved_errno = EIO;
break;
case TDB_ERR_LOCK:
saved_errno = EWOULDBLOCK;
break;
case TDB_ERR_OOM:
saved_errno = ENOMEM;
break;
case TDB_ERR_EINVAL:
saved_errno = EINVAL;
break;
default:
saved_errno = EINVAL;
break;
}
switch (ecode) {
case TDB_ERR_CORRUPT:
case TDB_ERR_IO:
saved_errno = EIO;
break;
case TDB_ERR_LOCK:
saved_errno = EWOULDBLOCK;
break;
case TDB_ERR_OOM:
saved_errno = ENOMEM;
break;
case TDB_ERR_EINVAL:
saved_errno = EINVAL;
break;
default:
saved_errno = EINVAL;
break;
}
fail_errno:
#ifdef TDB_TRACE
close(tdb->tracefd);
#endif
free((char *)tdb->name);
if (tdb->file) {
tdb_unlock_all(tdb);
if (tdb->file->map_ptr) {
if (tdb->flags & TDB_INTERNAL) {
free(tdb->file->map_ptr);
} else
tdb_munmap(tdb->file);
if (--tdb->file->refcnt == 0) {
assert(tdb->file->num_lockrecs == 0);
if (tdb->file->map_ptr) {
if (tdb->flags & TDB_INTERNAL) {
free(tdb->file->map_ptr);
} else
tdb_munmap(tdb->file);
}
if (close(tdb->file->fd) != 0)
tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
"tdb_open: failed to close tdb fd"
" on error: %s", strerror(errno));
free(tdb->file->lockrecs);
free(tdb->file);
}
if (close(tdb->file->fd) != 0)
tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
"tdb_open: failed to close tdb fd"
" on error: %s", strerror(errno));
free(tdb->file->lockrecs);
free(tdb->file);
}
free(tdb);
......@@ -487,17 +486,21 @@ int tdb_close(struct tdb_context *tdb)
free((char *)tdb->name);
if (tdb->file) {
struct tdb_file **i;
ret = close(tdb->file->fd);
/* Remove from files list */
for (i = &files; *i; i = &(*i)->next) {
if (*i == tdb->file) {
*i = tdb->file->next;
break;
tdb_unlock_all(tdb);
if (--tdb->file->refcnt == 0) {
ret = close(tdb->file->fd);
/* Remove from files list */
for (i = &files; *i; i = &(*i)->next) {
if (*i == tdb->file) {
*i = tdb->file->next;
break;
}
}
free(tdb->file->lockrecs);
free(tdb->file);
}
free(tdb->file->lockrecs);
free(tdb->file);
}
#ifdef TDB_TRACE
......
......@@ -320,6 +320,9 @@ struct tdb_file {
/* Single list of all TDBs, to detect multiple opens. */
struct tdb_file *next;
/* How many are sharing us? */
unsigned int refcnt;
/* Mmap (if any), or malloc (for TDB_INTERNAL). */
void *map_ptr;
......
......@@ -4,9 +4,9 @@
#include <stdbool.h>
/* FIXME: Check these! */
#define INITIAL_TDB_MALLOC "open.c", 195, FAILTEST_MALLOC
#define URANDOM_OPEN "open.c", 42, FAILTEST_OPEN
#define URANDOM_READ "open.c", 22, FAILTEST_READ
#define INITIAL_TDB_MALLOC "open.c", 198, FAILTEST_MALLOC
#define URANDOM_OPEN "open.c", 44, FAILTEST_OPEN
#define URANDOM_READ "open.c", 24, FAILTEST_READ
bool exit_check_log(struct failtest_call *history, unsigned num);
bool failmatch(const struct failtest_call *call,
......
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/open.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/hash.c>
#include <ccan/tdb2/check.c>
#include <ccan/tdb2/transaction.c>
#include <ccan/tap/tap.h>
#include "logging.h"
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb, *tdb2;
struct tdb_data key = { (unsigned char *)&i, sizeof(i) };
struct tdb_data data = { (unsigned char *)&i, sizeof(i) };
struct tdb_data d = { NULL, 0 }; /* Bogus GCC warning */
int flags[] = { TDB_DEFAULT, TDB_NOMMAP,
TDB_CONVERT, TDB_NOMMAP|TDB_CONVERT };
plan_tests(sizeof(flags) / sizeof(flags[0]) * 28);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
tdb = tdb_open("run-open-multiple-times.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
ok1(tdb);
if (!tdb)
continue;
tdb2 = tdb_open("run-open-multiple-times.tdb", flags[i],
O_RDWR|O_CREAT, 0600, &tap_log_attr);
ok1(tdb_check(tdb, NULL, NULL) == 0);
ok1(tdb_check(tdb2, NULL, NULL) == 0);
/* Store in one, fetch in the other. */
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
ok1(tdb_fetch(tdb2, key, &d) == TDB_SUCCESS);
ok1(d.dptr && d.dsize == data.dsize
&& memcmp(d.dptr, data.dptr, d.dsize) == 0);
free(d.dptr);
/* Vice versa, with delete. */
ok1(tdb_delete(tdb2, key) == 0);
ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_NOEXIST);
/* OK, now close first one, check second still good. */
ok1(tdb_close(tdb) == 0);
ok1(tdb_store(tdb2, key, data, TDB_REPLACE) == 0);
ok1(tdb_fetch(tdb2, key, &d) == TDB_SUCCESS);
ok1(d.dptr && d.dsize == data.dsize
&& memcmp(d.dptr, data.dptr, d.dsize) == 0);
free(d.dptr);
/* Reopen */
tdb = tdb_open("run-open-multiple-times.tdb", flags[i],
O_RDWR|O_CREAT, 0600, &tap_log_attr);
ok1(tdb);
ok1(tdb_transaction_start(tdb2) == 0);
/* Anything in the other one should fail. */
ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
ok1(tap_log_messages == 2);
ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 3);
ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 4);
/* Transaciton should work as normal. */
ok1(tdb_store(tdb2, key, data, TDB_REPLACE) == TDB_SUCCESS);
/* Now... try closing with locks held. */
ok1(tdb_close(tdb2) == 0);
ok1(tdb_fetch(tdb, key, &d) == TDB_SUCCESS);
ok1(d.dptr && d.dsize == data.dsize
&& memcmp(d.dptr, data.dptr, d.dsize) == 0);
free(d.dptr);
ok1(tdb_close(tdb) == 0);
ok1(tap_log_messages == 4);
tap_log_messages = 0;
}
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment