Commit d3c01870 authored by Rusty Russell's avatar Rusty Russell

tdb: use tdb_lockall_gradual by default.

This has the advantage the transactions will also use it, preventing them
from blocking.
parent ee9f0389
......@@ -152,14 +152,6 @@ int tdb_brlock(struct tdb_context *tdb,
return -1;
}
/* Sanity check */
if (tdb->transaction && offset >= lock_offset(-1) && len != 0) {
tdb->ecode = TDB_ERR_RDONLY;
TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_brlock attempted in transaction at offset %d rw_type=%d flags=%d len=%d\n",
offset, rw_type, flags, (int)len));
return -1;
}
do {
ret = fcntl_lock(tdb, rw_type, offset, len,
flags & TDB_LOCK_WAIT);
......@@ -520,8 +512,43 @@ static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
return 1;
}
/* We only need to lock individual bytes, but Linux merges consecutive locks
* so we lock in contiguous ranges. */
static int tdb_chainlock_gradual(struct tdb_context *tdb,
int ltype, enum tdb_lock_flags flags,
size_t off, size_t len)
{
int ret;
enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
if (len <= 4) {
/* Single record. Just do blocking lock. */
return tdb_brlock(tdb, ltype, off, len, flags);
}
/* First we try non-blocking. */
ret = tdb_brlock(tdb, ltype, off, len, nb_flags);
if (ret == 0) {
return 0;
}
/* Try locking first half, then second. */
ret = tdb_chainlock_gradual(tdb, ltype, flags, off, len / 2);
if (ret == -1)
return -1;
ret = tdb_chainlock_gradual(tdb, ltype, flags,
off + len / 2, len - len / 2);
if (ret == -1) {
tdb_brunlock(tdb, ltype, off, len / 2);
return -1;
}
return 0;
}
/* lock/unlock entire database. It can only be upgradable if you have some
* other way of guaranteeing exclusivity (ie. transaction write lock). */
* other way of guaranteeing exclusivity (ie. transaction write lock).
* We do the locking gradually to avoid being starved by smaller locks. */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
......@@ -532,10 +559,23 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
return 0;
}
if (tdb_brlock(tdb, ltype, FREELIST_TOP, 0, flags)) {
if (flags & TDB_LOCK_WAIT) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
}
/* We cover two kinds of locks:
* 1) Normal chain locks. Taken for almost all operations.
* 3) Individual records locks. Taken after normal or free
* chain locks.
*
* It is (1) which cause the starvation problem, so we're only
* gradual for that. */
if (tdb_chainlock_gradual(tdb, ltype, flags, FREELIST_TOP,
tdb->header.hash_size * 4) == -1) {
return -1;
}
/* Grab individual record locks. */
if (tdb_brlock(tdb, ltype, lock_offset(tdb->header.hash_size), 0,
flags) == -1) {
tdb_brunlock(tdb, ltype, FREELIST_TOP,
tdb->header.hash_size * 4);
return -1;
}
......@@ -660,85 +700,6 @@ int tdb_unlockall_read(struct tdb_context *tdb)
return tdb_allrecord_unlock(tdb, F_RDLCK, false);
}
/* We only need to lock individual bytes, but Linux merges consecutive locks
* so we lock in contiguous ranges. */
static int tdb_chainlock_gradual(struct tdb_context *tdb,
size_t off, size_t len)
{
int ret;
if (len <= 4) {
/* Single record. Just do blocking lock. */
return tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_WAIT);
}
/* First we try non-blocking. */
ret = tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_NOWAIT);
if (ret == 0) {
return 0;
}
/* Try locking first half, then second. */
ret = tdb_chainlock_gradual(tdb, off, len / 2);
if (ret == -1)
return -1;
ret = tdb_chainlock_gradual(tdb, off + len / 2, len - len / 2);
if (ret == -1) {
tdb_brunlock(tdb, F_WRLCK, off, len / 2);
return -1;
}
return 0;
}
/* We do the locking gradually to avoid being starved by smaller locks. */
int tdb_lockall_gradual(struct tdb_context *tdb)
{
int ret;
/* This checks for other locks, nesting. */
ret = tdb_allrecord_check(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
if (ret == -1 || ret == 0)
return ret;
/* We cover two kinds of locks:
* 1) Normal chain locks. Taken for almost all operations.
* 3) Individual records locks. Taken after normal or free
* chain locks.
*
* It is (1) which cause the starvation problem, so we're only
* gradual for that. */
if (tdb_chainlock_gradual(tdb, FREELIST_TOP,
tdb->header.hash_size * 4) == -1) {
return -1;
}
/* Grab individual record locks. */
if (tdb_brlock(tdb, F_WRLCK, lock_offset(tdb->header.hash_size), 0,
TDB_LOCK_WAIT) == -1) {
tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP,
tdb->header.hash_size * 4);
return -1;
}
/* That adds up to an allrecord lock. */
tdb->allrecord_lock.count = 1;
tdb->allrecord_lock.ltype = F_WRLCK;
tdb->allrecord_lock.off = false;
/* Just check we don't need recovery... */
if (tdb_needs_recovery(tdb)) {
tdb_allrecord_unlock(tdb, F_WRLCK, false);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
/* Try again. */
return tdb_lockall_gradual(tdb);
}
return 0;
}
/* lock/unlock one hash chain. This is meant to be used to reduce
contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
......
......@@ -132,7 +132,6 @@ int tdb_lockall_read_nonblock(struct tdb_context *tdb);
int tdb_unlockall_read(struct tdb_context *tdb);
int tdb_lockall_mark(struct tdb_context *tdb);
int tdb_lockall_unmark(struct tdb_context *tdb);
int tdb_lockall_gradual(struct tdb_context *tdb);
const char *tdb_name(struct tdb_context *tdb);
int tdb_fd(struct tdb_context *tdb);
tdb_log_func tdb_log_fn(struct tdb_context *tdb);
......
......@@ -77,9 +77,13 @@ int main(int argc, char *argv[])
if (strcmp(argv[1], "lockall") == 0)
lockall = tdb_lockall;
else if (strcmp(argv[1], "gradual") == 0)
else if (strcmp(argv[1], "gradual") == 0) {
#if 0
lockall = tdb_lockall_gradual;
else
#else
errx(1, "gradual is now the default implementation");
#endif
} else
usage("Arg1 should be 'lockall' or 'gradual'");
num = atoi(argv[2]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment