Commit ee9f0389 authored by Rusty Russell's avatar Rusty Russell

tdb: test and resolultion for tdb_lockall starvation.

parent bb4b8b6b
......@@ -485,11 +485,9 @@ int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false);
}
/* lock/unlock entire database. It can only be upgradable if you have some
* other way of guaranteeing exclusivity (ie. transaction write lock). */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
/* Returns 0 if all done, -1 if error, 1 if ok. */
static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
/* There are no locks on read-only dbs */
if (tdb->read_only || tdb->traverse_read) {
......@@ -519,6 +517,20 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
tdb->ecode = TDB_ERR_LOCK;
return -1;
}
return 1;
}
/* lock/unlock entire database. It can only be upgradable if you have some
* other way of guaranteeing exclusivity (ie. transaction write lock). */
int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable)
{
switch (tdb_allrecord_check(tdb, ltype, flags, upgradable)) {
case -1:
return -1;
case 0:
return 0;
}
if (tdb_brlock(tdb, ltype, FREELIST_TOP, 0, flags)) {
if (flags & TDB_LOCK_WAIT) {
......@@ -648,6 +660,85 @@ int tdb_unlockall_read(struct tdb_context *tdb)
return tdb_allrecord_unlock(tdb, F_RDLCK, false);
}
/* We only need to lock individual bytes, but Linux merges consecutive locks
* so we lock in contiguous ranges. */
static int tdb_chainlock_gradual(struct tdb_context *tdb,
size_t off, size_t len)
{
int ret;
if (len <= 4) {
/* Single record. Just do blocking lock. */
return tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_WAIT);
}
/* First we try non-blocking. */
ret = tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_NOWAIT);
if (ret == 0) {
return 0;
}
/* Try locking first half, then second. */
ret = tdb_chainlock_gradual(tdb, off, len / 2);
if (ret == -1)
return -1;
ret = tdb_chainlock_gradual(tdb, off + len / 2, len - len / 2);
if (ret == -1) {
tdb_brunlock(tdb, F_WRLCK, off, len / 2);
return -1;
}
return 0;
}
/* We do the locking gradually to avoid being starved by smaller locks. */
int tdb_lockall_gradual(struct tdb_context *tdb)
{
int ret;
/* This checks for other locks, nesting. */
ret = tdb_allrecord_check(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
if (ret == -1 || ret == 0)
return ret;
/* We cover two kinds of locks:
* 1) Normal chain locks. Taken for almost all operations.
* 3) Individual records locks. Taken after normal or free
* chain locks.
*
* It is (1) which cause the starvation problem, so we're only
* gradual for that. */
if (tdb_chainlock_gradual(tdb, FREELIST_TOP,
tdb->header.hash_size * 4) == -1) {
return -1;
}
/* Grab individual record locks. */
if (tdb_brlock(tdb, F_WRLCK, lock_offset(tdb->header.hash_size), 0,
TDB_LOCK_WAIT) == -1) {
tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP,
tdb->header.hash_size * 4);
return -1;
}
/* That adds up to an allrecord lock. */
tdb->allrecord_lock.count = 1;
tdb->allrecord_lock.ltype = F_WRLCK;
tdb->allrecord_lock.off = false;
/* Just check we don't need recovery... */
if (tdb_needs_recovery(tdb)) {
tdb_allrecord_unlock(tdb, F_WRLCK, false);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
/* Try again. */
return tdb_lockall_gradual(tdb);
}
return 0;
}
/* lock/unlock one hash chain. This is meant to be used to reduce
contention - it cannot guarantee how many records will be locked */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
......
......@@ -132,6 +132,7 @@ int tdb_lockall_read_nonblock(struct tdb_context *tdb);
int tdb_unlockall_read(struct tdb_context *tdb);
int tdb_lockall_mark(struct tdb_context *tdb);
int tdb_lockall_unmark(struct tdb_context *tdb);
int tdb_lockall_gradual(struct tdb_context *tdb);
const char *tdb_name(struct tdb_context *tdb);
int tdb_fd(struct tdb_context *tdb);
tdb_log_func tdb_log_fn(struct tdb_context *tdb);
......
......@@ -2,7 +2,7 @@ LDLIBS:=../../tdb.o
CFLAGS:=-I../../.. -Wall -O3 #-g -pg
LDFLAGS:=-L../../..
default: replay_trace tdbtorture tdbdump tdbtool
default: replay_trace tdbtorture tdbdump tdbtool starvation
benchmark: replay_trace
@trap "rm -f /tmp/trace.$$$$" 0; for f in benchmarks/*.rz; do if runzip -k $$f -o /tmp/trace.$$$$ && echo -n "$$f": && ./replay_trace --quiet -n 5 replay.tdb /tmp/trace.$$$$ && rm /tmp/trace.$$$$; then rm -f /tmp/trace.$$$$; else exit 1; fi; done
......
/* Demonstrate starvation of tdb_lockall */
#include <ccan/tdb/tdb.h>
#include <err.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
static void usage(const char *extra)
{
errx(1, "%s%s"
"Usage: starvation [lockall|gradual] <num> <worktime-in-ms>\n"
" Each locker holds lock for between 1/2 and 1 1/2 times\n"
" worktime, then sleeps for one second.\n\n"
" Main process tries tdb_lockall or tdb_lockall_gradual.",
extra ? extra : "", extra ? "\n" : "");
}
static void run_and_sleep(struct tdb_context *tdb, int parentfd, unsigned time)
{
char c;
struct timespec hold;
unsigned rand, randtime;
TDB_DATA key;
key.dptr = (void *)&rand;
key.dsize = sizeof(rand);
while (read(parentfd, &c, 1) != 0) {
/* Lock a random key. */
rand = random();
if (tdb_chainlock(tdb, key) != 0)
errx(1, "chainlock failed: %s", tdb_errorstr(tdb));
/* Hold it for some variable time. */
randtime = time / 2 + (random() % time);
hold.tv_sec = randtime / 1000;
hold.tv_nsec = (randtime % 1000) * 1000000;
nanosleep(&hold, NULL);
if (tdb_chainunlock(tdb, key) != 0)
errx(1, "chainunlock failed: %s", tdb_errorstr(tdb));
/* Wait for a second without the lock. */
sleep(1);
}
exit(0);
}
static void logfn(struct tdb_context *tdb,
enum tdb_debug_level level,
const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
}
int main(int argc, char *argv[])
{
int (*lockall)(struct tdb_context *);
unsigned int num, worktime, i;
int pfd[2];
struct tdb_context *tdb;
struct tdb_logging_context log = { logfn, NULL };
struct timeval start, end, duration;
if (argc != 4)
usage(NULL);
if (strcmp(argv[1], "lockall") == 0)
lockall = tdb_lockall;
else if (strcmp(argv[1], "gradual") == 0)
lockall = tdb_lockall_gradual;
else
usage("Arg1 should be 'lockall' or 'gradual'");
num = atoi(argv[2]);
worktime = atoi(argv[3]);
if (!num || !worktime)
usage("Number of threads and worktime must be non-zero");
if (pipe(pfd) != 0)
err(1, "Creating pipe");
tdb = tdb_open_ex("/tmp/starvation.tdb", 10000, TDB_DEFAULT,
O_RDWR|O_CREAT|O_TRUNC, 0600, &log, NULL);
if (!tdb)
err(1, "Opening tdb /tmp/starvation.tdb");
for (i = 0; i < num; i++) {
switch (fork()) {
case 0:
close(pfd[1]);
fcntl(pfd[0], F_SETFL,
fcntl(pfd[0], F_GETFL)|O_NONBLOCK);
srandom(getpid() + i);
if (tdb_reopen(tdb) != 0)
err(1, "Reopening tdb %s", tdb_name(tdb));
run_and_sleep(tdb, pfd[0], worktime);
case -1:
err(1, "forking");
}
/* Stagger the children. */
usleep(random() % (1000000 / num));
}
close(pfd[0]);
sleep(1);
gettimeofday(&start, NULL);
if (lockall(tdb) != 0)
errx(1, "lockall failed: %s", tdb_errorstr(tdb));
gettimeofday(&end, NULL);
duration.tv_sec = end.tv_sec - start.tv_sec;
duration.tv_usec = end.tv_usec - start.tv_usec;
if (duration.tv_usec < 0) {
--duration.tv_sec;
duration.tv_usec += 1000000;
}
if (tdb_unlockall(tdb) != 0)
errx(1, "unlockall failed: %s", tdb_errorstr(tdb));
tdb_close(tdb);
unlink("/tmp/starvation.tdb");
printf("Took %lu.%06lu seconds\n", duration.tv_sec, duration.tv_usec);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment