Commit 555c01c9 authored by Rusty Russell's avatar Rusty Russell

Get more sophisticated with resolving duplicate serial numbers.

Still deadlocks in one case, due to spurious dependencies inside traversals.  See next commit.
parent eb2cbb4b
......@@ -14,6 +14,7 @@
#include <sys/time.h>
#include <errno.h>
#include <signal.h>
#include <assert.h>
#define STRINGIFY2(x) #x
#define STRINGIFY(x) STRINGIFY2(x)
......@@ -143,7 +144,10 @@ struct op {
union {
int flag; /* open and store */
struct traverse *trav; /* traverse start */
TDB_DATA pre_append; /* append */
struct { /* append */
TDB_DATA pre;
TDB_DATA post;
} append;
unsigned int transaction_end; /* transaction start */
};
};
......@@ -259,8 +263,6 @@ static void op_add_store(const char *filename,
static void op_add_append(const char *filename,
struct op op[], unsigned int op_num, char *words[])
{
TDB_DATA post_append;
if (!words[2] || !words[3] || !words[4] || !words[5] || words[6]
|| !streq(words[4], "="))
fail(filename, op_num+1, "Expect <key> <data> = <rec>");
......@@ -268,11 +270,13 @@ static void op_add_append(const char *filename,
op[op_num].key = make_tdb_data(op, filename, op_num+1, words[2]);
op[op_num].data = make_tdb_data(op, filename, op_num+1, words[3]);
post_append = make_tdb_data(op, filename, op_num+1, words[5]);
op[op_num].append.post
= make_tdb_data(op, filename, op_num+1, words[5]);
/* By subtraction, figure out what previous data was. */
op[op_num].pre_append.dptr = post_append.dptr;
op[op_num].pre_append.dsize = post_append.dsize - op[op_num].data.dsize;
op[op_num].append.pre.dptr = op[op_num].append.post.dptr;
op[op_num].append.pre.dsize
= op[op_num].append.post.dsize - op[op_num].data.dsize;
total_keys++;
}
......@@ -737,11 +741,10 @@ unsigned run_ops(struct tdb_context *tdb,
break;
case OP_TDB_STORE:
try(tdb_store(tdb, op[i].key, op[i].data, op[i].flag),
op[i].ret < 0 ? op[i].ret : 0);
op[i].ret);
break;
case OP_TDB_APPEND:
try(tdb_append(tdb, op[i].key, op[i].data),
op[i].ret < 0 ? op[i].ret : 0);
try(tdb_append(tdb, op[i].key, op[i].data), op[i].ret);
break;
case OP_TDB_GET_SEQNUM:
try(tdb_get_seqnum(tdb), op[i].ret);
......@@ -888,9 +891,9 @@ static const TDB_DATA *needs(const struct op *op)
return NULL;
case OP_TDB_APPEND:
if (op->pre_append.dsize == 0)
if (op->append.pre.dsize == 0)
return &not_exists_or_empty;
return &op->pre_append;
return &op->append.pre;
case OP_TDB_STORE:
if (op->flag == TDB_INSERT) {
......@@ -945,74 +948,23 @@ static const TDB_DATA *needs(const struct op *op)
}
enum satisfaction {
/* This op makes the other one possible. */
SATISFIES,
/* This op makes the other one impossible. */
DISSATISFIES,
/* This op makes no difference. */
NO_CHANGE
};
static enum satisfaction satisfies(const struct op *op, const TDB_DATA *need)
/* What's the data after this op? pre if nothing changed. */
static const TDB_DATA *gives(const struct op *op, const TDB_DATA *pre)
{
bool deletes, creates;
/* Failed ops don't change state of db. */
if (op->ret < 0)
return NO_CHANGE;
deletes = (op->op == OP_TDB_DELETE || op->op == OP_TDB_WIPE_ALL);
/* Append/store is creating the record if ret == 0 (1 means replaced) */
if (op->op == OP_TDB_APPEND || op->op == OP_TDB_STORE)
creates = (op->ret == 0);
else
creates = false;
if (need == &must_not_exist) {
if (deletes)
return SATISFIES;
if (creates)
return DISSATISFIES;
return NO_CHANGE;
}
return pre;
if (need == &must_exist) {
if (deletes)
return DISSATISFIES;
if (creates)
return SATISFIES;
return NO_CHANGE;
}
if (op->op == OP_TDB_DELETE || op->op == OP_TDB_WIPE_ALL)
return &tdb_null;
if (need == &not_exists_or_empty) {
if (deletes)
return SATISFIES;
if (!creates)
return NO_CHANGE;
}
if (op->op == OP_TDB_APPEND)
return &op->append.post;
/* OK, we need an exact match. */
if (deletes)
return DISSATISFIES;
/* An append which results in the wrong data dissatisfies. */
if (op->op == OP_TDB_APPEND) {
if (op->pre_append.dsize + op->data.dsize != need->dsize)
return DISSATISFIES;
if (memcmp(op->pre_append.dptr, need->dptr,
op->pre_append.dsize) != 0)
return DISSATISFIES;
if (memcmp(op->data.dptr, need->dptr + op->pre_append.dsize,
op->data.dsize) != 0)
return DISSATISFIES;
return SATISFIES;
} else if (op->op == OP_TDB_STORE) {
if (key_eq(op->data, *need))
return SATISFIES;
return DISSATISFIES;
}
return NO_CHANGE;
if (op->op == OP_TDB_STORE)
return &op->data;
return pre;
}
static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
......@@ -1021,46 +973,6 @@ static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
unsigned int i, j, h;
struct keyinfo *hash;
/* Gcc nexted function extension. How cool is this? */
int compare_user_serial(const void *_a, const void *_b)
{
const struct key_user *a = _a, *b = _b;
int ret = op[a->file][a->op_num].serial
- op[b->file][b->op_num].serial;
/* Serial is not completely reliable. First, fetches don't
* inc serial, second we don't lock to get seq number.
* This smooths things a little for simple cases. */
if (ret == 0) {
const TDB_DATA *a_needs, *b_needs;
b_needs = needs(&op[b->file][b->op_num]);
switch (satisfies(&op[a->file][a->op_num], b_needs)) {
case SATISFIES:
/* A comes first: it satisfies B. */
return -1;
case DISSATISFIES:
/* A doesn't come first: it messes up B. */
return 1;
default:
break;
}
a_needs = needs(&op[a->file][a->op_num]);
switch (satisfies(&op[b->file][b->op_num], a_needs)) {
case SATISFIES:
/* B comes first: it satisfies A. */
return 1;
case DISSATISFIES:
/* B doesn't come first: it messes up A. */
return -1;
default:
break;
}
}
return ret;
}
hash = talloc_zero_array(op[0], struct keyinfo, total_keys*2);
for (i = 0; i < num; i++) {
for (j = 1; j < num_ops[i]; j++) {
......@@ -1098,12 +1010,127 @@ static struct keyinfo *hash_ops(struct op *op[], unsigned int num_ops[],
}
}
return hash;
}
static bool satisfies(const TDB_DATA *data, const TDB_DATA *need)
{
/* Don't need anything? Cool. */
if (!need)
return true;
/* This should be tdb_null or a real value. */
assert(data != &must_exist);
assert(data != &must_not_exist);
assert(data != &not_exists_or_empty);
/* must_not_exist == must_not_exist, must_exist == must_exist, or
not_exists_or_empty == not_exists_or_empty. */
if (data->dsize == need->dsize && data->dptr == need->dptr)
return true;
/* Must not exist? data must not exist. */
if (need == &must_not_exist)
return data->dptr == NULL;
/* Must exist? */
if (need == &must_exist)
return data->dptr != NULL;
/* Either noexist or empty. */
if (need == &not_exists_or_empty)
return data->dsize == 0;
/* Needs something specific. */
return key_eq(*data, *need);
}
static bool sort_deps(char *filename[], struct op *op[],
struct key_user res[], unsigned num,
const TDB_DATA *data)
{
unsigned int i;
/* Nothing left? We're sorted. */
if (num == 0)
return true;
for (i = 0; i < num; i++) {
struct op *this_op = &op[res[i].file][res[i].op_num];
/* Is what we have good enough for this op? */
if (satisfies(data, needs(this_op))) {
/* Try this one next. */
struct key_user tmp = res[0];
res[0] = res[i];
res[i] = tmp;
if (sort_deps(filename, op, res+1, num-1,
gives(this_op, data)))
return true;
}
}
/* No combination worked. */
return false;
}
/* All these ops have the same serial number. Which comes first?
*
* This can happen both because read ops or failed write ops don't
* change serial number, and also due to race since we access the
* number unlocked (the race can cause less detectable ordering problems,
* in which case we'll deadlock and report: fix manually in that case).
*/
static void figure_deps(char *filename[], struct op *op[],
struct key_user user[], unsigned start, unsigned end)
{
unsigned int i;
/* We assume database starts empty. */
const struct TDB_DATA *data = &tdb_null;
/* What do we have to start with? */
for (i = 0; i < start; i++)
data = gives(&op[user[i].file][user[i].op_num], data);
if (!sort_deps(filename, op, user + start, end - start, data))
fail(filename[user[start].file], user[start].op_num+1,
"Could not resolve inter-dependencies");
}
static void sort_ops(struct keyinfo hash[], char *filename[], struct op *op[])
{
unsigned int h;
/* Gcc nexted function extension. How cool is this? */
int compare_serial(const void *_a, const void *_b)
{
const struct key_user *a = _a, *b = _b;
return op[a->file][a->op_num].serial
- op[b->file][b->op_num].serial;
}
/* Now sort into seqnum order. */
for (h = 0; h < total_keys * 2; h++)
qsort(hash[h].user, hash[h].num_users, sizeof(hash[h].user[0]),
compare_user_serial);
for (h = 0; h < total_keys * 2; h++) {
unsigned int i, same;
struct key_user *user = hash[h].user;
return hash;
qsort(user, hash[h].num_users, sizeof(user[0]), compare_serial);
/* Try to deal with same serial numbers. */
for (i = 1, same = 0; i < hash[h].num_users; i++) {
if (op[user[i].file][user[i].op_num].serial
== op[user[i-1].file][user[i-1].op_num].serial) {
same++;
continue;
}
if (same) {
figure_deps(filename, op, user, i-same-1, i);
same = 0;
}
}
if (same)
figure_deps(filename, op, user, i-same-1, i);
}
}
static void add_dependency(void *ctx,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment