Commit 03c18761 authored by Rusty Russell's avatar Rusty Russell

Import from SAMBA's tdb:

commit b90863c0b7b860b006ac49c9396711ff351f777f
Author: Howard Chu <hyc@highlandsun.com>
Date:   Tue Mar 31 13:15:54 2009 +1100

    Add tdb_transaction_prepare_commit()
    
    Using tdb_transaction_prepare_commit() gives us 2-phase commits. This
    allows us to safely commit across multiple tdb databases at once, with
    reasonable transaction semantics
    
    Signed-off-by: tridge@samba.org
parent f83c45bf
...@@ -898,6 +898,16 @@ void tdb_trace(struct tdb_context *tdb, const char *op) ...@@ -898,6 +898,16 @@ void tdb_trace(struct tdb_context *tdb, const char *op)
tdb_trace_end(tdb); tdb_trace_end(tdb);
} }
void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
{
char msg[sizeof(tdb_off_t) * 4];
sprintf(msg, "%u ", seqnum);
tdb_trace_write(tdb, msg);
tdb_trace_write(tdb, op);
tdb_trace_end(tdb);
}
void tdb_trace_open(struct tdb_context *tdb, const char *op, void tdb_trace_open(struct tdb_context *tdb, const char *op,
unsigned hash_size, unsigned tdb_flags, unsigned open_flags) unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
{ {
......
...@@ -137,6 +137,7 @@ int tdb_fd(struct tdb_context *tdb); ...@@ -137,6 +137,7 @@ int tdb_fd(struct tdb_context *tdb);
tdb_log_func tdb_log_fn(struct tdb_context *tdb); tdb_log_func tdb_log_fn(struct tdb_context *tdb);
void *tdb_get_logging_private(struct tdb_context *tdb); void *tdb_get_logging_private(struct tdb_context *tdb);
int tdb_transaction_start(struct tdb_context *tdb); int tdb_transaction_start(struct tdb_context *tdb);
int tdb_transaction_prepare_commit(struct tdb_context *tdb);
int tdb_transaction_commit(struct tdb_context *tdb); int tdb_transaction_commit(struct tdb_context *tdb);
int tdb_transaction_cancel(struct tdb_context *tdb); int tdb_transaction_cancel(struct tdb_context *tdb);
int tdb_transaction_recover(struct tdb_context *tdb); int tdb_transaction_recover(struct tdb_context *tdb);
......
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
#endif #endif
#include "tdb.h" #include "tdb.h"
/* #define TDB_TRACE 1 */ #define TDB_TRACE 1
#if HAVE_GETPAGESIZE #if HAVE_GETPAGESIZE
#define getpagesize() 0x2000 #define getpagesize() 0x2000
...@@ -90,6 +90,7 @@ typedef uint32_t tdb_off_t; ...@@ -90,6 +90,7 @@ typedef uint32_t tdb_off_t;
#ifdef TDB_TRACE #ifdef TDB_TRACE
void tdb_trace(struct tdb_context *tdb, const char *op); void tdb_trace(struct tdb_context *tdb, const char *op);
void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op);
void tdb_trace_open(struct tdb_context *tdb, const char *op, void tdb_trace_open(struct tdb_context *tdb, const char *op,
unsigned hash_size, unsigned tdb_flags, unsigned open_flags); unsigned hash_size, unsigned tdb_flags, unsigned open_flags);
void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret); void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret);
...@@ -107,6 +108,7 @@ void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op, ...@@ -107,6 +108,7 @@ void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret); TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret);
#else #else
#define tdb_trace(tdb, op) #define tdb_trace(tdb, op)
#define tdb_trace_seqnum(tdb, seqnum, op)
#define tdb_trace_open(tdb, op, hash_size, tdb_flags, open_flags) #define tdb_trace_open(tdb, op, hash_size, tdb_flags, open_flags)
#define tdb_trace_ret(tdb, op, ret) #define tdb_trace_ret(tdb, op, ret)
#define tdb_trace_retrec(tdb, op, ret) #define tdb_trace_retrec(tdb, op, ret)
...@@ -216,7 +218,10 @@ struct tdb_context { ...@@ -216,7 +218,10 @@ struct tdb_context {
int page_size; int page_size;
int max_dead_records; int max_dead_records;
int transaction_lock_count; int transaction_lock_count;
#ifdef TDB_TRACE
int tracefd; int tracefd;
uint32_t transaction_prepare_seqnum;
#endif
volatile sig_atomic_t *interrupt_sig_ptr; volatile sig_atomic_t *interrupt_sig_ptr;
}; };
......
...@@ -38,7 +38,7 @@ struct op_table { ...@@ -38,7 +38,7 @@ struct op_table {
void (*enhance_op)(const char *filename, void (*enhance_op)(const char *filename,
struct op op[], unsigned int op_num, char *words[]); struct op op[], unsigned int op_num, char *words[]);
}; };
/* maximum key range = 43, duplicates = 0 */ /* maximum key range = 48, duplicates = 0 */
#ifdef __GNUC__ #ifdef __GNUC__
__inline __inline
...@@ -52,32 +52,32 @@ hash_keyword (register const char *str, register unsigned int len) ...@@ -52,32 +52,32 @@ hash_keyword (register const char *str, register unsigned int len)
{ {
static const unsigned char asso_values[] = static const unsigned char asso_values[] =
{ {
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 20, 51, 25, 56, 56, 56, 56, 56, 56, 56, 20, 56, 30,
5, 0, 0, 5, 5, 51, 51, 0, 0, 0, 5, 0, 0, 5, 5, 56, 56, 0, 0, 0,
20, 51, 20, 51, 51, 0, 5, 0, 51, 0, 20, 56, 20, 56, 56, 0, 5, 0, 56, 0,
51, 5, 51, 51, 51, 51, 51, 51, 51, 51, 56, 5, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51, 51, 51, 51, 51, 56, 56, 56, 56, 56, 56, 56, 56, 56, 56,
51, 51, 51, 51, 51, 51 56, 56, 56, 56, 56, 56
}; };
return len + asso_values[(unsigned char)str[4]] + asso_values[(unsigned char)str[len - 1]]; return len + asso_values[(unsigned char)str[4]] + asso_values[(unsigned char)str[len - 1]];
} }
...@@ -93,17 +93,17 @@ find_keyword (register const char *str, register unsigned int len) ...@@ -93,17 +93,17 @@ find_keyword (register const char *str, register unsigned int len)
{ {
enum enum
{ {
TOTAL_KEYWORDS = 33, TOTAL_KEYWORDS = 34,
MIN_WORD_LENGTH = 8, MIN_WORD_LENGTH = 8,
MAX_WORD_LENGTH = 25, MAX_WORD_LENGTH = 30,
MIN_HASH_VALUE = 8, MIN_HASH_VALUE = 8,
MAX_HASH_VALUE = 50 MAX_HASH_VALUE = 55
}; };
static const struct op_table wordlist[] = static const struct op_table wordlist[] =
{ {
{""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""}, {""},
#line 43 "keywords.gperf" #line 44 "keywords.gperf"
{"traverse", OP_TDB_TRAVERSE, op_add_traverse,}, {"traverse", OP_TDB_TRAVERSE, op_add_traverse,},
#line 33 "keywords.gperf" #line 33 "keywords.gperf"
{"tdb_store", OP_TDB_STORE, op_add_store,}, {"tdb_store", OP_TDB_STORE, op_add_store,},
...@@ -115,13 +115,13 @@ find_keyword (register const char *str, register unsigned int len) ...@@ -115,13 +115,13 @@ find_keyword (register const char *str, register unsigned int len)
{"tdb_wipe_all", OP_TDB_WIPE_ALL, op_add_nothing,}, {"tdb_wipe_all", OP_TDB_WIPE_ALL, op_add_nothing,},
#line 20 "keywords.gperf" #line 20 "keywords.gperf"
{"tdb_unlockall", OP_TDB_UNLOCKALL, op_add_nothing,}, {"tdb_unlockall", OP_TDB_UNLOCKALL, op_add_nothing,},
#line 47 "keywords.gperf"
{"tdb_fetch", OP_TDB_FETCH, op_add_key_data,},
#line 48 "keywords.gperf" #line 48 "keywords.gperf"
{"tdb_fetch", OP_TDB_FETCH, op_add_key_data,},
#line 49 "keywords.gperf"
{"tdb_delete", OP_TDB_DELETE, op_add_key_ret,}, {"tdb_delete", OP_TDB_DELETE, op_add_key_ret,},
#line 17 "keywords.gperf" #line 17 "keywords.gperf"
{"tdb_lockall_mark", OP_TDB_LOCKALL_MARK, op_add_nothing,}, {"tdb_lockall_mark", OP_TDB_LOCKALL_MARK, op_add_nothing,},
#line 45 "keywords.gperf" #line 46 "keywords.gperf"
{"tdb_firstkey", OP_TDB_FIRSTKEY, op_add_key,}, {"tdb_firstkey", OP_TDB_FIRSTKEY, op_add_key,},
#line 18 "keywords.gperf" #line 18 "keywords.gperf"
{"tdb_lockall_unmark", OP_TDB_LOCKALL_UNMARK, op_add_nothing,}, {"tdb_lockall_unmark", OP_TDB_LOCKALL_UNMARK, op_add_nothing,},
...@@ -137,35 +137,38 @@ find_keyword (register const char *str, register unsigned int len) ...@@ -137,35 +137,38 @@ find_keyword (register const char *str, register unsigned int len)
{""}, {""},
#line 22 "keywords.gperf" #line 22 "keywords.gperf"
{"tdb_lockall_read_nonblock", OP_TDB_LOCKALL_READ_NONBLOCK, op_add_nothing,}, {"tdb_lockall_read_nonblock", OP_TDB_LOCKALL_READ_NONBLOCK, op_add_nothing,},
#line 42 "keywords.gperf" #line 43 "keywords.gperf"
{"tdb_traverse_end", OP_TDB_TRAVERSE_END, op_analyze_traverse,}, {"tdb_traverse_end", OP_TDB_TRAVERSE_END, op_analyze_traverse,},
#line 38 "keywords.gperf" #line 38 "keywords.gperf"
{"tdb_transaction_cancel", OP_TDB_TRANSACTION_CANCEL, op_analyze_transaction,}, {"tdb_transaction_cancel", OP_TDB_TRANSACTION_CANCEL, op_analyze_transaction,},
#line 41 "keywords.gperf" #line 42 "keywords.gperf"
{"tdb_traverse_start", OP_TDB_TRAVERSE_START, op_add_traverse_start,}, {"tdb_traverse_start", OP_TDB_TRAVERSE_START, op_add_traverse_start,},
{""}, {""},
#line 44 "keywords.gperf" #line 45 "keywords.gperf"
{"traversefn", OP_TDB_TRAVERSE, op_add_traversefn,}, {"traversefn", OP_TDB_TRAVERSE, op_add_traversefn,},
#line 37 "keywords.gperf" #line 37 "keywords.gperf"
{"tdb_transaction_start", OP_TDB_TRANSACTION_START, op_add_transaction,}, {"tdb_transaction_start", OP_TDB_TRANSACTION_START, op_add_transaction,},
#line 39 "keywords.gperf" #line 39 "keywords.gperf"
{"tdb_transaction_commit", OP_TDB_TRANSACTION_COMMIT, op_analyze_transaction,}, {"tdb_transaction_commit", OP_TDB_TRANSACTION_COMMIT, op_analyze_transaction,},
#line 40 "keywords.gperf" #line 41 "keywords.gperf"
{"tdb_traverse_read_start", OP_TDB_TRAVERSE_READ_START, op_add_traverse_start,}, {"tdb_traverse_read_start", OP_TDB_TRAVERSE_READ_START, op_add_traverse_start,},
{""}, {""},
#line 34 "keywords.gperf" #line 34 "keywords.gperf"
{"tdb_append", OP_TDB_APPEND, op_add_append,}, {"tdb_append", OP_TDB_APPEND, op_add_append,},
#line 46 "keywords.gperf" #line 47 "keywords.gperf"
{"tdb_nextkey", OP_TDB_NEXTKEY, op_add_key_data,}, {"tdb_nextkey", OP_TDB_NEXTKEY, op_add_key_data,},
{""}, {""}, {""},
#line 40 "keywords.gperf"
{"tdb_transaction_prepare_commit", OP_TDB_TRANSACTION_PREPARE_COMMIT, op_add_nothing,},
#line 31 "keywords.gperf"
{"tdb_parse_record", OP_TDB_PARSE_RECORD, op_add_key_ret,},
{""}, {""},
#line 24 "keywords.gperf" #line 24 "keywords.gperf"
{"tdb_chainlock", OP_TDB_CHAINLOCK, op_add_chainlock,}, {"tdb_chainlock", OP_TDB_CHAINLOCK, op_add_chainlock,},
{""}, {""},
#line 28 "keywords.gperf" #line 28 "keywords.gperf"
{"tdb_chainunlock", OP_TDB_CHAINUNLOCK, op_analyze_chainlock,}, {"tdb_chainunlock", OP_TDB_CHAINUNLOCK, op_analyze_chainlock,},
#line 31 "keywords.gperf" {""}, {""},
{"tdb_parse_record", OP_TDB_PARSE_RECORD, op_add_key_ret,},
{""},
#line 26 "keywords.gperf" #line 26 "keywords.gperf"
{"tdb_chainlock_mark", OP_TDB_CHAINLOCK_MARK, op_add_key,}, {"tdb_chainlock_mark", OP_TDB_CHAINLOCK_MARK, op_add_key,},
{""}, {""},
......
...@@ -37,6 +37,7 @@ tdb_wipe_all, OP_TDB_WIPE_ALL, op_add_nothing, ...@@ -37,6 +37,7 @@ tdb_wipe_all, OP_TDB_WIPE_ALL, op_add_nothing,
tdb_transaction_start, OP_TDB_TRANSACTION_START, op_add_transaction, tdb_transaction_start, OP_TDB_TRANSACTION_START, op_add_transaction,
tdb_transaction_cancel, OP_TDB_TRANSACTION_CANCEL, op_analyze_transaction, tdb_transaction_cancel, OP_TDB_TRANSACTION_CANCEL, op_analyze_transaction,
tdb_transaction_commit, OP_TDB_TRANSACTION_COMMIT, op_analyze_transaction, tdb_transaction_commit, OP_TDB_TRANSACTION_COMMIT, op_analyze_transaction,
tdb_transaction_prepare_commit, OP_TDB_TRANSACTION_PREPARE_COMMIT, op_add_nothing,
tdb_traverse_read_start, OP_TDB_TRAVERSE_READ_START, op_add_traverse_start, tdb_traverse_read_start, OP_TDB_TRAVERSE_READ_START, op_add_traverse_start,
tdb_traverse_start, OP_TDB_TRAVERSE_START, op_add_traverse_start, tdb_traverse_start, OP_TDB_TRAVERSE_START, op_add_traverse_start,
tdb_traverse_end, OP_TDB_TRAVERSE_END, op_analyze_traverse, tdb_traverse_end, OP_TDB_TRAVERSE_END, op_analyze_traverse,
......
...@@ -116,6 +116,7 @@ enum op_type { ...@@ -116,6 +116,7 @@ enum op_type {
OP_TDB_WIPE_ALL, OP_TDB_WIPE_ALL,
OP_TDB_TRANSACTION_START, OP_TDB_TRANSACTION_START,
OP_TDB_TRANSACTION_CANCEL, OP_TDB_TRANSACTION_CANCEL,
OP_TDB_TRANSACTION_PREPARE_COMMIT,
OP_TDB_TRANSACTION_COMMIT, OP_TDB_TRANSACTION_COMMIT,
OP_TDB_TRAVERSE_READ_START, OP_TDB_TRAVERSE_READ_START,
OP_TDB_TRAVERSE_START, OP_TDB_TRAVERSE_START,
...@@ -764,6 +765,10 @@ unsigned run_ops(struct tdb_context *tdb, ...@@ -764,6 +765,10 @@ unsigned run_ops(struct tdb_context *tdb,
case OP_TDB_TRANSACTION_CANCEL: case OP_TDB_TRANSACTION_CANCEL:
try(tdb_transaction_cancel(tdb), op[file][i].ret); try(tdb_transaction_cancel(tdb), op[file][i].ret);
break; break;
case OP_TDB_TRANSACTION_PREPARE_COMMIT:
try(tdb_transaction_prepare_commit(tdb),
op[file][i].ret);
break;
case OP_TDB_TRANSACTION_COMMIT: case OP_TDB_TRANSACTION_COMMIT:
try(tdb_transaction_commit(tdb), op[file][i].ret); try(tdb_transaction_commit(tdb), op[file][i].ret);
break; break;
......
...@@ -129,13 +129,11 @@ static void addrec_db(void) ...@@ -129,13 +129,11 @@ static void addrec_db(void)
goto next; goto next;
} }
if (in_traverse == 0 && in_transaction && random() % TRANSACTION_PROB == 0) { if (in_traverse == 0 && in_transaction && random() % TRANSACTION_PROB == 0) {
#if 0
if (random() % TRANSACTION_PREPARE_PROB == 0) { if (random() % TRANSACTION_PREPARE_PROB == 0) {
if (tdb_transaction_prepare_commit(db) != 0) { if (tdb_transaction_prepare_commit(db) != 0) {
fatal("tdb_transaction_prepare_commit failed"); fatal("tdb_transaction_prepare_commit failed");
} }
} }
#endif
if (tdb_transaction_commit(db) != 0) { if (tdb_transaction_commit(db) != 0) {
fatal("tdb_transaction_commit failed"); fatal("tdb_transaction_commit failed");
} }
......
...@@ -121,6 +121,10 @@ struct tdb_transaction { ...@@ -121,6 +121,10 @@ struct tdb_transaction {
but don't create a new transaction */ but don't create a new transaction */
int nesting; int nesting;
/* set when a prepare has already occurred */
bool prepared;
tdb_off_t magic_offset;
/* old file size before transaction */ /* old file size before transaction */
tdb_len_t old_map_size; tdb_len_t old_map_size;
}; };
...@@ -135,6 +139,14 @@ static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, ...@@ -135,6 +139,14 @@ static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
{ {
uint32_t blk; uint32_t blk;
/* Only a commit is allowed on a prepared transaction */
if (tdb->transaction->prepared) {
tdb->ecode = TDB_ERR_EINVAL;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
tdb->transaction->transaction_error = 1;
return -1;
}
/* break it down into block sized ops */ /* break it down into block sized ops */
while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) { while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size); tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
...@@ -192,6 +204,14 @@ static int transaction_write(struct tdb_context *tdb, tdb_off_t off, ...@@ -192,6 +204,14 @@ static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
{ {
uint32_t blk; uint32_t blk;
/* Only a commit is allowed on a prepared transaction */
if (tdb->transaction->prepared) {
tdb->ecode = TDB_ERR_EINVAL;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
tdb->transaction->transaction_error = 1;
return -1;
}
/* if the write is to a hash head, then update the transaction /* if the write is to a hash head, then update the transaction
hash heads */ hash heads */
if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP && if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
...@@ -398,9 +418,34 @@ static const struct tdb_methods transaction_methods = { ...@@ -398,9 +418,34 @@ static const struct tdb_methods transaction_methods = {
transaction_brlock transaction_brlock
}; };
/*
sync to disk
*/
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
{
if (fsync(tdb->fd) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
return -1;
}
#ifdef MS_SYNC
if (tdb->map_ptr) {
tdb_off_t moffset = offset & ~(tdb->page_size-1);
if (msync(moffset + (char *)tdb->map_ptr,
length + (offset - moffset), MS_SYNC) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
strerror(errno)));
return -1;
}
}
#endif
return 0;
}
int tdb_transaction_cancel_internal(struct tdb_context *tdb) int tdb_transaction_cancel_internal(struct tdb_context *tdb)
{ {
int i; int i, ret = 0;
if (tdb->transaction == NULL) { if (tdb->transaction == NULL) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
...@@ -423,6 +468,18 @@ int tdb_transaction_cancel_internal(struct tdb_context *tdb) ...@@ -423,6 +468,18 @@ int tdb_transaction_cancel_internal(struct tdb_context *tdb)
} }
SAFE_FREE(tdb->transaction->blocks); SAFE_FREE(tdb->transaction->blocks);
if (tdb->transaction->magic_offset) {
const struct tdb_methods *methods = tdb->transaction->io_methods;
uint32_t zero = 0;
/* remove the recovery marker */
if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
ret = -1;
}
}
/* remove any global lock created during the transaction */ /* remove any global lock created during the transaction */
if (tdb->global_lock.count != 0) { if (tdb->global_lock.count != 0) {
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size); tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
...@@ -448,7 +505,7 @@ int tdb_transaction_cancel_internal(struct tdb_context *tdb) ...@@ -448,7 +505,7 @@ int tdb_transaction_cancel_internal(struct tdb_context *tdb)
SAFE_FREE(tdb->transaction->hash_heads); SAFE_FREE(tdb->transaction->hash_heads);
SAFE_FREE(tdb->transaction); SAFE_FREE(tdb->transaction);
return 0; return ret;
} }
/* /*
...@@ -570,31 +627,6 @@ int tdb_transaction_cancel(struct tdb_context *tdb) ...@@ -570,31 +627,6 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
tdb_trace(tdb, "tdb_transaction_cancel"); tdb_trace(tdb, "tdb_transaction_cancel");
return tdb_transaction_cancel_internal(tdb); return tdb_transaction_cancel_internal(tdb);
} }
/*
sync to disk
*/
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
{
if (fsync(tdb->fd) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
return -1;
}
#ifdef MS_SYNC
if (tdb->map_ptr) {
tdb_off_t moffset = offset & ~(tdb->page_size-1);
if (msync(moffset + (char *)tdb->map_ptr,
length + (offset - moffset), MS_SYNC) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
strerror(errno)));
return -1;
}
}
#endif
return 0;
}
/* /*
work out how much space the linearised recovery data will consume work out how much space the linearised recovery data will consume
...@@ -842,26 +874,26 @@ static int transaction_setup_recovery(struct tdb_context *tdb, ...@@ -842,26 +874,26 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
return 0; return 0;
} }
/* static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
commit the current transaction
*/
int tdb_transaction_commit(struct tdb_context *tdb)
{ {
const struct tdb_methods *methods; const struct tdb_methods *methods;
tdb_off_t magic_offset = 0;
uint32_t zero = 0;
int i;
tdb_trace(tdb, "tdb_transaction_commit");
if (tdb->transaction == NULL) { if (tdb->transaction == NULL) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
return -1;
}
if (tdb->transaction->prepared) {
tdb->ecode = TDB_ERR_EINVAL;
tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
return -1; return -1;
} }
if (tdb->transaction->transaction_error) { if (tdb->transaction->transaction_error) {
tdb->ecode = TDB_ERR_IO; tdb->ecode = TDB_ERR_IO;
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
return -1; return -1;
} }
...@@ -871,9 +903,13 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -871,9 +903,13 @@ int tdb_transaction_commit(struct tdb_context *tdb)
return 0; return 0;
} }
#ifdef TDB_TRACE
/* store seqnum now, before reading becomes illegal. */
tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
#endif
/* check for a null transaction */ /* check for a null transaction */
if (tdb->transaction->blocks == NULL) { if (tdb->transaction->blocks == NULL) {
tdb_transaction_cancel_internal(tdb);
return 0; return 0;
} }
...@@ -883,14 +919,14 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -883,14 +919,14 @@ int tdb_transaction_commit(struct tdb_context *tdb)
nested their locks properly, so fail the transaction */ nested their locks properly, so fail the transaction */
if (tdb->num_locks || tdb->global_lock.count) { if (tdb->num_locks || tdb->global_lock.count) {
tdb->ecode = TDB_ERR_LOCK; tdb->ecode = TDB_ERR_LOCK;
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
return -1; return -1;
} }
/* upgrade the main transaction lock region to a write lock */ /* upgrade the main transaction lock region to a write lock */
if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) { if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
tdb->ecode = TDB_ERR_LOCK; tdb->ecode = TDB_ERR_LOCK;
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
return -1; return -1;
...@@ -899,7 +935,7 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -899,7 +935,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
/* get the global lock - this prevents new users attaching to the database /* get the global lock - this prevents new users attaching to the database
during the commit */ during the commit */
if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) { if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n")); TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
tdb->ecode = TDB_ERR_LOCK; tdb->ecode = TDB_ERR_LOCK;
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
return -1; return -1;
...@@ -907,21 +943,23 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -907,21 +943,23 @@ int tdb_transaction_commit(struct tdb_context *tdb)
if (!(tdb->flags & TDB_NOSYNC)) { if (!(tdb->flags & TDB_NOSYNC)) {
/* write the recovery data to the end of the file */ /* write the recovery data to the end of the file */
if (transaction_setup_recovery(tdb, &magic_offset) == -1) { if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n")); TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1); tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
return -1; return -1;
} }
} }
tdb->transaction->prepared = true;
/* expand the file to the new size if needed */ /* expand the file to the new size if needed */
if (tdb->map_size != tdb->transaction->old_map_size) { if (tdb->map_size != tdb->transaction->old_map_size) {
if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
tdb->map_size - tdb->map_size -
tdb->transaction->old_map_size) == -1) { tdb->transaction->old_map_size) == -1) {
tdb->ecode = TDB_ERR_IO; tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n")); TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1); tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
tdb_transaction_cancel_internal(tdb); tdb_transaction_cancel_internal(tdb);
return -1; return -1;
...@@ -930,6 +968,68 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -930,6 +968,68 @@ int tdb_transaction_commit(struct tdb_context *tdb)
methods->tdb_oob(tdb, tdb->map_size + 1, 1); methods->tdb_oob(tdb, tdb->map_size + 1, 1);
} }
/* Keep the global lock until the actual commit */
return 0;
}
/*
prepare to commit the current transaction
*/
int tdb_transaction_prepare_commit(struct tdb_context *tdb)
{
tdb_trace(tdb, "tdb_transaction_prepare_commit");
return tdb_transaction_prepare_commit_internal(tdb);
}
/*
commit the current transaction
*/
int tdb_transaction_commit(struct tdb_context *tdb)
{
const struct tdb_methods *methods;
int i;
if (tdb->transaction == NULL) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
return -1;
}
/* If we've prepared, can't read seqnum. */
if (tdb->transaction->prepared) {
tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
"tdb_transaction_commit");
} else {
tdb_trace(tdb, "tdb_transaction_commit");
}
if (tdb->transaction->transaction_error) {
tdb->ecode = TDB_ERR_IO;
tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
return -1;
}
if (tdb->transaction->nesting != 0) {
tdb->transaction->nesting--;
return 0;
}
/* check for a null transaction */
if (tdb->transaction->blocks == NULL) {
tdb_transaction_cancel_internal(tdb);
return 0;
}
if (!tdb->transaction->prepared) {
int ret = tdb_transaction_prepare_commit_internal(tdb);
if (ret)
return ret;
}
methods = tdb->transaction->io_methods;
/* perform all the writes */ /* perform all the writes */
for (i=0;i<tdb->transaction->num_blocks;i++) { for (i=0;i<tdb->transaction->num_blocks;i++) {
tdb_off_t offset; tdb_off_t offset;
...@@ -971,17 +1071,6 @@ int tdb_transaction_commit(struct tdb_context *tdb) ...@@ -971,17 +1071,6 @@ int tdb_transaction_commit(struct tdb_context *tdb)
if (transaction_sync(tdb, 0, tdb->map_size) == -1) { if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
return -1; return -1;
} }
/* remove the recovery marker */
if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
return -1;
}
/* ensure the recovery marker has been removed on disk */
if (transaction_sync(tdb, magic_offset, 4) == -1) {
return -1;
}
} }
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1); tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment