Commit 729fc5b9 authored by Rusty Russell's avatar Rusty Russell

tdb: handle processes dying during transaction commit.

tdb transactions were designed to be robust against the machine
powering off, but interestingly were never designed to handle the case
where an administrator kill -9's a process during commit.  Because
recovery is only done on tdb_open, processes with the tdb already
mapped will simply use it despite it being corrupt and needing
recovery.

The solution to this is to check for recovery every time we grab a
data lock: we could have gained the lock because a process just died.
This has no measurable cost: here is the time for tdbtorture -s 0 -n 1
-l 10000:

Before:
	2.75 2.50 2.81 3.19 2.91 2.53 2.72 2.50 2.78 2.77 = Avg 2.75

After:
	2.81 2.57 3.42 2.49 3.02 2.49 2.84 2.48 2.80 2.43 = Avg 2.74
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 497e23b2
......@@ -309,10 +309,44 @@ int tdb_nest_lock(struct tdb_context *tdb, uint32_t offset, int ltype,
return 0;
}
static int tdb_lock_and_recover(struct tdb_context *tdb)
{
int ret;
/* We need to match locking order in transaction commit. */
if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) {
return -1;
}
if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) {
tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
return -1;
}
ret = tdb_transaction_recover(tdb);
tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0);
return ret;
}
static bool have_data_locks(const struct tdb_context *tdb)
{
unsigned int i;
for (i = 0; i < tdb->num_lockrecs; i++) {
if (tdb->lockrecs[i].off >= lock_offset(-1))
return true;
}
return false;
}
static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype,
enum tdb_lock_flags waitflag)
{
int ret;
bool check = false;
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->allrecord_lock.count &&
......@@ -324,7 +358,18 @@ static int tdb_lock_list(struct tdb_context *tdb, int list, int ltype,
tdb->ecode = TDB_ERR_LOCK;
ret = -1;
} else {
/* Only check when we grab first data lock. */
check = !have_data_locks(tdb);
ret = tdb_nest_lock(tdb, lock_offset(list), ltype, waitflag);
if (ret == 0 && check && tdb_needs_recovery(tdb)) {
tdb_nest_unlock(tdb, lock_offset(list), ltype, false);
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
return tdb_lock_list(tdb, list, ltype, waitflag);
}
}
return ret;
}
......@@ -488,6 +533,21 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
tdb->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
tdb->allrecord_lock.off = upgradable;
if (tdb_needs_recovery(tdb)) {
bool mark = flags & TDB_LOCK_MARK_ONLY;
tdb_allrecord_unlock(tdb, ltype, mark);
if (mark) {
tdb->ecode = TDB_ERR_LOCK;
TDB_LOG((tdb, TDB_DEBUG_ERROR,
"tdb_lockall_mark cannot do recovery\n"));
return -1;
}
if (tdb_lock_and_recover(tdb) == -1) {
return -1;
}
return tdb_allrecord_lock(tdb, ltype, flags, upgradable);
}
return 0;
}
......
......@@ -282,6 +282,7 @@ int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
bool tdb_needs_recovery(struct tdb_context *tdb);
int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec);
int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec);
int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec);
......
......@@ -76,11 +76,7 @@ static int do_operation(enum operation op, const char *name)
} else if (op == CHECK_KEEP_OPENED) {
return tdb_check(tdb, NULL, 0) == 0;
} else if (op == NEEDS_RECOVERY_KEEP_OPENED) {
#if 0
return tdb_maybe_needs_recovery(tdb);
#else
return 0;
#endif
return tdb_needs_recovery(tdb);
}
alarmed = 0;
......
......@@ -1194,3 +1194,28 @@ int tdb_transaction_recover(struct tdb_context *tdb)
/* all done */
return 0;
}
/* Any I/O failures we say "needs recovery". */
bool tdb_needs_recovery(struct tdb_context *tdb)
{
tdb_off_t recovery_head;
struct tdb_record rec;
/* find the recovery area */
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
return true;
}
if (recovery_head == 0) {
/* we have never allocated a recovery record */
return false;
}
/* read the recovery record */
if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
sizeof(rec), DOCONV()) == -1) {
return true;
}
return (rec.magic == TDB_RECOVERY_MAGIC);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment