Commit ceb2f3ea authored by Rusty Russell's avatar Rusty Russell

tdb2: unify tdb1_chainlock et al. into tdb_chainlock

Switch on the TDB_VERSION1 flag.
parent cc2d609d
...@@ -852,6 +852,11 @@ static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key, ...@@ -852,6 +852,11 @@ static enum TDB_ERROR chainlock(struct tdb_context *tdb, const TDB_DATA *key,
contention - it cannot guarantee how many records will be locked */ contention - it cannot guarantee how many records will be locked */
enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key) enum TDB_ERROR tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
{ {
if (tdb->flags & TDB_VERSION1) {
if (tdb1_chainlock(tdb, key) == -1)
return tdb->last_error;
return TDB_SUCCESS;
}
return tdb->last_error = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT, return tdb->last_error = chainlock(tdb, &key, F_WRLCK, TDB_LOCK_WAIT,
"tdb_chainlock"); "tdb_chainlock");
} }
...@@ -862,6 +867,11 @@ void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) ...@@ -862,6 +867,11 @@ void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
tdb_off_t lockstart, locksize; tdb_off_t lockstart, locksize;
unsigned int group, gbits; unsigned int group, gbits;
if (tdb->flags & TDB_VERSION1) {
tdb1_chainunlock(tdb, key);
return;
}
gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS; gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
group = bits_from(h, 64 - gbits, gbits); group = bits_from(h, 64 - gbits, gbits);
...@@ -873,6 +883,11 @@ void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key) ...@@ -873,6 +883,11 @@ void tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
enum TDB_ERROR tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key) enum TDB_ERROR tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
{ {
if (tdb->flags & TDB_VERSION1) {
if (tdb1_chainlock_read(tdb, key) == -1)
return tdb->last_error;
return TDB_SUCCESS;
}
return tdb->last_error = chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT, return tdb->last_error = chainlock(tdb, &key, F_RDLCK, TDB_LOCK_WAIT,
"tdb_chainlock_read"); "tdb_chainlock_read");
} }
...@@ -883,6 +898,10 @@ void tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key) ...@@ -883,6 +898,10 @@ void tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
tdb_off_t lockstart, locksize; tdb_off_t lockstart, locksize;
unsigned int group, gbits; unsigned int group, gbits;
if (tdb->flags & TDB_VERSION1) {
tdb1_chainunlock_read(tdb, key);
return;
}
gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS; gbits = TDB_TOPLEVEL_HASH_BITS - TDB_HASH_GROUP_BITS;
group = bits_from(h, 64 - gbits, gbits); group = bits_from(h, 64 - gbits, gbits);
......
...@@ -534,6 +534,12 @@ enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype, ...@@ -534,6 +534,12 @@ enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
tdb_bool_err berr; tdb_bool_err berr;
if (tdb->flags & TDB_VERSION1) {
if (tdb1_allrecord_lock(tdb, ltype, flags, upgradable) == -1)
return tdb->last_error;
return TDB_SUCCESS;
}
if (tdb->flags & TDB_NOLOCK) if (tdb->flags & TDB_NOLOCK)
return TDB_SUCCESS; return TDB_SUCCESS;
...@@ -648,6 +654,11 @@ void tdb_unlock_expand(struct tdb_context *tdb, int ltype) ...@@ -648,6 +654,11 @@ void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
/* unlock entire db */ /* unlock entire db */
void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype) void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
{ {
if (tdb->flags & TDB_VERSION1) {
tdb1_allrecord_unlock(tdb, ltype);
return;
}
if (tdb->flags & TDB_NOLOCK) if (tdb->flags & TDB_NOLOCK)
return; return;
...@@ -862,12 +873,7 @@ void tdb_lock_cleanup(struct tdb_context *tdb) ...@@ -862,12 +873,7 @@ void tdb_lock_cleanup(struct tdb_context *tdb)
while (tdb->file->allrecord_lock.count while (tdb->file->allrecord_lock.count
&& tdb->file->allrecord_lock.owner == tdb) { && tdb->file->allrecord_lock.owner == tdb) {
if (tdb->flags & TDB_VERSION1) tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
tdb1_allrecord_unlock(tdb,
tdb->file->allrecord_lock.ltype);
else
tdb_allrecord_unlock(tdb,
tdb->file->allrecord_lock.ltype);
} }
for (i=0; i<tdb->file->num_lockrecs; i++) { for (i=0; i<tdb->file->num_lockrecs; i++) {
......
...@@ -640,8 +640,15 @@ enum TDB_ERROR tdb1_open(struct tdb_context *tdb); ...@@ -640,8 +640,15 @@ enum TDB_ERROR tdb1_open(struct tdb_context *tdb);
enum TDB_ERROR tdb1_probe_length(struct tdb_context *tdb); enum TDB_ERROR tdb1_probe_length(struct tdb_context *tdb);
/* tdb1_lock.c: */ /* tdb1_lock.c: */
int tdb1_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable);
int tdb1_allrecord_unlock(struct tdb_context *tdb, int ltype); int tdb1_allrecord_unlock(struct tdb_context *tdb, int ltype);
int tdb1_chainlock(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainunlock(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainlock_read(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainunlock_read(struct tdb_context *tdb, TDB_DATA key);
/* tdb1_transaction.c: */ /* tdb1_transaction.c: */
int tdb1_transaction_recover(struct tdb_context *tdb); int tdb1_transaction_recover(struct tdb_context *tdb);
int tdb1_transaction_cancel(struct tdb_context *tdb); int tdb1_transaction_cancel(struct tdb_context *tdb);
......
...@@ -47,14 +47,6 @@ TDB_DATA tdb1_firstkey(struct tdb_context *tdb); ...@@ -47,14 +47,6 @@ TDB_DATA tdb1_firstkey(struct tdb_context *tdb);
TDB_DATA tdb1_nextkey(struct tdb_context *tdb, TDB_DATA key); TDB_DATA tdb1_nextkey(struct tdb_context *tdb, TDB_DATA key);
int tdb1_lockall(struct tdb_context *tdb);
int tdb1_unlockall(struct tdb_context *tdb);
int tdb1_lockall_read(struct tdb_context *tdb);
int tdb1_unlockall_read(struct tdb_context *tdb);
int tdb1_transaction_start(struct tdb_context *tdb); int tdb1_transaction_start(struct tdb_context *tdb);
int tdb1_transaction_prepare_commit(struct tdb_context *tdb); int tdb1_transaction_prepare_commit(struct tdb_context *tdb);
...@@ -73,13 +65,6 @@ int tdb1_check(struct tdb_context *tdb, ...@@ -73,13 +65,6 @@ int tdb1_check(struct tdb_context *tdb,
/* @} ******************************************************************/ /* @} ******************************************************************/
/* Low level locking functions: use with care */
int tdb1_chainlock(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainunlock(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainlock_read(struct tdb_context *tdb, TDB_DATA key);
int tdb1_chainunlock_read(struct tdb_context *tdb, TDB_DATA key);
/* wipe and repack */ /* wipe and repack */
int tdb1_wipe_all(struct tdb_context *tdb); int tdb1_wipe_all(struct tdb_context *tdb);
int tdb1_repack(struct tdb_context *tdb); int tdb1_repack(struct tdb_context *tdb);
......
...@@ -338,7 +338,7 @@ int tdb1_check(struct tdb_context *tdb, ...@@ -338,7 +338,7 @@ int tdb1_check(struct tdb_context *tdb,
if (tdb->file->allrecord_lock.count != 0) { if (tdb->file->allrecord_lock.count != 0) {
locked = false; locked = false;
} else { } else {
if (tdb1_lockall_read(tdb) == -1) if (tdb_lockall_read(tdb) != TDB_SUCCESS)
return -1; return -1;
locked = true; locked = true;
} }
...@@ -455,7 +455,7 @@ int tdb1_check(struct tdb_context *tdb, ...@@ -455,7 +455,7 @@ int tdb1_check(struct tdb_context *tdb,
free(hashes); free(hashes);
if (locked) { if (locked) {
tdb1_unlockall_read(tdb); tdb_unlockall_read(tdb);
} }
return 0; return 0;
...@@ -463,7 +463,7 @@ free: ...@@ -463,7 +463,7 @@ free:
free(hashes); free(hashes);
unlock: unlock:
if (locked) { if (locked) {
tdb1_unlockall_read(tdb); tdb_unlockall_read(tdb);
} }
return -1; return -1;
} }
...@@ -306,7 +306,7 @@ int tdb1_allrecord_lock(struct tdb_context *tdb, int ltype, ...@@ -306,7 +306,7 @@ int tdb1_allrecord_lock(struct tdb_context *tdb, int ltype,
} }
/* FIXME: Temporary cast. */ /* FIXME: Temporary cast. */
tdb->file->allrecord_lock.owner = (void *)(struct tdb1_context *)tdb; tdb->file->allrecord_lock.owner = (void *)(struct tdb_context *)tdb;
tdb->file->allrecord_lock.count = 1; tdb->file->allrecord_lock.count = 1;
/* If it's upgradable, it's actually exclusive so we can treat /* If it's upgradable, it's actually exclusive so we can treat
* it as a write lock. */ * it as a write lock. */
...@@ -364,30 +364,6 @@ int tdb1_allrecord_unlock(struct tdb_context *tdb, int ltype) ...@@ -364,30 +364,6 @@ int tdb1_allrecord_unlock(struct tdb_context *tdb, int ltype)
return 0; return 0;
} }
/* lock entire database with write lock */
int tdb1_lockall(struct tdb_context *tdb)
{
return tdb1_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
}
/* unlock entire database with write lock */
int tdb1_unlockall(struct tdb_context *tdb)
{
return tdb1_allrecord_unlock(tdb, F_WRLCK);
}
/* lock entire database with read lock */
int tdb1_lockall_read(struct tdb_context *tdb)
{
return tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false);
}
/* unlock entire database with read lock */
int tdb1_unlockall_read(struct tdb_context *tdb)
{
return tdb1_allrecord_unlock(tdb, F_RDLCK);
}
/* lock/unlock one hash chain. This is meant to be used to reduce /* lock/unlock one hash chain. This is meant to be used to reduce
contention - it cannot guarantee how many records will be locked */ contention - it cannot guarantee how many records will be locked */
int tdb1_chainlock(struct tdb_context *tdb, TDB_DATA key) int tdb1_chainlock(struct tdb_context *tdb, TDB_DATA key)
......
...@@ -42,7 +42,7 @@ void tdb1_header_hash(struct tdb_context *tdb, ...@@ -42,7 +42,7 @@ void tdb1_header_hash(struct tdb_context *tdb,
*magic1_hash = 1; *magic1_hash = 1;
} }
static void tdb1_context_init(struct tdb_context *tdb) static void tdb_context_init(struct tdb_context *tdb)
{ {
assert(tdb->flags & TDB_VERSION1); assert(tdb->flags & TDB_VERSION1);
...@@ -71,7 +71,7 @@ enum TDB_ERROR tdb1_new_database(struct tdb_context *tdb, ...@@ -71,7 +71,7 @@ enum TDB_ERROR tdb1_new_database(struct tdb_context *tdb,
int hash_size = TDB1_DEFAULT_HASH_SIZE; int hash_size = TDB1_DEFAULT_HASH_SIZE;
enum TDB_ERROR ret = TDB_ERR_IO; enum TDB_ERROR ret = TDB_ERR_IO;
tdb1_context_init(tdb); tdb_context_init(tdb);
/* Default TDB2 hash becomes default TDB1 hash. */ /* Default TDB2 hash becomes default TDB1 hash. */
if (tdb->hash_fn == tdb_jenkins_hash) if (tdb->hash_fn == tdb_jenkins_hash)
...@@ -171,7 +171,7 @@ enum TDB_ERROR tdb1_open(struct tdb_context *tdb) ...@@ -171,7 +171,7 @@ enum TDB_ERROR tdb1_open(struct tdb_context *tdb)
tdb->flags |= TDB_VERSION1; tdb->flags |= TDB_VERSION1;
tdb1_context_init(tdb); tdb_context_init(tdb);
/* Default TDB2 hash becomes default TDB1 hash. */ /* Default TDB2 hash becomes default TDB1 hash. */
if (tdb->hash_fn == tdb_jenkins_hash) { if (tdb->hash_fn == tdb_jenkins_hash) {
......
...@@ -142,8 +142,6 @@ int tdb1_recovery_area(struct tdb_context *tdb, ...@@ -142,8 +142,6 @@ int tdb1_recovery_area(struct tdb_context *tdb,
const struct tdb1_methods *methods, const struct tdb1_methods *methods,
tdb1_off_t *recovery_offset, tdb1_off_t *recovery_offset,
struct tdb1_record *rec); struct tdb1_record *rec);
int tdb1_allrecord_lock(struct tdb_context *tdb, int ltype,
enum tdb_lock_flags flags, bool upgradable);
int tdb1_allrecord_upgrade(struct tdb_context *tdb); int tdb1_allrecord_upgrade(struct tdb_context *tdb);
int tdb1_write_lock_record(struct tdb_context *tdb, tdb1_off_t off); int tdb1_write_lock_record(struct tdb_context *tdb, tdb1_off_t off);
int tdb1_write_unlock_record(struct tdb_context *tdb, tdb1_off_t off); int tdb1_write_unlock_record(struct tdb_context *tdb, tdb1_off_t off);
......
...@@ -98,7 +98,7 @@ char *tdb1_summary(struct tdb_context *tdb) ...@@ -98,7 +98,7 @@ char *tdb1_summary(struct tdb_context *tdb)
if (tdb->file->allrecord_lock.count != 0) { if (tdb->file->allrecord_lock.count != 0) {
locked = false; locked = false;
} else { } else {
if (tdb1_lockall_read(tdb) == -1) if (tdb_lockall_read(tdb) != TDB_SUCCESS)
return NULL; return NULL;
locked = true; locked = true;
} }
...@@ -196,7 +196,7 @@ char *tdb1_summary(struct tdb_context *tdb) ...@@ -196,7 +196,7 @@ char *tdb1_summary(struct tdb_context *tdb)
unlock: unlock:
if (locked) { if (locked) {
tdb1_unlockall_read(tdb); tdb_unlockall_read(tdb);
} }
return ret; return ret;
} }
...@@ -732,7 +732,7 @@ int tdb1_wipe_all(struct tdb_context *tdb) ...@@ -732,7 +732,7 @@ int tdb1_wipe_all(struct tdb_context *tdb)
tdb1_off_t recovery_head; tdb1_off_t recovery_head;
tdb1_len_t recovery_size = 0; tdb1_len_t recovery_size = 0;
if (tdb1_lockall(tdb) != 0) { if (tdb_lockall(tdb) != TDB_SUCCESS) {
return -1; return -1;
} }
...@@ -801,16 +801,11 @@ int tdb1_wipe_all(struct tdb_context *tdb) ...@@ -801,16 +801,11 @@ int tdb1_wipe_all(struct tdb_context *tdb)
} }
} }
if (tdb1_unlockall(tdb) != 0) { tdb_unlockall(tdb);
tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
"tdb1_wipe_all: failed to unlock");
goto failed;
}
return 0; return 0;
failed: failed:
tdb1_unlockall(tdb); tdb_unlockall(tdb);
return -1; return -1;
} }
......
...@@ -74,7 +74,7 @@ int main(int argc, char *argv[]) ...@@ -74,7 +74,7 @@ int main(int argc, char *argv[])
hsize.base.next = &tap_log_attr; hsize.base.next = &tap_log_attr;
hsize.tdb1_hashsize.hsize = 1024; hsize.tdb1_hashsize.hsize = 1024;
plan_tests(43); plan_tests(40);
tdb = tdb_open("run-no-lock-during-traverse.tdb1", tdb = tdb_open("run-no-lock-during-traverse.tdb1",
TDB_VERSION1, O_CREAT|O_TRUNC|O_RDWR, TDB_VERSION1, O_CREAT|O_TRUNC|O_RDWR,
0600, &hsize); 0600, &hsize);
...@@ -82,28 +82,28 @@ int main(int argc, char *argv[]) ...@@ -82,28 +82,28 @@ int main(int argc, char *argv[])
ok1(tdb); ok1(tdb);
ok1(prepare_entries(tdb)); ok1(prepare_entries(tdb));
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_lockall(tdb) == 0); ok1(tdb_lockall(tdb) == 0);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb_traverse(tdb, delete_other, &errors) >= 0); ok1(tdb_traverse(tdb, delete_other, &errors) >= 0);
ok1(errors == 0); ok1(errors == 0);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_unlockall(tdb) == 0); tdb_unlockall(tdb);
ok1(prepare_entries(tdb)); ok1(prepare_entries(tdb));
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_lockall(tdb) == 0); ok1(tdb_lockall(tdb) == 0);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb_traverse(tdb, delete_self, NULL) == NUM_ENTRIES); ok1(tdb_traverse(tdb, delete_self, NULL) == NUM_ENTRIES);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_unlockall(tdb) == 0); tdb_unlockall(tdb);
ok1(prepare_entries(tdb)); ok1(prepare_entries(tdb));
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_lockall(tdb) == 0); ok1(tdb_lockall(tdb) == 0);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
delete_entries(tdb); delete_entries(tdb);
ok1(locking_errors1 == 0); ok1(locking_errors1 == 0);
ok1(tdb1_unlockall(tdb) == 0); tdb_unlockall(tdb);
ok1(tdb_close(tdb) == 0); ok1(tdb_close(tdb) == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment