Commit f2c17cc9 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-29911 InnoDB recovery and mariadb-backup --prepare fail to report detailed progress

This is a 10.6 port of commit 2f9e2647
from MariaDB Server 10.9 that is missing some optimization due to a
more complex redo log format and recovery logic
(which was simplified in commit 685d958e).

The progress reporting of InnoDB crash recovery was rather intermittent.
Nothing was reported during the single-threaded log record parsing, which
could consume minutes when parsing a large log. During log application,
there only was progress reporting in background threads that would be
invoked on data page read completion.

The progress reporting here will be detailed like this:

InnoDB: Starting crash recovery from checkpoint LSN=628599973,5653727799
InnoDB: Read redo log up to LSN=1963895808
InnoDB: Multi-batch recovery needed at LSN 2534560930
InnoDB: Read redo log up to LSN=3312233472
InnoDB: Read redo log up to LSN=1599646720
InnoDB: Read redo log up to LSN=2160831488
InnoDB: To recover: LSN 2806789376/2806819840; 195082 pages
InnoDB: To recover: LSN 2806789376/2806819840; 63507 pages
InnoDB: Read redo log up to LSN=3195776000
InnoDB: Read redo log up to LSN=3687099392
InnoDB: Read redo log up to LSN=4165315584
InnoDB: To recover: LSN 4374395699/4374440960; 241454 pages
InnoDB: To recover: LSN 4374395699/4374440960; 123701 pages
InnoDB: Read redo log up to LSN=4508724224
InnoDB: Read redo log up to LSN=5094550528
InnoDB: To recover: 205230 pages

The previous messages "Starting a batch to recover" or
"Starting a final batch to recover" will be replaced by
"To recover: ... pages" messages.

If a batch lasts longer than 15 seconds, then there will be
progress reports every 15 seconds, showing the number of remaining pages.
For the non-final batch, the "To recover:" message includes two end LSN:
that of the batch, and of the recovered log. This is the primary measure
of progress. The batch will end once the number of pages to recover
reaches 0.

If recovery is possible in a single batch, the output will look like this,
with a shorter "To recover:" message that counts only the remaining pages:

InnoDB: Starting crash recovery from checkpoint LSN=628599973,5653727799
InnoDB: Read redo log up to LSN=1984539648
InnoDB: Read redo log up to LSN=2710875136
InnoDB: Read redo log up to LSN=3358895104
InnoDB: Read redo log up to LSN=3965299712
InnoDB: Read redo log up to LSN=4557417472
InnoDB: Read redo log up to LSN=5219527680
InnoDB: To recover: 450915 pages

We will also speed up recovery by improving the memory management and
implementing multi-threaded recovery of data pages that will not need
to be read into the buffer pool ("fake read"). Log application in the
"fake read" threads will be protected by an atomic being_recovered field
and exclusive buf_page_t::lock.

Recovery will reserve for data pages two thirds of the buffer pool,
or 256 pages, whichever is smaller. Previously, we could only use at most
one third of the buffer pool for buffered log records. This would typically
mean that with large buffer pools, recovery unnecessary consisted of
multiple batches.

If recovery runs out of memory, it will "roll back" or "rewind" the current
mini-transaction. The recv_sys.recovered_lsn and recv_sys.pages
will correspond to the "out of memory LSN", at the end of the previous
complete mini-transaction.

If recovery runs out of memory while executing the final recovery batch,
we can simply invoke recv_sys.apply(false) to make room, and resume
parsing.

If recovery runs out of memory before the final batch, we will
scan the redo log to the end and check for any missing or inconsistent
files. In this version of the patch, we will throw away any previously
buffered recv_sys.pages and rescan the log from the checkpoint onwards.

recv_sys_t::pages_it: A cached iterator to recv_sys.pages.

recv_sys_t::is_memory_exhausted(): Remove. We will have out-of-memory
handling deep inside recv_sys_t::parse().

recv_sys_t::rewind(), page_recv_t::recs_t::rewind():
Remove all log starting with a specific LSN.

IORequest::write_complete(), IORequest::read_complete():
Replaces fil_aio_callback().

read_io_callback(), write_io_callback(): Replaces io_callback().

IORequest::fake_read_complete(), fake_io_callback(), os_fake_read():
Process a "fake read" request for concurrent recovery.

recv_sys_t::apply_batch(): Choose a number of successive pages
for a recovery batch.

recv_sys_t::erase(recv_sys_t::map::iterator): Remove log records for a
page whose recovery is not in progress. Log application threads
will not invoke this; they will only set being_recovered=-1 to indicate
that the entry is no longer needed.

recv_sys_t::garbage_collect(): Remove all being_recovered=-1 entries.

recv_sys_t::wait_for_pool(): Wait for some space to become available
in the buffer pool.

mlog_init_t::mark_ibuf_exist(): Avoid calls to
recv_sys::recover_low() via ibuf_page_exists() and buf_page_get_low().
Such calls would lead to double locking of recv_sys.mutex, which
depending on implementation could cause a deadlock. We will use
lower-level calls to look up index pages.

buf_LRU_block_remove_hashed(): Disable consistency checks for freed
ROW_FORMAT=COMPRESSED pages. Their contents could be uninitialized garbage.
This fixes an occasional failure of the test
innodb.innodb_bulk_create_index_debug.

Tested by: Matthias Leich
parent 1fe830b5
...@@ -2483,6 +2483,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init() ...@@ -2483,6 +2483,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
/** Flush the buffer pool on shutdown. */ /** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool() ATTRIBUTE_COLD void buf_flush_buffer_pool()
{ {
ut_ad(!os_aio_pending_reads());
ut_ad(!buf_page_cleaner_is_active); ut_ad(!buf_page_cleaner_is_active);
ut_ad(!buf_flush_sync_lsn); ut_ad(!buf_flush_sync_lsn);
......
...@@ -1095,7 +1095,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id, ...@@ -1095,7 +1095,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id,
ut_a(!zip || !bpage->oldest_modification()); ut_a(!zip || !bpage->oldest_modification());
ut_ad(bpage->zip_size()); ut_ad(bpage->zip_size());
/* Skip consistency checks if the page was freed.
In recovery, we could get a sole FREE_PAGE record
and nothing else, for a ROW_FORMAT=COMPRESSED page.
Its contents would be garbage. */
if (!bpage->is_freed())
switch (fil_page_get_type(page)) { switch (fil_page_get_type(page)) {
case FIL_PAGE_TYPE_ALLOCATED: case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE: case FIL_PAGE_INODE:
...@@ -1226,6 +1230,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) ...@@ -1226,6 +1230,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain); page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
recv_sys.free_corrupted_page(id);
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
hash_lock.lock(); hash_lock.lock();
...@@ -1250,8 +1255,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) ...@@ -1250,8 +1255,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage)); buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage));
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
recv_sys.free_corrupted_page(id);
} }
/** Update buf_pool.LRU_old_ratio. /** Update buf_pool.LRU_old_ratio.
......
...@@ -655,61 +655,35 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf) ...@@ -655,61 +655,35 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf)
return count; return count;
} }
/** @return whether a page has been freed */ /** Schedule a page for recovery.
inline bool fil_space_t::is_freed(uint32_t page) @param space tablespace
@param page_id page identifier
@param recs log records
@param init page initialization, or nullptr if the page needs to be read */
void buf_read_recover(fil_space_t *space, const page_id_t page_id,
page_recv_t &recs, recv_init *init)
{ {
std::lock_guard<std::mutex> freed_lock(freed_range_mutex); ut_ad(space->id == page_id.space());
return freed_ranges.contains(page); space->reacquire();
} const ulint zip_size= space->zip_size();
/** Issues read requests for pages which recovery wants to read in.
@param[in] space_id tablespace id
@param[in] page_nos array of page numbers to read, with the
highest page number the last in the array
@param[in] n number of page numbers in the array */
void buf_read_recv_pages(ulint space_id, const uint32_t* page_nos, ulint n)
{
fil_space_t* space = fil_space_t::get(space_id);
if (!space) {
/* The tablespace is missing or unreadable: do nothing */
return;
}
const ulint zip_size = space->zip_size();
for (ulint i = 0; i < n; i++) {
/* Ignore if the page already present in freed ranges. */
if (space->is_freed(page_nos[i])) {
continue;
}
const page_id_t cur_page_id(space_id, page_nos[i]);
ulint limit = 0;
for (ulint j = 0; j < buf_pool.n_chunks; j++) {
limit += buf_pool.chunks[j].size / 2;
}
if (os_aio_pending_reads() >= limit) { if (init)
os_aio_wait_until_no_pending_reads(false); {
} if (buf_page_t *bpage= buf_page_init_for_read(BUF_READ_ANY_PAGE, page_id,
zip_size, true))
space->reacquire(); {
switch (buf_read_page_low(space, false, BUF_READ_ANY_PAGE, ut_ad(bpage->in_file());
cur_page_id, zip_size, true)) { os_fake_read(IORequest{bpage, (buf_tmp_buffer_t*) &recs,
case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC: UT_LIST_GET_FIRST(space->chain),
break; IORequest::READ_ASYNC}, ptrdiff_t(init));
default: }
sql_print_error("InnoDB: Recovery failed to read page " }
UINT32PF " from %s", else if (dberr_t err= buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
cur_page_id.page_no(), page_id, zip_size, true))
space->chain.start->name); {
} if (err != DB_SUCCESS_LOCKED_REC)
} sql_print_error("InnoDB: Recovery failed to read page "
UINT32PF " from %s",
DBUG_PRINT("ib_buf", ("recovery read (%u pages) for %s", n, page_id.page_no(), space->chain.start->name);
space->chain.start->name)); }
space->release();
} }
...@@ -2823,53 +2823,55 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len, ...@@ -2823,53 +2823,55 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
#include <tpool.h> #include <tpool.h>
/** Callback for AIO completion */ void IORequest::write_complete() const
void fil_aio_callback(const IORequest &request)
{ {
ut_ad(fil_validate_skip()); ut_ad(fil_validate_skip());
ut_ad(request.node); ut_ad(node);
ut_ad(is_write());
if (!request.bpage) if (!bpage)
{ {
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (request.type == IORequest::DBLWR_BATCH) if (type == IORequest::DBLWR_BATCH)
buf_dblwr.flush_buffered_writes_completed(request); buf_dblwr.flush_buffered_writes_completed(*this);
else else
ut_ad(request.type == IORequest::WRITE_ASYNC); ut_ad(type == IORequest::WRITE_ASYNC);
write_completed:
request.node->complete_write();
}
else if (request.is_write())
{
buf_page_write_complete(request);
goto write_completed;
} }
else else
{ buf_page_write_complete(*this);
ut_ad(request.is_read());
/* IMPORTANT: since i/o handling for reads will read also the insert node->complete_write();
buffer in fil_system.sys_space, we have to be very careful not to node->space->release();
introduce deadlocks. We never close fil_system.sys_space data }
files and never issue asynchronous reads of change buffer pages. */
const page_id_t id(request.bpage->id());
if (dberr_t err= request.bpage->read_complete(*request.node)) void IORequest::read_complete() const
{ {
if (recv_recovery_is_on() && !srv_force_recovery) ut_ad(fil_validate_skip());
{ ut_ad(node);
mysql_mutex_lock(&recv_sys.mutex); ut_ad(is_read());
recv_sys.set_corrupt_fs(); ut_ad(bpage);
mysql_mutex_unlock(&recv_sys.mutex);
} /* IMPORTANT: since i/o handling for reads will read also the insert
buffer in fil_system.sys_space, we have to be very careful not to
introduce deadlocks. We never close fil_system.sys_space data files
and never issue asynchronous reads of change buffer pages. */
const page_id_t id(bpage->id());
if (err != DB_FAIL) if (dberr_t err= bpage->read_complete(*node))
ib::error() << "Failed to read page " << id.page_no() {
<< " from file '" << request.node->name << "': " << err; if (recv_recovery_is_on() && !srv_force_recovery)
{
mysql_mutex_lock(&recv_sys.mutex);
recv_sys.set_corrupt_fs();
mysql_mutex_unlock(&recv_sys.mutex);
} }
if (err != DB_FAIL)
ib::error() << "Failed to read page " << id.page_no()
<< " from file '" << node->name << "': " << err;
} }
request.node->space->release(); node->space->release();
} }
/** Flush to disk the writes in file spaces of the given type /** Flush to disk the writes in file spaces of the given type
......
...@@ -75,8 +75,7 @@ struct buf_pool_info_t ...@@ -75,8 +75,7 @@ struct buf_pool_info_t
ulint flush_list_len; /*!< Length of buf_pool.flush_list */ ulint flush_list_len; /*!< Length of buf_pool.flush_list */
ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages
pending decompress */ pending decompress */
ulint n_pend_reads; /*!< buf_pool.n_pend_reads, pages ulint n_pend_reads; /*!< os_aio_pending_reads() */
pending read */
ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */ ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */
ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH
LIST */ LIST */
......
...@@ -102,12 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io. ...@@ -102,12 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io.
ulint ulint
buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf); buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf);
/** Issues read requests for pages which recovery wants to read in. /** Schedule a page for recovery.
@param[in] space_id tablespace id @param space tablespace
@param[in] page_nos array of page numbers to read, with the @param page_id page identifier
highest page number the last in the array @param recs log records
@param[in] n number of page numbers in the array */ @param init page initialization, or nullptr if the page needs to be read */
void buf_read_recv_pages(ulint space_id, const uint32_t* page_nos, ulint n); void buf_read_recover(fil_space_t *space, const page_id_t page_id,
page_recv_t &recs, recv_init *init);
/** @name Modes used in read-ahead @{ */ /** @name Modes used in read-ahead @{ */
/** read only pages belonging to the insert buffer tree */ /** read only pages belonging to the insert buffer tree */
......
...@@ -45,9 +45,9 @@ recv_find_max_checkpoint(ulint* max_field) ...@@ -45,9 +45,9 @@ recv_find_max_checkpoint(ulint* max_field)
MY_ATTRIBUTE((nonnull, warn_unused_result)); MY_ATTRIBUTE((nonnull, warn_unused_result));
ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result)) ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result))
/** Apply any buffered redo log to a page that was just read from a data file. /** Apply any buffered redo log to a page.
@param[in,out] space tablespace @param space tablespace
@param[in,out] bpage buffer pool page @param bpage buffer pool page
@return whether the page was recovered correctly */ @return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage); bool recv_recover_page(fil_space_t* space, buf_page_t* bpage);
...@@ -146,21 +146,15 @@ struct recv_dblwr_t ...@@ -146,21 +146,15 @@ struct recv_dblwr_t
list pages; list pages;
}; };
/** the recovery state and buffered records for a page */ /** recv_sys.pages entry; protected by recv_sys.mutex */
struct page_recv_t struct page_recv_t
{ {
/** Recovery state; protected by recv_sys.mutex */ /** Recovery status: 0=not in progress, 1=log is being applied,
enum -1=log has been applied and the entry may be erased.
{ Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */
/** not yet processed */ Atomic_relaxed<int8_t> being_processed{0};
RECV_NOT_PROCESSED, /** Whether reading the page will be skipped */
/** not processed; the page will be reinitialized */ bool skip_read= false;
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED
} state= RECV_NOT_PROCESSED;
/** Latest written byte offset when applying the log records. /** Latest written byte offset when applying the log records.
@see mtr_t::m_last_offset */ @see mtr_t::m_last_offset */
uint16_t last_offset= 1; uint16_t last_offset= 1;
...@@ -183,6 +177,9 @@ struct page_recv_t ...@@ -183,6 +177,9 @@ struct page_recv_t
head= recs; head= recs;
tail= recs; tail= recs;
} }
/** Remove the last records for the page
@param start_lsn start of the removed log */
ATTRIBUTE_COLD void rewind(lsn_t start_lsn);
/** @return the last log snippet */ /** @return the last log snippet */
const log_rec_t* last() const { return tail; } const log_rec_t* last() const { return tail; }
...@@ -201,8 +198,8 @@ struct page_recv_t ...@@ -201,8 +198,8 @@ struct page_recv_t
iterator begin() { return head; } iterator begin() { return head; }
iterator end() { return NULL; } iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; } bool empty() const { ut_ad(!head == !tail); return !head; }
/** Clear and free the records; @see recv_sys_t::alloc() */ /** Clear and free the records; @see recv_sys_t::add() */
inline void clear(); void clear();
} log; } log;
/** Trim old log records for a page. /** Trim old log records for a page.
...@@ -211,21 +208,27 @@ struct page_recv_t ...@@ -211,21 +208,27 @@ struct page_recv_t
inline bool trim(lsn_t start_lsn); inline bool trim(lsn_t start_lsn);
/** Ignore any earlier redo log records for this page. */ /** Ignore any earlier redo log records for this page. */
inline void will_not_read(); inline void will_not_read();
/** @return whether the log records for the page are being processed */ };
bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
/** A page initialization operation that was parsed from the redo log */
struct recv_init
{
/** log sequence number of the page initialization */
lsn_t lsn;
/** Whether btr_page_create() avoided a read of the page.
At the end of the last recovery batch, mark_ibuf_exist()
will mark pages for which this flag is set. */
bool created;
}; };
/** Recovery system data structure */ /** Recovery system data structure */
struct recv_sys_t struct recv_sys_t
{ {
/** mutex protecting apply_log_recs and page_recv_t::state */ using init= recv_init;
mysql_mutex_t mutex;
/** mutex protecting this as well as some of page_recv_t */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex;
private: private:
/** condition variable for
!apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */
pthread_cond_t cond;
/** whether recv_apply_hashed_log_recs() is running */
bool apply_batch_on;
/** set when finding a corrupt log block or record, or there is a /** set when finding a corrupt log block or record, or there is a
log parsing buffer overflow */ log parsing buffer overflow */
bool found_corrupt_log; bool found_corrupt_log;
...@@ -270,6 +273,9 @@ struct recv_sys_t ...@@ -270,6 +273,9 @@ struct recv_sys_t
map pages; map pages;
private: private:
/** iterator to pages, used by parse() */
map::iterator pages_it;
/** Process a record that indicates that a tablespace size is being shrunk. /** Process a record that indicates that a tablespace size is being shrunk.
@param page_id first page that is not in the file @param page_id first page that is not in the file
@param lsn log sequence number of the shrink operation */ @param lsn log sequence number of the shrink operation */
...@@ -296,23 +302,38 @@ struct recv_sys_t ...@@ -296,23 +302,38 @@ struct recv_sys_t
inline size_t files_size(); inline size_t files_size();
void close_files() { files.clear(); files.shrink_to_fit(); } void close_files() { files.clear(); files.shrink_to_fit(); }
/** Advance pages_it if it matches the iterator */
void pages_it_invalidate(const map::iterator &p)
{
mysql_mutex_assert_owner(&mutex);
if (pages_it == p)
pages_it++;
}
/** Invalidate pages_it if it points to the given tablespace */
void pages_it_invalidate(uint32_t space_id)
{
mysql_mutex_assert_owner(&mutex);
if (pages_it != pages.end() && pages_it->first.space() == space_id)
pages_it= pages.end();
}
private: private:
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param p iterator
@param p iterator pointing to page_id
@param mtr mini-transaction @param mtr mini-transaction
@param b pre-allocated buffer pool block @param b pre-allocated buffer pool block
@param init page initialization
@return the recovered block @return the recovered block
@retval nullptr if the page cannot be initialized based on log records @retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */ @retval -1 if the page cannot be recovered due to corruption */
inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p, inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr,
mtr_t &mtr, buf_block_t *b); buf_block_t *b, init &init);
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param page_id page identifier
@return the recovered block @return the recovered block
@retval nullptr if the page cannot be initialized based on log records @retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */ @retval -1 if the page cannot be recovered due to corruption */
buf_block_t *recover_low(const page_id_t page_id); ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id);
/** All found log files (multiple ones are possible if we are upgrading /** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */ from before MariaDB Server 10.5.1) */
...@@ -323,12 +344,27 @@ struct recv_sys_t ...@@ -323,12 +344,27 @@ struct recv_sys_t
/** Base node of the redo block list. /** Base node of the redo block list.
List elements are linked via buf_block_t::unzip_LRU. */ List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks; UT_LIST_BASE_NODE_T(buf_block_t) blocks;
/** Allocate a block from the buffer pool for recv_sys.pages */
ATTRIBUTE_COLD buf_block_t *add_block();
/** Wait for buffer pool to become available.
@param pages number of buffer pool pages needed */
ATTRIBUTE_COLD void wait_for_pool(size_t pages);
/** Free log for processed pages. */
void garbage_collect();
/** Apply a recovery batch.
@param space_id current tablespace identifier
@param space current tablespace
@param free_block spare buffer block
@param last_batch whether it is possible to write more redo log
@return whether the caller must provide a new free_block */
bool apply_batch(uint32_t space_id, fil_space_t *&space,
buf_block_t *&free_block, bool last_batch);
public: public:
/** Check whether the number of read redo log blocks exceeds the maximum.
Store last_stored_lsn if the recovery is not in the last phase.
@param[in,out] store whether to store page operations
@return whether the memory is exhausted */
inline bool is_memory_exhausted(store_t *store);
/** Apply buffered log to persistent data pages. /** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */ @param last_batch whether it is possible to write more redo log */
void apply(bool last_batch); void apply(bool last_batch);
...@@ -353,9 +389,10 @@ struct recv_sys_t ...@@ -353,9 +389,10 @@ struct recv_sys_t
@param start_lsn start LSN of the mini-transaction @param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn() @param lsn @see mtr_t::commit_lsn()
@param l redo log snippet @see log_t::FORMAT_10_5 @param l redo log snippet @see log_t::FORMAT_10_5
@param len length of l, in bytes */ @param len length of l, in bytes
inline void add(map::iterator it, lsn_t start_lsn, lsn_t lsn, @return whether we ran out of memory */
const byte *l, size_t len); bool add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
const byte *l, size_t len);
/** Parse and register one mini-transaction in log_t::FORMAT_10_5. /** Parse and register one mini-transaction in log_t::FORMAT_10_5.
@param checkpoint_lsn the log sequence number of the latest checkpoint @param checkpoint_lsn the log sequence number of the latest checkpoint
...@@ -365,32 +402,31 @@ struct recv_sys_t ...@@ -365,32 +402,31 @@ struct recv_sys_t
or corruption was noticed */ or corruption was noticed */
bool parse(lsn_t checkpoint_lsn, store_t *store, bool apply); bool parse(lsn_t checkpoint_lsn, store_t *store, bool apply);
/** Erase log records for a page. */
void erase(map::iterator p);
/** Clear a fully processed set of stored redo log records. */ /** Clear a fully processed set of stored redo log records. */
inline void clear(); void clear();
private:
/** Rewind a mini-transaction when parse() runs out of memory.
@param end current position of the mini-transaction
@param begin start of the mini-transaction */
ATTRIBUTE_COLD void rewind(const byte *end, const byte *begin) noexcept;
/** Report progress in terms of LSN or pages remaining */
ATTRIBUTE_COLD void report_progress() const;
public:
/** Determine whether redo log recovery progress should be reported. /** Determine whether redo log recovery progress should be reported.
@param time the current time @param time the current time
@return whether progress should be reported @return whether progress should be reported
(the last report was at least 15 seconds ago) */ (the last report was at least 15 seconds ago) */
bool report(time_t time) bool report(time_t time);
{
if (time - progress_time < 15)
return false;
progress_time= time;
return true;
}
/** The alloc() memory alignment, in bytes */ /** The alloc() memory alignment, in bytes */
static constexpr size_t ALIGNMENT= sizeof(size_t); static constexpr size_t ALIGNMENT= sizeof(size_t);
/** Allocate memory for log_rec_t
@param len allocation size, in bytes
@return pointer to len bytes of memory (never NULL) */
inline void *alloc(size_t len);
/** Free a redo log snippet. /** Free a redo log snippet.
@param data buffer returned by alloc() */ @param data buffer allocated in add() */
inline void free(const void *data); inline void free(const void *data);
/** Remove records for a corrupted page. /** Remove records for a corrupted page.
...@@ -402,8 +438,6 @@ struct recv_sys_t ...@@ -402,8 +438,6 @@ struct recv_sys_t
ATTRIBUTE_COLD void set_corrupt_fs(); ATTRIBUTE_COLD void set_corrupt_fs();
/** Flag log file corruption during recovery. */ /** Flag log file corruption during recovery. */
ATTRIBUTE_COLD void set_corrupt_log(); ATTRIBUTE_COLD void set_corrupt_log();
/** Possibly finish a recovery batch. */
inline void maybe_finish_batch();
/** @return whether data file corruption was found */ /** @return whether data file corruption was found */
bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); } bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); }
...@@ -421,13 +455,14 @@ struct recv_sys_t ...@@ -421,13 +455,14 @@ struct recv_sys_t
} }
/** Try to recover a tablespace that was not readable earlier /** Try to recover a tablespace that was not readable earlier
@param p iterator, initially pointing to page_id_t{space_id,0}; @param p iterator
the records will be freed and the iterator advanced
@param name tablespace file name @param name tablespace file name
@param free_block spare buffer block @param free_block spare buffer block
@return whether recovery failed */ @return recovered tablespace
bool recover_deferred(map::iterator &p, const std::string &name, @retval nullptr if recovery failed */
buf_block_t *&free_block); fil_space_t *recover_deferred(const map::iterator &p,
const std::string &name,
buf_block_t *&free_block);
}; };
/** The recovery system */ /** The recovery system */
......
...@@ -221,6 +221,10 @@ class IORequest ...@@ -221,6 +221,10 @@ class IORequest
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; } bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; } bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; }
void write_complete() const;
void read_complete() const;
void fake_read_complete(os_offset_t offset) const;
/** If requested, free storage space associated with a section of the file. /** If requested, free storage space associated with a section of the file.
@param off byte offset from the start (SEEK_SET) @param off byte offset from the start (SEEK_SET)
@param len size of the hole in bytes @param len size of the hole in bytes
...@@ -1050,6 +1054,11 @@ int os_aio_init(); ...@@ -1050,6 +1054,11 @@ int os_aio_init();
Frees the asynchronous io system. */ Frees the asynchronous io system. */
void os_aio_free(); void os_aio_free();
/** Submit a fake read request during crash recovery.
@param type fake read request
@param offset additional context */
void os_fake_read(const IORequest &type, os_offset_t offset);
/** Request a read or write. /** Request a read or write.
@param type I/O request @param type I/O request
@param buf buffer @param buf buffer
......
...@@ -748,7 +748,7 @@ static struct ...@@ -748,7 +748,7 @@ static struct
{ {
retry: retry:
mysql_mutex_unlock(&log_sys.mutex); mysql_mutex_unlock(&log_sys.mutex);
bool fail= false; fil_space_t *space= fil_system.sys_space;
buf_block_t *free_block= buf_LRU_get_free_block(false); buf_block_t *free_block= buf_LRU_get_free_block(false);
mysql_mutex_lock(&log_sys.mutex); mysql_mutex_lock(&log_sys.mutex);
mysql_mutex_lock(&recv_sys.mutex); mysql_mutex_lock(&recv_sys.mutex);
...@@ -765,11 +765,12 @@ static struct ...@@ -765,11 +765,12 @@ static struct
there were no buffered records. Either way, we must create a there were no buffered records. Either way, we must create a
dummy tablespace with the latest known name, dummy tablespace with the latest known name,
for dict_drop_index_tree(). */ for dict_drop_index_tree(). */
recv_sys.pages_it_invalidate(space_id);
while (p != recv_sys.pages.end() && p->first.space() == space_id) while (p != recv_sys.pages.end() && p->first.space() == space_id)
{ {
ut_ad(!p->second.being_processed);
recv_sys_t::map::iterator r= p++; recv_sys_t::map::iterator r= p++;
r->second.log.clear(); recv_sys.erase(r);
recv_sys.pages.erase(r);
} }
recv_spaces_t::iterator it{recv_spaces.find(space_id)}; recv_spaces_t::iterator it{recv_spaces.find(space_id)};
if (it != recv_spaces.end()) if (it != recv_spaces.end())
...@@ -792,11 +793,14 @@ static struct ...@@ -792,11 +793,14 @@ static struct
} }
} }
else else
fail= recv_sys.recover_deferred(p, d->second.file_name, free_block); space= recv_sys.recover_deferred(p, d->second.file_name, free_block);
processed: processed:
defers.erase(d++); auto e= d++;
if (fail) defers.erase(e);
if (!space)
break; break;
if (space != fil_system.sys_space)
space->release();
if (free_block) if (free_block)
continue; continue;
mysql_mutex_unlock(&recv_sys.mutex); mysql_mutex_unlock(&recv_sys.mutex);
...@@ -807,7 +811,7 @@ static struct ...@@ -807,7 +811,7 @@ static struct
mysql_mutex_unlock(&recv_sys.mutex); mysql_mutex_unlock(&recv_sys.mutex);
if (free_block) if (free_block)
buf_pool.free_block(free_block); buf_pool.free_block(free_block);
return fail; return !space;
} }
/** Create tablespace metadata for a data file that was initially /** Create tablespace metadata for a data file that was initially
...@@ -927,28 +931,191 @@ static struct ...@@ -927,28 +931,191 @@ static struct
} }
deferred_spaces; deferred_spaces;
/** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier
@param[in] type redo log type
@param[in] name file name (not NUL-terminated)
@param[in] len length of name, in bytes
@param[in] new_name new file name (NULL if not rename)
@param[in] new_len length of new_name, in bytes (0 if NULL) */
void (*log_file_op)(ulint space_id, int type,
const byte* name, ulint len,
const byte* new_name, ulint new_len);
void (*undo_space_trunc)(uint32_t space_id);
void (*first_page_init)(ulint space_id);
/** Information about initializing page contents during redo log processing.
FIXME: Rely on recv_sys.pages! */
class mlog_init_t
{
using map= std::map<const page_id_t, recv_init,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t, recv_init>>>;
/** Map of page initialization operations.
FIXME: Merge this to recv_sys.pages! */
map inits;
/** Iterator to the last add() or will_avoid_read(), for speeding up
will_avoid_read(). */
map::iterator i;
public:
/** Constructor */
mlog_init_t() : i(inits.end()) {}
/** Record that a page will be initialized by the redo log.
@param page_id page identifier
@param lsn log sequence number
@return whether the state was changed */
bool add(const page_id_t page_id, lsn_t lsn)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
const recv_init init = { lsn, false };
std::pair<map::iterator, bool> p=
inits.insert(map::value_type(page_id, init));
ut_ad(!p.first->second.created);
if (p.second) return true;
if (p.first->second.lsn >= lsn) return false;
p.first->second = init;
i = p.first;
return true;
}
/** Get the last stored lsn of the page id and its respective
init/load operation.
@param page_id page identifier
@return the latest page initialization;
not valid after releasing recv_sys.mutex. */
recv_init &last(page_id_t page_id)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
return inits.find(page_id)->second;
}
/** Determine if a page will be initialized or freed after a time.
@param page_id page identifier
@param lsn log sequence number
@return whether page_id will be freed or initialized after lsn */
bool will_avoid_read(page_id_t page_id, lsn_t lsn)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
if (i != inits.end() && i->first == page_id)
return i->second.lsn > lsn;
i = inits.lower_bound(page_id);
return i != inits.end() && i->first == page_id && i->second.lsn > lsn;
}
/** At the end of each recovery batch, reset the 'created' flags. */
void reset()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
ut_ad(recv_no_ibuf_operations);
for (map::value_type &i : inits)
i.second.created= false;
}
/** During the last recovery batch, mark whether there exist
buffered changes for the pages that were initialized
by buf_page_create() and still reside in the buffer pool. */
void mark_ibuf_exist()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
for (const map::value_type &i : inits)
if (i.second.created)
{
auto &chain= buf_pool.page_hash.cell_get(i.first.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
hash_lock.lock_shared();
buf_block_t *block= reinterpret_cast<buf_block_t*>
(buf_pool.page_hash.get(i.first, chain));
bool got_latch= block && block->page.lock.x_lock_try();
hash_lock.unlock_shared();
if (!block)
continue;
uint32_t state;
if (!got_latch)
{
mysql_mutex_lock(&buf_pool.mutex);
block= reinterpret_cast<buf_block_t*>
(buf_pool.page_hash.get(i.first, chain));
if (!block)
{
mysql_mutex_unlock(&buf_pool.mutex);
continue;
}
state= block->page.fix();
mysql_mutex_unlock(&buf_pool.mutex);
if (state < buf_page_t::UNFIXED)
{
block->page.unfix();
continue;
}
block->page.lock.x_lock();
state= block->page.unfix();
ut_ad(state < buf_page_t::READ_FIX);
if (state >= buf_page_t::UNFIXED && block->page.id() == i.first)
goto check_ibuf;
}
else
{
state= block->page.state();
ut_ad(state >= buf_page_t::FREED);
ut_ad(state < buf_page_t::READ_FIX);
if (state >= buf_page_t::UNFIXED)
{
check_ibuf:
mysql_mutex_unlock(&recv_sys.mutex);
if (ibuf_page_exists(block->page.id(), block->zip_size()))
block->page.set_ibuf_exist();
mysql_mutex_lock(&recv_sys.mutex);
}
}
block->page.lock.x_unlock();
}
}
/** Clear the data structure */
void clear() { inits.clear(); i = inits.end(); }
};
static mlog_init_t mlog_init;
/** Try to recover a tablespace that was not readable earlier /** Try to recover a tablespace that was not readable earlier
@param p iterator, initially pointing to page_id_t{space_id,0}; @param p iterator to the page
the records will be freed and the iterator advanced
@param name tablespace file name @param name tablespace file name
@param free_block spare buffer block @param free_block spare buffer block
@return whether recovery failed */ @return recovered tablespace
bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, @retval nullptr if recovery failed */
const std::string &name, fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p,
buf_block_t *&free_block) const std::string &name,
buf_block_t *&free_block)
{ {
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_owner(&mutex);
const page_id_t first{p->first}; ut_ad(p->first.space());
ut_ad(first.space());
recv_spaces_t::iterator it{recv_spaces.find(first.space())}; recv_spaces_t::iterator it{recv_spaces.find(p->first.space())};
ut_ad(it != recv_spaces.end()); ut_ad(it != recv_spaces.end());
if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) if (!p->first.page_no() && p->second.skip_read)
{ {
mtr_t mtr; mtr_t mtr;
buf_block_t *block= recover_low(first, p, mtr, free_block); ut_ad(!p->second.being_processed);
p->second.being_processed= 1;
init &init= mlog_init.last(p->first);
mysql_mutex_unlock(&mutex);
buf_block_t *block= recover_low(p, mtr, free_block, init);
mysql_mutex_lock(&mutex);
p->second.being_processed= -1;
ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1)); ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1));
free_block= nullptr; free_block= nullptr;
if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1))) if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1)))
...@@ -961,10 +1128,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, ...@@ -961,10 +1128,7 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET); const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
const uint32_t size= fsp_header_get_field(page, FSP_SIZE); const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
ut_ad(it != recv_spaces.end()); if (page_id_t{space_id, page_no} == p->first && size >= 4 &&
if (page_id_t{space_id, page_no} == first && size >= 4 &&
it != recv_spaces.end() &&
fil_space_t::is_valid_flags(flags, space_id) && fil_space_t::is_valid_flags(flags, space_id) &&
fil_space_t::logical_size(flags) == srv_page_size) fil_space_t::logical_size(flags) == srv_page_size)
{ {
...@@ -1018,10 +1182,10 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, ...@@ -1018,10 +1182,10 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
} }
size_set: size_set:
node->deferred= false; node->deferred= false;
space->release();
it->second.space= space; it->second.space= space;
block->page.lock.x_unlock(); block->page.lock.x_unlock();
return false; p->second.being_processed= -1;
return space;
} }
release_and_fail: release_and_fail:
...@@ -1029,179 +1193,34 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, ...@@ -1029,179 +1193,34 @@ bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
} }
fail: fail:
ib::error() << "Cannot apply log to " << first ib::error() << "Cannot apply log to " << p->first
<< " of corrupted file '" << name << "'"; << " of corrupted file '" << name << "'";
return true; return nullptr;
} }
/** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier
@param[in] type redo log type
@param[in] name file name (not NUL-terminated)
@param[in] len length of name, in bytes
@param[in] new_name new file name (NULL if not rename)
@param[in] new_len length of new_name, in bytes (0 if NULL) */
void (*log_file_op)(ulint space_id, int type,
const byte* name, ulint len,
const byte* new_name, ulint new_len);
void (*undo_space_trunc)(uint32_t space_id);
void (*first_page_init)(ulint space_id);
/** Information about initializing page contents during redo log processing.
FIXME: Rely on recv_sys.pages! */
class mlog_init_t
{
public:
/** A page initialization operation that was parsed from
the redo log */
struct init {
/** log sequence number of the page initialization */
lsn_t lsn;
/** Whether btr_page_create() avoided a read of the page.
At the end of the last recovery batch, mark_ibuf_exist()
will mark pages for which this flag is set. */
bool created;
};
private:
typedef std::map<const page_id_t, init,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t, init> > >
map;
/** Map of page initialization operations.
FIXME: Merge this to recv_sys.pages! */
map inits;
public:
/** Record that a page will be initialized by the redo log.
@param[in] page_id page identifier
@param[in] lsn log sequence number
@return whether the state was changed */
bool add(const page_id_t page_id, lsn_t lsn)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
const init init = { lsn, false };
std::pair<map::iterator, bool> p = inits.insert(
map::value_type(page_id, init));
ut_ad(!p.first->second.created);
if (p.second) return true;
if (p.first->second.lsn >= init.lsn) return false;
p.first->second = init;
return true;
}
/** Get the last stored lsn of the page id and its respective
init/load operation.
@param[in] page_id page id
@param[in,out] init initialize log or load log
@return the latest page initialization;
not valid after releasing recv_sys.mutex. */
init& last(page_id_t page_id)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
return inits.find(page_id)->second;
}
/** Determine if a page will be initialized or freed after a time.
@param page_id page identifier
@param lsn log sequence number
@return whether page_id will be freed or initialized after lsn */
bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
{
mysql_mutex_assert_owner(&recv_sys.mutex);
auto i= inits.find(page_id);
return i != inits.end() && i->second.lsn > lsn;
}
/** At the end of each recovery batch, reset the 'created' flags. */
void reset()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
ut_ad(recv_no_ibuf_operations);
for (map::value_type& i : inits) {
i.second.created = false;
}
}
/** On the last recovery batch, mark whether there exist
buffered changes for the pages that were initialized
by buf_page_create() and still reside in the buffer pool.
@param[in,out] mtr dummy mini-transaction */
void mark_ibuf_exist(mtr_t& mtr)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
mtr.start();
for (const map::value_type& i : inits) {
if (!i.second.created) {
continue;
}
if (buf_block_t* block = buf_page_get_low(
i.first, 0, RW_X_LATCH, nullptr,
BUF_GET_IF_IN_POOL,
&mtr, nullptr, false)) {
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
switch (fil_page_get_type(
block->page.zip.data)) {
case FIL_PAGE_INDEX:
case FIL_PAGE_RTREE:
if (page_zip_decompress(
&block->page.zip,
block->page.frame,
true)) {
break;
}
ib::error() << "corrupted "
<< block->page.id();
}
}
if (recv_no_ibuf_operations) {
mtr.commit();
mtr.start();
continue;
}
mysql_mutex_unlock(&recv_sys.mutex);
if (ibuf_page_exists(block->page.id(),
block->zip_size())) {
block->page.set_ibuf_exist();
}
mtr.commit();
mtr.start();
mysql_mutex_lock(&recv_sys.mutex);
}
}
mtr.commit();
clear();
}
/** Clear the data structure */
void clear() { inits.clear(); }
};
static mlog_init_t mlog_init;
/** Process a record that indicates that a tablespace is /** Process a record that indicates that a tablespace is
being shrunk in size. being shrunk in size.
@param page_id first page identifier that is not in the file @param page_id first page identifier that is not in the file
@param lsn log sequence number of the shrink operation */ @param lsn log sequence number of the shrink operation */
inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn) inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
{ {
DBUG_ENTER("recv_sys_t::trim"); DBUG_ENTER("recv_sys_t::trim");
DBUG_LOG("ib_log", DBUG_LOG("ib_log", "discarding log beyond end of tablespace "
"discarding log beyond end of tablespace " << page_id << " before LSN " << lsn);
<< page_id << " before LSN " << lsn); mysql_mutex_assert_owner(&mutex);
mysql_mutex_assert_owner(&mutex); if (pages_it != pages.end() && pages_it->first.space() == page_id.space())
for (recv_sys_t::map::iterator p = pages.lower_bound(page_id); pages_it= pages.end();
p != pages.end() && p->first.space() == page_id.space();) { for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
recv_sys_t::map::iterator r = p++; p != pages.end() && p->first.space() == page_id.space();)
if (r->second.trim(lsn)) { {
pages.erase(r); recv_sys_t::map::iterator r = p++;
} if (r->second.trim(lsn))
} {
DBUG_VOID_RETURN; ut_ad(!r->second.being_processed);
pages.erase(r);
}
}
DBUG_VOID_RETURN;
} }
void recv_sys_t::open_log_files_if_needed() void recv_sys_t::open_log_files_if_needed()
...@@ -1400,7 +1419,6 @@ void recv_sys_t::close() ...@@ -1400,7 +1419,6 @@ void recv_sys_t::close()
last_stored_lsn= 0; last_stored_lsn= 0;
mysql_mutex_destroy(&mutex); mysql_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
} }
recv_spaces.clear(); recv_spaces.clear();
...@@ -1415,10 +1433,8 @@ void recv_sys_t::create() ...@@ -1415,10 +1433,8 @@ void recv_sys_t::create()
ut_ad(this == &recv_sys); ut_ad(this == &recv_sys);
ut_ad(!is_initialised()); ut_ad(!is_initialised());
mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr); mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr);
pthread_cond_init(&cond, nullptr);
apply_log_recs = false; apply_log_recs = false;
apply_batch_on = false;
buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE, buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE,
PSI_INSTRUMENT_ME)); PSI_INSTRUMENT_ME));
...@@ -1433,6 +1449,8 @@ void recv_sys_t::create() ...@@ -1433,6 +1449,8 @@ void recv_sys_t::create()
mlog_checkpoint_lsn = 0; mlog_checkpoint_lsn = 0;
progress_time = time(NULL); progress_time = time(NULL);
ut_ad(pages.empty());
pages_it = pages.end();
recv_max_page_lsn = 0; recv_max_page_lsn = 0;
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces); memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
...@@ -1441,13 +1459,13 @@ void recv_sys_t::create() ...@@ -1441,13 +1459,13 @@ void recv_sys_t::create()
} }
/** Clear a fully processed set of stored redo log records. */ /** Clear a fully processed set of stored redo log records. */
inline void recv_sys_t::clear() void recv_sys_t::clear()
{ {
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_owner(&mutex);
apply_log_recs= false; apply_log_recs= false;
apply_batch_on= false;
ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks)); ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks));
pages.clear(); pages.clear();
pages_it= pages.end();
for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; ) for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
{ {
...@@ -1458,8 +1476,6 @@ inline void recv_sys_t::clear() ...@@ -1458,8 +1476,6 @@ inline void recv_sys_t::clear()
buf_block_free(block); buf_block_free(block);
block= prev_block; block= prev_block;
} }
pthread_cond_broadcast(&cond);
} }
/** Free most recovery data structures. */ /** Free most recovery data structures. */
...@@ -1471,6 +1487,7 @@ void recv_sys_t::debug_free() ...@@ -1471,6 +1487,7 @@ void recv_sys_t::debug_free()
recovery_on= false; recovery_on= false;
pages.clear(); pages.clear();
pages_it= pages.end();
ut_free_dodump(buf, RECV_PARSING_BUF_SIZE); ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
buf= nullptr; buf= nullptr;
...@@ -1478,48 +1495,9 @@ void recv_sys_t::debug_free() ...@@ -1478,48 +1495,9 @@ void recv_sys_t::debug_free()
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
} }
inline void *recv_sys_t::alloc(size_t len)
{
mysql_mutex_assert_owner(&mutex);
ut_ad(len);
ut_ad(len <= srv_page_size);
buf_block_t *block= UT_LIST_GET_FIRST(blocks);
if (UNIV_UNLIKELY(!block))
{
create_block:
block= buf_block_alloc();
block->page.access_time= 1U << 16 |
ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT);
static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
UT_LIST_ADD_FIRST(blocks, block);
MEM_MAKE_ADDRESSABLE(block->page.frame, len);
MEM_NOACCESS(block->page.frame + len, srv_page_size - len);
return my_assume_aligned<ALIGNMENT>(block->page.frame);
}
size_t free_offset= static_cast<uint16_t>(block->page.access_time);
ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
if (UNIV_UNLIKELY(!free_offset))
{
ut_ad(srv_page_size == 65536);
goto create_block;
}
ut_ad(free_offset <= srv_page_size);
free_offset+= len;
if (free_offset > srv_page_size)
goto create_block;
block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - len, len);
return my_assume_aligned<ALIGNMENT>(block->page.frame + free_offset - len);
}
/** Free a redo log snippet. /** Free a redo log snippet.
@param data buffer returned by alloc() */ @param data buffer allocated in add() */
inline void recv_sys_t::free(const void *data) inline void recv_sys_t::free(const void *data)
{ {
ut_ad(!ut_align_offset(data, ALIGNMENT)); ut_ad(!ut_align_offset(data, ALIGNMENT));
...@@ -1544,8 +1522,11 @@ inline void recv_sys_t::free(const void *data) ...@@ -1544,8 +1522,11 @@ inline void recv_sys_t::free(const void *data)
ut_ad(block->page.state() == buf_page_t::MEMORY); ut_ad(block->page.state() == buf_page_t::MEMORY);
ut_ad(static_cast<uint16_t>(block->page.access_time - 1) < ut_ad(static_cast<uint16_t>(block->page.access_time - 1) <
srv_page_size); srv_page_size);
ut_ad(block->page.access_time >= 1U << 16); unsigned a= block->page.access_time;
if (!((block->page.access_time -= 1U << 16) >> 16)) ut_ad(a >= 1U << 16);
a-= 1U << 16;
block->page.access_time= a;
if (!(a >> 16))
{ {
UT_LIST_REMOVE(blocks, block); UT_LIST_REMOVE(blocks, block);
MEM_MAKE_ADDRESSABLE(block->page.frame, srv_page_size); MEM_MAKE_ADDRESSABLE(block->page.frame, srv_page_size);
...@@ -2109,7 +2090,31 @@ inline bool page_recv_t::trim(lsn_t start_lsn) ...@@ -2109,7 +2090,31 @@ inline bool page_recv_t::trim(lsn_t start_lsn)
} }
inline void page_recv_t::recs_t::clear() void page_recv_t::recs_t::rewind(lsn_t start_lsn)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
log_phys_t *trim= static_cast<log_phys_t*>(head);
ut_ad(trim);
while (log_phys_t *next= static_cast<log_phys_t*>(trim->next))
{
ut_ad(trim->start_lsn < start_lsn);
if (next->start_lsn == start_lsn)
break;
trim= next;
}
tail= trim;
log_rec_t *l= tail->next;
tail->next= nullptr;
while (l)
{
log_rec_t *next= l->next;
recv_sys.free(l);
l= next;
}
}
void page_recv_t::recs_t::clear()
{ {
mysql_mutex_assert_owner(&recv_sys.mutex); mysql_mutex_assert_owner(&recv_sys.mutex);
for (const log_rec_t *l= head; l; ) for (const log_rec_t *l= head; l; )
...@@ -2121,33 +2126,99 @@ inline void page_recv_t::recs_t::clear() ...@@ -2121,33 +2126,99 @@ inline void page_recv_t::recs_t::clear()
head= tail= nullptr; head= tail= nullptr;
} }
/** Ignore any earlier redo log records for this page. */ /** Ignore any earlier redo log records for this page. */
inline void page_recv_t::will_not_read() inline void page_recv_t::will_not_read()
{ {
ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ); ut_ad(!being_processed);
state= RECV_WILL_NOT_READ; skip_read= true;
log.clear(); log.clear();
} }
void recv_sys_t::erase(map::iterator p)
{
ut_ad(p->second.being_processed <= 0);
p->second.log.clear();
pages.erase(p);
}
/** Free log for processed pages. */
void recv_sys_t::garbage_collect()
{
mysql_mutex_assert_owner(&mutex);
if (pages_it != pages.end() && pages_it->second.being_processed < 0)
pages_it= pages.end();
for (map::iterator p= pages.begin(); p != pages.end(); )
{
if (p->second.being_processed < 0)
{
map::iterator r= p++;
erase(r);
}
else
p++;
}
}
/** Allocate a block from the buffer pool for recv_sys.pages */
ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block()
{
for (bool freed= false;;)
{
const auto rs= UT_LIST_GET_LEN(blocks) * 2;
mysql_mutex_lock(&buf_pool.mutex);
const auto bs=
UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU);
if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs))
{
buf_block_t *block= buf_LRU_get_free_block(true);
mysql_mutex_unlock(&buf_pool.mutex);
return block;
}
/* out of memory: redo log occupies more than 1/3 of buf_pool
and there are fewer than BUF_LRU_MIN_LEN pages left */
mysql_mutex_unlock(&buf_pool.mutex);
if (freed)
return nullptr;
freed= true;
garbage_collect();
}
}
/** Wait for buffer pool to become available. */
ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages)
{
mysql_mutex_unlock(&mutex);
os_aio_wait_until_no_pending_reads(false);
mysql_mutex_lock(&mutex);
garbage_collect();
mysql_mutex_lock(&buf_pool.mutex);
bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages;
mysql_mutex_unlock(&buf_pool.mutex);
if (need_more)
buf_flush_sync_batch(recovered_lsn);
}
/** Register a redo log snippet for a page. /** Register a redo log snippet for a page.
@param it page iterator @param it page iterator
@param start_lsn start LSN of the mini-transaction @param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn() @param lsn @see mtr_t::commit_lsn()
@param recs redo log snippet @see log_t::FORMAT_10_5 @param l redo log snippet
@param len length of l, in bytes */ @param len length of l, in bytes
inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, @return whether we ran out of memory */
const byte *l, size_t len) ATTRIBUTE_NOINLINE
bool recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
const byte *l, size_t len)
{ {
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_owner(&mutex);
page_id_t page_id = it->first;
page_recv_t &recs= it->second; page_recv_t &recs= it->second;
buf_block_t *block;
switch (*l & 0x70) { switch (*l & 0x70) {
case FREE_PAGE: case INIT_PAGE: case FREE_PAGE: case INIT_PAGE:
recs.will_not_read(); recs.will_not_read();
mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */ mlog_init.add(it->first, start_lsn); /* FIXME: remove this! */
/* fall through */ /* fall through */
default: default:
log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last()); log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last());
...@@ -2156,7 +2227,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, ...@@ -2156,7 +2227,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
if (tail->start_lsn != start_lsn) if (tail->start_lsn != start_lsn)
break; break;
ut_ad(tail->lsn == lsn); ut_ad(tail->lsn == lsn);
buf_block_t *block= UT_LIST_GET_LAST(blocks); block= UT_LIST_GET_LAST(blocks);
ut_ad(block); ut_ad(block);
const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1; const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1;
ut_ad(used >= ALIGNMENT); ut_ad(used >= ALIGNMENT);
...@@ -2169,7 +2240,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, ...@@ -2169,7 +2240,7 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
MEM_MAKE_ADDRESSABLE(end + 1, len); MEM_MAKE_ADDRESSABLE(end + 1, len);
/* Append to the preceding record for the page */ /* Append to the preceding record for the page */
tail->append(l, len); tail->append(l, len);
return; return false;
} }
if (end <= &block->page.frame[used - ALIGNMENT] || if (end <= &block->page.frame[used - ALIGNMENT] ||
&block->page.frame[used] >= end) &block->page.frame[used] >= end)
...@@ -2183,8 +2254,49 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn, ...@@ -2183,8 +2254,49 @@ inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT); ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT);
goto append; goto append;
} }
recs.log.append(new (alloc(log_phys_t::alloc_size(len)))
log_phys_t(start_lsn, lsn, l, len)); const size_t size{log_phys_t::alloc_size(len)};
ut_ad(size <= srv_page_size);
void *buf;
block= UT_LIST_GET_FIRST(blocks);
if (UNIV_UNLIKELY(!block))
{
create_block:
block= add_block();
if (UNIV_UNLIKELY(!block))
return true;
block->page.access_time= 1U << 16 |
ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT);
static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
UT_LIST_ADD_FIRST(blocks, block);
MEM_MAKE_ADDRESSABLE(block->page.frame, size);
MEM_NOACCESS(block->page.frame + size, srv_page_size - size);
buf= block->page.frame;
}
else
{
size_t free_offset= static_cast<uint16_t>(block->page.access_time);
ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
if (UNIV_UNLIKELY(!free_offset))
{
ut_ad(srv_page_size == 65536);
goto create_block;
}
ut_ad(free_offset <= srv_page_size);
free_offset+= size;
if (free_offset > srv_page_size)
goto create_block;
block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
MEM_MAKE_ADDRESSABLE(block->page.frame + free_offset - size, size);
buf= block->page.frame + free_offset - size;
}
recs.log.append(new (my_assume_aligned<ALIGNMENT>(buf))
log_phys_t{start_lsn, lsn, l, len});
return false;
} }
/** Store/remove the freed pages in fil_name_t of recv_spaces. /** Store/remove the freed pages in fil_name_t of recv_spaces.
...@@ -2220,6 +2332,70 @@ static void store_freed_or_init_rec(page_id_t page_id, bool freed) ...@@ -2220,6 +2332,70 @@ static void store_freed_or_init_rec(page_id_t page_id, bool freed)
} }
} }
ATTRIBUTE_COLD
void recv_sys_t::rewind(const byte *end, const byte *begin) noexcept
{
ut_ad(srv_operation != SRV_OPERATION_BACKUP);
mysql_mutex_assert_owner(&mutex);
uint32_t rlen;
for (const byte *l= begin; !(l == end); l+= rlen)
{
const byte b= *l++;
ut_ad(UNIV_LIKELY((b & 0x70) != RESERVED) || srv_force_recovery);
rlen= b & 0xf;
if (!rlen)
{
if (!b)
continue;
const uint32_t lenlen= mlog_decode_varint_length(*l);
const uint32_t addlen= mlog_decode_varint(l);
ut_ad(addlen != MLOG_DECODE_ERROR);
rlen= addlen + 15 - lenlen;
l+= lenlen;
}
ut_ad(l + rlen <= end);
if (b & 0x80)
continue;
uint32_t idlen= mlog_decode_varint_length(*l);
if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen))
continue;
const uint32_t space_id= mlog_decode_varint(l);
if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR))
continue;
l+= idlen;
rlen-= idlen;
idlen= mlog_decode_varint_length(*l);
if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen))
continue;
const uint32_t page_no= mlog_decode_varint(l);
if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR))
continue;
const page_id_t id{space_id, page_no};
if (pages_it == pages.end() || pages_it->first != id)
{
pages_it= pages.find(id);
if (pages_it == pages.end())
continue;
}
ut_ad(!pages_it->second.being_processed);
const log_phys_t *head=
static_cast<log_phys_t*>(*pages_it->second.log.begin());
if (!head || head->start_lsn == recovered_lsn)
{
erase(pages_it);
pages_it= pages.end();
}
else
pages_it->second.log.rewind(recovered_lsn);
}
pages_it= pages.end();
}
/** Parse and register one mini-transaction in log_t::FORMAT_10_5. /** Parse and register one mini-transaction in log_t::FORMAT_10_5.
@param checkpoint_lsn the log sequence number of the latest checkpoint @param checkpoint_lsn the log sequence number of the latest checkpoint
@param store whether to store the records @param store whether to store the records
...@@ -2228,17 +2404,16 @@ static void store_freed_or_init_rec(page_id_t page_id, bool freed) ...@@ -2228,17 +2404,16 @@ static void store_freed_or_init_rec(page_id_t page_id, bool freed)
or corruption was noticed */ or corruption was noticed */
bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
{ {
restart:
mysql_mutex_assert_owner(&log_sys.mutex); mysql_mutex_assert_owner(&log_sys.mutex);
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_owner(&mutex);
ut_ad(parse_start_lsn); ut_ad(parse_start_lsn);
ut_ad(log_sys.is_physical()); ut_ad(log_sys.is_physical());
bool last_phase= (*store == STORE_IF_EXISTS);
const byte *const end= buf + len; const byte *const end= buf + len;
loop: loop:
const byte *const log= buf + recovered_offset; const byte *const log= buf + recovered_offset;
const lsn_t start_lsn= recovered_lsn; const lsn_t start_lsn= recovered_lsn;
map::iterator cached_pages_it = pages.end();
/* Check that the entire mini-transaction is included within the buffer */ /* Check that the entire mini-transaction is included within the buffer */
const byte *l; const byte *l;
...@@ -2554,7 +2729,6 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) ...@@ -2554,7 +2729,6 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE); ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
} }
#endif #endif
const bool is_init= (b & 0x70) <= INIT_PAGE;
switch (*store) { switch (*store) {
case STORE_IF_EXISTS: case STORE_IF_EXISTS:
if (fil_space_t *space= fil_space_t::get(space_id)) if (fil_space_t *space= fil_space_t::get(space_id))
...@@ -2568,23 +2742,48 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) ...@@ -2568,23 +2742,48 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
continue; continue;
/* fall through */ /* fall through */
case STORE_YES: case STORE_YES:
if (!mlog_init.will_avoid_read(id, start_lsn)) if (mlog_init.will_avoid_read(id, start_lsn))
continue;
if (pages_it == pages.end() || pages_it->first != id)
pages_it= pages.emplace(id, page_recv_t{}).first;
if (UNIV_UNLIKELY(add(pages_it, start_lsn, end_lsn, recs,
l - recs + rlen)))
{ {
if (cached_pages_it == pages.end() || cached_pages_it->first != id) recovered_lsn= start_lsn;
cached_pages_it= pages.emplace(id, page_recv_t()).first; recovered_offset= log - buf;
add(cached_pages_it, start_lsn, end_lsn, recs, rewind(l + rlen, log);
static_cast<size_t>(l + rlen - recs)); if (*store == STORE_IF_EXISTS)
{
log_sys.set_lsn(recovered_lsn);
log_sys.set_flushed_lsn(recovered_lsn);
mysql_mutex_unlock(&mutex);
this->apply(false);
mysql_mutex_lock(&mutex);
if (is_corrupt_fs())
return true;
}
else
{
last_stored_lsn= recovered_lsn;
sql_print_information("InnoDB: Multi-batch recovery needed at LSN "
LSN_PF, recovered_lsn);
*store= STORE_NO;
}
goto restart;
} }
continue; continue;
case STORE_NO: case STORE_NO:
if (!is_init) if ((b & 0x70) > INIT_PAGE)
continue; continue;
mlog_init.add(id, start_lsn); mlog_init.add(id, start_lsn);
map::iterator i= pages.find(id); if (pages_it == pages.end() || pages_it->first != id)
if (i == pages.end()) {
continue; pages_it= pages.find(id);
i->second.log.clear(); if (pages_it == pages.end())
pages.erase(i); continue;
}
map::iterator r= pages_it++;
erase(r);
} }
} }
else if (rlen) else if (rlen)
...@@ -2706,8 +2905,6 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) ...@@ -2706,8 +2905,6 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
ut_ad(l == el); ut_ad(l == el);
recovered_offset= l - buf; recovered_offset= l - buf;
recovered_lsn= end_lsn; recovered_lsn= end_lsn;
if (is_memory_exhausted(store) && last_phase)
return false;
goto loop; goto loop;
} }
...@@ -2715,23 +2912,22 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply) ...@@ -2715,23 +2912,22 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
lsn of a log record. lsn of a log record.
@param[in,out] block buffer pool page @param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction @param[in,out] mtr mini-transaction
@param[in,out] p recovery address @param[in,out] recs log records to apply
@param[in,out] space tablespace, or NULL if not looked up yet @param[in,out] space tablespace, or NULL if not looked up yet
@param[in,out] init page initialization operation, or NULL @param[in,out] init page initialization operation, or NULL
@return the recovered page @return the recovered page
@retval nullptr on failure */ @retval nullptr on failure */
static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
const recv_sys_t::map::iterator &p, page_recv_t &recs,
fil_space_t *space= nullptr, fil_space_t *space,
mlog_init_t::init *init= nullptr) recv_init *init)
{ {
mysql_mutex_assert_owner(&recv_sys.mutex); mysql_mutex_assert_not_owner(&recv_sys.mutex);
ut_ad(recv_sys.apply_log_recs); ut_ad(recv_sys.apply_log_recs);
ut_ad(recv_needed_recovery); ut_ad(recv_needed_recovery);
ut_ad(!init || init->created); ut_ad(!init || init->created);
ut_ad(!init || init->lsn); ut_ad(!init || init->lsn);
ut_ad(block->page.id() == p->first); ut_ad(recs.being_processed == 1);
ut_ad(!p->second.is_being_processed());
ut_ad(!space || space->id == block->page.id().space()); ut_ad(!space || space->id == block->page.id().space());
ut_ad(log_sys.is_physical()); ut_ad(log_sys.is_physical());
...@@ -2743,10 +2939,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, ...@@ -2743,10 +2939,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(), block->page.id().space(),
block->page.id().page_no())); block->page.id().page_no()));
p->second.state = page_recv_t::RECV_BEING_PROCESSED;
mysql_mutex_unlock(&recv_sys.mutex);
byte *frame = UNIV_LIKELY_NULL(block->page.zip.data) byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
? block->page.zip.data ? block->page.zip.data
: block->page.frame; : block->page.frame;
...@@ -2760,7 +2952,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, ...@@ -2760,7 +2952,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
bool skipped_after_init = false; bool skipped_after_init = false;
for (const log_rec_t* recv : p->second.log) { for (const log_rec_t* recv : recs.log) {
const log_phys_t* l = static_cast<const log_phys_t*>(recv); const log_phys_t* l = static_cast<const log_phys_t*>(recv);
ut_ad(l->lsn); ut_ad(l->lsn);
ut_ad(end_lsn <= l->lsn); ut_ad(end_lsn <= l->lsn);
...@@ -2817,8 +3009,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, ...@@ -2817,8 +3009,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(), block->page.id().space(),
block->page.id().page_no())); block->page.id().page_no()));
log_phys_t::apply_status a= l->apply(*block, log_phys_t::apply_status a= l->apply(*block, recs.last_offset);
p->second.last_offset);
switch (a) { switch (a) {
case log_phys_t::APPLIED_NO: case log_phys_t::APPLIED_NO:
...@@ -2937,24 +3128,11 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, ...@@ -2937,24 +3128,11 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
mtr.commit(); mtr.commit();
done: done:
time_t now = time(NULL); /* FIXME: do this in page read, protected with recv_sys.mutex! */
mysql_mutex_lock(&recv_sys.mutex);
if (recv_max_page_lsn < page_lsn) { if (recv_max_page_lsn < page_lsn) {
recv_max_page_lsn = page_lsn; recv_max_page_lsn = page_lsn;
} }
ut_ad(!block || p->second.is_being_processed());
ut_ad(!block || !recv_sys.pages.empty());
if (recv_sys.report(now)) {
const ulint n = recv_sys.pages.size();
ib::info() << "To recover: " << n << " pages from log";
service_manager_extend_timeout(
INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
}
return block; return block;
} }
...@@ -2968,146 +3146,350 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id) ...@@ -2968,146 +3146,350 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id); map::iterator p= pages.find(page_id);
if (p != pages.end()) if (p == pages.end())
{ {
p->second.log.clear(); mysql_mutex_unlock(&mutex);
pages.erase(p); return;
if (!srv_force_recovery)
{
set_corrupt_fs();
ib::error() << "Unable to apply log to corrupted page " << page_id
<< "; set innodb_force_recovery to ignore";
}
else
ib::warn() << "Discarding log for corrupted page " << page_id;
} }
if (pages.empty()) p->second.being_processed= -1;
pthread_cond_broadcast(&cond); if (!srv_force_recovery)
set_corrupt_fs();
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
}
/** Possibly finish a recovery batch. */ ib::error_or_warn(!srv_force_recovery)
inline void recv_sys_t::maybe_finish_batch() << "Unable to apply log to corrupted page " << page_id;
{
mysql_mutex_assert_owner(&mutex);
ut_ad(recovery_on);
if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs())
pthread_cond_broadcast(&cond);
} }
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log() ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log()
{ {
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
found_corrupt_log= true; found_corrupt_log= true;
pthread_cond_broadcast(&cond);
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
} }
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs() ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs()
{ {
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_owner(&mutex);
if (!srv_force_recovery)
sql_print_information("InnoDB: Set innodb_force_recovery=1"
" to ignore corrupted pages.");
found_corrupt_fs= true; found_corrupt_fs= true;
pthread_cond_broadcast(&cond);
} }
/** Apply any buffered redo log to a page that was just read from a data file. /** Apply any buffered redo log to a page.
@param[in,out] space tablespace @param space tablespace
@param[in,out] bpage buffer pool page @param bpage buffer pool page
@return whether the page was recovered correctly */ @return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage) bool recv_recover_page(fil_space_t* space, buf_page_t* bpage)
{ {
mtr_t mtr; mtr_t mtr;
mtr.start(); mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO); mtr.set_log_mode(MTR_LOG_NO_REDO);
ut_ad(bpage->frame);
/* Move the ownership of the x-latch on the page to
this OS thread, so that we can acquire a second
x-latch on it. This is needed for the operations to
the page to pass the debug checks. */
bpage->lock.claim_ownership();
bpage->lock.x_lock_recursive();
bpage->fix_on_recovery();
mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage),
MTR_MEMO_PAGE_X_FIX);
buf_block_t* success = reinterpret_cast<buf_block_t*>(bpage);
mysql_mutex_lock(&recv_sys.mutex); ut_ad(bpage->frame);
if (recv_sys.apply_log_recs) { /* Move the ownership of the x-latch on the page to this OS thread,
recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id()); so that we can acquire a second x-latch on it. This is needed for
if (p != recv_sys.pages.end() the operations to the page to pass the debug checks. */
&& !p->second.is_being_processed()) { bpage->lock.claim_ownership();
success = recv_recover_page(success, mtr, p, space); bpage->lock.x_lock_recursive();
if (UNIV_LIKELY(!!success)) { bpage->fix_on_recovery();
p->second.log.clear(); mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
recv_sys.pages.erase(p);
}
recv_sys.maybe_finish_batch();
goto func_exit;
}
}
mtr.commit(); buf_block_t *success= reinterpret_cast<buf_block_t*>(bpage);
mysql_mutex_lock(&recv_sys.mutex);
if (recv_sys.apply_log_recs)
{
const page_id_t id{bpage->id()};
recv_sys_t::map::iterator p= recv_sys.pages.find(id);
if (p == recv_sys.pages.end());
else if (p->second.being_processed < 0)
{
recv_sys.pages_it_invalidate(p);
recv_sys.erase(p);
}
else
{
p->second.being_processed= 1;
recv_sys_t::init *init= nullptr;
if (p->second.skip_read)
(init= &mlog_init.last(id))->created= true;
mysql_mutex_unlock(&recv_sys.mutex);
success= recv_recover_page(success, mtr, p->second, space, init);
p->second.being_processed= -1;
goto func_exit;
}
}
mysql_mutex_unlock(&recv_sys.mutex);
mtr.commit();
func_exit: func_exit:
mysql_mutex_unlock(&recv_sys.mutex); ut_ad(mtr.has_committed());
ut_ad(mtr.has_committed()); return success;
return success; }
void IORequest::fake_read_complete(os_offset_t offset) const
{
ut_ad(node);
ut_ad(is_read());
ut_ad(bpage);
ut_ad(bpage->frame);
ut_ad(recv_recovery_is_on());
ut_ad(offset);
mtr_t mtr;
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
ut_ad(bpage->frame);
/* Move the ownership of the x-latch on the page to this OS thread,
so that we can acquire a second x-latch on it. This is needed for
the operations to the page to pass the debug checks. */
bpage->lock.claim_ownership();
bpage->lock.x_lock_recursive();
bpage->fix_on_recovery();
mtr.memo_push(reinterpret_cast<buf_block_t*>(bpage), MTR_MEMO_PAGE_X_FIX);
page_recv_t &recs= *reinterpret_cast<page_recv_t*>(slot);
ut_ad(recs.being_processed == 1);
recv_init &init= *reinterpret_cast<recv_init*>(offset);
ut_ad(init.lsn > 1);
init.created= true;
if (recv_recover_page(reinterpret_cast<buf_block_t*>(bpage),
mtr, recs, node->space, &init))
{
ut_ad(bpage->oldest_modification() || bpage->is_freed());
bpage->lock.x_unlock(true);
}
recs.being_processed= -1;
ut_ad(mtr.has_committed());
node->space->release();
}
/** @return whether a page has been freed */
inline bool fil_space_t::is_freed(uint32_t page)
{
std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
return freed_ranges.contains(page);
} }
/** Read pages for which log needs to be applied. bool recv_sys_t::report(time_t time)
@param page_id first page identifier to read
@param i iterator to recv_sys.pages */
TRANSACTIONAL_TARGET
static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
{ {
uint32_t page_nos[32]; if (time - progress_time < 15)
ut_ad(page_id == i->first); return false;
page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U)); progress_time= time;
const page_id_t up_limit{page_id + 31}; return true;
uint32_t* p= page_nos; }
for (; i != recv_sys.pages.end() && i->first <= up_limit; i++) ATTRIBUTE_COLD
void recv_sys_t::report_progress() const
{
mysql_mutex_assert_owner(&mutex);
const size_t n{pages.size()};
if (recv_sys.scanned_lsn == recv_sys.recovered_lsn)
{
sql_print_information("InnoDB: To recover: %zu pages", n);
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"To recover: %zu pages", n);
}
else
{ {
if (i->second.state == page_recv_t::RECV_NOT_PROCESSED) sql_print_information("InnoDB: To recover: LSN " LSN_PF
"/" LSN_PF "; %zu pages",
recv_sys.recovered_lsn, recv_sys.scanned_lsn, n);
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"To recover: LSN " LSN_PF
"/" LSN_PF "; %zu pages",
recv_sys.recovered_lsn,
recv_sys.scanned_lsn, n);
}
}
/** Apply a recovery batch.
@param space_id current tablespace identifier
@param space current tablespace
@param free_block spare buffer block
@param last_batch whether it is possible to write more redo log
@return whether the caller must provide a new free_block */
bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space,
buf_block_t *&free_block, bool last_batch)
{
mysql_mutex_assert_owner(&mutex);
ut_ad(pages_it != pages.end());
ut_ad(!pages_it->second.log.empty());
mysql_mutex_lock(&buf_pool.mutex);
size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN,
UT_LIST_GET_LEN(buf_pool.LRU) +
UT_LIST_GET_LEN(buf_pool.free));
mysql_mutex_unlock(&buf_pool.mutex);
map::iterator begin= pages.end();
page_id_t begin_id{~0ULL};
while (pages_it != pages.end() && n < max_n)
{
ut_ad(!buf_dblwr.is_inside(pages_it->first));
if (!pages_it->second.being_processed)
{ {
i->second.state= page_recv_t::RECV_BEING_READ; if (space_id != pages_it->first.space())
*p++= i->first.page_no(); {
space_id= pages_it->first.space();
if (space)
space->release();
space= fil_space_t::get(space_id);
if (!space)
{
auto d= deferred_spaces.defers.find(space_id);
if (d == deferred_spaces.defers.end() || d->second.deleted)
/* For deleted files we preserve the deferred_spaces entry */;
else if (!free_block)
return true;
else
{
space= recover_deferred(pages_it, d->second.file_name, free_block);
deferred_spaces.defers.erase(d);
if (!space && !srv_force_recovery)
{
set_corrupt_fs();
return false;
}
}
}
}
if (!space || space->is_freed(pages_it->first.page_no()))
pages_it->second.being_processed= -1;
else if (!n++)
{
begin= pages_it;
begin_id= pages_it->first;
}
} }
pages_it++;
} }
if (p != page_nos) if (!last_batch)
mysql_mutex_unlock(&log_sys.mutex);
mysql_mutex_assert_not_owner(&log_sys.mutex);
pages_it= begin;
if (report(time(nullptr)))
report_progress();
if (!n)
goto wait;
mysql_mutex_lock(&buf_pool.mutex);
if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n))
{ {
mysql_mutex_unlock(&recv_sys.mutex); mysql_mutex_unlock(&buf_pool.mutex);
buf_read_recv_pages(page_id.space(), page_nos, ulint(p - page_nos)); wait:
mysql_mutex_lock(&recv_sys.mutex); wait_for_pool(n);
if (n);
else if (!last_batch)
goto unlock_relock;
else
goto get_last;
pages_it= pages.lower_bound(begin_id);
ut_ad(pages_it != pages.end());
} }
else
mysql_mutex_unlock(&buf_pool.mutex);
while (pages_it != pages.end())
{
ut_ad(!buf_dblwr.is_inside(pages_it->first));
if (!pages_it->second.being_processed)
{
const page_id_t id{pages_it->first};
if (space_id != id.space())
{
space_id= id.space();
if (space)
space->release();
space= fil_space_t::get(space_id);
}
if (!space)
{
const auto it= deferred_spaces.defers.find(space_id);
if (it != deferred_spaces.defers.end() && !it->second.deleted)
/* The records must be processed after recover_deferred(). */
goto next;
goto space_not_found;
}
else if (space->is_freed(id.page_no()))
{
space_not_found:
pages_it->second.being_processed= -1;
goto next;
}
else
{
page_recv_t &recs= pages_it->second;
ut_ad(!recs.log.empty());
recs.being_processed= 1;
init *init= recs.skip_read ? &mlog_init.last(id) : nullptr;
mysql_mutex_unlock(&mutex);
buf_read_recover(space, id, recs, init);
}
if (!--n)
{
if (last_batch)
goto relock_last;
goto relock;
}
mysql_mutex_lock(&mutex);
pages_it= pages.lower_bound(id);
}
else
next:
pages_it++;
}
if (!last_batch)
{
unlock_relock:
mysql_mutex_unlock(&mutex);
relock:
mysql_mutex_lock(&log_sys.mutex);
relock_last:
mysql_mutex_lock(&mutex);
get_last:
pages_it= pages.lower_bound(begin_id);
}
return false;
} }
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param p iterator
@param p iterator pointing to page_id
@param mtr mini-transaction @param mtr mini-transaction
@param b pre-allocated buffer pool block @param b pre-allocated buffer pool block
@param init page initialization
@return the recovered block @return the recovered block
@retval nullptr if the page cannot be initialized based on log records @retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */ @retval -1 if the page cannot be recovered due to corruption */
inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr,
map::iterator &p, mtr_t &mtr, buf_block_t *b, init &init)
buf_block_t *b)
{ {
mysql_mutex_assert_owner(&mutex); mysql_mutex_assert_not_owner(&mutex);
ut_ad(p->first == page_id);
page_recv_t &recs= p->second; page_recv_t &recs= p->second;
ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ); ut_ad(recs.skip_read);
ut_ad(recs.being_processed == 1);
buf_block_t* block= nullptr; buf_block_t* block= nullptr;
mlog_init_t::init &i= mlog_init.last(page_id); const lsn_t end_lsn= recs.log.last()->lsn;
const lsn_t end_lsn = recs.log.last()->lsn; if (end_lsn < init.lsn)
if (end_lsn < i.lsn) DBUG_LOG("ib_log", "skip log for page " << p->first
DBUG_LOG("ib_log", "skip log for page " << page_id << " LSN " << end_lsn << " < " << init.lsn);
<< " LSN " << end_lsn << " < " << i.lsn); fil_space_t *space= fil_space_t::get(p->first.space());
fil_space_t *space= fil_space_t::get(page_id.space());
mtr.start(); mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO); mtr.set_log_mode(MTR_LOG_NO_REDO);
...@@ -3116,82 +3498,77 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, ...@@ -3116,82 +3498,77 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
if (!space) if (!space)
{ {
if (page_id.page_no() != 0) if (p->first.page_no() != 0)
{ {
nothing_recoverable: nothing_recoverable:
mtr.commit(); mtr.commit();
return nullptr; return nullptr;
} }
auto it= recv_spaces.find(page_id.space()); auto it= recv_spaces.find(p->first.space());
ut_ad(it != recv_spaces.end()); ut_ad(it != recv_spaces.end());
uint32_t flags= it->second.flags; uint32_t flags= it->second.flags;
zip_size= fil_space_t::zip_size(flags); zip_size= fil_space_t::zip_size(flags);
block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b); block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b);
ut_ad(block == b); ut_ad(block == b);
block->page.lock.x_lock_recursive(); block->page.lock.x_lock_recursive();
} }
else else
{ {
block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b); block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b);
if (UNIV_UNLIKELY(block != b)) if (UNIV_UNLIKELY(block != b))
{ {
/* The page happened to exist in the buffer pool, or it /* The page happened to exist in the buffer pool, or it
was just being read in. Before the exclusive page latch was acquired by was just being read in. Before the exclusive page latch was acquired by
buf_page_create(), all changes to the page must have been applied. */ buf_page_create(), all changes to the page must have been applied. */
ut_ad(pages.find(page_id) == pages.end()); ut_d(mysql_mutex_lock(&mutex));
ut_ad(pages.find(p->first) == pages.end());
ut_d(mysql_mutex_unlock(&mutex));
space->release(); space->release();
goto nothing_recoverable; goto nothing_recoverable;
} }
} }
ut_ad(&recs == &pages.find(page_id)->second); ut_d(mysql_mutex_lock(&mutex));
i.created= true; ut_ad(&recs == &pages.find(p->first)->second);
map::iterator r= p++; ut_d(mysql_mutex_unlock(&mutex));
block= recv_recover_page(block, mtr, r, space, &i); init.created= true;
block= recv_recover_page(block, mtr, recs, space, &init);
ut_ad(mtr.has_committed()); ut_ad(mtr.has_committed());
if (block)
{
recs.log.clear();
pages.erase(r);
}
else
block= reinterpret_cast<buf_block_t*>(-1);
if (pages.empty())
pthread_cond_signal(&cond);
if (space) if (space)
space->release(); space->release();
return block; return block ? block : reinterpret_cast<buf_block_t*>(-1);
} }
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param page_id page identifier
@return recovered block @return recovered block
@retval nullptr if the page cannot be initialized based on log records */ @retval nullptr if the page cannot be initialized based on log records */
buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
{ {
buf_block_t *free_block= buf_LRU_get_free_block(false);
buf_block_t *block= nullptr;
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id); map::iterator p= pages.find(page_id);
if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) if (p != pages.end() && !p->second.being_processed && p->second.skip_read)
{ {
p->second.being_processed= 1;
init &init= mlog_init.last(page_id);
mysql_mutex_unlock(&mutex);
buf_block_t *free_block= buf_LRU_get_free_block(false);
mtr_t mtr; mtr_t mtr;
block= recover_low(page_id, p, mtr, free_block); buf_block_t *block= recover_low(p, mtr, free_block, init);
p->second.being_processed= -1;
ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) || ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) ||
block == free_block); block == free_block);
if (UNIV_UNLIKELY(!block))
buf_pool.free_block(free_block);
return block;
} }
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
if (UNIV_UNLIKELY(!block)) return nullptr;
buf_pool.free_block(free_block);
return block;
} }
inline fil_space_t *fil_system_t::find(const char *path) const inline fil_space_t *fil_system_t::find(const char *path) const
...@@ -3242,46 +3619,18 @@ void recv_sys_t::apply(bool last_batch) ...@@ -3242,46 +3619,18 @@ void recv_sys_t::apply(bool last_batch)
#endif /* SAFE_MUTEX */ #endif /* SAFE_MUTEX */
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
timespec abstime; garbage_collect();
while (apply_batch_on)
{
if (is_corrupt_log())
{
mysql_mutex_unlock(&mutex);
return;
}
if (last_batch)
{
mysql_mutex_assert_not_owner(&log_sys.mutex);
my_cond_wait(&cond, &mutex.m_mutex);
}
else
{
mysql_mutex_unlock(&mutex);
set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
my_cond_timedwait(&cond, &log_sys.mutex.m_mutex, &abstime);
mysql_mutex_lock(&mutex);
}
}
recv_no_ibuf_operations = !last_batch ||
srv_operation == SRV_OPERATION_RESTORE ||
srv_operation == SRV_OPERATION_RESTORE_EXPORT;
mtr_t mtr;
if (!pages.empty()) if (!pages.empty())
{ {
const char *msg= last_batch recv_no_ibuf_operations = !last_batch ||
? "Starting final batch to recover " srv_operation == SRV_OPERATION_RESTORE ||
: "Starting a batch to recover "; srv_operation == SRV_OPERATION_RESTORE_EXPORT;
const ulint n= pages.size(); ut_ad(!last_batch || recovered_lsn == scanned_lsn);
ib::info() << msg << n << " pages from redo log."; progress_time= time(nullptr);
sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log", msg, n); report_progress();
apply_log_recs= true; apply_log_recs= true;
apply_batch_on= true;
for (auto id= srv_undo_tablespaces_open; id--;) for (auto id= srv_undo_tablespaces_open; id--;)
{ {
...@@ -3307,130 +3656,70 @@ void recv_sys_t::apply(bool last_batch) ...@@ -3307,130 +3656,70 @@ void recv_sys_t::apply(bool last_batch)
fil_system.extend_to_recv_size(); fil_system.extend_to_recv_size();
/* We must release log_sys.mutex and recv_sys.mutex before fil_space_t *space= nullptr;
invoking buf_LRU_get_free_block(). Allocating a block may initiate uint32_t space_id= ~0;
a redo log write and therefore acquire log_sys.mutex. To avoid buf_block_t *free_block= nullptr;
deadlocks, log_sys.mutex must not be acquired while holding
recv_sys.mutex. */
mysql_mutex_unlock(&mutex);
if (!last_batch)
mysql_mutex_unlock(&log_sys.mutex);
mysql_mutex_assert_not_owner(&log_sys.mutex);
buf_block_t *free_block= buf_LRU_get_free_block(false);
if (!last_batch)
mysql_mutex_lock(&log_sys.mutex);
mysql_mutex_lock(&mutex);
for (map::iterator p= pages.begin(); p != pages.end(); ) for (pages_it= pages.begin(); pages_it != pages.end();
pages_it= pages.begin())
{ {
const page_id_t page_id= p->first; if (!free_block)
ut_ad(!p->second.log.empty()); {
if (!last_batch)
mysql_mutex_unlock(&log_sys.mutex);
wait_for_pool(1);
pages_it= pages.begin();
mysql_mutex_unlock(&mutex);
/* We must release log_sys.mutex and recv_sys.mutex before
invoking buf_LRU_get_free_block(). Allocating a block may initiate
a redo log write and therefore acquire log_sys.mutex. To avoid
deadlocks, log_sys.mutex must not be acquired while holding
recv_sys.mutex. */
free_block= buf_LRU_get_free_block(false);
if (!last_batch)
mysql_mutex_lock(&log_sys.mutex);
mysql_mutex_lock(&mutex);
pages_it= pages.begin();
}
const uint32_t space_id= page_id.space(); while (pages_it != pages.end())
auto d= deferred_spaces.defers.find(space_id);
if (d != deferred_spaces.defers.end())
{ {
if (d->second.deleted) if (is_corrupt_fs() || is_corrupt_log())
{ {
/* For deleted files we must preserve the entry in deferred_spaces */ if (space)
erase_for_space: space->release();
while (p != pages.end() && p->first.space() == space_id) mysql_mutex_unlock(&mutex);
if (free_block)
{ {
map::iterator r= p++; mysql_mutex_lock(&buf_pool.mutex);
r->second.log.clear(); buf_LRU_block_free_non_file_page(free_block);
pages.erase(r); mysql_mutex_unlock(&buf_pool.mutex);
} }
return;
} }
else if (recover_deferred(p, d->second.file_name, free_block)) if (apply_batch(space_id, space, free_block, last_batch))
{
if (!srv_force_recovery)
set_corrupt_fs();
deferred_spaces.defers.erase(d);
goto erase_for_space;
}
else
deferred_spaces.defers.erase(d);
if (!free_block)
goto next_free_block;
p= pages.lower_bound(page_id);
continue;
}
switch (p->second.state) {
case page_recv_t::RECV_BEING_READ:
case page_recv_t::RECV_BEING_PROCESSED:
p++;
continue;
case page_recv_t::RECV_WILL_NOT_READ:
if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block)))
{
next_free_block:
mysql_mutex_unlock(&mutex);
if (!last_batch)
mysql_mutex_unlock(&log_sys.mutex);
mysql_mutex_assert_not_owner(&log_sys.mutex);
free_block= buf_LRU_get_free_block(false);
if (!last_batch)
mysql_mutex_lock(&log_sys.mutex);
mysql_mutex_lock(&mutex);
break; break;
}
ut_ad(p == pages.end() || p->first > page_id);
continue;
case page_recv_t::RECV_NOT_PROCESSED:
recv_read_in_area(page_id, p);
} }
p= pages.lower_bound(page_id);
/* Ensure that progress will be made. */
ut_ad(p == pages.end() || p->first > page_id ||
p->second.state >= page_recv_t::RECV_BEING_READ);
} }
buf_pool.free_block(free_block); if (space)
space->release();
/* Wait until all the pages have been processed */ if (free_block)
for (;;)
{ {
const bool empty= pages.empty(); mysql_mutex_lock(&buf_pool.mutex);
if (empty && !os_aio_pending_reads()) buf_LRU_block_free_non_file_page(free_block);
break; mysql_mutex_unlock(&buf_pool.mutex);
if (!is_corrupt_fs() && !is_corrupt_log())
{
if (last_batch)
{
mysql_mutex_assert_not_owner(&log_sys.mutex);
if (!empty)
my_cond_wait(&cond, &mutex.m_mutex);
else
{
mysql_mutex_unlock(&mutex);
os_aio_wait_until_no_pending_reads(false);
mysql_mutex_lock(&mutex);
ut_ad(pages.empty());
}
}
else
{
mysql_mutex_unlock(&mutex);
set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
my_cond_timedwait(&cond, &log_sys.mutex.m_mutex, &abstime);
mysql_mutex_lock(&mutex);
}
continue;
}
if (is_corrupt_fs() && !srv_force_recovery)
ib::info() << "Set innodb_force_recovery=1 to ignore corrupted pages.";
mysql_mutex_unlock(&mutex);
return;
} }
} }
if (last_batch) if (last_batch)
/* We skipped this in buf_page_create(). */ {
mlog_init.mark_ibuf_exist(mtr); if (!recv_no_ibuf_operations)
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist();
mlog_init.clear();
}
else else
{ {
mlog_init.reset(); mlog_init.reset();
...@@ -3440,21 +3729,19 @@ void recv_sys_t::apply(bool last_batch) ...@@ -3440,21 +3729,19 @@ void recv_sys_t::apply(bool last_batch)
mysql_mutex_assert_not_owner(&log_sys.mutex); mysql_mutex_assert_not_owner(&log_sys.mutex);
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
srv_operation != SRV_OPERATION_RESTORE_EXPORT)
log_sort_flush_list();
else
{
/* Instead of flushing, last_batch could sort the buf_pool.flush_list
in ascending order of buf_page_t::oldest_modification. */
buf_flush_sync_batch(recovered_lsn);
}
if (!last_batch) if (!last_batch)
{ {
buf_flush_sync_batch(recovered_lsn);
buf_pool_invalidate(); buf_pool_invalidate();
mysql_mutex_lock(&log_sys.mutex); mysql_mutex_lock(&log_sys.mutex);
} }
else if (srv_operation == SRV_OPERATION_RESTORE ||
srv_operation == SRV_OPERATION_RESTORE_EXPORT)
buf_flush_sync_batch(recovered_lsn);
else
/* Instead of flushing, last_batch could sort the buf_pool.flush_list
in ascending order of buf_page_t::oldest_modification() */
log_sort_flush_list();
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
...@@ -3463,24 +3750,6 @@ void recv_sys_t::apply(bool last_batch) ...@@ -3463,24 +3750,6 @@ void recv_sys_t::apply(bool last_batch)
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
} }
/** Check whether the number of read redo log blocks exceeds the maximum.
Store last_stored_lsn if the recovery is not in the last phase.
@param[in,out] store whether to store page operations
@return whether the memory is exhausted */
inline bool recv_sys_t::is_memory_exhausted(store_t *store)
{
if (*store == STORE_NO ||
UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages())
return false;
if (*store == STORE_YES)
last_stored_lsn= recovered_lsn;
*store= STORE_NO;
DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
" last stored offset " ULINTPF "\n",
recovered_lsn, recovered_offset));
return true;
}
/** Adds data from a new log block to the parsing buffer of recv_sys if /** Adds data from a new log block to the parsing buffer of recv_sys if
recv_sys.parse_start_lsn is non-zero. recv_sys.parse_start_lsn is non-zero.
@param[in] log_block log block to add @param[in] log_block log block to add
...@@ -3588,7 +3857,7 @@ static bool recv_scan_log_recs( ...@@ -3588,7 +3857,7 @@ static bool recv_scan_log_recs(
bool more_data = false; bool more_data = false;
bool apply = recv_sys.mlog_checkpoint_lsn != 0; bool apply = recv_sys.mlog_checkpoint_lsn != 0;
ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE; ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
const bool last_phase = (*store == STORE_IF_EXISTS); const store_t old_store = *store;
ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE); ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
...@@ -3710,8 +3979,8 @@ static bool recv_scan_log_recs( ...@@ -3710,8 +3979,8 @@ static bool recv_scan_log_recs(
} }
/* During last phase of scanning, there can be redo logs /* During last phase of scanning, there can be redo logs
left in recv_sys.buf to parse & store it in recv_sys.heap */ left in recv_sys.buf to parse & store it in recv_sys.pages */
if (last_phase if (old_store == STORE_IF_EXISTS
&& recv_sys.recovered_lsn < recv_sys.scanned_lsn) { && recv_sys.recovered_lsn < recv_sys.scanned_lsn) {
more_data = true; more_data = true;
} }
...@@ -3732,33 +4001,21 @@ static bool recv_scan_log_recs( ...@@ -3732,33 +4001,21 @@ static bool recv_scan_log_recs(
if (more_data && !recv_sys.is_corrupt_log()) { if (more_data && !recv_sys.is_corrupt_log()) {
/* Try to parse more log records */ /* Try to parse more log records */
if (recv_sys.parse(checkpoint_lsn, store, apply)) { if (recv_sys.parse(checkpoint_lsn, store, apply)) {
finished = true;
ut_ad(recv_sys.is_corrupt_log() ut_ad(recv_sys.is_corrupt_log()
|| recv_sys.is_corrupt_fs() || recv_sys.is_corrupt_fs()
|| recv_sys.mlog_checkpoint_lsn || recv_sys.mlog_checkpoint_lsn
== recv_sys.recovered_lsn); == recv_sys.recovered_lsn);
finished = true; } else if (recv_sys.recovered_offset
goto func_exit; > recv_parsing_buf_size / 4
} || (recv_sys.recovered_offset
&& recv_sys.len
recv_sys.is_memory_exhausted(store); >= recv_parsing_buf_size - RECV_SCAN_SIZE)) {
if (recv_sys.recovered_offset > recv_parsing_buf_size / 4
|| (recv_sys.recovered_offset
&& recv_sys.len
>= recv_parsing_buf_size - RECV_SCAN_SIZE)) {
/* Move parsing buffer data to the buffer start */ /* Move parsing buffer data to the buffer start */
recv_sys_justify_left_parsing_buf(); recv_sys_justify_left_parsing_buf();
} }
/* Need to re-parse the redo log which're stored
in recv_sys.buf */
if (last_phase && *store == STORE_NO) {
finished = false;
}
} }
func_exit:
recv_sys.maybe_finish_batch();
mysql_mutex_unlock(&recv_sys.mutex); mysql_mutex_unlock(&recv_sys.mutex);
return(finished); return(finished);
} }
...@@ -3802,13 +4059,6 @@ recv_group_scan_log_recs( ...@@ -3802,13 +4059,6 @@ recv_group_scan_log_recs(
ut_d(recv_sys.after_apply = last_phase); ut_d(recv_sys.after_apply = last_phase);
do { do {
if (last_phase && store == STORE_NO) {
store = STORE_IF_EXISTS;
recv_sys.apply(false);
/* Rescan the redo logs from last stored lsn */
end_lsn = recv_sys.recovered_lsn;
}
start_lsn = ut_uint64_align_down(end_lsn, start_lsn = ut_uint64_align_down(end_lsn,
OS_FILE_LOG_BLOCK_SIZE); OS_FILE_LOG_BLOCK_SIZE);
end_lsn = start_lsn; end_lsn = start_lsn;
...@@ -3913,8 +4163,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3913,8 +4163,8 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
/* fall through */ /* fall through */
case file_name_t::DELETED: case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++; recv_sys_t::map::iterator r = p++;
r->second.log.clear(); recv_sys.pages_it_invalidate(r);
recv_sys.pages.erase(r); recv_sys.erase(r);
continue; continue;
} }
ut_ad(0); ut_ad(0);
...@@ -3938,8 +4188,6 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3938,8 +4188,6 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
continue; continue;
} }
missing_tablespace = true;
if (srv_force_recovery > 0) { if (srv_force_recovery > 0) {
ib::warn() << "Tablespace " << rs.first ib::warn() << "Tablespace " << rs.first
<<" was not found at " << rs.second.name <<" was not found at " << rs.second.name
...@@ -3954,14 +4202,11 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace) ...@@ -3954,14 +4202,11 @@ recv_validate_tablespace(bool rescan, bool& missing_tablespace)
<< " was not found at '" << " was not found at '"
<< rs.second.name << "', but there" << rs.second.name << "', but there"
<<" were no modifications either."; <<" were no modifications either.";
} else {
missing_tablespace = true;
} }
} }
if (!rescan || srv_force_recovery > 0) {
missing_tablespace = false;
}
err = DB_SUCCESS;
goto func_exit; goto func_exit;
} }
...@@ -4344,6 +4589,12 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn) ...@@ -4344,6 +4589,12 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
return(DB_ERROR); return(DB_ERROR);
} }
ut_ad(contiguous_lsn <= recv_sys.recovered_lsn);
ut_ad(recv_sys.scanned_lsn == recv_sys.scanned_lsn);
log_sys.set_lsn(recv_sys.recovered_lsn);
log_sys.set_flushed_lsn(recv_sys.recovered_lsn);
/* In case of multi-batch recovery, /* In case of multi-batch recovery,
redo log for the last batch is not redo log for the last batch is not
applied yet. */ applied yet. */
......
...@@ -3431,15 +3431,12 @@ os_file_get_status( ...@@ -3431,15 +3431,12 @@ os_file_get_status(
return(ret); return(ret);
} }
static void io_callback_errorcheck(const tpool::aiocb *cb)
extern void fil_aio_callback(const IORequest &request);
static void io_callback(tpool::aiocb *cb)
{ {
const IORequest &request= *static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata));
if (cb->m_err != DB_SUCCESS) if (cb->m_err != DB_SUCCESS)
{ {
const IORequest &request= *static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata));
ib::fatal() << "IO Error: " << cb->m_err << " during " << ib::fatal() << "IO Error: " << cb->m_err << " during " <<
(request.is_async() ? "async " : "sync ") << (request.is_async() ? "async " : "sync ") <<
(request.is_LRU() ? "lru " : "") << (request.is_LRU() ? "lru " : "") <<
...@@ -3447,19 +3444,36 @@ static void io_callback(tpool::aiocb *cb) ...@@ -3447,19 +3444,36 @@ static void io_callback(tpool::aiocb *cb)
" of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " << " of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " <<
cb->m_ret_len; cb->m_ret_len;
} }
/* Return cb back to cache*/ }
if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
{ static void fake_io_callback(void *c)
ut_ad(read_slots->contains(cb)); {
fil_aio_callback(request); tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
read_slots->release(cb); ut_ad(read_slots->contains(cb));
} static_cast<const IORequest*>(static_cast<const void*>(cb->m_userdata))->
else fake_read_complete(cb->m_offset);
{ read_slots->release(cb);
ut_ad(write_slots->contains(cb)); }
fil_aio_callback(request);
write_slots->release(cb); static void read_io_callback(void *c)
} {
tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PREAD);
io_callback_errorcheck(cb);
ut_ad(read_slots->contains(cb));
static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata))->read_complete();
read_slots->release(cb);
}
static void write_io_callback(void *c)
{
tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
ut_ad(write_slots->contains(cb));
static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata))->write_complete();
write_slots->release(cb);
} }
#ifdef LINUX_NATIVE_AIO #ifdef LINUX_NATIVE_AIO
...@@ -3704,6 +3718,28 @@ void os_aio_wait_until_no_pending_reads(bool declare) ...@@ -3704,6 +3718,28 @@ void os_aio_wait_until_no_pending_reads(bool declare)
tpool::tpool_wait_end(); tpool::tpool_wait_end();
} }
/** Submit a fake read request during crash recovery.
@param type fake read request
@param offset additional context */
void os_fake_read(const IORequest &type, os_offset_t offset)
{
tpool::aiocb *cb= read_slots->acquire();
cb->m_group= read_slots->get_task_group();
cb->m_fh= type.node->handle.m_file;
cb->m_buffer= nullptr;
cb->m_len= 0;
cb->m_offset= offset;
cb->m_opcode= tpool::aio_opcode::AIO_PREAD;
new (cb->m_userdata) IORequest{type};
cb->m_internal_task.m_func= fake_io_callback;
cb->m_internal_task.m_arg= cb;
cb->m_internal_task.m_group= cb->m_group;
srv_thread_pool->submit_task(&cb->m_internal_task);
}
/** Request a read or write. /** Request a read or write.
@param type I/O request @param type I/O request
@param buf buffer @param buf buffer
...@@ -3748,23 +3784,32 @@ dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n) ...@@ -3748,23 +3784,32 @@ dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n)
return err; return err;
} }
io_slots* slots;
tpool::callback_func callback;
tpool::aio_opcode opcode;
if (type.is_read()) { if (type.is_read()) {
++os_n_file_reads; ++os_n_file_reads;
slots = read_slots;
callback = read_io_callback;
opcode = tpool::aio_opcode::AIO_PREAD;
} else { } else {
++os_n_file_writes; ++os_n_file_writes;
slots = write_slots;
callback = write_io_callback;
opcode = tpool::aio_opcode::AIO_PWRITE;
} }
compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN); compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN);
io_slots* slots= type.is_read() ? read_slots : write_slots;
tpool::aiocb* cb = slots->acquire(); tpool::aiocb* cb = slots->acquire();
cb->m_buffer = buf; cb->m_buffer = buf;
cb->m_callback = (tpool::callback_func)io_callback; cb->m_callback = callback;
cb->m_group = slots->get_task_group(); cb->m_group = slots->get_task_group();
cb->m_fh = type.node->handle.m_file; cb->m_fh = type.node->handle.m_file;
cb->m_len = (int)n; cb->m_len = (int)n;
cb->m_offset = offset; cb->m_offset = offset;
cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; cb->m_opcode = opcode;
new (cb->m_userdata) IORequest{type}; new (cb->m_userdata) IORequest{type};
ut_a(reinterpret_cast<size_t>(cb->m_buffer) % OS_FILE_LOG_BLOCK_SIZE ut_a(reinterpret_cast<size_t>(cb->m_buffer) % OS_FILE_LOG_BLOCK_SIZE
...@@ -3777,6 +3822,7 @@ dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n) ...@@ -3777,6 +3822,7 @@ dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n)
os_file_handle_error(type.node->name, type.is_read() os_file_handle_error(type.node->name, type.is_read()
? "aio read" : "aio write"); ? "aio read" : "aio write");
err = DB_IO_ERROR; err = DB_IO_ERROR;
type.node->space->release();
} }
goto func_exit; goto func_exit;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment