From c84c4b107c7d471588433a4d1fa1c3b8b37dccf0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= <marko.makela@oracle.com>
Date: Wed, 2 Jun 2010 13:37:14 +0300
Subject: [PATCH] Bug#53674: InnoDB: Error: unlock row could not find a 4 mode
 lock on the record

In semi-consistent read, only unlock freshly locked non-matching records.

lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.

enum db_err: Add DB_SUCCESS_LOCKED_REC for indicating a successful
operation where a record lock was created.

lock_sec_rec_read_check_and_lock(),
lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
new record lock was created. Adjust callers.

row_unlock_for_mysql(): Correct the function documentation.

row_prebuilt_t::new_rec_locks: Correct the documentation.
---
 .../innodb_plugin/r/innodb_bug53674.result    |  11 ++
 .../t/innodb_bug53674-master.opt              |   1 +
 .../innodb_plugin/t/innodb_bug53674.test      |   8 +
 storage/innodb_plugin/include/db0err.h        |   2 +
 storage/innodb_plugin/include/lock0lock.h     |  12 +-
 storage/innodb_plugin/include/row0mysql.h     |  54 ++++---
 storage/innodb_plugin/lock/lock0lock.c        | 138 +++++++++++-------
 storage/innodb_plugin/row/row0ins.c           | 130 +++++++++--------
 storage/innodb_plugin/row/row0mysql.c         |  31 ++--
 storage/innodb_plugin/row/row0sel.c           | 132 +++++++++++------
 10 files changed, 308 insertions(+), 211 deletions(-)
 create mode 100644 mysql-test/suite/innodb_plugin/r/innodb_bug53674.result
 create mode 100644 mysql-test/suite/innodb_plugin/t/innodb_bug53674-master.opt
 create mode 100644 mysql-test/suite/innodb_plugin/t/innodb_bug53674.test

diff --git a/mysql-test/suite/innodb_plugin/r/innodb_bug53674.result b/mysql-test/suite/innodb_plugin/r/innodb_bug53674.result
new file mode 100644
index 0000000000..c4021c2e7c
--- /dev/null
+++ b/mysql-test/suite/innodb_plugin/r/innodb_bug53674.result
@@ -0,0 +1,11 @@
+create table bug53674(a int)engine=innodb;
+insert into bug53674 values (1),(2);
+start transaction;
+select * from bug53674 for update;
+a
+1
+2
+select * from bug53674 where a=(select a from bug53674 where a > 1);
+a
+2
+drop table bug53674;
diff --git a/mysql-test/suite/innodb_plugin/t/innodb_bug53674-master.opt b/mysql-test/suite/innodb_plugin/t/innodb_bug53674-master.opt
new file mode 100644
index 0000000000..f1cfd7ab6c
--- /dev/null
+++ b/mysql-test/suite/innodb_plugin/t/innodb_bug53674-master.opt
@@ -0,0 +1 @@
+--log-bin --innodb-locks-unsafe-for-binlog --binlog-format=mixed
diff --git a/mysql-test/suite/innodb_plugin/t/innodb_bug53674.test b/mysql-test/suite/innodb_plugin/t/innodb_bug53674.test
new file mode 100644
index 0000000000..e3cbf4466a
--- /dev/null
+++ b/mysql-test/suite/innodb_plugin/t/innodb_bug53674.test
@@ -0,0 +1,8 @@
+-- source include/have_innodb_plugin.inc
+
+create table bug53674(a int)engine=innodb;
+insert into bug53674 values (1),(2);
+start transaction;
+select * from bug53674 for update;
+select * from bug53674 where a=(select a from bug53674 where a > 1);
+drop table bug53674;
diff --git a/storage/innodb_plugin/include/db0err.h b/storage/innodb_plugin/include/db0err.h
index 747e9b5364..c841c2b4af 100644
--- a/storage/innodb_plugin/include/db0err.h
+++ b/storage/innodb_plugin/include/db0err.h
@@ -28,6 +28,8 @@ Created 5/24/1996 Heikki Tuuri
 
 
 enum db_err {
+	DB_SUCCESS_LOCKED_REC = 9,	/*!< like DB_SUCCESS, but a new
+					explicit record lock was created */
 	DB_SUCCESS = 10,
 
 	/* The following are error codes */
diff --git a/storage/innodb_plugin/include/lock0lock.h b/storage/innodb_plugin/include/lock0lock.h
index 7d76cbe3c7..b3e1e5c453 100644
--- a/storage/innodb_plugin/include/lock0lock.h
+++ b/storage/innodb_plugin/include/lock0lock.h
@@ -340,11 +340,12 @@ lock_sec_rec_modify_check_and_lock(
 	que_thr_t*	thr,	/*!< in: query thread */
 	mtr_t*		mtr);	/*!< in/out: mini-transaction */
 /*********************************************************************//**
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
 secondary index record.
-@return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 UNIV_INTERN
-ulint
+enum db_err
 lock_sec_rec_read_check_and_lock(
 /*=============================*/
 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
@@ -371,9 +372,10 @@ if the query thread should anyway be suspended for some reason; if not, then
 puts the transaction and the query thread to the lock wait state and inserts a
 waiting request for a record lock to the lock queue. Sets the requested mode
 lock on the record.
-@return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 UNIV_INTERN
-ulint
+enum db_err
 lock_clust_rec_read_check_and_lock(
 /*===============================*/
 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
diff --git a/storage/innodb_plugin/include/row0mysql.h b/storage/innodb_plugin/include/row0mysql.h
index e90742abe7..39ea240772 100644
--- a/storage/innodb_plugin/include/row0mysql.h
+++ b/storage/innodb_plugin/include/row0mysql.h
@@ -264,27 +264,26 @@ row_update_for_mysql(
 	row_prebuilt_t*	prebuilt);	/*!< in: prebuilt struct in MySQL
 					handle */
 /*********************************************************************//**
-This can only be used when srv_locks_unsafe_for_binlog is TRUE or
-session is using a READ COMMITTED isolation level. Before
-calling this function we must use trx_reset_new_rec_lock_info() and
-trx_register_new_rec_lock() to store the information which new record locks
-really were set. This function removes a newly set lock under prebuilt->pcur,
-and also under prebuilt->clust_pcur. Currently, this is only used and tested
-in the case of an UPDATE or a DELETE statement, where the row lock is of the
-LOCK_X type.
-Thus, this implements a 'mini-rollback' that releases the latest record
-locks we set.
-@return	error code or DB_SUCCESS */
+This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
+session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
+Before calling this function row_search_for_mysql() must have
+initialized prebuilt->new_rec_locks to store the information which new
+record locks really were set. This function removes a newly set
+clustered index record lock under prebuilt->pcur or
+prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
+releases the latest clustered index record lock we set.
+@return error code or DB_SUCCESS */
 UNIV_INTERN
 int
 row_unlock_for_mysql(
 /*=================*/
-	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct in MySQL
+	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt struct in MySQL
 					handle */
-	ibool		has_latches_on_recs);/*!< TRUE if called so that we have
-					the latches on the records under pcur
-					and clust_pcur, and we do not need to
-					reposition the cursors. */
+	ibool		has_latches_on_recs);/*!< in: TRUE if called
+					so that we have the latches on
+					the records under pcur and
+					clust_pcur, and we do not need
+					to reposition the cursors. */
 /*********************************************************************//**
 Creates an query graph node of 'update' type to be used in the MySQL
 interface.
@@ -702,18 +701,17 @@ struct row_prebuilt_struct {
 	ulint		new_rec_locks;	/*!< normally 0; if
 					srv_locks_unsafe_for_binlog is
 					TRUE or session is using READ
-					COMMITTED isolation level, in a
-					cursor search, if we set a new
-					record lock on an index, this is
-					incremented; this is used in
-					releasing the locks under the
-					cursors if we are performing an
-					UPDATE and we determine after
-					retrieving the row that it does
-					not need to be locked; thus,
-					these can be used to implement a
-					'mini-rollback' that releases
-					the latest record locks */
+					COMMITTED or READ UNCOMMITTED
+					isolation level, set in
+					row_search_for_mysql() if we set a new
+					record lock on the secondary
+					or clustered index; this is
+					used in row_unlock_for_mysql()
+					when releasing the lock under
+					the cursor if we determine
+					after retrieving the row that
+					it does not need to be locked
+					('mini-rollback') */
 	ulint		mysql_prefix_len;/*!< byte offset of the end of
 					the last requested column */
 	ulint		mysql_row_len;	/*!< length in bytes of a row in the
diff --git a/storage/innodb_plugin/lock/lock0lock.c b/storage/innodb_plugin/lock/lock0lock.c
index d6d9f6668d..77d69d11a2 100644
--- a/storage/innodb_plugin/lock/lock0lock.c
+++ b/storage/innodb_plugin/lock/lock0lock.c
@@ -1733,11 +1733,11 @@ lock_rec_create(
 Enqueues a waiting request for a lock which cannot be granted immediately.
 Checks for deadlocks.
 @return DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED, or
-DB_SUCCESS; DB_SUCCESS means that there was a deadlock, but another
-transaction was chosen as a victim, and we got the lock immediately:
-no need to wait then */
+DB_SUCCESS_LOCKED_REC; DB_SUCCESS_LOCKED_REC means that
+there was a deadlock, but another transaction was chosen as a victim,
+and we got the lock immediately: no need to wait then */
 static
-ulint
+enum db_err
 lock_rec_enqueue_waiting(
 /*=====================*/
 	ulint			type_mode,/*!< in: lock mode this
@@ -1809,7 +1809,7 @@ lock_rec_enqueue_waiting(
 
 	if (trx->wait_lock == NULL) {
 
-		return(DB_SUCCESS);
+		return(DB_SUCCESS_LOCKED_REC);
 	}
 
 	trx->que_state = TRX_QUE_LOCK_WAIT;
@@ -1925,6 +1925,16 @@ somebody_waits:
 	return(lock_rec_create(type_mode, block, heap_no, index, trx));
 }
 
+/** Record locking request status */
+enum lock_rec_req_status {
+	/** Failed to acquire a lock */
+	LOCK_REC_FAIL,
+	/** Succeeded in acquiring a lock (implicit or already acquired) */
+	LOCK_REC_SUCCESS,
+	/** Explicitly created a new lock */
+	LOCK_REC_SUCCESS_CREATED
+};
+
 /*********************************************************************//**
 This is a fast routine for locking a record in the most common cases:
 there are no explicit locks on the page, or there is just one lock, owned
@@ -1932,9 +1942,9 @@ by this transaction, and of the right type_mode. This is a low-level function
 which does NOT look at implicit locks! Checks lock compatibility within
 explicit locks. This function sets a normal next-key lock, or in the case of
 a page supremum record, a gap type lock.
-@return	TRUE if locking succeeded */
+@return whether the locking succeeded */
 UNIV_INLINE
-ibool
+enum lock_rec_req_status
 lock_rec_lock_fast(
 /*===============*/
 	ibool			impl,	/*!< in: if TRUE, no lock is set
@@ -1973,19 +1983,19 @@ lock_rec_lock_fast(
 			lock_rec_create(mode, block, heap_no, index, trx);
 		}
 
-		return(TRUE);
+		return(LOCK_REC_SUCCESS_CREATED);
 	}
 
 	if (lock_rec_get_next_on_page(lock)) {
 
-		return(FALSE);
+		return(LOCK_REC_FAIL);
 	}
 
 	if (lock->trx != trx
 	    || lock->type_mode != (mode | LOCK_REC)
 	    || lock_rec_get_n_bits(lock) <= heap_no) {
 
-		return(FALSE);
+		return(LOCK_REC_FAIL);
 	}
 
 	if (!impl) {
@@ -1994,10 +2004,11 @@ lock_rec_lock_fast(
 
 		if (!lock_rec_get_nth_bit(lock, heap_no)) {
 			lock_rec_set_nth_bit(lock, heap_no);
+			return(LOCK_REC_SUCCESS_CREATED);
 		}
 	}
 
-	return(TRUE);
+	return(LOCK_REC_SUCCESS);
 }
 
 /*********************************************************************//**
@@ -2005,9 +2016,10 @@ This is the general, and slower, routine for locking a record. This is a
 low-level function which does NOT look at implicit locks! Checks lock
 compatibility within explicit locks. This function sets a normal next-key
 lock, or in the case of a page supremum record, a gap type lock.
-@return	DB_SUCCESS, DB_LOCK_WAIT, or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 static
-ulint
+enum db_err
 lock_rec_lock_slow(
 /*===============*/
 	ibool			impl,	/*!< in: if TRUE, no lock is set
@@ -2024,7 +2036,6 @@ lock_rec_lock_slow(
 	que_thr_t*		thr)	/*!< in: query thread */
 {
 	trx_t*	trx;
-	ulint	err;
 
 	ut_ad(mutex_own(&kernel_mutex));
 	ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@@ -2043,27 +2054,23 @@ lock_rec_lock_slow(
 		/* The trx already has a strong enough lock on rec: do
 		nothing */
 
-		err = DB_SUCCESS;
 	} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
 
 		/* If another transaction has a non-gap conflicting request in
 		the queue, as this transaction does not have a lock strong
 		enough already granted on the record, we have to wait. */
 
-		err = lock_rec_enqueue_waiting(mode, block, heap_no,
-					       index, thr);
-	} else {
-		if (!impl) {
-			/* Set the requested lock on the record */
+		return(lock_rec_enqueue_waiting(mode, block, heap_no,
+						index, thr));
+	} else if (!impl) {
+		/* Set the requested lock on the record */
 
-			lock_rec_add_to_queue(LOCK_REC | mode, block,
-					      heap_no, index, trx);
-		}
-
-		err = DB_SUCCESS;
+		lock_rec_add_to_queue(LOCK_REC | mode, block,
+				      heap_no, index, trx);
+		return(DB_SUCCESS_LOCKED_REC);
 	}
 
-	return(err);
+	return(DB_SUCCESS);
 }
 
 /*********************************************************************//**
@@ -2072,9 +2079,10 @@ possible, enqueues a waiting lock request. This is a low-level function
 which does NOT look at implicit locks! Checks lock compatibility within
 explicit locks. This function sets a normal next-key lock, or in the case
 of a page supremum record, a gap type lock.
-@return	DB_SUCCESS, DB_LOCK_WAIT, or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 static
-ulint
+enum db_err
 lock_rec_lock(
 /*==========*/
 	ibool			impl,	/*!< in: if TRUE, no lock is set
@@ -2090,8 +2098,6 @@ lock_rec_lock(
 	dict_index_t*		index,	/*!< in: index of record */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	ulint	err;
-
 	ut_ad(mutex_own(&kernel_mutex));
 	ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
 	      || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
@@ -2103,18 +2109,20 @@ lock_rec_lock(
 	      || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
 	      || mode - (LOCK_MODE_MASK & mode) == 0);
 
-	if (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
-
-		/* We try a simplified and faster subroutine for the most
-		common cases */
-
-		err = DB_SUCCESS;
-	} else {
-		err = lock_rec_lock_slow(impl, mode, block,
-					 heap_no, index, thr);
+	/* We try a simplified and faster subroutine for the most
+	common cases */
+	switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
+	case LOCK_REC_SUCCESS:
+		return(DB_SUCCESS);
+	case LOCK_REC_SUCCESS_CREATED:
+		return(DB_SUCCESS_LOCKED_REC);
+	case LOCK_REC_FAIL:
+		return(lock_rec_lock_slow(impl, mode, block,
+					  heap_no, index, thr));
 	}
 
-	return(err);
+	ut_error;
+	return(DB_ERROR);
 }
 
 /*********************************************************************//**
@@ -5072,7 +5080,14 @@ lock_rec_insert_check_and_lock(
 
 	lock_mutex_exit_kernel();
 
-	if ((err == DB_SUCCESS) && !dict_index_is_clust(index)) {
+	switch (err) {
+	case DB_SUCCESS_LOCKED_REC:
+		err = DB_SUCCESS;
+		/* fall through */
+	case DB_SUCCESS:
+		if (dict_index_is_clust(index)) {
+			break;
+		}
 		/* Update the page max trx id field */
 		page_update_max_trx_id(block,
 				       buf_block_get_page_zip(block),
@@ -5195,6 +5210,10 @@ lock_clust_rec_modify_check_and_lock(
 
 	ut_ad(lock_rec_queue_validate(block, rec, index, offsets));
 
+	if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+		err = DB_SUCCESS;
+	}
+
 	return(err);
 }
 
@@ -5261,22 +5280,27 @@ lock_sec_rec_modify_check_and_lock(
 	}
 #endif /* UNIV_DEBUG */
 
-	if (err == DB_SUCCESS) {
+	if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
 		/* Update the page max trx id field */
+		/* It might not be necessary to do this if
+		err == DB_SUCCESS (no new lock created),
+		but it should not cost too much performance. */
 		page_update_max_trx_id(block,
 				       buf_block_get_page_zip(block),
 				       thr_get_trx(thr)->id, mtr);
+		err = DB_SUCCESS;
 	}
 
 	return(err);
 }
 
 /*********************************************************************//**
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
 secondary index record.
-@return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 UNIV_INTERN
-ulint
+enum db_err
 lock_sec_rec_read_check_and_lock(
 /*=============================*/
 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
@@ -5297,8 +5321,8 @@ lock_sec_rec_read_check_and_lock(
 					LOCK_REC_NOT_GAP */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	ulint	err;
-	ulint	heap_no;
+	enum db_err	err;
+	ulint		heap_no;
 
 	ut_ad(!dict_index_is_clust(index));
 	ut_ad(block->frame == page_align(rec));
@@ -5349,9 +5373,10 @@ if the query thread should anyway be suspended for some reason; if not, then
 puts the transaction and the query thread to the lock wait state and inserts a
 waiting request for a record lock to the lock queue. Sets the requested mode
 lock on the record.
-@return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
 UNIV_INTERN
-ulint
+enum db_err
 lock_clust_rec_read_check_and_lock(
 /*===============================*/
 	ulint			flags,	/*!< in: if BTR_NO_LOCKING_FLAG
@@ -5372,8 +5397,8 @@ lock_clust_rec_read_check_and_lock(
 					LOCK_REC_NOT_GAP */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	ulint	err;
-	ulint	heap_no;
+	enum db_err	err;
+	ulint		heap_no;
 
 	ut_ad(dict_index_is_clust(index));
 	ut_ad(block->frame == page_align(rec));
@@ -5444,17 +5469,22 @@ lock_clust_rec_read_check_and_lock_alt(
 	mem_heap_t*	tmp_heap	= NULL;
 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
 	ulint*		offsets		= offsets_;
-	ulint		ret;
+	ulint		err;
 	rec_offs_init(offsets_);
 
 	offsets = rec_get_offsets(rec, index, offsets,
 				  ULINT_UNDEFINED, &tmp_heap);
-	ret = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
+	err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
 						 offsets, mode, gap_mode, thr);
 	if (tmp_heap) {
 		mem_heap_free(tmp_heap);
 	}
-	return(ret);
+
+	if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+		err = DB_SUCCESS;
+	}
+
+	return(err);
 }
 
 /*******************************************************************//**
diff --git a/storage/innodb_plugin/row/row0ins.c b/storage/innodb_plugin/row/row0ins.c
index 230dc45dad..09d2ffc743 100644
--- a/storage/innodb_plugin/row/row0ins.c
+++ b/storage/innodb_plugin/row/row0ins.c
@@ -1121,9 +1121,9 @@ nonstandard_exit_func:
 /*********************************************************************//**
 Sets a shared lock on a record. Used in locking possible duplicate key
 records and also in checking foreign key constraints.
-@return	DB_SUCCESS or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 static
-ulint
+enum db_err
 row_ins_set_shared_rec_lock(
 /*========================*/
 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
@@ -1134,7 +1134,7 @@ row_ins_set_shared_rec_lock(
 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	ulint	err;
+	enum db_err	err;
 
 	ut_ad(rec_offs_validate(rec, index, offsets));
 
@@ -1152,9 +1152,9 @@ row_ins_set_shared_rec_lock(
 /*********************************************************************//**
 Sets a exclusive lock on a record. Used in locking possible duplicate key
 records
-@return	DB_SUCCESS or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 static
-ulint
+enum db_err
 row_ins_set_exclusive_rec_lock(
 /*===========================*/
 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
@@ -1165,7 +1165,7 @@ row_ins_set_exclusive_rec_lock(
 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	ulint	err;
+	enum db_err	err;
 
 	ut_ad(rec_offs_validate(rec, index, offsets));
 
@@ -1205,7 +1205,6 @@ row_ins_check_foreign_constraint(
 	dict_index_t*	check_index;
 	ulint		n_fields_cmp;
 	btr_pcur_t	pcur;
-	ibool		moved;
 	int		cmp;
 	ulint		err;
 	ulint		i;
@@ -1336,13 +1335,13 @@ run_again:
 
 	/* Scan index records and check if there is a matching record */
 
-	for (;;) {
+	do {
 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
 
 		if (page_rec_is_infimum(rec)) {
 
-			goto next_rec;
+			continue;
 		}
 
 		offsets = rec_get_offsets(rec, check_index,
@@ -1353,12 +1352,13 @@ run_again:
 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
 							  rec, check_index,
 							  offsets, thr);
-			if (err != DB_SUCCESS) {
-
-				break;
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				continue;
+			default:
+				goto end_scan;
 			}
-
-			goto next_rec;
 		}
 
 		cmp = cmp_dtuple_rec(entry, rec, offsets);
@@ -1369,9 +1369,12 @@ run_again:
 				err = row_ins_set_shared_rec_lock(
 					LOCK_ORDINARY, block,
 					rec, check_index, offsets, thr);
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
 					break;
+				default:
+					goto end_scan;
 				}
 			} else {
 				/* Found a matching record. Lock only
@@ -1382,15 +1385,18 @@ run_again:
 					LOCK_REC_NOT_GAP, block,
 					rec, check_index, offsets, thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
 					break;
+				default:
+					goto end_scan;
 				}
 
 				if (check_ref) {
 					err = DB_SUCCESS;
 
-					break;
+					goto end_scan;
 				} else if (foreign->type != 0) {
 					/* There is an ON UPDATE or ON DELETE
 					condition: check them in a separate
@@ -1416,7 +1422,7 @@ run_again:
 							err = DB_FOREIGN_DUPLICATE_KEY;
 						}
 
-						break;
+						goto end_scan;
 					}
 
 					/* row_ins_foreign_check_on_constraint
@@ -1429,49 +1435,41 @@ run_again:
 						thr, foreign, rec, entry);
 
 					err = DB_ROW_IS_REFERENCED;
-					break;
+					goto end_scan;
 				}
 			}
-		}
+		} else {
+			ut_a(cmp < 0);
 
-		if (cmp < 0) {
 			err = row_ins_set_shared_rec_lock(
 				LOCK_GAP, block,
 				rec, check_index, offsets, thr);
-			if (err != DB_SUCCESS) {
 
-				break;
-			}
-
-			if (check_ref) {
-				err = DB_NO_REFERENCED_ROW;
-				row_ins_foreign_report_add_err(
-					trx, foreign, rec, entry);
-			} else {
-				err = DB_SUCCESS;
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				if (check_ref) {
+					err = DB_NO_REFERENCED_ROW;
+					row_ins_foreign_report_add_err(
+						trx, foreign, rec, entry);
+				} else {
+					err = DB_SUCCESS;
+				}
 			}
 
-			break;
+			goto end_scan;
 		}
+	} while (btr_pcur_move_to_next(&pcur, &mtr));
 
-		ut_a(cmp == 0);
-next_rec:
-		moved = btr_pcur_move_to_next(&pcur, &mtr);
-
-		if (!moved) {
-			if (check_ref) {
-				rec = btr_pcur_get_rec(&pcur);
-				row_ins_foreign_report_add_err(
-					trx, foreign, rec, entry);
-				err = DB_NO_REFERENCED_ROW;
-			} else {
-				err = DB_SUCCESS;
-			}
-
-			break;
-		}
+	if (check_ref) {
+		row_ins_foreign_report_add_err(
+			trx, foreign, btr_pcur_get_rec(&pcur), entry);
+		err = DB_NO_REFERENCED_ROW;
+	} else {
+		err = DB_SUCCESS;
 	}
 
+end_scan:
 	btr_pcur_close(&pcur);
 
 	mtr_commit(&mtr);
@@ -1719,9 +1717,13 @@ row_ins_scan_sec_index_for_duplicate(
 				rec, index, offsets, thr);
 		}
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS_LOCKED_REC:
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
 			break;
+		default:
+			goto end_scan;
 		}
 
 		if (page_rec_is_supremum(rec)) {
@@ -1738,17 +1740,15 @@ row_ins_scan_sec_index_for_duplicate(
 
 				thr_get_trx(thr)->error_info = index;
 
-				break;
+				goto end_scan;
 			}
+		} else {
+			ut_a(cmp < 0);
+			goto end_scan;
 		}
-
-		if (cmp < 0) {
-			break;
-		}
-
-		ut_a(cmp == 0);
 	} while (btr_pcur_move_to_next(&pcur, &mtr));
 
+end_scan:
 	if (UNIV_LIKELY_NULL(heap)) {
 		mem_heap_free(heap);
 	}
@@ -1837,7 +1837,11 @@ row_ins_duplicate_error_in_clust(
 					cursor->index, offsets, thr);
 			}
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				break;
+			default:
 				goto func_exit;
 			}
 
@@ -1877,7 +1881,11 @@ row_ins_duplicate_error_in_clust(
 					rec, cursor->index, offsets, thr);
 			}
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+			case DB_SUCCESS:
+				break;
+			default:
 				goto func_exit;
 			}
 
diff --git a/storage/innodb_plugin/row/row0mysql.c b/storage/innodb_plugin/row/row0mysql.c
index c093925fc7..feeb7fc80b 100644
--- a/storage/innodb_plugin/row/row0mysql.c
+++ b/storage/innodb_plugin/row/row0mysql.c
@@ -1430,27 +1430,26 @@ run_again:
 }
 
 /*********************************************************************//**
-This can only be used when srv_locks_unsafe_for_binlog is TRUE or
-this session is using a READ COMMITTED isolation level. Before
-calling this function we must use trx_reset_new_rec_lock_info() and
-trx_register_new_rec_lock() to store the information which new record locks
-really were set. This function removes a newly set lock under prebuilt->pcur,
-and also under prebuilt->clust_pcur. Currently, this is only used and tested
-in the case of an UPDATE or a DELETE statement, where the row lock is of the
-LOCK_X type.
-Thus, this implements a 'mini-rollback' that releases the latest record
-locks we set.
-@return	error code or DB_SUCCESS */
+This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
+session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
+Before calling this function row_search_for_mysql() must have
+initialized prebuilt->new_rec_locks to store the information which new
+record locks really were set. This function removes a newly set
+clustered index record lock under prebuilt->pcur or
+prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
+releases the latest clustered index record lock we set.
+@return error code or DB_SUCCESS */
 UNIV_INTERN
 int
 row_unlock_for_mysql(
 /*=================*/
-	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct in MySQL
+	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt struct in MySQL
 					handle */
-	ibool		has_latches_on_recs)/*!< TRUE if called so that we have
-					the latches on the records under pcur
-					and clust_pcur, and we do not need to
-					reposition the cursors. */
+	ibool		has_latches_on_recs)/*!< in: TRUE if called so
+					that we have the latches on
+					the records under pcur and
+					clust_pcur, and we do not need
+					to reposition the cursors. */
 {
 	btr_pcur_t*	pcur		= prebuilt->pcur;
 	btr_pcur_t*	clust_pcur	= prebuilt->clust_pcur;
diff --git a/storage/innodb_plugin/row/row0sel.c b/storage/innodb_plugin/row/row0sel.c
index 4d19ed93a4..a5bf361661 100644
--- a/storage/innodb_plugin/row/row0sel.c
+++ b/storage/innodb_plugin/row/row0sel.c
@@ -863,8 +863,14 @@ row_sel_get_clust_rec(
 			clust_rec, index, offsets,
 			node->row_lock_mode, lock_type, thr);
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
+			/* Declare the variable uninitialized in Valgrind.
+			It should be set to DB_SUCCESS at func_exit. */
+			UNIV_MEM_INVALID(&err, sizeof err);
+			break;
+		default:
 			goto err_exit;
 		}
 	} else {
@@ -934,9 +940,9 @@ err_exit:
 
 /*********************************************************************//**
 Sets a lock on a record.
-@return	DB_SUCCESS or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 UNIV_INLINE
-ulint
+enum db_err
 sel_set_rec_lock(
 /*=============*/
 	const buf_block_t*	block,	/*!< in: buffer block of rec */
@@ -948,8 +954,8 @@ sel_set_rec_lock(
 					LOC_REC_NOT_GAP */
 	que_thr_t*		thr)	/*!< in: query thread */
 {
-	trx_t*	trx;
-	ulint	err;
+	trx_t*		trx;
+	enum db_err	err;
 
 	trx = thr_get_trx(thr);
 
@@ -1482,11 +1488,15 @@ rec_loop:
 					       node->row_lock_mode,
 					       lock_type, thr);
 
-			if (err != DB_SUCCESS) {
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				/* Note that in this case we will store in pcur
 				the PREDECESSOR of the record we are waiting
 				the lock for */
-
 				goto lock_wait_or_error;
 			}
 		}
@@ -1538,8 +1548,12 @@ skip_lock:
 				       rec, index, offsets,
 				       node->row_lock_mode, lock_type, thr);
 
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS_LOCKED_REC:
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
+			break;
+		default:
 			goto lock_wait_or_error;
 		}
 	}
@@ -2801,9 +2815,9 @@ row_sel_build_prev_vers_for_mysql(
 Retrieves the clustered index record corresponding to a record in a
 non-clustered index. Does the necessary locking. Used in the MySQL
 interface.
-@return	DB_SUCCESS or error code */
+@return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
 static
-ulint
+enum db_err
 row_sel_get_clust_rec_for_mysql(
 /*============================*/
 	row_prebuilt_t*	prebuilt,/*!< in: prebuilt struct in the handle */
@@ -2830,7 +2844,7 @@ row_sel_get_clust_rec_for_mysql(
 	dict_index_t*	clust_index;
 	const rec_t*	clust_rec;
 	rec_t*		old_vers;
-	ulint		err;
+	enum db_err	err;
 	trx_t*		trx;
 
 	*out_rec = NULL;
@@ -2889,6 +2903,7 @@ row_sel_get_clust_rec_for_mysql(
 
 		clust_rec = NULL;
 
+		err = DB_SUCCESS;
 		goto func_exit;
 	}
 
@@ -2904,8 +2919,11 @@ row_sel_get_clust_rec_for_mysql(
 			0, btr_pcur_get_block(prebuilt->clust_pcur),
 			clust_rec, clust_index, *offsets,
 			prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr);
-		if (err != DB_SUCCESS) {
-
+		switch (err) {
+		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
+			break;
+		default:
 			goto err_exit;
 		}
 	} else {
@@ -2965,6 +2983,8 @@ row_sel_get_clust_rec_for_mysql(
 				     rec, sec_index, clust_rec, clust_index));
 #endif
 		}
+
+		err = DB_SUCCESS;
 	}
 
 func_exit:
@@ -2977,7 +2997,6 @@ func_exit:
 		btr_pcur_store_position(prebuilt->clust_pcur, mtr);
 	}
 
-	err = DB_SUCCESS;
 err_exit:
 	return(err);
 }
@@ -3702,8 +3721,12 @@ shortcut_fails_too_big_rec:
 					       prebuilt->select_lock_type,
 					       LOCK_GAP, thr);
 
-			if (err != DB_SUCCESS) {
-
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				goto lock_wait_or_error;
 			}
 		}
@@ -3801,8 +3824,12 @@ rec_loop:
 					       prebuilt->select_lock_type,
 					       LOCK_ORDINARY, thr);
 
-			if (err != DB_SUCCESS) {
-
+			switch (err) {
+			case DB_SUCCESS_LOCKED_REC:
+				err = DB_SUCCESS;
+			case DB_SUCCESS:
+				break;
+			default:
 				goto lock_wait_or_error;
 			}
 		}
@@ -3932,8 +3959,11 @@ wrong_offs:
 					prebuilt->select_lock_type, LOCK_GAP,
 					thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
+					break;
+				default:
 					goto lock_wait_or_error;
 				}
 			}
@@ -3968,8 +3998,11 @@ wrong_offs:
 					prebuilt->select_lock_type, LOCK_GAP,
 					thr);
 
-				if (err != DB_SUCCESS) {
-
+				switch (err) {
+				case DB_SUCCESS_LOCKED_REC:
+				case DB_SUCCESS:
+					break;
+				default:
 					goto lock_wait_or_error;
 				}
 			}
@@ -4039,15 +4072,21 @@ no_gap_lock:
 
 		switch (err) {
 			const rec_t*	old_vers;
-		case DB_SUCCESS:
+		case DB_SUCCESS_LOCKED_REC:
 			if (srv_locks_unsafe_for_binlog
-			    || trx->isolation_level <= TRX_ISO_READ_COMMITTED) {
+			    || trx->isolation_level
+			    <= TRX_ISO_READ_COMMITTED) {
 				/* Note that a record of
 				prebuilt->index was locked. */
 				prebuilt->new_rec_locks = 1;
 			}
+			err = DB_SUCCESS;
+		case DB_SUCCESS:
 			break;
 		case DB_LOCK_WAIT:
+			/* Never unlock rows that were part of a conflict. */
+			prebuilt->new_rec_locks = 0;
+
 			if (UNIV_LIKELY(prebuilt->row_read_type
 					!= ROW_READ_TRY_SEMI_CONSISTENT)
 			    || unique_search
@@ -4077,7 +4116,6 @@ no_gap_lock:
 			if (UNIV_LIKELY(trx->wait_lock != NULL)) {
 				lock_cancel_waiting_and_release(
 					trx->wait_lock);
-				prebuilt->new_rec_locks = 0;
 			} else {
 				mutex_exit(&kernel_mutex);
 
@@ -4089,9 +4127,6 @@ no_gap_lock:
 							  ULINT_UNDEFINED,
 							  &heap);
 				err = DB_SUCCESS;
-				/* Note that a record of
-				prebuilt->index was locked. */
-				prebuilt->new_rec_locks = 1;
 				break;
 			}
 			mutex_exit(&kernel_mutex);
@@ -4228,27 +4263,30 @@ requires_clust_rec:
 		err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec,
 						      thr, &clust_rec,
 						      &offsets, &heap, &mtr);
-		if (err != DB_SUCCESS) {
+		switch (err) {
+		case DB_SUCCESS:
+			if (clust_rec == NULL) {
+				/* The record did not exist in the read view */
+				ut_ad(prebuilt->select_lock_type == LOCK_NONE);
 
+				goto next_rec;
+			}
+			break;
+		case DB_SUCCESS_LOCKED_REC:
+			ut_a(clust_rec != NULL);
+			if (srv_locks_unsafe_for_binlog
+			     || trx->isolation_level
+			    <= TRX_ISO_READ_COMMITTED) {
+				/* Note that the clustered index record
+				was locked. */
+				prebuilt->new_rec_locks = 2;
+			}
+			err = DB_SUCCESS;
+			break;
+		default:
 			goto lock_wait_or_error;
 		}
 
-		if (clust_rec == NULL) {
-			/* The record did not exist in the read view */
-			ut_ad(prebuilt->select_lock_type == LOCK_NONE);
-
-			goto next_rec;
-		}
-
-		if ((srv_locks_unsafe_for_binlog
-		     || trx->isolation_level <= TRX_ISO_READ_COMMITTED)
-		    && prebuilt->select_lock_type != LOCK_NONE) {
-			/* Note that both the secondary index record
-			and the clustered index record were locked. */
-			ut_ad(prebuilt->new_rec_locks == 1);
-			prebuilt->new_rec_locks = 2;
-		}
-
 		if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) {
 
 			/* The record is delete marked: we can skip it */
-- 
2.30.9