Commit f12bdc75 authored by John Esmet's avatar John Esmet

Pass down txn manager state to message application, which it can use to

run full garbage collection when a leafentry has > 1 committed entry.
parent f7323d26
...@@ -1065,6 +1065,10 @@ typedef enum { ...@@ -1065,6 +1065,10 @@ typedef enum {
LE_MAX_PROVISIONAL_XR, LE_MAX_PROVISIONAL_XR,
LE_EXPANDED, LE_EXPANDED,
LE_MAX_MEMSIZE, LE_MAX_MEMSIZE,
LE_APPLY_GC_BYTES_IN,
LE_APPLY_GC_BYTES_OUT,
LE_NORMAL_GC_BYTES_IN,
LE_NORMAL_GC_BYTES_OUT,
LE_STATUS_NUM_ROWS LE_STATUS_NUM_ROWS
} le_status_entry; } le_status_entry;
...@@ -1196,6 +1200,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1196,6 +1200,7 @@ toku_ft_bn_apply_cmd_once (
LEAFENTRY le, LEAFENTRY le,
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdonep, uint64_t *workdonep,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1209,6 +1214,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1209,6 +1214,7 @@ toku_ft_bn_apply_cmd (
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1222,6 +1228,7 @@ toku_ft_leaf_apply_cmd ( ...@@ -1222,6 +1228,7 @@ toku_ft_leaf_apply_cmd (
int target_childnum, int target_childnum,
FT_MSG cmd, FT_MSG cmd,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1236,6 +1243,7 @@ toku_ft_node_put_cmd ( ...@@ -1236,6 +1243,7 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
size_t flow_deltas[], size_t flow_deltas[],
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
......
...@@ -1692,6 +1692,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1692,6 +1692,7 @@ toku_ft_bn_apply_cmd_once (
LEAFENTRY le, LEAFENTRY le,
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -1719,6 +1720,7 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1719,6 +1720,7 @@ toku_ft_bn_apply_cmd_once (
idx, idx,
oldest_referenced_xid, oldest_referenced_xid,
gc_info, gc_info,
txn_state_for_gc,
&new_le, &new_le,
&numbytes_delta &numbytes_delta
); );
...@@ -1768,6 +1770,7 @@ struct setval_extra_s { ...@@ -1768,6 +1770,7 @@ struct setval_extra_s {
LEAFENTRY le; LEAFENTRY le;
TXNID oldest_referenced_xid; TXNID oldest_referenced_xid;
GC_INFO gc_info; GC_INFO gc_info;
txn_manager_state *txn_state_for_gc;
uint64_t * workdone; // set by toku_ft_bn_apply_cmd_once() uint64_t * workdone; // set by toku_ft_bn_apply_cmd_once()
STAT64INFO stats_to_update; STAT64INFO stats_to_update;
}; };
...@@ -1801,6 +1804,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { ...@@ -1801,6 +1804,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
toku_ft_bn_apply_cmd_once(svextra->bn, &msg, toku_ft_bn_apply_cmd_once(svextra->bn, &msg,
svextra->idx, svextra->le, svextra->idx, svextra->le,
svextra->oldest_referenced_xid, svextra->gc_info, svextra->oldest_referenced_xid, svextra->gc_info,
svextra->txn_state_for_gc,
svextra->workdone, svextra->stats_to_update); svextra->workdone, svextra->stats_to_update);
svextra->setval_r = 0; svextra->setval_r = 0;
} }
...@@ -1816,6 +1820,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn ...@@ -1816,6 +1820,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
uint32_t keylen, uint32_t keylen,
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t * workdone, uint64_t * workdone,
STAT64INFO stats_to_update) { STAT64INFO stats_to_update) {
LEAFENTRY le_for_update; LEAFENTRY le_for_update;
...@@ -1860,7 +1865,8 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn ...@@ -1860,7 +1865,8 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
le_for_update = le; le_for_update = le;
struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids, struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids,
keyp, idx, le_for_update, oldest_referenced_xid, gc_info, workdone, stats_to_update}; keyp, idx, le_for_update, oldest_referenced_xid, gc_info,
txn_state_for_gc, workdone, stats_to_update};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun() // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc); FAKE_DB(db, desc);
int r = update_fun( int r = update_fun(
...@@ -1885,6 +1891,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1885,6 +1891,7 @@ toku_ft_bn_apply_cmd (
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid_known, TXNID oldest_referenced_xid_known,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -1931,7 +1938,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1931,7 +1938,7 @@ toku_ft_bn_apply_cmd (
} else { } else {
assert_zero(r); assert_zero(r);
} }
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of // if the insertion point is within a window of the right edge of
// the leaf then it is sequential // the leaf then it is sequential
...@@ -1963,7 +1970,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1963,7 +1970,7 @@ toku_ft_bn_apply_cmd (
); );
if (r == DB_NOTFOUND) break; if (r == DB_NOTFOUND) break;
assert_zero(r); assert_zero(r);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
break; break;
} }
...@@ -1985,7 +1992,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1985,7 +1992,7 @@ toku_ft_bn_apply_cmd (
cmd->u.id.key = &curr_keydbt; cmd->u.id.key = &curr_keydbt;
int deleted = 0; int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do. if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
uint32_t new_omt_size = bn->data_buffer.omt_size(); uint32_t new_omt_size = bn->data_buffer.omt_size();
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -2017,7 +2024,7 @@ toku_ft_bn_apply_cmd ( ...@@ -2017,7 +2024,7 @@ toku_ft_bn_apply_cmd (
cmd->u.id.key = &curr_keydbt; cmd->u.id.key = &curr_keydbt;
int deleted = 0; int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) { if (le_has_xids(storeddata, cmd->xids)) {
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
uint32_t new_omt_size = bn->data_buffer.omt_size(); uint32_t new_omt_size = bn->data_buffer.omt_size();
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -2049,9 +2056,9 @@ toku_ft_bn_apply_cmd ( ...@@ -2049,9 +2056,9 @@ toku_ft_bn_apply_cmd (
key = cmd->u.id.key->data; key = cmd->u.id.key->data;
keylen = cmd->u.id.key->size; keylen = cmd->u.id.key->size;
} }
r = do_update(update_fun, desc, bn, cmd, idx, NULL, NULL, 0, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, NULL, NULL, 0, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
} else if (r==0) { } else if (r==0) {
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, key, keylen, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, key, keylen, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
} // otherwise, a worse error, just return it } // otherwise, a worse error, just return it
break; break;
} }
...@@ -2074,7 +2081,7 @@ toku_ft_bn_apply_cmd ( ...@@ -2074,7 +2081,7 @@ toku_ft_bn_apply_cmd (
// This is broken below. Have a compilation error checked // This is broken below. Have a compilation error checked
// in as a reminder // in as a reminder
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, curr_key, curr_keylen, oldest_referenced_xid_known, gc_info, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, curr_key, curr_keylen, oldest_referenced_xid_known, gc_info, txn_state_for_gc, workdone, stats_to_update);
assert_zero(r); assert_zero(r);
if (num_leafentries_before == bn->data_buffer.omt_size()) { if (num_leafentries_before == bn->data_buffer.omt_size()) {
...@@ -2442,29 +2449,24 @@ static void ...@@ -2442,29 +2449,24 @@ static void
ft_leaf_run_gc(FTNODE node, FT ft) { ft_leaf_run_gc(FTNODE node, FT ft) {
TOKULOGGER logger = toku_cachefile_logger(ft->cf); TOKULOGGER logger = toku_cachefile_logger(ft->cf);
if (logger) { if (logger) {
xid_omt_t snapshot_txnids; TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
rx_omt_t referenced_xids; txn_manager_state txn_state_for_gc;
xid_omt_t live_root_txns; txn_state_for_gc.init(txn_manager);
toku_txn_manager_clone_state_for_gc(
logger->txn_manager,
&snapshot_txnids,
&referenced_xids,
&live_root_txns
);
// Perform garbage collection. Provide a full snapshot of the transaction // Perform full garbage collection. Provide a fresh snapshot of the transaction
// system plus the oldest known referenced xid that could have had messages // system plus the oldest known referenced xid that could have had messages
// applied to this leaf. // applied to this leaf (which comes from the node, NOT the txn_manager_state,
// which has a value only suitible for simple garbage colletion).
// //
// Using the oldest xid in either the referenced_xids or live_root_txns // Using the oldest xid in either the referenced_xids or live_root_txns
// snapshots is not sufficient, because there could be something older that is neither // snapshots is not sufficient, because there could be something older that is neither
// live nor referenced, but instead aborted somewhere above us as a message in the tree. // live nor referenced, but instead aborted somewhere above us as a message in the tree.
ft_leaf_gc_all_les(node, ft, snapshot_txnids, referenced_xids, live_root_txns, node->oldest_referenced_xid_known); ft_leaf_gc_all_les(node, ft,
txn_state_for_gc.snapshot_xids,
// Free the OMT's we used for garbage collecting. txn_state_for_gc.referenced_xids,
snapshot_txnids.destroy(); txn_state_for_gc.live_root_txns,
referenced_xids.destroy(); node->oldest_referenced_xid_known);
live_root_txns.destroy(); txn_state_for_gc.destroy();
} }
} }
...@@ -2478,6 +2480,14 @@ void toku_bnc_flush_to_child( ...@@ -2478,6 +2480,14 @@ void toku_bnc_flush_to_child(
paranoid_invariant(bnc); paranoid_invariant(bnc);
STAT64INFO_S stats_delta = {0,0}; STAT64INFO_S stats_delta = {0,0};
size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer); size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer);
txn_manager_state txn_state_for_gc;
bool do_garbage_collection = child->height == 0 && toku_cachefile_logger(ft->cf) != nullptr;
if (do_garbage_collection) {
TOKULOGGER logger = toku_cachefile_logger(ft->cf);
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
txn_state_for_gc.init(txn_manager);
}
FIFO_ITERATE( FIFO_ITERATE(
bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
({ ({
...@@ -2503,6 +2513,7 @@ void toku_bnc_flush_to_child( ...@@ -2503,6 +2513,7 @@ void toku_bnc_flush_to_child(
&ftcmd, &ftcmd,
is_fresh, is_fresh,
make_gc_info(true), // mvcc_needed make_gc_info(true), // mvcc_needed
&txn_state_for_gc,
flow_deltas, flow_deltas,
&stats_delta &stats_delta
); );
...@@ -2514,8 +2525,8 @@ void toku_bnc_flush_to_child( ...@@ -2514,8 +2525,8 @@ void toku_bnc_flush_to_child(
if (stats_delta.numbytes || stats_delta.numrows) { if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, stats_delta); toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
} }
if (child->height == 0) { if (do_garbage_collection) {
ft_leaf_run_gc(child, ft); txn_state_for_gc.destroy();
size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer); size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
STATUS_INC(FT_MSG_BYTES_OUT, buffsize); STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there // may be misleading if there's a broadcast message in there
...@@ -2539,6 +2550,7 @@ toku_ft_node_put_cmd ( ...@@ -2539,6 +2550,7 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
size_t flow_deltas[], size_t flow_deltas[],
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -2556,7 +2568,7 @@ toku_ft_node_put_cmd ( ...@@ -2556,7 +2568,7 @@ toku_ft_node_put_cmd (
// and instead defer to these functions // and instead defer to these functions
// //
if (node->height==0) { if (node->height==0) {
toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, gc_info, nullptr, stats_to_update); toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, gc_info, txn_state_for_gc, nullptr, stats_to_update);
} else { } else {
ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas); ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
} }
...@@ -2577,6 +2589,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2577,6 +2589,7 @@ void toku_ft_leaf_apply_cmd(
int target_childnum, // which child to inject to, or -1 if unknown int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd, FT_MSG cmd,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -2627,6 +2640,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2627,6 +2640,7 @@ void toku_ft_leaf_apply_cmd(
cmd, cmd,
oldest_referenced_xid_known, oldest_referenced_xid_known,
gc_info, gc_info,
txn_state_for_gc,
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
...@@ -2638,14 +2652,15 @@ void toku_ft_leaf_apply_cmd( ...@@ -2638,14 +2652,15 @@ void toku_ft_leaf_apply_cmd(
if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) { if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) {
BLB(node, childnum)->max_msn_applied = cmd->msn; BLB(node, childnum)->max_msn_applied = cmd->msn;
toku_ft_bn_apply_cmd(compare_fun, toku_ft_bn_apply_cmd(compare_fun,
update_fun, update_fun,
desc, desc,
BLB(node, childnum), BLB(node, childnum),
cmd, cmd,
oldest_referenced_xid_known, oldest_referenced_xid_known,
gc_info, gc_info,
workdone, txn_state_for_gc,
stats_to_update); workdone,
stats_to_update);
} else { } else {
STATUS_INC(FT_MSN_DISCARDS, 1); STATUS_INC(FT_MSN_DISCARDS, 1);
} }
...@@ -2696,6 +2711,7 @@ static void inject_message_in_locked_node( ...@@ -2696,6 +2711,7 @@ static void inject_message_in_locked_node(
cmd, cmd,
true, true,
gc_info, gc_info,
nullptr,
flow_deltas, flow_deltas,
&stats_delta &stats_delta
); );
...@@ -4331,6 +4347,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, TXNID ol ...@@ -4331,6 +4347,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, TXNID ol
&ftcmd, &ftcmd,
oldest_referenced_xid, oldest_referenced_xid,
make_gc_info(true), //mvcc is needed make_gc_info(true), //mvcc is needed
nullptr,
workdone, workdone,
stats_to_update stats_to_update
); );
......
...@@ -230,6 +230,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char ...@@ -230,6 +230,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
&cmd, &cmd,
true, true,
make_gc_info(true), make_gc_info(true),
nullptr,
zero_flow_deltas, zero_flow_deltas,
NULL NULL
); );
......
...@@ -2925,7 +2925,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2925,7 +2925,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
.xids = lbuf->xids, .xids = lbuf->xids,
.u = { .id = { &thekey, &theval } } }; .u = { .id = { &thekey, &theval } } };
uint64_t workdone=0; uint64_t workdone=0;
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(true), &workdone, stats_to_update); toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(true), nullptr, &workdone, stats_to_update);
} }
static int write_literal(struct dbout *out, void*data, size_t len) { static int write_literal(struct dbout *out, void*data, size_t len) {
......
...@@ -249,6 +249,7 @@ toku_le_apply_msg(FT_MSG msg, ...@@ -249,6 +249,7 @@ toku_le_apply_msg(FT_MSG msg,
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
LEAFENTRY *new_leafentry_p, LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p); int64_t * numbytes_delta_p);
......
...@@ -291,6 +291,7 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) { ...@@ -291,6 +291,7 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
txn_manager->last_xid = 0; txn_manager->last_xid = 0;
txn_manager->last_xid_seen_for_recover = TXNID_NONE; txn_manager->last_xid_seen_for_recover = TXNID_NONE;
txn_manager->last_calculated_oldest_referenced_xid = TXNID_NONE;
*txn_managerp = txn_manager; *txn_managerp = txn_manager;
} }
...@@ -324,6 +325,10 @@ toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) { ...@@ -324,6 +325,10 @@ toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) {
return rval; return rval;
} }
TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager) {
return txn_manager->last_calculated_oldest_referenced_xid;
}
int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids); int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids);
int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){ int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){
(*referenced_xids)[index] = live_xid->txnid.parent_id64; (*referenced_xids)[index] = live_xid->txnid.parent_id64;
...@@ -371,7 +376,7 @@ max_xid(TXNID a, TXNID b) { ...@@ -371,7 +376,7 @@ max_xid(TXNID a, TXNID b) {
return a < b ? b : a; return a < b ? b : a;
} }
static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) { static void set_oldest_referenced_xid(TXN_MANAGER txn_manager) {
TXNID oldest_referenced_xid = TXNID_MAX; TXNID oldest_referenced_xid = TXNID_MAX;
int r; int r;
if (txn_manager->live_root_ids.size() > 0) { if (txn_manager->live_root_ids.size() > 0) {
...@@ -397,8 +402,8 @@ static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) { ...@@ -397,8 +402,8 @@ static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) {
if (txn_manager->last_xid < oldest_referenced_xid) { if (txn_manager->last_xid < oldest_referenced_xid) {
oldest_referenced_xid = txn_manager->last_xid; oldest_referenced_xid = txn_manager->last_xid;
} }
paranoid_invariant(oldest_referenced_xid != TXNID_MAX); invariant(oldest_referenced_xid != TXNID_MAX);
return oldest_referenced_xid; txn_manager->last_calculated_oldest_referenced_xid = oldest_referenced_xid;
} }
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index) //Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
...@@ -672,7 +677,7 @@ void toku_txn_manager_start_txn( ...@@ -672,7 +677,7 @@ void toku_txn_manager_start_txn(
r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx); r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
invariant_zero(r); invariant_zero(r);
} }
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager); set_oldest_referenced_xid(txn_manager);
if (needs_snapshot) { if (needs_snapshot) {
txn_manager_create_snapshot_unlocked( txn_manager_create_snapshot_unlocked(
...@@ -825,7 +830,22 @@ void toku_txn_manager_clone_state_for_gc( ...@@ -825,7 +830,22 @@ void toku_txn_manager_clone_state_for_gc(
txn_manager_unlock(txn_manager); txn_manager_unlock(txn_manager);
} }
void txn_manager_state::init(TXN_MANAGER txn_manager) {
invariant_notnull(txn_manager);
toku_txn_manager_clone_state_for_gc(
txn_manager,
&snapshot_xids,
&referenced_xids,
&live_root_txns
);
oldest_referenced_xid_for_simple_gc = txn_manager->last_calculated_oldest_referenced_xid;
}
void txn_manager_state::destroy() {
snapshot_xids.destroy();
referenced_xids.destroy();
live_root_txns.destroy();
}
void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) { void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) {
TOKUTXN txn; TOKUTXN txn;
......
...@@ -121,6 +121,28 @@ struct txn_manager { ...@@ -121,6 +121,28 @@ struct txn_manager {
TXNID last_xid; TXNID last_xid;
TXNID last_xid_seen_for_recover; TXNID last_xid_seen_for_recover;
TXNID last_calculated_oldest_referenced_xid;
};
struct txn_manager_state {
// a snapshot of the txn manager's mvcc state
xid_omt_t snapshot_xids;
rx_omt_t referenced_xids;
xid_omt_t live_root_txns;
// the oldest xid in any live list
//
// suitible for simple garbage collection that cleans up multiple committed
// transaction records into one. not suitible for implicit promotions, which
// must be correct in the face of abort messages - see ftnode->oldest_referenced_xid
TXNID oldest_referenced_xid_for_simple_gc;
txn_manager_state() { }
void init(TXN_MANAGER txn_manager);
void destroy();
private:
txn_manager_state(txn_manager_state &rhs); // shouldn't need to copy construct
}; };
...@@ -129,6 +151,8 @@ void toku_txn_manager_destroy(TXN_MANAGER txn_manager); ...@@ -129,6 +151,8 @@ void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager); TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager);
void toku_txn_manager_handle_snapshot_create_for_child_txn( void toku_txn_manager_handle_snapshot_create_for_child_txn(
TOKUTXN txn, TOKUTXN txn,
TXN_MANAGER txn_manager, TXN_MANAGER txn_manager,
......
...@@ -116,7 +116,7 @@ PATENT RIGHTS GRANT: ...@@ -116,7 +116,7 @@ PATENT RIGHTS GRANT:
#include "ule-internal.h" #include "ule-internal.h"
#include <util/status.h> #include <util/status.h>
#include <util/scoped_malloc.h> #include <util/scoped_malloc.h>
#include <util/partitioned_counter.h>
#define ULE_DEBUG 0 #define ULE_DEBUG 0
...@@ -141,6 +141,10 @@ status_init(void) { ...@@ -141,6 +141,10 @@ status_init(void) {
STATUS_INIT(LE_MAX_PROVISIONAL_XR, nullptr, UINT64, "max provisional xr", TOKU_ENGINE_STATUS); STATUS_INIT(LE_MAX_PROVISIONAL_XR, nullptr, UINT64, "max provisional xr", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_EXPANDED, nullptr, UINT64, "expanded", TOKU_ENGINE_STATUS); STATUS_INIT(LE_EXPANDED, nullptr, UINT64, "expanded", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_MAX_MEMSIZE, nullptr, UINT64, "max memsize", TOKU_ENGINE_STATUS); STATUS_INIT(LE_MAX_MEMSIZE, nullptr, UINT64, "max memsize", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_APPLY_GC_BYTES_IN, nullptr, PARCOUNT, "size of leafentries before garbage collection (during message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_APPLY_GC_BYTES_OUT, nullptr, PARCOUNT, "size of leafentries after garbage collection (during message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_NORMAL_GC_BYTES_IN, nullptr, PARCOUNT, "size of leafentries before garbage collection (outside message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_NORMAL_GC_BYTES_OUT,nullptr, PARCOUNT, "size of leafentries after garbage collection (outside message application)", TOKU_ENGINE_STATUS);
le_status.initialized = true; le_status.initialized = true;
} }
#undef STATUS_INIT #undef STATUS_INIT
...@@ -153,6 +157,14 @@ toku_le_get_status(LE_STATUS statp) { ...@@ -153,6 +157,14 @@ toku_le_get_status(LE_STATUS statp) {
} }
#define STATUS_VALUE(x) le_status.status[x].value.num #define STATUS_VALUE(x) le_status.status[x].value.num
#define STATUS_INC(x, d) \
do { \
if (le_status.status[x].type == PARCOUNT) { \
increment_partitioned_counter(le_status.status[x].value.parcount, d); \
} else { \
toku_sync_fetch_and_add(&le_status.status[x].value.num, d); \
} \
} while (0)
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
...@@ -441,6 +453,18 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref ...@@ -441,6 +453,18 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
done:; done:;
} }
static size_t ule_packed_memsize(ULE ule) {
// Returns: The size 'ule' would be when packed into a leafentry, or 0 if the
// topmost committed value is a delete.
if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
UXR uxr = ule_get_innermost_uxr(ule);
if (uxr_is_delete(uxr)) {
return 0;
}
}
return le_memsize_from_ule(ule);
}
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction // This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes // has no knowledge of the inner structure of either leafentry or msg. It makes
...@@ -462,6 +486,7 @@ toku_le_apply_msg(FT_MSG msg, ...@@ -462,6 +486,7 @@ toku_le_apply_msg(FT_MSG msg,
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
GC_INFO gc_info, GC_INFO gc_info,
txn_manager_state *txn_state_for_gc,
LEAFENTRY *new_leafentry_p, LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
paranoid_invariant_notnull(new_leafentry_p); paranoid_invariant_notnull(new_leafentry_p);
...@@ -486,7 +511,27 @@ toku_le_apply_msg(FT_MSG msg, ...@@ -486,7 +511,27 @@ toku_le_apply_msg(FT_MSG msg,
oldnumbytes = ule_get_innermost_numbytes(&ule, keylen); oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
} }
msg_modify_ule(&ule, msg); // modify unpacked leafentry msg_modify_ule(&ule, msg); // modify unpacked leafentry
ule_simple_garbage_collection(&ule, oldest_referenced_xid, gc_info);
// - we may be able to immediately promote the newly-apllied outermost provisonal uxr
// - either way, run simple gc first, and then full gc if there are still some committed uxrs.
ule_try_promote_provisional_outermost(&ule, oldest_referenced_xid);
ule_simple_garbage_collection(&ule,
txn_state_for_gc != nullptr ?
txn_state_for_gc->oldest_referenced_xid_for_simple_gc :
oldest_referenced_xid,
gc_info);
if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr) {
size_t size_before_gc = ule_packed_memsize(&ule);
ule_garbage_collect(&ule,
txn_state_for_gc->snapshot_xids,
txn_state_for_gc->referenced_xids,
txn_state_for_gc->live_root_txns
);
size_t size_after_gc = ule_packed_memsize(&ule);
STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
}
int rval = le_pack( int rval = le_pack(
&ule, // create packed leafentry &ule, // create packed leafentry
data_buffer, data_buffer,
...@@ -578,7 +623,18 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry, ...@@ -578,7 +623,18 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
// garbage in leafentries. // garbage in leafentries.
TXNID oldest_possible_live_xid = oldest_referenced_xid_known; TXNID oldest_possible_live_xid = oldest_referenced_xid_known;
ule_try_promote_provisional_outermost(&ule, oldest_possible_live_xid); ule_try_promote_provisional_outermost(&ule, oldest_possible_live_xid);
ule_garbage_collect(&ule, snapshot_xids, referenced_xids, live_root_txns); // No need to run simple gc here if we're going straight for full gc.
if (ule.num_cuxrs > 1) {
size_t size_before_gc = ule_packed_memsize(&ule);
ule_garbage_collect(&ule,
snapshot_xids,
referenced_xids,
live_root_txns);
size_t size_after_gc = ule_packed_memsize(&ule);
STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
}
int r = le_pack( int r = le_pack(
&ule, &ule,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment