Commit a202371f authored by Vlad Lesin's avatar Vlad Lesin

MDEV-33757 Get rid of TrxUndoRsegs code

Post-push fix: purge queue array can't be fixed size, because the elements
of the array is the analogue of undo logs, which must be processed in
the order of transaction commits, and the array can contain more
elements, than trx_sys.rseg_array. Also it's necessary to maintain
min-heap property by the trx_no of transaction, which produced the first
non-purged undo log in all rsegs. That's why the element of purge queue
aray must contain not only trx_sys.rseg_array index, but also trx_no of
committed transacion, i.e. the pair (trx_no, trx_sys.rseg_array index),
which is encoded as uint64_t((trx_no << 8) | (trx_sys.rseg_array index)).

Reviewed by: Marko Mäkelä
parent 9a4991a0
......@@ -58,62 +58,47 @@ ulint trx_purge(ulint n_tasks, ulint history_size);
/** The control structure used in the purge operation */
class purge_sys_t
{
/** Min-heap based priority queue over fixed size array */
/** Min-heap based priority queue of (trx_no, trx_sys.rseg_array index)
pairs, ordered on trx_no. The highest 64-TRX_NO_SHIFT bits of each element is
trx_no, the lowest 8 bits is rseg's index in trx_sys.rseg_array. */
class purge_queue
{
/** Array of indexes in trx_sys.rseg_array. */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) byte m_array[TRX_SYS_N_RSEGS];
/** Pointer to the end of m_array. */
byte *m_end= m_array;
public:
struct trx_rseg_cmp
{
/** Compare two trx_rseg_t* based on trx_no.
@param lhs first index in trx_sys.rseg_array to compare
@param rhs second index in trx_sys.rseg_array to compare
@return whether lhs>rhs */
bool operator()(const byte lhs, const byte rhs)
{
ut_ad(lhs < TRX_SYS_N_RSEGS);
ut_ad(rhs < TRX_SYS_N_RSEGS);
/* We can compare without trx_rseg_t::latch, because rseg last
commit is always set before pushing rseg to purge queue. */
return trx_sys.rseg_array[lhs].last_commit_and_offset >
trx_sys.rseg_array[rhs].last_commit_and_offset;
}
};
byte *begin() { return m_array; }
byte *end() { return m_end; }
const byte *c_begin() const { return m_array; }
const byte *c_end() const { return m_end; }
size_t size() const
{
size_t s= c_end() - c_begin();
ut_ad(s <= TRX_SYS_N_RSEGS);
return s;
}
bool empty() const { return !size(); }
void clear() { m_end= m_array; }
typedef std::vector<uint64_t, ut_allocator<uint64_t>> container_type;
/** Number of bits reseved to shift trx_no in purge queue element */
static constexpr unsigned TRX_NO_SHIFT= 8;
bool empty() const { return m_array.empty(); }
void clear() { m_array.clear(); }
/** Push index of trx_sys.rseg_array into min-heap.
@param i index to push */
void push_rseg_index(byte i)
/** Push (trx_no, trx_sys.rseg_array index) into min-heap.
@param trx_no_rseg (trx_no << TRX_NO_SHIFT | (trx_sys.rseg_array index)) */
void push_trx_no_rseg(container_type::value_type trx_no_rseg)
{
ut_ad(i < TRX_SYS_N_RSEGS);
ut_ad(size() + 1 <= TRX_SYS_N_RSEGS);
*m_end++= i;
std::push_heap(begin(), end(), trx_rseg_cmp());
m_array.push_back(trx_no_rseg);
std::push_heap(m_array.begin(), m_array.end(),
std::greater<container_type::value_type>());
}
/** Push rseg to priority queue.
@param rseg trx_rseg_t pointer to push */
void push(const trx_rseg_t *rseg)
@param trx_no trx_no of committed transaction
@param rseg rseg of committed transaction*/
void push(trx_id_t trx_no, const trx_rseg_t &rseg)
{
ut_ad(rseg >= trx_sys.rseg_array);
ut_ad(rseg < trx_sys.rseg_array + TRX_SYS_N_RSEGS);
byte i= byte(rseg - trx_sys.rseg_array);
push_rseg_index(i);
ut_ad(trx_no < 1ULL << (DATA_TRX_ID_LEN * CHAR_BIT));
ut_ad(&rseg >= trx_sys.rseg_array);
ut_ad(&rseg < trx_sys.rseg_array + TRX_SYS_N_RSEGS);
push_trx_no_rseg(trx_no << TRX_NO_SHIFT |
byte(&rseg - trx_sys.rseg_array));
}
/** Extracts rseg from (trx_no, trx_sys.rseg_array index) pair.
@param trx_no_rseg (trx_no << TRX_NO_SHIFT | (trx_sys.rseg_array index)
@return pointer to rseg in trx_sys.rseg_array */
static trx_rseg_t *rseg(container_type::value_type trx_no_rseg) {
byte i= static_cast<byte>(trx_no_rseg);
ut_ad(i < TRX_SYS_N_RSEGS);
return &trx_sys.rseg_array[i];
}
/** Pop rseg from priority queue.
......@@ -121,13 +106,23 @@ class purge_sys_t
trx_rseg_t *pop()
{
ut_ad(!empty());
std::pop_heap(begin(), end(), trx_rseg_cmp());
byte i= *--m_end;
ut_ad(i < TRX_SYS_N_RSEGS);
return &trx_sys.rseg_array[i];
std::pop_heap(m_array.begin(), m_array.end(),
std::greater<container_type::value_type>());
trx_rseg_t *r = rseg(m_array.back());
m_array.pop_back();
return r;
}
/** Clone m_array.
@return m_array clone */
container_type clone_container() const{ return m_array; }
private:
/** Array of (trx_no, trx_sys.rseg_array index) pairs. */
container_type m_array;
};
public:
/** latch protecting view, m_enabled */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock latch;
......@@ -243,20 +238,29 @@ class purge_sys_t
record */
uint16_t hdr_offset; /*!< Header byte offset on the page */
/** Binary min-heap of indexes in trx_sys.rseg_array, ordered on
rseg_t::last_trx_no(). It is protected by the pq_mutex */
/** Binary min-heap of (trx_no, trx_sys.rseg_array index) pairs, ordered on
trx_no. It is protected by the pq_mutex */
purge_queue purge_queue;
/** Mutex protecting purge_queue */
mysql_mutex_t pq_mutex;
public:
void enqueue(trx_id_t trx_no, const trx_rseg_t &rseg) {
mysql_mutex_assert_owner(&pq_mutex);
purge_queue.push(trx_no, rseg);
}
/** Push to purge queue without acquiring pq_mutex.
@param rseg rseg to push */
void enqueue(trx_rseg_t &rseg)
{
void enqueue(const trx_rseg_t &rseg) { enqueue(rseg.last_trx_no(), rseg); }
/** Clone purge queue container.
@return purge queue container clone */
purge_queue::container_type clone_queue_container() const {
mysql_mutex_assert_owner(&pq_mutex);
purge_queue.push(&rseg);
return purge_queue.clone_container();
}
/** Acquare purge_queue_mutex */
......
......@@ -179,7 +179,7 @@ struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) trx_rseg_t
/** @return header offset of the last committed transaction */
uint16_t last_offset() const
{
return static_cast<uint16_t>(last_commit_and_offset & ((1ULL << 16) - 1));
return static_cast<uint16_t>(last_commit_and_offset);
}
void set_last_commit(uint16_t last_offset, trx_id_t trx_no)
......
......@@ -495,14 +495,12 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
void purge_sys_t::cleanse_purge_queue(const fil_space_t &space)
{
byte purge_elem_list[TRX_SYS_N_RSEGS];
mysql_mutex_lock(&pq_mutex);
std::copy(purge_queue.c_begin(), purge_queue.c_end(), purge_elem_list);
byte *purge_list_end = purge_elem_list + purge_queue.size();
auto purge_elem_list= clone_queue_container();
purge_queue.clear();
for (byte *elem = purge_elem_list; elem < purge_list_end; ++elem)
if (trx_sys.rseg_array[*elem].space != &space)
purge_queue.push_rseg_index(*elem);
for (auto elem : purge_elem_list)
if (purge_queue::rseg(elem)->space != &space)
purge_queue.push_trx_no_rseg(elem);
mysql_mutex_unlock(&pq_mutex);
}
......@@ -814,7 +812,7 @@ bool purge_sys_t::rseg_get_next_history_log()
can never produce events from an empty rollback segment. */
mysql_mutex_lock(&pq_mutex);
purge_queue.push(rseg);
enqueue(*rseg);
mysql_mutex_unlock(&pq_mutex);
}
}
......
......@@ -1153,7 +1153,7 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
/* end cannot be less than anything in rseg. User threads only
produce events when a rollback segment is empty. */
rseg->set_last_commit(undo->hdr_offset, end);
purge_sys.enqueue(*rseg);
purge_sys.enqueue(end, *rseg);
purge_sys.queue_unlock();
}
else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment