Commit 7fe32184 authored by Rusty Russell's avatar Rusty Russell

tdb2: TDB_ATTRIBUTE_FLOCK support

This allows overriding of low-level locking functions.  This allows
special effects such as non-blocking operations, and lock proxying.

We rename some local lock vars to l to avoid -Wshadow warnings.
parent 06935292
......@@ -37,7 +37,7 @@ static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
call);
}
/* If we fork, we no longer really own locks. */
/* If we fork, we no longer really own locks: preserves errno */
static bool check_lock_pid(struct tdb_context *tdb,
const char *call, bool log)
{
......@@ -60,34 +60,59 @@ static bool check_lock_pid(struct tdb_context *tdb,
return false;
}
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
int tdb_fcntl_lock(int fd, int rw, off_t off, off_t len, bool waitflag,
void *unused)
{
struct flock fl;
int ret;
do {
fl.l_type = rw;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
if (waitflag)
ret = fcntl(fd, F_SETLKW, &fl);
else
ret = fcntl(fd, F_SETLK, &fl);
} while (ret != 0 && errno == EINTR);
return ret;
}
int tdb_fcntl_unlock(int fd, int rw, off_t off, off_t len, void *unused)
{
struct flock fl;
int ret;
do {
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
fl.l_type = rw;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
fl.l_pid = 0;
ret = fcntl(fd, F_SETLKW, &fl);
} while (ret != 0 && errno == EINTR);
return ret;
}
static int lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
{
if (tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0) {
tdb->file->locker = getpid();
}
add_stat(tdb, lock_lowlevel, 1);
if (waitflag)
return fcntl(tdb->file->fd, F_SETLKW, &fl);
else {
if (!waitflag)
add_stat(tdb, lock_nonblock, 1);
return fcntl(tdb->file->fd, F_SETLK, &fl);
}
return tdb->lock_fn(tdb->file->fd, rw, off, len, waitflag,
tdb->lock_data);
}
static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
static int unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
{
struct flock fl;
#if 0 /* Check they matched up locks and unlocks correctly. */
char line[80];
FILE *locks;
......@@ -146,13 +171,7 @@ static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
fclose(locks);
#endif
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
fl.l_pid = 0;
return fcntl(tdb->file->fd, F_SETLKW, &fl);
return tdb->unlock_fn(tdb->file->fd, rw, off, len, tdb->lock_data);
}
/* a byte range locking function - return 0 on success
......@@ -183,16 +202,13 @@ static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
(long long)(offset + len));
}
do {
ret = fcntl_lock(tdb, rw_type, offset, len,
flags & TDB_LOCK_WAIT);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
ret = lock(tdb, rw_type, offset, len, flags & TDB_LOCK_WAIT);
if (ret != 0) {
/* Generic lock error. errno set by fcntl.
* EAGAIN is an expected return from non-blocking
* locks. */
if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
if (!(flags & TDB_LOCK_PROBE)
&& (errno != EAGAIN && errno != EINTR)) {
tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_brlock failed (fd=%d) at"
" offset %zu rw_type=%d flags=%d len=%zu:"
......@@ -214,17 +230,15 @@ static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
return TDB_SUCCESS;
}
do {
ret = fcntl_unlock(tdb, rw_type, offset, len);
} while (ret == -1 && errno == EINTR);
ret = unlock(tdb, rw_type, offset, len);
/* If we fail, *then* we verify that we owned the lock. If not, ok. */
if (ret == -1 && check_lock_pid(tdb, "tdb_brunlock", false)) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_brunlock failed (fd=%d) at offset %zu"
" rw_type=%d len=%zu",
" rw_type=%d len=%zu: %s",
tdb->file->fd, (size_t)offset, rw_type,
(size_t)len);
(size_t)len, strerror(errno));
}
return TDB_SUCCESS;
}
......@@ -276,8 +290,11 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
tv.tv_usec = 1;
select(0, NULL, NULL, NULL, &tv);
}
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_allrecord_upgrade failed");
if (errno != EAGAIN && errno != EINTR)
tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_allrecord_upgrade failed");
return TDB_ERR_LOCK;
}
static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
......@@ -699,7 +716,7 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
int ltype, enum tdb_lock_flags waitflag)
{
/* FIXME: Do this properly, using hlock_range */
unsigned lock = TDB_HASH_LOCK_START
unsigned l = TDB_HASH_LOCK_START
+ (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
/* a allrecord lock allows us to avoid per chain locks */
......@@ -732,14 +749,14 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
" already have expansion lock");
}
return tdb_nest_lock(tdb, lock, ltype, waitflag);
return tdb_nest_lock(tdb, l, ltype, waitflag);
}
enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
tdb_off_t hash_lock,
tdb_len_t hash_range, int ltype)
{
unsigned lock = TDB_HASH_LOCK_START
unsigned l = TDB_HASH_LOCK_START
+ (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
if (tdb->flags & TDB_NOLOCK)
......@@ -755,7 +772,7 @@ enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
return TDB_SUCCESS;
}
return tdb_nest_unlock(tdb, lock, ltype);
return tdb_nest_unlock(tdb, l, ltype);
}
/* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
......
......@@ -212,6 +212,8 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb->access = NULL;
tdb->last_error = TDB_SUCCESS;
tdb->file = NULL;
tdb->lock_fn = fcntl_lock;
tdb->unlock_fn = fcntl_unlock;
tdb_hash_init(tdb);
tdb_io_init(tdb);
......@@ -237,6 +239,11 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
case TDB_ATTRIBUTE_OPENHOOK:
openhook = &attr->openhook;
break;
case TDB_ATTRIBUTE_FLOCK:
tdb->lock_fn = attr->flock.lock;
tdb->unlock_fn = attr->flock.unlock;
tdb->lock_data = attr->flock.data;
break;
default:
ecode = tdb_logerr(tdb, TDB_ERR_EINVAL,
TDB_LOG_USE_ERROR,
......@@ -340,7 +347,8 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
/* ensure there is only one process initialising at once */
ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
if (ecode != TDB_SUCCESS) {
goto fail;
saved_errno = errno;
goto fail_errno;
}
/* call their open hook if they gave us one. */
......
......@@ -376,6 +376,11 @@ struct tdb_context {
void *hash_data;
uint64_t hash_seed;
/* low level (fnctl) lock functions. */
int (*lock_fn)(int fd, int rw, off_t off, off_t len, bool w, void *);
int (*unlock_fn)(int fd, int rw, off_t off, off_t len, void *);
void *lock_data;
/* Set if we are in a transaction. */
struct tdb_transaction *transaction;
......@@ -584,6 +589,10 @@ bool tdb_has_expansion_lock(struct tdb_context *tdb);
/* If it needs recovery, grab all the locks and do it. */
enum TDB_ERROR tdb_lock_and_recover(struct tdb_context *tdb);
/* Default lock and unlock functions. */
int tdb_fcntl_lock(int fd, int rw, off_t off, off_t len, bool waitflag, void *);
int tdb_fcntl_unlock(int fd, int rw, off_t off, off_t len, void *);
/* transaction.c: */
enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb);
tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb);
......
......@@ -610,7 +610,8 @@ enum tdb_attribute_type {
TDB_ATTRIBUTE_HASH = 1,
TDB_ATTRIBUTE_SEED = 2,
TDB_ATTRIBUTE_STATS = 3,
TDB_ATTRIBUTE_OPENHOOK = 4
TDB_ATTRIBUTE_OPENHOOK = 4,
TDB_ATTRIBUTE_FLOCK = 5
};
/**
......@@ -736,6 +737,25 @@ struct tdb_attribute_openhook {
void *data;
};
/**
* struct tdb_attribute_flock - tdb special effects hook for file locking
*
* This attribute contains function to call to place locks on a file; it can
* be used to support non-blocking operations or lock proxying.
*
* They should return 0 on success, -1 on failure and set errno.
*
* An error will be logged on error if errno is neither EAGAIN nor EINTR
* (normally it would only return EAGAIN if waitflag is false, and
* loop internally on EINTR).
*/
struct tdb_attribute_flock {
struct tdb_attribute_base base; /* .attr = TDB_ATTRIBUTE_FLOCK */
int (*lock)(int fd,int rw, off_t off, off_t len, bool waitflag, void *);
int (*unlock)(int fd, int rw, off_t off, off_t len, void *);
void *data;
};
/**
* union tdb_attribute - tdb attributes.
*
......@@ -744,7 +764,7 @@ struct tdb_attribute_openhook {
* See also:
* struct tdb_attribute_log, struct tdb_attribute_hash,
* struct tdb_attribute_seed, struct tdb_attribute_stats,
* struct tdb_attribute_openhook.
* struct tdb_attribute_openhook, struct tdb_attribute_flock.
*/
union tdb_attribute {
struct tdb_attribute_base base;
......@@ -753,6 +773,7 @@ union tdb_attribute {
struct tdb_attribute_seed seed;
struct tdb_attribute_stats stats;
struct tdb_attribute_openhook openhook;
struct tdb_attribute_flock flock;
};
#ifdef __cplusplus
......
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/open.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/hash.c>
#include <ccan/tdb2/check.c>
#include <ccan/tdb2/transaction.c>
#include <ccan/tdb2/traverse.c>
#include <ccan/tap/tap.h>
#include "logging.h"
static int mylock(int fd, int rw, off_t off, off_t len, bool waitflag,
void *_err)
{
int *lock_err = _err;
struct flock fl;
int ret;
if (*lock_err) {
errno = *lock_err;
return -1;
}
do {
fl.l_type = rw;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
if (waitflag)
ret = fcntl(fd, F_SETLKW, &fl);
else
ret = fcntl(fd, F_SETLK, &fl);
} while (ret != 0 && errno == EINTR);
return ret;
}
static int myunlock(int fd, int rw, off_t off, off_t len, void *_err)
{
int *lock_err = _err;
struct flock fl;
int ret;
if (*lock_err) {
errno = *lock_err;
return -1;
}
do {
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
fl.l_start = off;
fl.l_len = len;
ret = fcntl(fd, F_SETLKW, &fl);
} while (ret != 0 && errno == EINTR);
return ret;
}
static int trav_err;
static int trav(struct tdb_context *tdb, TDB_DATA k, TDB_DATA d, int *err)
{
*err = trav_err;
return 0;
}
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
int flags[] = { TDB_DEFAULT, TDB_NOMMAP,
TDB_CONVERT, TDB_NOMMAP|TDB_CONVERT };
union tdb_attribute lock_attr;
struct tdb_data key = tdb_mkdata("key", 3);
struct tdb_data data = tdb_mkdata("data", 4);
int lock_err;
lock_attr.base.attr = TDB_ATTRIBUTE_FLOCK;
lock_attr.base.next = &tap_log_attr;
lock_attr.flock.lock = mylock;
lock_attr.flock.unlock = myunlock;
lock_attr.flock.data = &lock_err;
plan_tests(sizeof(flags) / sizeof(flags[0]) * 80);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
struct tdb_data d;
/* Nonblocking open; expect no error message. */
lock_err = EAGAIN;
tdb = tdb_open("run-82-lockattr.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
ok(errno == lock_err, "Errno is %u", errno);
ok1(!tdb);
ok1(tap_log_messages == 0);
lock_err = EINTR;
tdb = tdb_open("run-82-lockattr.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
ok(errno == lock_err, "Errno is %u", errno);
ok1(!tdb);
ok1(tap_log_messages == 0);
/* Forced fail open. */
lock_err = ENOMEM;
tdb = tdb_open("run-82-lockattr.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
ok1(errno == lock_err);
ok1(!tdb);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
lock_err = 0;
tdb = tdb_open("run-82-lockattr.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
if (!ok1(tdb))
continue;
ok1(tap_log_messages == 0);
/* Nonblocking store. */
lock_err = EAGAIN;
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
/* Nonblocking fetch. */
lock_err = EAGAIN;
ok1(!tdb_exists(tdb, key));
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(!tdb_exists(tdb, key));
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(!tdb_exists(tdb, key));
ok1(tap_log_messages == 1);
tap_log_messages = 0;
lock_err = EAGAIN;
ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
/* Nonblocking delete. */
lock_err = EAGAIN;
ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
/* Nonblocking locks. */
lock_err = EAGAIN;
ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
lock_err = EAGAIN;
ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
lock_err = EAGAIN;
ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
/* This actually does divide and conquer. */
ok1(tap_log_messages > 0);
tap_log_messages = 0;
lock_err = EAGAIN;
ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages > 0);
tap_log_messages = 0;
/* Nonblocking traverse; go nonblock partway through. */
lock_err = 0;
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
trav_err = EAGAIN;
ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
trav_err = EINTR;
lock_err = 0;
ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
trav_err = ENOMEM;
lock_err = 0;
ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
/* Nonblocking transactions. */
lock_err = EAGAIN;
ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = EINTR;
ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = ENOMEM;
ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 1);
tap_log_messages = 0;
/* Nonblocking transaction prepare. */
lock_err = 0;
ok1(tdb_transaction_start(tdb) == 0);
ok1(tdb_delete(tdb, key) == 0);
lock_err = EAGAIN;
ok1(tdb_transaction_prepare_commit(tdb) == TDB_ERR_LOCK);
ok1(tap_log_messages == 0);
lock_err = 0;
ok1(tdb_transaction_prepare_commit(tdb) == 0);
ok1(tdb_transaction_commit(tdb) == 0);
/* And the transaction was committed, right? */
ok1(!tdb_exists(tdb, key));
tdb_close(tdb);
ok1(tap_log_messages == 0);
}
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment