Commit a9c554b3 authored by Yoni Fogel's avatar Yoni Fogel

[t:2641] Merge DB_READCOMMITTED bugfix (r20839) to main.

git-svn-id: file:///svn/toku/tokudb@20863 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0566a81c
......@@ -183,6 +183,7 @@ struct brt_header {
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
TXNID txnid_that_created_or_locked_when_empty;
TXNID root_that_created_or_locked_when_empty;
TXNID txnid_that_suppressed_recovery_logs;
struct toku_list live_brts;
struct toku_list zombie_brts;
......
......@@ -174,6 +174,18 @@ message are not gorged. (But they may be hungry or too fat or too thin.)
#include "roll.h"
#include "toku_atomic.h"
void
toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn) {
TXNID txnid = toku_txn_get_txnid(txn);
assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
h->txnid_that_created_or_locked_when_empty == txnid);
h->txnid_that_created_or_locked_when_empty = txnid;
TXNID rootid = toku_txn_get_root_txnid(txn);
assert(h->root_that_created_or_locked_when_empty == TXNID_NONE ||
h->root_that_created_or_locked_when_empty == rootid);
h->root_that_created_or_locked_when_empty = rootid;
}
static void brt_cursor_invalidate(BRT_CURSOR brtcursor);
// We invalidate all the OMTCURSORS any time we push into the root of the BRT for that OMT.
......@@ -3330,8 +3342,7 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET
if (t->db) t->db->descriptor = &t->h->descriptor;
if (txn_created) {
assert(txn);
assert(t->h->txnid_that_created_or_locked_when_empty == TXNID_NONE);
t->h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(txn);
toku_brt_header_suppress_rollbacks(t->h, txn);
r = toku_txn_note_brt(txn, t);
assert(r==0);
}
......@@ -3641,9 +3652,8 @@ toku_dictionary_redirect (const char *dst_fname_in_env, BRT old_brt, TOKUTXN txn
r = toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
assert(r==0);
assert(new_h->txnid_that_created_or_locked_when_empty == TXNID_NONE);
TXNID xid = toku_txn_get_txnid(txn);
new_h->txnid_that_created_or_locked_when_empty = xid;
toku_brt_header_suppress_rollbacks(new_h, txn);
r = toku_log_suppress_rollback(txn->logger, NULL, 0, new_filenum, xid);
assert(r==0);
}
......@@ -4068,49 +4078,54 @@ brt_cursor_cleanup_dbts(BRT_CURSOR c) {
}
}
static inline void brt_cursor_extract_key_and_val(
LEAFENTRY le,
BRT_CURSOR cursor,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val
)
{
static inline int brt_cursor_extract_key_and_val(
LEAFENTRY le,
BRT_CURSOR cursor,
u_int32_t *keylen,
void **key,
u_int32_t *vallen,
void **val) {
int r = 0;
if (cursor->is_read_committed) {
TXNID le_anc_id = le_outermost_uncommitted_xid(le);
if (le_anc_id < cursor->logger->oldest_living_xid || //current transaction has inserted this element
le_anc_id == 0 || // le is a committed value with no provisional data
le_anc_id == cursor->ancestor_id || //quick check to avoid more expensive is_txnid_live check
!is_txnid_live(cursor->logger,le_anc_id))
{
*key = le_latest_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
//Maybe fake not finding if some transaction is inserting 'committed elements'
TXNID rootid = cursor->brt->h->root_that_created_or_locked_when_empty;
if (rootid != TXNID_NONE && rootid != cursor->ancestor_id) {
r = DB_NOTFOUND;
}
else {
*key = le_outermost_key_and_len(le, keylen);
*val = le_outermost_val_and_len(le, vallen);
TXNID le_anc_id = le_outermost_uncommitted_xid(le);
if (le_anc_id < cursor->logger->oldest_living_xid || //current transaction has inserted this element
le_anc_id == 0 || // le is a committed value with no provisional data
le_anc_id == cursor->ancestor_id || //quick check to avoid more expensive is_txnid_live check
!is_txnid_live(cursor->logger,le_anc_id))
{
*key = le_latest_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
}
else {
*key = le_outermost_key_and_len(le, keylen);
*val = le_outermost_val_and_len(le, vallen);
}
}
}
else {
*key = le_latest_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
}
return r;
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert(r==0);
brt_cursor_extract_key_and_val(
le,
c,
&key->size,
&key->data,
&val->size,
&val->data
);
r = brt_cursor_extract_key_and_val(le,
c,
&key->size,
&key->data,
&val->size,
&val->data);
assert(r==0);
}
// When an omt cursor is invalidated, this is the brt-level function
......@@ -4368,21 +4383,21 @@ got_a_good_value:
u_int32_t vallen;
void *val;
brt_cursor_extract_key_and_val(
le,
brtcursor,
&keylen,
&key,
&vallen,
&val
);
r = brt_cursor_extract_key_and_val(le,
brtcursor,
&keylen,
&key,
&vallen,
&val);
assert(brtcursor->current_in_omt == FALSE);
r = getf(keylen, key,
vallen, val,
0, NULL, //TODO: Put actual values here.
0, NULL, //TODO: Put actual values here.
getf_v);
if (r==0) {
r = getf(keylen, key,
vallen, val,
0, NULL, //TODO: Put actual values here.
0, NULL, //TODO: Put actual values here.
getf_v);
}
if (r==0) {
// Leave the omtcursor alone above (pass NULL to omt_find/fetch)
// This prevents the omt from calling associate(), which would
......@@ -4815,16 +4830,16 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_
u_int32_t vallen;
void *val;
brt_cursor_extract_key_and_val(
le,
cursor,
&keylen,
&key,
&vallen,
&val
);
r = brt_cursor_extract_key_and_val(le,
cursor,
&keylen,
&key,
&vallen,
&val);
r = getf(keylen, key, vallen, val, getf_v);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
//Update cursor.
cursor->leaf_info.to_be.index = index;
......
......@@ -210,6 +210,9 @@ void toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used);
int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
void toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn);
//Effect: suppresses rollback logs
void toku_brt_suppress_recovery_logs (BRT brt, TOKUTXN txn);
// Effect: suppresses recovery logs
// Requires: this is a (target) redirected brt
......
......@@ -1087,6 +1087,15 @@ int toku_read_logmagic (FILE *f, u_int32_t *versionp) {
return 0;
}
TXNID toku_txn_get_root_txnid (TOKUTXN txn) {
if (txn==0) return 0;
TOKUTXN root = txn;
while (root->parent) {
root = root->parent;
}
return root->txnid64;
}
TXNID toku_txn_get_txnid (TOKUTXN txn) {
if (txn==0) return 0;
else return txn->txnid64;
......
......@@ -84,6 +84,7 @@ int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp);
int toku_read_logmagic (FILE *f, u_int32_t *versionp);
TXNID toku_txn_get_txnid (TOKUTXN txn);
TXNID toku_txn_get_root_txnid (TOKUTXN txn);
LSN toku_logger_last_lsn(TOKULOGGER logger);
TOKULOGGER toku_txn_logger (TOKUTXN txn);
......
......@@ -533,9 +533,7 @@ static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(
assert(r == 0);
assert(txn!=NULL);
struct brt_header *h = tuple->brt->h;
assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
h->txnid_that_created_or_locked_when_empty == l->xid);
h->txnid_that_created_or_locked_when_empty = l->xid;
toku_brt_header_suppress_rollbacks(h, txn);
}
return 0;
}
......
......@@ -574,6 +574,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment