Commit 91df7abd authored by Marko Mäkelä's avatar Marko Mäkelä

Bug #55284 Double BLOB free due to lock wait while updating PRIMARY KEY

This bug fix requires that Bug #58912 be fixed as well (bzr revision id
marko.makela@oracle.com-20101221093919-mcmmgd4zpse9567d). Otherwise,
another double BLOB free could occur when InnoDB would try to perform
an update-in-place as delete-and-insert-by-update-in-place.

row_upd_clust_rec_by_insert(): Do not disown the externally stored
columns from the old record (btr_cur_mark_extern_inherited_fields())
until after checking the foreign key constraints and successfully
inserting the updated record. If a lock wait timeout occurs between
the delete-marking of the old record and the insertion of the updated
record, mark the columns inherited before retrying the insert.
Distinguish the state UPD_NODE_INSERT_BLOB from
UPD_NODE_INSERT_CLUSTERED.

btr_cur_del_mark_set_clust_rec(): Replace the cursor with
block,rec,index,offsets so that the offsets need not be recalculated.
Assert that rec is on a clustered index leaf page.

btr_cur_disown_inherited_fields(): Renamed from
btr_cur_mark_extern_inherited_fields(). Use
upd_get_field_by_field_no(). Assert that there are externally stored
columns. Assert that a mini-transaction is passed. Remove the return
status. (The only caller, row_upd_clust_rec_by_insert(), will have
determined that some fields have changed ownership.)

btr_cur_mark_dtuple_inherited_extern(): Rename to
row_upd_clust_rec_by_insert_inherit_func() and declare as static. Add
the debug parameters rec, offsets. When rec is given, assert that the
off-page columns match those in the inesrt tuple and that the off-page
columns are owned by the record. Assert that the non-updated off-page
columns in the insert tuple are owned, and mark them inherited.

row_upd_clust_rec_by_insert_inherit(): A wrapper macro for
row_upd_clust_rec_by_insert_inherit_func().

row_undo_mod_upd_exist_sec(): Adjust a comment about
row_upd_clust_rec_by_insert().

rb:508 approved by Jimmy Yang
parent 2317da0d
2010-12-21 The InnoDB Team
* include/btr0cur.h, include/row0upd.h, btr/btr0cur.c,
row/row0umod.c, row/row0upd.c:
Fix Bug#55284 Double free of off-page columns due to lock wait
while updating PRIMARY KEY
2010-12-21 The InnoDB Team
* include/data0data.h, include/data0data.ic, include/row0upd.h,
......
......@@ -2509,27 +2509,24 @@ ulint
btr_cur_del_mark_set_clust_rec(
/*===========================*/
ulint flags, /*!< in: undo logging and locking flags */
btr_cur_t* cursor, /*!< in: cursor */
buf_block_t* block, /*!< in/out: buffer block of the record */
rec_t* rec, /*!< in/out: record */
dict_index_t* index, /*!< in: clustered index of the record */
const ulint* offsets,/*!< in: rec_get_offsets(rec) */
ibool val, /*!< in: value to set */
que_thr_t* thr, /*!< in: query thread */
mtr_t* mtr) /*!< in: mtr */
{
dict_index_t* index;
buf_block_t* block;
roll_ptr_t roll_ptr;
ulint err;
rec_t* rec;
page_zip_des_t* page_zip;
trx_t* trx;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
rec_offs_init(offsets_);
rec = btr_cur_get_rec(cursor);
index = cursor->index;
ut_ad(dict_index_is_clust(index));
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
ut_ad(buf_block_get_frame(block) == page_align(rec));
ut_ad(page_is_leaf(page_align(rec)));
#ifdef UNIV_DEBUG
if (btr_cur_print_record_ops && thr) {
......@@ -2541,13 +2538,12 @@ btr_cur_del_mark_set_clust_rec(
ut_ad(dict_index_is_clust(index));
ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
err = lock_clust_rec_modify_check_and_lock(flags,
btr_cur_get_block(cursor),
err = lock_clust_rec_modify_check_and_lock(flags, block,
rec, index, offsets, thr);
if (err != DB_SUCCESS) {
goto func_exit;
return(err);
}
err = trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr,
......@@ -2555,11 +2551,9 @@ btr_cur_del_mark_set_clust_rec(
&roll_ptr);
if (err != DB_SUCCESS) {
goto func_exit;
return(err);
}
block = btr_cur_get_block(cursor);
if (block->is_hashed) {
rw_lock_x_lock(&btr_search_latch);
}
......@@ -2582,10 +2576,6 @@ btr_cur_del_mark_set_clust_rec(
btr_cur_del_mark_set_clust_rec_log(flags, rec, index, val, trx,
roll_ptr, mtr);
func_exit:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(err);
}
......@@ -3477,107 +3467,35 @@ btr_cur_set_ownership_of_extern_field(
}
/*******************************************************************//**
Marks not updated extern fields as not-owned by this record. The ownership
is transferred to the updated record which is inserted elsewhere in the
Marks non-updated off-page fields as disowned by this record. The ownership
must be transferred to the updated record which is inserted elsewhere in the
index tree. In purge only the owner of externally stored field is allowed
to free the field.
@return TRUE if BLOB ownership was transferred */
to free the field. */
UNIV_INTERN
ibool
btr_cur_mark_extern_inherited_fields(
/*=================================*/
void
btr_cur_disown_inherited_fields(
/*============================*/
page_zip_des_t* page_zip,/*!< in/out: compressed page whose uncompressed
part will be updated, or NULL */
rec_t* rec, /*!< in/out: record in a clustered index */
dict_index_t* index, /*!< in: index of the page */
const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
const upd_t* update, /*!< in: update vector */
mtr_t* mtr) /*!< in: mtr, or NULL if not logged */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
ulint n;
ulint j;
ulint i;
ibool change_ownership = FALSE;
ut_ad(rec_offs_validate(rec, NULL, offsets));
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
ut_ad(rec_offs_any_extern(offsets));
ut_ad(mtr);
if (!rec_offs_any_extern(offsets)) {
return(FALSE);
}
n = rec_offs_n_fields(offsets);
for (i = 0; i < n; i++) {
if (rec_offs_nth_extern(offsets, i)) {
/* Check it is not in updated fields */
if (update) {
for (j = 0; j < upd_get_n_fields(update);
j++) {
if (upd_get_nth_field(update, j)
->field_no == i) {
goto updated;
}
}
}
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
if (rec_offs_nth_extern(offsets, i)
&& !upd_get_field_by_field_no(update, i)) {
btr_cur_set_ownership_of_extern_field(
page_zip, rec, index, offsets, i, FALSE, mtr);
change_ownership = TRUE;
updated:
;
}
}
return(change_ownership);
}
/*******************************************************************//**
The complement of the previous function: in an update entry may inherit
some externally stored fields from a record. We must mark them as inherited
in entry, so that they are not freed in a rollback. */
UNIV_INTERN
void
btr_cur_mark_dtuple_inherited_extern(
/*=================================*/
dtuple_t* entry, /*!< in/out: updated entry to be
inserted to clustered index */
const upd_t* update) /*!< in: update vector */
{
ulint i;
for (i = 0; i < dtuple_get_n_fields(entry); i++) {
dfield_t* dfield = dtuple_get_nth_field(entry, i);
byte* data;
ulint len;
ulint j;
if (!dfield_is_ext(dfield)) {
continue;
}
/* Check if it is in updated fields */
for (j = 0; j < upd_get_n_fields(update); j++) {
if (upd_get_nth_field(update, j)->field_no == i) {
goto is_updated;
}
}
data = dfield_get_data(dfield);
len = dfield_get_len(dfield);
data[len - BTR_EXTERN_FIELD_REF_SIZE + BTR_EXTERN_LEN]
|= BTR_EXTERN_INHERITED_FLAG;
is_updated:
;
}
}
......@@ -3616,29 +3534,6 @@ btr_cur_unmark_extern_fields(
}
}
/*******************************************************************//**
Marks all extern fields in a dtuple as owned by the record. */
UNIV_INTERN
void
btr_cur_unmark_dtuple_extern_fields(
/*================================*/
dtuple_t* entry) /*!< in/out: clustered index entry */
{
ulint i;
for (i = 0; i < dtuple_get_n_fields(entry); i++) {
dfield_t* dfield = dtuple_get_nth_field(entry, i);
if (dfield_is_ext(dfield)) {
byte* data = dfield_get_data(dfield);
ulint len = dfield_get_len(dfield);
data[len - BTR_EXTERN_FIELD_REF_SIZE + BTR_EXTERN_LEN]
&= ~BTR_EXTERN_OWNER_FLAG;
}
}
}
/*******************************************************************//**
Flags the data tuple fields that are marked as extern storage in the
update vector. We use this function to remember which fields we must
......
......@@ -332,10 +332,14 @@ ulint
btr_cur_del_mark_set_clust_rec(
/*===========================*/
ulint flags, /*!< in: undo logging and locking flags */
btr_cur_t* cursor, /*!< in: cursor */
buf_block_t* block, /*!< in/out: buffer block of the record */
rec_t* rec, /*!< in/out: record */
dict_index_t* index, /*!< in: clustered index of the record */
const ulint* offsets,/*!< in: rec_get_offsets(rec) */
ibool val, /*!< in: value to set */
que_thr_t* thr, /*!< in: query thread */
mtr_t* mtr); /*!< in: mtr */
mtr_t* mtr) /*!< in: mtr */
__attribute__((nonnull));
/***********************************************************//**
Sets a secondary index record delete mark to TRUE or FALSE.
@return DB_SUCCESS, DB_LOCK_WAIT, or error number */
......@@ -481,40 +485,22 @@ btr_estimate_number_of_different_key_vals(
/*======================================*/
dict_index_t* index); /*!< in: index */
/*******************************************************************//**
Marks not updated extern fields as not-owned by this record. The ownership
is transferred to the updated record which is inserted elsewhere in the
Marks non-updated off-page fields as disowned by this record. The ownership
must be transferred to the updated record which is inserted elsewhere in the
index tree. In purge only the owner of externally stored field is allowed
to free the field.
@return TRUE if BLOB ownership was transferred */
to free the field. */
UNIV_INTERN
ibool
btr_cur_mark_extern_inherited_fields(
/*=================================*/
void
btr_cur_disown_inherited_fields(
/*============================*/
page_zip_des_t* page_zip,/*!< in/out: compressed page whose uncompressed
part will be updated, or NULL */
rec_t* rec, /*!< in/out: record in a clustered index */
dict_index_t* index, /*!< in: index of the page */
const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
const upd_t* update, /*!< in: update vector */
mtr_t* mtr); /*!< in: mtr, or NULL if not logged */
/*******************************************************************//**
The complement of the previous function: in an update entry may inherit
some externally stored fields from a record. We must mark them as inherited
in entry, so that they are not freed in a rollback. */
UNIV_INTERN
void
btr_cur_mark_dtuple_inherited_extern(
/*=================================*/
dtuple_t* entry, /*!< in/out: updated entry to be
inserted to clustered index */
const upd_t* update); /*!< in: update vector */
/*******************************************************************//**
Marks all extern fields in a dtuple as owned by the record. */
UNIV_INTERN
void
btr_cur_unmark_dtuple_extern_fields(
/*================================*/
dtuple_t* entry); /*!< in/out: clustered index entry */
mtr_t* mtr) /*!< in/out: mini-transaction */
__attribute__((nonnull(2,3,4,5,6)));
/*******************************************************************//**
Stores the fields in big_rec_vec to the tablespace and puts pointers to
them in rec. The extern flags in rec will have to be set beforehand.
......
......@@ -465,11 +465,16 @@ struct upd_node_struct{
#define UPD_NODE_INSERT_CLUSTERED 3 /* clustered index record should be
inserted, old record is already delete
marked */
#define UPD_NODE_UPDATE_ALL_SEC 4 /* an ordering field of the clustered
#define UPD_NODE_INSERT_BLOB 4 /* clustered index record should be
inserted, old record is already
delete-marked; non-updated BLOBs
should be inherited by the new record
and disowned by the old record */
#define UPD_NODE_UPDATE_ALL_SEC 5 /* an ordering field of the clustered
index record was changed, or this is
a delete operation: should update
all the secondary index records */
#define UPD_NODE_UPDATE_SOME_SEC 5 /* secondary index entries should be
#define UPD_NODE_UPDATE_SOME_SEC 6 /* secondary index entries should be
looked at and updated if an ordering
field changed */
......
......@@ -676,11 +676,10 @@ row_undo_mod_upd_exist_sec(
index, heap);
if (UNIV_UNLIKELY(!entry)) {
/* The server must have crashed in
row_upd_clust_rec_by_insert(), in
row_ins_index_entry_low() before
btr_store_big_rec_extern_fields()
has written the externally stored columns
(BLOBs) of the new clustered index entry. */
row_upd_clust_rec_by_insert() before
the updated externally stored columns (BLOBs)
of the new clustered index entry were
written. */
/* The table must be in DYNAMIC or COMPRESSED
format. REDUNDANT and COMPACT formats
......
......@@ -1616,6 +1616,91 @@ row_upd_sec_step(
return(DB_SUCCESS);
}
#ifdef UNIV_DEBUG
# define row_upd_clust_rec_by_insert_inherit(rec,offsets,entry,update) \
row_upd_clust_rec_by_insert_inherit_func(rec,offsets,entry,update)
#else /* UNIV_DEBUG */
# define row_upd_clust_rec_by_insert_inherit(rec,offsets,entry,update) \
row_upd_clust_rec_by_insert_inherit_func(entry,update)
#endif /* UNIV_DEBUG */
/*******************************************************************//**
Mark non-updated off-page columns inherited when the primary key is
updated. We must mark them as inherited in entry, so that they are not
freed in a rollback. A limited version of this function used to be
called btr_cur_mark_dtuple_inherited_extern().
@return TRUE if any columns were inherited */
static __attribute__((warn_unused_result))
ibool
row_upd_clust_rec_by_insert_inherit_func(
/*=====================================*/
#ifdef UNIV_DEBUG
const rec_t* rec, /*!< in: old record, or NULL */
const ulint* offsets,/*!< in: rec_get_offsets(rec), or NULL */
#endif /* UNIV_DEBUG */
dtuple_t* entry, /*!< in/out: updated entry to be
inserted into the clustered index */
const upd_t* update) /*!< in: update vector */
{
ibool inherit = FALSE;
ulint i;
ut_ad(!rec == !offsets);
ut_ad(!rec || rec_offs_any_extern(offsets));
for (i = 0; i < dtuple_get_n_fields(entry); i++) {
dfield_t* dfield = dtuple_get_nth_field(entry, i);
byte* data;
ulint len;
ut_ad(!offsets
|| !rec_offs_nth_extern(offsets, i)
== !dfield_is_ext(dfield)
|| upd_get_field_by_field_no(update, i));
if (!dfield_is_ext(dfield)
|| upd_get_field_by_field_no(update, i)) {
continue;
}
#ifdef UNIV_DEBUG
if (UNIV_LIKELY(rec != NULL)) {
const byte* rec_data
= rec_get_nth_field(rec, offsets, i, &len);
ut_ad(len == dfield_get_len(dfield));
ut_ad(len != UNIV_SQL_NULL);
ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
rec_data += len - BTR_EXTERN_FIELD_REF_SIZE;
/* The pointer must not be zero. */
ut_ad(memcmp(rec_data, field_ref_zero,
BTR_EXTERN_FIELD_REF_SIZE));
/* The BLOB must be owned. */
ut_ad(!(rec_data[BTR_EXTERN_LEN]
& BTR_EXTERN_OWNER_FLAG));
}
#endif /* UNIV_DEBUG */
len = dfield_get_len(dfield);
ut_a(len != UNIV_SQL_NULL);
ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
data = dfield_get_data(dfield) + len
- BTR_EXTERN_FIELD_REF_SIZE;
/* The pointer must not be zero. */
ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
/* The BLOB must be owned. */
ut_a(!(data[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
data[BTR_EXTERN_LEN] |= BTR_EXTERN_INHERITED_FLAG;
/* The BTR_EXTERN_INHERITED_FLAG only matters in
rollback. Purge will always free the extern fields of
a delete-marked row. */
inherit = TRUE;
}
return(inherit);
}
/***********************************************************//**
Marks the clustered index record deleted and inserts the updated version
of the record to the index. This function should be used when the ordering
......@@ -1634,14 +1719,16 @@ row_upd_clust_rec_by_insert(
a foreign key constraint */
mtr_t* mtr) /*!< in/out: mtr; gets committed here */
{
mem_heap_t* heap = NULL;
mem_heap_t* heap;
btr_pcur_t* pcur;
btr_cur_t* btr_cur;
trx_t* trx;
dict_table_t* table;
dtuple_t* entry;
ulint err;
ibool change_ownership = FALSE;
ibool change_ownership = FALSE;
rec_t* rec;
ulint* offsets = NULL;
ut_ad(node);
ut_ad(dict_index_is_clust(index));
......@@ -1651,77 +1738,112 @@ row_upd_clust_rec_by_insert(
pcur = node->pcur;
btr_cur = btr_pcur_get_btr_cur(pcur);
if (node->state != UPD_NODE_INSERT_CLUSTERED) {
rec_t* rec;
dict_index_t* index;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets;
rec_offs_init(offsets_);
heap = mem_heap_create(1000);
err = btr_cur_del_mark_set_clust_rec(BTR_NO_LOCKING_FLAG,
btr_cur, TRUE, thr, mtr);
entry = row_build_index_entry(node->upd_row, node->upd_ext,
index, heap);
ut_a(entry);
row_upd_index_entry_sys_field(entry, index, DATA_TRX_ID, trx->id);
switch (node->state) {
default:
ut_error;
case UPD_NODE_INSERT_BLOB:
/* A lock wait occurred in row_ins_index_entry() in
the previous invocation of this function. Mark the
off-page columns in the entry inherited. */
change_ownership = row_upd_clust_rec_by_insert_inherit(
NULL, NULL, entry, node->update);
ut_a(change_ownership);
/* fall through */
case UPD_NODE_INSERT_CLUSTERED:
/* A lock wait occurred in row_ins_index_entry() in
the previous invocation of this function. */
break;
case UPD_NODE_UPDATE_CLUSTERED:
/* This is the first invocation of the function where
we update the primary key. Delete-mark the old record
in the clustered index and prepare to insert a new entry. */
rec = btr_cur_get_rec(btr_cur);
offsets = rec_get_offsets(rec, index, NULL,
ULINT_UNDEFINED, &heap);
ut_ad(page_rec_is_user_rec(rec));
err = btr_cur_del_mark_set_clust_rec(
BTR_NO_LOCKING_FLAG, btr_cur_get_block(btr_cur),
rec, index, offsets, TRUE, thr, mtr);
if (err != DB_SUCCESS) {
err_exit:
mtr_commit(mtr);
mem_heap_free(heap);
return(err);
}
/* Mark as not-owned the externally stored fields which the new
row inherits from the delete marked record: purge should not
free those externally stored fields even if the delete marked
record is removed from the index tree, or updated. */
/* If the the new row inherits externally stored
fields (off-page columns a.k.a. BLOBs) from the
delete-marked old record, mark them disowned by the
old record and owned by the new entry. */
if (rec_offs_any_extern(offsets)) {
change_ownership = row_upd_clust_rec_by_insert_inherit(
rec, offsets, entry, node->update);
if (change_ownership) {
btr_pcur_store_position(pcur, mtr);
}
}
rec = btr_cur_get_rec(btr_cur);
index = dict_table_get_first_index(table);
offsets = rec_get_offsets(rec, index, offsets_,
ULINT_UNDEFINED, &heap);
change_ownership = btr_cur_mark_extern_inherited_fields(
btr_cur_get_page_zip(btr_cur), rec, index, offsets,
node->update, mtr);
if (check_ref) {
/* NOTE that the following call loses
the position of pcur ! */
err = row_upd_check_references_constraints(
node, pcur, table, index, offsets, thr, mtr);
if (err != DB_SUCCESS) {
mtr_commit(mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(err);
goto err_exit;
}
}
}
mtr_commit(mtr);
if (!heap) {
heap = mem_heap_create(500);
}
node->state = UPD_NODE_INSERT_CLUSTERED;
err = row_ins_index_entry(index, entry,
node->upd_ext ? node->upd_ext->n_ext : 0,
TRUE, thr);
node->state = change_ownership
? UPD_NODE_INSERT_BLOB
: UPD_NODE_INSERT_CLUSTERED;
entry = row_build_index_entry(node->upd_row, node->upd_ext,
index, heap);
ut_a(entry);
if (err == DB_SUCCESS && change_ownership) {
/* Mark the non-updated fields disowned by the old record. */
row_upd_index_entry_sys_field(entry, index, DATA_TRX_ID, trx->id);
/* NOTE: this transaction has an x-lock on the record
and therefore other transactions cannot modify the
record when we have no latch on the page. In addition,
we assume that other query threads of the same
transaction do not modify the record in the meantime.
Therefore we can assert that the restoration of the
cursor succeeds. */
if (change_ownership) {
/* If we return from a lock wait, for example, we may have
extern fields marked as not-owned in entry (marked in the
if-branch above). We must unmark them, take the ownership
back. */
mtr_start(mtr);
btr_cur_unmark_dtuple_extern_fields(entry);
if (!btr_pcur_restore_position(BTR_MODIFY_LEAF, pcur, mtr)) {
ut_error;
}
/* We must mark non-updated extern fields in entry as
inherited, so that a possible rollback will not free them. */
rec = btr_cur_get_rec(btr_cur);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
ut_ad(page_rec_is_user_rec(rec));
btr_cur_mark_dtuple_inherited_extern(entry, node->update);
btr_cur_disown_inherited_fields(
btr_cur_get_page_zip(btr_cur),
rec, index, offsets, node->update, mtr);
mtr_commit(mtr);
}
err = row_ins_index_entry(index, entry,
node->upd_ext ? node->upd_ext->n_ext : 0,
TRUE, thr);
mem_heap_free(heap);
return(err);
......@@ -1865,8 +1987,9 @@ row_upd_del_mark_clust_rec(
/* Mark the clustered index record deleted; we do not have to check
locks, because we assume that we have an x-lock on the record */
err = btr_cur_del_mark_set_clust_rec(BTR_NO_LOCKING_FLAG,
btr_cur, TRUE, thr, mtr);
err = btr_cur_del_mark_set_clust_rec(
BTR_NO_LOCKING_FLAG, btr_cur_get_block(btr_cur),
btr_cur_get_rec(btr_cur), index, offsets, TRUE, thr, mtr);
if (err == DB_SUCCESS && check_ref) {
/* NOTE that the following call loses the position of pcur ! */
......@@ -2083,7 +2206,8 @@ row_upd(
}
if (node->state == UPD_NODE_UPDATE_CLUSTERED
|| node->state == UPD_NODE_INSERT_CLUSTERED) {
|| node->state == UPD_NODE_INSERT_CLUSTERED
|| node->state == UPD_NODE_INSERT_BLOB) {
log_free_check();
err = row_upd_clust_step(node, thr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment