Commit dc251c89 authored by Rusty Russell's avatar Rusty Russell

First attempt at transactions & traverse (deadlocks still).

parent 2885bab4
......@@ -19,7 +19,10 @@
/* Avoid mod by zero */
static unsigned int total_keys = 1;
/* #define DEBUG_DEPS 1 */
#define DEBUG_DEPS 1
/* Traversals block transactions in the current implementation. */
#define TRAVERSALS_TAKE_TRANSACTION_LOCK 1
struct pipe {
int fd[2];
......@@ -123,15 +126,21 @@ struct op {
TDB_DATA key;
TDB_DATA data;
int ret;
/* Who is waiting for us? */
struct list_head post;
/* How many are we waiting for? */
unsigned int pre;
/* If I'm part of a group (traverse/transaction) where is
* start? (Otherwise, 0) */
unsigned int group_start;
union {
int flag; /* open and store */
struct traverse *trav; /* traverse start */
TDB_DATA post_append; /* append */
unsigned int transaction_end; /* transaction start */
};
};
......@@ -180,6 +189,7 @@ static void add_op(const char *filename, struct op **op, unsigned int i,
new->serial = serial;
new->pre = 0;
new->ret = 0;
new->group_start = 0;
}
static void op_add_nothing(const char *filename,
......@@ -278,6 +288,44 @@ static void op_add_traverse(const char *filename,
op[op_num].trav = NULL;
}
static void op_add_transaction(const char *filename, struct op op[],
unsigned int op_num, char *words[])
{
if (words[2])
fail(filename, op_num+1, "Expect no arguments");
op[op_num].key = tdb_null;
op[op_num].transaction_end = 0;
}
static void op_analyze_transaction(const char *filename,
struct op op[], unsigned int op_num,
char *words[])
{
int i, start;
op[op_num].key = tdb_null;
if (words[2])
fail(filename, op_num+1, "Expect no arguments");
for (i = op_num-1; i >= 0; i--) {
if (op[i].op == OP_TDB_TRANSACTION_START &&
!op[i].transaction_end)
break;
}
if (i < 0)
fail(filename, op_num+1, "no transaction start found");
start = i;
op[start].transaction_end = op_num;
/* This rolls in nested transactions. I think that's right. */
for (i++; i <= op_num; i++)
op[i].group_start = start;
}
struct traverse_hash {
TDB_DATA key;
unsigned int index;
......@@ -325,7 +373,7 @@ static void op_analyze_traverse(const char *filename,
struct op op[], unsigned int op_num,
char *words[])
{
int i;
int i, start;
struct traverse *trav = talloc(op, struct traverse);
op[op_num].key = tdb_null;
......@@ -354,14 +402,18 @@ static void op_analyze_traverse(const char *filename,
if (i < 0)
fail(filename, op_num+1, "no traversal start found");
op[i].trav = trav;
start = i;
op[start].trav = trav;
for (i = start; i <= op_num; i++)
op[i].group_start = start;
if (is_trivial_traverse(op+i, op_num-i)) {
/* Fill in a plentiful hash table. */
op[i].trav->hash = talloc_zero_array(op[i].trav,
struct traverse_hash,
trav->num * 2);
for (; i < op_num; i++) {
op[start].trav->hash = talloc_zero_array(op[i].trav,
struct traverse_hash,
trav->num * 2);
for (i = start; i < op_num; i++) {
unsigned int h;
if (op[i].op != OP_TDB_TRAVERSE)
continue;
......@@ -500,18 +552,23 @@ static void do_pre(const char *filename, int pre_fd,
#if DEBUG_DEPS
printf("%s:%u:waiting for pre\n", filename, i+1);
fflush(stdout);
#endif
if (read(pre_fd, &opnum, sizeof(opnum)) != sizeof(opnum))
errx(1, "Reading from pipe");
#if DEBUG_DEPS
printf("%s:%u:got pre %u\n",
filename, i+1, opnum);
printf("%s:%u:got pre %u (%u)\n", filename, i+1, opnum+1,
op[opnum].pre);
fflush(stdout);
#endif
/* This could be any op, not just this one. */
if (op[opnum].pre == 0)
errx(1, "Got unexpected notification for op line %u",
errx(1, "Got extra notification for op line %u",
opnum + 1);
if (opnum < i)
errx(1, "%s:%u: got late notification for line %u",
filename, i + 1, opnum + 1);
op[opnum].pre--;
}
}
......@@ -523,7 +580,7 @@ static void do_post(const char *filename, const struct op op[], unsigned int i)
list_for_each(&op[i].post, dep, list) {
#if DEBUG_DEPS
printf("%s:%u:sending %u to file %u\n", filename, i+1,
dep->op, dep->file);
dep->op+1, dep->file);
#endif
if (write(pipes[dep->file].fd[1], &dep->op, sizeof(dep->op))
!= sizeof(dep->op))
......@@ -821,6 +878,43 @@ static void add_dependency(void *ctx,
unsigned int satisfies_opnum)
{
struct depend *post;
unsigned int needs_start, sat_start;
needs_start = op[needs_file][needs_opnum].group_start;
sat_start = op[satisfies_file][satisfies_opnum].group_start;
/* If needs is in a transaction, we need it before start. */
if (needs_start) {
switch (op[needs_file][needs_start].op) {
case OP_TDB_TRANSACTION_START:
#if TRAVERSALS_TAKE_TRANSACTION_LOCK
case OP_TDB_TRAVERSE_START:
case OP_TDB_TRAVERSE_READ_START:
#endif
needs_opnum = needs_start;
#ifdef DEBUG_DEPS
printf(" -> Back to %u\n", needs_start+1);
fflush(stdout);
#endif
break;
default:
break;
}
}
/* If satisfies is in a transaction, we wait until after commit. */
/* FIXME: If transaction is cancelled, don't need dependency. */
if (sat_start) {
if (op[satisfies_file][sat_start].op
== OP_TDB_TRANSACTION_START) {
satisfies_opnum
= op[satisfies_file][sat_start].transaction_end;
#ifdef DEBUG_DEPS
printf(" -> Depends on %u\n", satisfies_opnum+1);
fflush(stdout);
#endif
}
}
post = talloc(ctx, struct depend);
post->file = needs_file;
......@@ -828,6 +922,14 @@ static void add_dependency(void *ctx,
list_add(&op[satisfies_file][satisfies_opnum].post, &post->list);
op[needs_file][needs_opnum].pre++;
#ifdef DEBUG_DEPS
if (op[needs_file][needs_opnum].pre > 1) {
printf(" (File %u opnum %u hash %u needs)\n",
needs_file, needs_opnum+1,
op[needs_file][needs_opnum].pre);
fflush(stdout);
}
#endif
}
static void derive_dependencies(char *filename[],
......@@ -855,6 +957,7 @@ static void derive_dependencies(char *filename[],
hash[i].user[j].op_num+1,
filename[hash[i].user[j-1].file],
hash[i].user[j-1].op_num+1);
fflush(stdout);
#endif
add_dependency(hash, op,
hash[i].user[j].file,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment