trx0purge.c 28.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/******************************************************
Purge old versions

(c) 1996 Innobase Oy

Created 3/26/1996 Heikki Tuuri
*******************************************************/

#include "trx0purge.h"

#ifdef UNIV_NONINL
#include "trx0purge.ic"
#endif

#include "fsp0fsp.h"
#include "mach0data.h"
#include "trx0rseg.h"
#include "trx0trx.h"
#include "trx0roll.h"
#include "read0read.h"
#include "fut0fut.h"
#include "que0que.h"
#include "row0purge.h"
#include "row0upd.h"
#include "trx0rec.h"
26
#include "srv0que.h"
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
#include "os0thread.h"

/* The global data structure coordinating a purge */
trx_purge_t*	purge_sys = NULL;

/* A dummy undo record used as a return value when we have a whole undo log
which needs no purge */
trx_undo_rec_t	trx_purge_dummy_rec;

/*********************************************************************
Checks if trx_id is >= purge_view: then it is guaranteed that its update
undo log still exists in the system. */

ibool
trx_purge_update_undo_must_exist(
/*=============================*/
			/* out: TRUE if is sure that it is preserved, also
			if the function returns FALSE, it is possible that
			the undo log still exists in the system */
	dulint	trx_id)	/* in: transaction id */
{
48
#ifdef UNIV_SYNC_DEBUG
49
	ut_ad(rw_lock_own(&(purge_sys->latch), RW_LOCK_SHARED));
50
#endif /* UNIV_SYNC_DEBUG */
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76

	if (!read_view_sees_trx_id(purge_sys->view, trx_id)) {

		return(TRUE);
	}

	return(FALSE);
}

/*=================== PURGE RECORD ARRAY =============================*/

/***********************************************************************
Stores info of an undo log record during a purge. */
static
trx_undo_inf_t*
trx_purge_arr_store_info(
/*=====================*/
			/* out: pointer to the storage cell */
	dulint	trx_no,	/* in: transaction number */
	dulint	undo_no)/* in: undo number */
{
	trx_undo_inf_t*	cell;
	trx_undo_arr_t*	arr;
	ulint		i;

	arr = purge_sys->arr;
77

78 79 80 81 82 83 84 85
	for (i = 0;; i++) {
		cell = trx_undo_arr_get_nth_info(arr, i);

		if (!(cell->in_use)) {
			/* Not in use, we may store here */
			cell->undo_no = undo_no;
			cell->trx_no = trx_no;
			cell->in_use = TRUE;
86

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
			arr->n_used++;

			return(cell);
		}
	}
}

/***********************************************************************
Removes info of an undo log record during a purge. */
UNIV_INLINE
void
trx_purge_arr_remove_info(
/*======================*/
	trx_undo_inf_t*	cell)	/* in: pointer to the storage cell */
{
	trx_undo_arr_t*	arr;

104
	arr = purge_sys->arr;
105 106

	cell->in_use = FALSE;
107

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
	ut_ad(arr->n_used > 0);

	arr->n_used--;
}

/***********************************************************************
Gets the biggest pair of a trx number and an undo number in a purge array. */
static
void
trx_purge_arr_get_biggest(
/*======================*/
	trx_undo_arr_t*	arr,	/* in: purge array */
	dulint*		trx_no,	/* out: transaction number: ut_dulint_zero
				if array is empty */
	dulint*		undo_no)/* out: undo number */
{
	trx_undo_inf_t*	cell;
	dulint		pair_trx_no;
	dulint		pair_undo_no;
	int		trx_cmp;
	ulint		n_used;
	ulint		i;
	ulint		n;
131

132 133 134 135
	n = 0;
	n_used = arr->n_used;
	pair_trx_no = ut_dulint_zero;
	pair_undo_no = ut_dulint_zero;
136

137 138 139 140 141
	for (i = 0;; i++) {
		cell = trx_undo_arr_get_nth_info(arr, i);

		if (cell->in_use) {
			n++;
142
			trx_cmp = ut_dulint_cmp(cell->trx_no, pair_trx_no);
143 144

			if ((trx_cmp > 0)
unknown's avatar
unknown committed
145 146 147
			    || ((trx_cmp == 0)
				&& (ut_dulint_cmp(cell->undo_no,
						  pair_undo_no) >= 0))) {
148

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
				pair_trx_no = cell->trx_no;
				pair_undo_no = cell->undo_no;
			}
		}

		if (n == n_used) {
			*trx_no = pair_trx_no;
			*undo_no = pair_undo_no;

			return;
		}
	}
}

/********************************************************************
Builds a purge 'query' graph. The actual purge is performed by executing
this query graph. */
static
que_t*
trx_purge_graph_build(void)
/*=======================*/
				/* out, own: the query graph */
{
	mem_heap_t*	heap;
	que_fork_t*	fork;
	que_thr_t*	thr;
unknown's avatar
unknown committed
175
	/*	que_thr_t*	thr2; */
176

177 178 179
	heap = mem_heap_create(512);
	fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
	fork->trx = purge_sys->trx;
180

181 182
	thr = que_thr_create(fork, heap);

183
	thr->child = row_purge_node_create(thr, heap);
184

unknown's avatar
unknown committed
185
	/*	thr2 = que_thr_create(fork, fork, heap);
186

187
	thr2->child = row_purge_node_create(fork, thr2, heap);	 */
188 189 190 191 192 193 194 195 196 197 198 199

	return(fork);
}

/************************************************************************
Creates the global purge system control structure and inits the history
mutex. */

void
trx_purge_sys_create(void)
/*======================*/
{
200
#ifdef UNIV_SYNC_DEBUG
201
	ut_ad(mutex_own(&kernel_mutex));
202
#endif /* UNIV_SYNC_DEBUG */
203 204 205 206 207 208 209 210 211 212

	purge_sys = mem_alloc(sizeof(trx_purge_t));

	purge_sys->state = TRX_STOP_PURGE;

	purge_sys->n_pages_handled = 0;

	purge_sys->purge_trx_no = ut_dulint_zero;
	purge_sys->purge_undo_no = ut_dulint_zero;
	purge_sys->next_stored = FALSE;
213

unknown's avatar
unknown committed
214
	rw_lock_create(&purge_sys->latch, SYNC_PURGE_LATCH);
215

unknown's avatar
unknown committed
216
	mutex_create(&purge_sys->mutex, SYNC_PURGE_SYS);
217 218 219 220 221

	purge_sys->heap = mem_heap_create(256);

	purge_sys->arr = trx_undo_arr_create();

unknown's avatar
unknown committed
222
	purge_sys->sess = sess_open();
223

unknown's avatar
unknown committed
224
	purge_sys->trx = purge_sys->sess->trx;
225

unknown's avatar
unknown committed
226
	purge_sys->trx->type = TRX_PURGE;
227 228 229 230

	ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED));

	purge_sys->query = trx_purge_graph_build();
231

unknown's avatar
unknown committed
232 233
	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
							    purge_sys->heap);
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
}

/*================ UNDO LOG HISTORY LIST =============================*/

/************************************************************************
Adds the update undo log as the first log in the history list. Removes the
update undo log segment from the rseg slot if it is too big for reuse. */

void
trx_purge_add_update_undo_to_history(
/*=================================*/
	trx_t*	trx,		/* in: transaction */
	page_t*	undo_page,	/* in: update undo log header page,
				x-latched */
	mtr_t*	mtr)		/* in: mtr */
{
	trx_undo_t*	undo;
	trx_rseg_t*	rseg;
	trx_rsegf_t*	rseg_header;
	trx_usegf_t*	seg_header;
	trx_ulogf_t*	undo_header;
	trx_upagef_t*	page_header;
	ulint		hist_size;
257

258
	undo = trx->update_undo;
259

260
	ut_ad(undo);
261

262
	rseg = undo->rseg;
263
#ifdef UNIV_SYNC_DEBUG
264
	ut_ad(mutex_own(&(rseg->mutex)));
265
#endif /* UNIV_SYNC_DEBUG */
266 267 268 269 270 271

	rseg_header = trx_rsegf_get(rseg->space, rseg->page_no, mtr);

	undo_header = undo_page + undo->hdr_offset;
	seg_header  = undo_page + TRX_UNDO_SEG_HDR;
	page_header = undo_page + TRX_UNDO_PAGE_HDR;
272

273 274 275
	if (undo->state != TRX_UNDO_CACHED) {
		/* The undo log segment will not be reused */

276 277
		if (undo->id >= TRX_RSEG_N_SLOTS) {
			fprintf(stderr,
unknown's avatar
unknown committed
278 279
				"InnoDB: Error: undo->id is %lu\n",
				(ulong) undo->id);
280
			ut_error;
281 282
		}

283 284 285
		trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);

		hist_size = mtr_read_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
286 287 288
					   MLOG_4BYTES, mtr);
		ut_ad(undo->size == flst_get_len
		      (seg_header + TRX_UNDO_PAGE_LIST, mtr));
289 290

		mlog_write_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
291
				 hist_size + undo->size, MLOG_4BYTES, mtr);
292 293 294 295
	}

	/* Add the log as the first in the history list */
	flst_add_first(rseg_header + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
296
		       undo_header + TRX_UNDO_HISTORY_NODE, mtr);
297 298 299
	mutex_enter(&kernel_mutex);
	trx_sys->rseg_history_len++;
	mutex_exit(&kernel_mutex);
300 301

	/* Write the trx number to the undo log header */
302
	mlog_write_dulint(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
303
	/* Write information about delete markings to the undo log header */
304

305 306
	if (!undo->del_marks) {
		mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE,
unknown's avatar
unknown committed
307
				 MLOG_2BYTES, mtr);
308
	}
309

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
	if (rseg->last_page_no == FIL_NULL) {

		rseg->last_page_no = undo->hdr_page_no;
		rseg->last_offset = undo->hdr_offset;
		rseg->last_trx_no = trx->no;
		rseg->last_del_marks = undo->del_marks;
	}
}

/**************************************************************************
Frees an undo log segment which is in the history list. Cuts the end of the
history list at the youngest undo log in this segment. */
static
void
trx_purge_free_segment(
/*===================*/
	trx_rseg_t*	rseg,		/* in: rollback segment */
	fil_addr_t	hdr_addr,	/* in: the file address of log_hdr */
	ulint		n_removed_logs)	/* in: count of how many undo logs we
					will cut off from the end of the
					history list */
{
	page_t*		undo_page;
	trx_rsegf_t*	rseg_hdr;
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	ibool		freed;
	ulint		seg_size;
	ulint		hist_size;
	ibool		marked		= FALSE;
	mtr_t		mtr;
341

unknown's avatar
unknown committed
342
	/*	fputs("Freeing an update undo log segment\n", stderr); */
343

344
#ifdef UNIV_SYNC_DEBUG
345
	ut_ad(mutex_own(&(purge_sys->mutex)));
346
#endif /* UNIV_SYNC_DEBUG */
347
loop:
348
	mtr_start(&mtr);
349 350
	mutex_enter(&(rseg->mutex));

351 352 353 354 355 356 357 358 359 360 361 362 363 364
	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

	undo_page = trx_undo_page_get(rseg->space, hdr_addr.page, &mtr);
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
	log_hdr = undo_page + hdr_addr.boffset;

	/* Mark the last undo log totally purged, so that if the system
	crashes, the tail of the undo log will not get accessed again. The
	list of pages in the undo log tail gets inconsistent during the
	freeing of the segment, and therefore purge should not try to access
	them again. */

	if (!marked) {
		mlog_write_ulint(log_hdr + TRX_UNDO_DEL_MARKS, FALSE,
unknown's avatar
unknown committed
365
				 MLOG_2BYTES, &mtr);
366 367
		marked = TRUE;
	}
368

369
	freed = fseg_free_step_not_header(seg_hdr + TRX_UNDO_FSEG_HEADER,
unknown's avatar
unknown committed
370
					  &mtr);
371
	if (!freed) {
372
		mutex_exit(&(rseg->mutex));
373 374 375 376 377 378 379 380
		mtr_commit(&mtr);

		goto loop;
	}

	/* The page list may now be inconsistent, but the length field
	stored in the list base node tells us how big it was before we
	started the freeing. */
381

382 383 384 385 386 387 388 389
	seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST, &mtr);

	/* We may free the undo log segment header page; it must be freed
	within the same mtr as the undo log header is removed from the
	history list: otherwise, in case of a database crash, the segment
	could become inaccessible garbage in the file space. */

	flst_cut_end(rseg_hdr + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
390
		     log_hdr + TRX_UNDO_HISTORY_NODE, n_removed_logs, &mtr);
391 392 393 394 395 396

	mutex_enter(&kernel_mutex);
	ut_ad(trx_sys->rseg_history_len >= n_removed_logs);
	trx_sys->rseg_history_len -= n_removed_logs;
	mutex_exit(&kernel_mutex);

397 398 399 400 401 402 403
	freed = FALSE;

	while (!freed) {
		/* Here we assume that a file segment with just the header
		page can be freed in a few steps, so that the buffer pool
		is not flooded with bufferfixed pages: see the note in
		fsp0fsp.c. */
404

405
		freed = fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER,
unknown's avatar
unknown committed
406
				       &mtr);
407 408 409
	}

	hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
410
				   MLOG_4BYTES, &mtr);
411 412 413
	ut_ad(hist_size >= seg_size);

	mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
unknown's avatar
unknown committed
414
			 hist_size - seg_size, MLOG_4BYTES, &mtr);
415 416

	ut_ad(rseg->curr_size >= seg_size);
417

418 419
	rseg->curr_size -= seg_size;

420
	mutex_exit(&(rseg->mutex));
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447

	mtr_commit(&mtr);
}

/************************************************************************
Removes unnecessary history data from a rollback segment. */
static
void
trx_purge_truncate_rseg_history(
/*============================*/
	trx_rseg_t*	rseg,		/* in: rollback segment */
	dulint		limit_trx_no,	/* in: remove update undo logs whose
					trx number is < limit_trx_no */
	dulint		limit_undo_no)	/* in: if transaction number is equal
					to limit_trx_no, truncate undo records
					with undo number < limit_undo_no */
{
	fil_addr_t	hdr_addr;
	fil_addr_t	prev_hdr_addr;
	trx_rsegf_t*	rseg_hdr;
	page_t*		undo_page;
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	int		cmp;
	ulint		n_removed_logs	= 0;
	mtr_t		mtr;

448
#ifdef UNIV_SYNC_DEBUG
449
	ut_ad(mutex_own(&(purge_sys->mutex)));
450
#endif /* UNIV_SYNC_DEBUG */
451 452

	mtr_start(&mtr);
453 454
	mutex_enter(&(rseg->mutex));

455 456
	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

unknown's avatar
unknown committed
457 458
	hdr_addr = trx_purge_get_log_from_hist
		(flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
459 460 461
loop:
	if (hdr_addr.page == FIL_NULL) {

462
		mutex_exit(&(rseg->mutex));
463 464 465 466 467 468 469 470 471 472

		mtr_commit(&mtr);

		return;
	}

	undo_page = trx_undo_page_get(rseg->space, hdr_addr.page, &mtr);

	log_hdr = undo_page + hdr_addr.boffset;

473
	cmp = ut_dulint_cmp(mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO),
unknown's avatar
unknown committed
474
			    limit_trx_no);
475 476 477 478 479 480
	if (cmp == 0) {
		trx_undo_truncate_start(rseg, rseg->space, hdr_addr.page,
					hdr_addr.boffset, limit_undo_no);
	}

	if (cmp >= 0) {
481 482 483 484 485
		mutex_enter(&kernel_mutex);
		ut_a(trx_sys->rseg_history_len >= n_removed_logs);
		trx_sys->rseg_history_len -= n_removed_logs;
		mutex_exit(&kernel_mutex);

486
		flst_truncate_end(rseg_hdr + TRX_RSEG_HISTORY,
unknown's avatar
unknown committed
487 488
				  log_hdr + TRX_UNDO_HISTORY_NODE,
				  n_removed_logs, &mtr);
489

490
		mutex_exit(&(rseg->mutex));
491 492 493 494 495
		mtr_commit(&mtr);

		return;
	}

unknown's avatar
unknown committed
496 497
	prev_hdr_addr = trx_purge_get_log_from_hist
		(flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
498
	n_removed_logs++;
499

500 501 502
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;

	if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
unknown's avatar
unknown committed
503
	    && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
504 505 506

		/* We can free the whole log segment */

507
		mutex_exit(&(rseg->mutex));
508
		mtr_commit(&mtr);
509

510 511 512 513
		trx_purge_free_segment(rseg, hdr_addr, n_removed_logs);

		n_removed_logs = 0;
	} else {
514
		mutex_exit(&(rseg->mutex));
515 516 517 518
		mtr_commit(&mtr);
	}

	mtr_start(&mtr);
519
	mutex_enter(&(rseg->mutex));
520 521 522 523

	rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);

	hdr_addr = prev_hdr_addr;
524

525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
	goto loop;
}

/************************************************************************
Removes unnecessary history data from rollback segments. NOTE that when this
function is called, the caller must not have any latches on undo log pages! */
static
void
trx_purge_truncate_history(void)
/*============================*/
{
	trx_rseg_t*	rseg;
	dulint		limit_trx_no;
	dulint		limit_undo_no;

540
#ifdef UNIV_SYNC_DEBUG
541
	ut_ad(mutex_own(&(purge_sys->mutex)));
542
#endif /* UNIV_SYNC_DEBUG */
543 544

	trx_purge_arr_get_biggest(purge_sys->arr, &limit_trx_no,
unknown's avatar
unknown committed
545
				  &limit_undo_no);
546

547
	if (ut_dulint_cmp(limit_trx_no, ut_dulint_zero) == 0) {
548

549 550 551 552 553 554 555
		limit_trx_no = purge_sys->purge_trx_no;
		limit_undo_no = purge_sys->purge_undo_no;
	}

	/* We play safe and set the truncate limit at most to the purge view
	low_limit number, though this is not necessary */

unknown's avatar
Merge  
unknown committed
556 557
	if (ut_dulint_cmp(limit_trx_no, purge_sys->view->low_limit_no) >= 0) {
		limit_trx_no = purge_sys->view->low_limit_no;
558 559 560 561
		limit_undo_no = ut_dulint_zero;
	}

	ut_ad((ut_dulint_cmp(limit_trx_no,
unknown's avatar
unknown committed
562
			     purge_sys->view->low_limit_no) <= 0));
563 564 565 566 567

	rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);

	while (rseg) {
		trx_purge_truncate_rseg_history(rseg, limit_trx_no,
unknown's avatar
unknown committed
568
						limit_undo_no);
569 570 571 572 573 574 575 576 577 578 579 580 581
		rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
	}
}

/************************************************************************
Does a truncate if the purge array is empty. NOTE that when this function is
called, the caller must not have any latches on undo log pages! */
UNIV_INLINE
ibool
trx_purge_truncate_if_arr_empty(void)
/*=================================*/
			/* out: TRUE if array empty */
{
582
#ifdef UNIV_SYNC_DEBUG
583
	ut_ad(mutex_own(&(purge_sys->mutex)));
584
#endif /* UNIV_SYNC_DEBUG */
585

unknown's avatar
Merge  
unknown committed
586
	if (purge_sys->arr->n_used == 0) {
587 588 589 590 591 592 593 594 595 596 597 598

		trx_purge_truncate_history();

		return(TRUE);
	}

	return(FALSE);
}

/***************************************************************************
Updates the last not yet purged history log info in rseg when we have purged
a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */
599
static
600 601 602 603 604
void
trx_purge_rseg_get_next_history_log(
/*================================*/
	trx_rseg_t*	rseg)	/* in: rollback segment */
{
605
	page_t*		undo_page;
606 607 608 609 610 611 612
	trx_ulogf_t*	log_hdr;
	trx_usegf_t*	seg_hdr;
	fil_addr_t	prev_log_addr;
	dulint		trx_no;
	ibool		del_marks;
	mtr_t		mtr;

613
#ifdef UNIV_SYNC_DEBUG
614
	ut_ad(mutex_own(&(purge_sys->mutex)));
615
#endif /* UNIV_SYNC_DEBUG */
616 617 618

	mutex_enter(&(rseg->mutex));

unknown's avatar
unknown committed
619
	ut_a(rseg->last_page_no != FIL_NULL);
620 621 622 623

	purge_sys->purge_trx_no = ut_dulint_add(rseg->last_trx_no, 1);
	purge_sys->purge_undo_no = ut_dulint_zero;
	purge_sys->next_stored = FALSE;
624

625
	mtr_start(&mtr);
626

627 628 629 630 631
	undo_page = trx_undo_page_get_s_latched(rseg->space,
						rseg->last_page_no, &mtr);
	log_hdr = undo_page + rseg->last_offset;
	seg_hdr = undo_page + TRX_UNDO_SEG_HDR;

unknown's avatar
unknown committed
632
	/* Increase the purge page count by one for every handled log */
633

unknown's avatar
unknown committed
634
	purge_sys->n_pages_handled++;
635

unknown's avatar
unknown committed
636 637
	prev_log_addr = trx_purge_get_log_from_hist
		(flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
638 639 640 641
	if (prev_log_addr.page == FIL_NULL) {
		/* No logs left in the history list */

		rseg->last_page_no = FIL_NULL;
642

643 644 645
		mutex_exit(&(rseg->mutex));
		mtr_commit(&mtr);

unknown's avatar
unknown committed
646
		mutex_enter(&kernel_mutex);
647

unknown's avatar
unknown committed
648 649 650 651 652 653 654
		/* Add debug code to track history list corruption reported
		on the MySQL mailing list on Nov 9, 2004. The fut0lst.c
		file-based list was corrupt. The prev node pointer was
		FIL_NULL, even though the list length was over 8 million nodes!
		We assume that purge truncates the history list in moderate
		size pieces, and if we here reach the head of the list, the
		list cannot be longer than 20 000 undo logs now. */
655

unknown's avatar
unknown committed
656 657 658
		if (trx_sys->rseg_history_len > 20000) {
			ut_print_timestamp(stderr);
			fprintf(stderr,
unknown's avatar
unknown committed
659 660 661 662 663 664 665
				"  InnoDB: Warning: purge reached the"
				" head of the history list,\n"
				"InnoDB: but its length is still"
				" reported as %lu! Make a detailed bug\n"
				"InnoDB: report, and submit it"
				" to http://bugs.mysql.com\n",
				(ulong) trx_sys->rseg_history_len);
unknown's avatar
unknown committed
666 667 668 669
		}

		mutex_exit(&kernel_mutex);

670 671 672 673 674 675 676 677 678 679
		return;
	}

	mutex_exit(&(rseg->mutex));
	mtr_commit(&mtr);

	/* Read the trx number and del marks from the previous log header */
	mtr_start(&mtr);

	log_hdr = trx_undo_page_get_s_latched(rseg->space,
unknown's avatar
unknown committed
680 681
					      prev_log_addr.page, &mtr)
		+ prev_log_addr.boffset;
682 683

	trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
684

685 686 687 688 689 690 691 692 693 694 695 696 697
	del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS);

	mtr_commit(&mtr);

	mutex_enter(&(rseg->mutex));

	rseg->last_page_no = prev_log_addr.page;
	rseg->last_offset = prev_log_addr.boffset;
	rseg->last_trx_no = trx_no;
	rseg->last_del_marks = del_marks;

	mutex_exit(&(rseg->mutex));
}
698

699 700 701 702 703
/***************************************************************************
Chooses the next undo log to purge and updates the info in purge_sys. This
function is used to initialize purge_sys when the next record to purge is
not known, and also to update the purge system info on the next record when
purge has handled the whole undo log for a transaction. */
704
static
705 706 707 708 709 710 711 712
void
trx_purge_choose_next_log(void)
/*===========================*/
{
	trx_undo_rec_t*	rec;
	trx_rseg_t*	rseg;
	trx_rseg_t*	min_rseg;
	dulint		min_trx_no;
unknown's avatar
unknown committed
713 714 715
	ulint		space = 0;   /* remove warning (??? bug ???) */
	ulint		page_no = 0; /* remove warning (??? bug ???) */
	ulint		offset = 0;  /* remove warning (??? bug ???) */
716
	mtr_t		mtr;
717

718
#ifdef UNIV_SYNC_DEBUG
719
	ut_ad(mutex_own(&(purge_sys->mutex)));
720
#endif /* UNIV_SYNC_DEBUG */
721 722 723 724
	ut_ad(purge_sys->next_stored == FALSE);

	rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);

725 726
	min_trx_no = ut_dulint_max;

727
	min_rseg = NULL;
728

729 730
	while (rseg) {
		mutex_enter(&(rseg->mutex));
731

732 733 734
		if (rseg->last_page_no != FIL_NULL) {

			if ((min_rseg == NULL)
unknown's avatar
unknown committed
735 736
			    || (ut_dulint_cmp(min_trx_no,
					      rseg->last_trx_no) > 0)) {
737 738 739 740

				min_rseg = rseg;
				min_trx_no = rseg->last_trx_no;
				space = rseg->space;
741
				ut_a(space == 0); /* We assume in purge of
unknown's avatar
unknown committed
742 743
						  externally stored fields
						  that space id == 0 */
744 745 746 747 748 749 750 751 752
				page_no = rseg->last_page_no;
				offset = rseg->last_offset;
			}
		}

		mutex_exit(&(rseg->mutex));

		rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
	}
753

754 755 756 757 758 759 760 761 762 763 764 765 766
	if (min_rseg == NULL) {

		return;
	}

	mtr_start(&mtr);

	if (!min_rseg->last_del_marks) {
		/* No need to purge this log */

		rec = &trx_purge_dummy_rec;
	} else {
		rec = trx_undo_get_first_rec(space, page_no, offset,
unknown's avatar
unknown committed
767
					     RW_S_LATCH, &mtr);
768 769 770 771 772 773
		if (rec == NULL) {
			/* Undo log empty */

			rec = &trx_purge_dummy_rec;
		}
	}
774

775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
	purge_sys->next_stored = TRUE;
	purge_sys->rseg = min_rseg;

	purge_sys->hdr_page_no = page_no;
	purge_sys->hdr_offset = offset;

	purge_sys->purge_trx_no = min_trx_no;

	if (rec == &trx_purge_dummy_rec) {

		purge_sys->purge_undo_no = ut_dulint_zero;
		purge_sys->page_no = page_no;
		purge_sys->offset = 0;
	} else {
		purge_sys->purge_undo_no = trx_undo_rec_get_undo_no(rec);

		purge_sys->page_no = buf_frame_get_page_no(rec);
		purge_sys->offset = rec - buf_frame_align(rec);
	}

	mtr_commit(&mtr);
}

/***************************************************************************
Gets the next record to purge and updates the info in the purge system. */
static
trx_undo_rec_t*
trx_purge_get_next_rec(
/*===================*/
				/* out: copy of an undo log record or
				pointer to the dummy undo log record */
	mem_heap_t*	heap)	/* in: memory heap where copied */
{
	trx_undo_rec_t*	rec;
	trx_undo_rec_t*	rec_copy;
	trx_undo_rec_t*	rec2;
	trx_undo_rec_t*	next_rec;
812 813
	page_t*		undo_page;
	page_t*		page;
814 815 816 817 818 819 820
	ulint		offset;
	ulint		page_no;
	ulint		space;
	ulint		type;
	ulint		cmpl_info;
	mtr_t		mtr;

821
#ifdef UNIV_SYNC_DEBUG
822
	ut_ad(mutex_own(&(purge_sys->mutex)));
823
#endif /* UNIV_SYNC_DEBUG */
824 825
	ut_ad(purge_sys->next_stored);

unknown's avatar
Merge  
unknown committed
826
	space = purge_sys->rseg->space;
827 828 829 830 831 832 833 834
	page_no = purge_sys->page_no;
	offset = purge_sys->offset;

	if (offset == 0) {
		/* It is the dummy undo log record, which means that there is
		no need to purge this undo log */

		trx_purge_rseg_get_next_history_log(purge_sys->rseg);
835

836 837 838 839 840 841
		/* Look for the next undo log and record to purge */

		trx_purge_choose_next_log();

		return(&trx_purge_dummy_rec);
	}
842

843 844 845 846 847 848 849 850 851 852
	mtr_start(&mtr);

	undo_page = trx_undo_page_get_s_latched(space, page_no, &mtr);
	rec = undo_page + offset;

	rec2 = rec;

	for (;;) {
		/* Try first to find the next record which requires a purge
		operation from the same page of the same undo log */
853

854
		next_rec = trx_undo_page_get_next_rec(rec2,
unknown's avatar
unknown committed
855 856
						      purge_sys->hdr_page_no,
						      purge_sys->hdr_offset);
857
		if (next_rec == NULL) {
unknown's avatar
unknown committed
858 859 860
			rec2 = trx_undo_get_next_rec
				(rec2, purge_sys->hdr_page_no,
				 purge_sys->hdr_offset, &mtr);
861 862 863 864
			break;
		}

		rec2 = next_rec;
865

866 867 868 869 870
		type = trx_undo_rec_get_type(rec2);

		if (type == TRX_UNDO_DEL_MARK_REC) {

			break;
871
		}
872 873

		cmpl_info = trx_undo_rec_get_cmpl_info(rec2);
874 875 876 877

		if (trx_undo_rec_get_extern_storage(rec2)) {
			break;
		}
878

879
		if ((type == TRX_UNDO_UPD_EXIST_REC)
unknown's avatar
unknown committed
880
		    && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
881 882
			break;
		}
883 884 885 886
	}

	if (rec2 == NULL) {
		mtr_commit(&mtr);
887

888
		trx_purge_rseg_get_next_history_log(purge_sys->rseg);
889

890 891
		/* Look for the next undo log and record to purge */

892
		trx_purge_choose_next_log();
893 894 895 896 897 898 899 900

		mtr_start(&mtr);

		undo_page = trx_undo_page_get_s_latched(space, page_no, &mtr);

		rec = undo_page + offset;
	} else {
		page = buf_frame_align(rec2);
901

902 903 904 905 906 907 908 909 910
		purge_sys->purge_undo_no = trx_undo_rec_get_undo_no(rec2);
		purge_sys->page_no = buf_frame_get_page_no(page);
		purge_sys->offset = rec2 - page;

		if (undo_page != page) {
			/* We advance to a new page of the undo log: */
			purge_sys->n_pages_handled++;
		}
	}
911

912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
	rec_copy = trx_undo_rec_copy(rec, heap);

	mtr_commit(&mtr);

	return(rec_copy);
}

/************************************************************************
Fetches the next undo log record from the history list to purge. It must be
released with the corresponding release function. */

trx_undo_rec_t*
trx_purge_fetch_next_rec(
/*=====================*/
				/* out: copy of an undo log record or
				pointer to the dummy undo log record
				&trx_purge_dummy_rec, if the whole undo log
				can skipped in purge; NULL if none left */
	dulint*		roll_ptr,/* out: roll pointer to undo record */
	trx_undo_inf_t** cell,	/* out: storage cell for the record in the
				purge array */
	mem_heap_t*	heap)	/* in: memory heap where copied */
{
	trx_undo_rec_t*	undo_rec;
936

937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
	mutex_enter(&(purge_sys->mutex));

	if (purge_sys->state == TRX_STOP_PURGE) {
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
	}

	if (!purge_sys->next_stored) {
		trx_purge_choose_next_log();

		if (!purge_sys->next_stored) {
			purge_sys->state = TRX_STOP_PURGE;
952

953 954 955
			trx_purge_truncate_if_arr_empty();

			if (srv_print_thread_releases) {
956
				fprintf(stderr,
unknown's avatar
unknown committed
957 958
					"Purge: No logs left in the"
					" history list; pages handled %lu\n",
959
					(ulong) purge_sys->n_pages_handled);
960 961 962 963 964
			}

			mutex_exit(&(purge_sys->mutex));

			return(NULL);
965 966
		}
	}
967 968 969 970

	if (purge_sys->n_pages_handled >= purge_sys->handle_limit) {

		purge_sys->state = TRX_STOP_PURGE;
971

972 973 974 975 976
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
977
	}
978 979

	if (ut_dulint_cmp(purge_sys->purge_trx_no,
unknown's avatar
unknown committed
980
			  purge_sys->view->low_limit_no) >= 0) {
981
		purge_sys->state = TRX_STOP_PURGE;
982

983 984 985 986 987 988
		trx_purge_truncate_if_arr_empty();

		mutex_exit(&(purge_sys->mutex));

		return(NULL);
	}
989

unknown's avatar
unknown committed
990 991 992 993
	/*	fprintf(stderr, "Thread %lu purging trx %lu undo record %lu\n",
	os_thread_get_curr_id(),
	ut_dulint_get_low(purge_sys->purge_trx_no),
	ut_dulint_get_low(purge_sys->purge_undo_no)); */
994 995

	*roll_ptr = trx_undo_build_roll_ptr(FALSE, (purge_sys->rseg)->id,
unknown's avatar
unknown committed
996 997
					    purge_sys->page_no,
					    purge_sys->offset);
998 999 1000 1001 1002

	*cell = trx_purge_arr_store_info(purge_sys->purge_trx_no,
					 purge_sys->purge_undo_no);

	ut_ad(ut_dulint_cmp(purge_sys->purge_trx_no,
unknown's avatar
unknown committed
1003
			    (purge_sys->view)->low_limit_no) < 0);
1004

1005 1006
	/* The following call will advance the stored values of purge_trx_no
	and purge_undo_no, therefore we had to store them first */
1007

1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
	undo_rec = trx_purge_get_next_rec(heap);

	mutex_exit(&(purge_sys->mutex));

	return(undo_rec);
}

/***********************************************************************
Releases a reserved purge undo record. */

void
trx_purge_rec_release(
/*==================*/
	trx_undo_inf_t*	cell)	/* in: storage cell */
{
	trx_undo_arr_t*	arr;
1024

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
	mutex_enter(&(purge_sys->mutex));

	arr = purge_sys->arr;

	trx_purge_arr_remove_info(cell);

	mutex_exit(&(purge_sys->mutex));
}

/***********************************************************************
This function runs a purge batch. */

ulint
trx_purge(void)
/*===========*/
				/* out: number of undo log pages handled in
				the batch */
{
	que_thr_t*	thr;
unknown's avatar
unknown committed
1044
	/*	que_thr_t*	thr2; */
1045 1046 1047 1048 1049
	ulint		old_pages_handled;

	mutex_enter(&(purge_sys->mutex));

	if (purge_sys->trx->n_active_thrs > 0) {
1050

1051 1052 1053 1054
		mutex_exit(&(purge_sys->mutex));

		/* Should not happen */

1055
		ut_error;
1056

1057
		return(0);
1058
	}
1059 1060 1061 1062 1063

	rw_lock_x_lock(&(purge_sys->latch));

	mutex_enter(&kernel_mutex);

1064
	/* Close and free the old purge view */
1065 1066 1067 1068 1069

	read_view_close(purge_sys->view);
	purge_sys->view = NULL;
	mem_heap_empty(purge_sys->heap);

1070 1071 1072 1073 1074 1075 1076 1077 1078
	/* Determine how much data manipulation language (DML) statements
	need to be delayed in order to reduce the lagging of the purge
	thread. */
	srv_dml_needed_delay = 0; /* in microseconds; default: no delay */

	/* If we cannot advance the 'purge view' because of an old
	'consistent read view', then the DML statements cannot be delayed.
	Also, srv_max_purge_lag <= 0 means 'infinity'. */
	if (srv_max_purge_lag > 0
unknown's avatar
unknown committed
1079
	    && !UT_LIST_GET_LAST(trx_sys->view_list)) {
1080
		float	ratio = (float) trx_sys->rseg_history_len
unknown's avatar
unknown committed
1081
			/ srv_max_purge_lag;
1082 1083 1084 1085
		if (ratio > ULINT_MAX / 10000) {
			/* Avoid overflow: maximum delay is 4295 seconds */
			srv_dml_needed_delay = ULINT_MAX;
		} else if (ratio > 1) {
1086 1087 1088 1089
			/* If the history list length exceeds the
			innodb_max_purge_lag, the
			data manipulation statements are delayed
			by at least 5000 microseconds. */
1090
			srv_dml_needed_delay = (ulint) ((ratio - .5) * 10000);
1091 1092 1093
		}
	}

unknown's avatar
unknown committed
1094 1095
	purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
							    purge_sys->heap);
1096
	mutex_exit(&kernel_mutex);
1097 1098 1099

	rw_lock_x_unlock(&(purge_sys->latch));

1100 1101
	purge_sys->state = TRX_PURGE_ON;

1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
	/* Handle at most 20 undo log pages in one purge batch */

	purge_sys->handle_limit = purge_sys->n_pages_handled + 20;

	old_pages_handled = purge_sys->n_pages_handled;

	mutex_exit(&(purge_sys->mutex));

	mutex_enter(&kernel_mutex);

unknown's avatar
unknown committed
1112
	thr = que_fork_start_command(purge_sys->query);
1113 1114

	ut_ad(thr);
1115

unknown's avatar
unknown committed
1116
	/*	thr2 = que_fork_start_command(purge_sys->query);
1117

1118
	ut_ad(thr2); */
1119

1120 1121 1122

	mutex_exit(&kernel_mutex);

unknown's avatar
unknown committed
1123
	/*	srv_que_task_enqueue(thr2); */
1124

1125
	if (srv_print_thread_releases) {
1126

1127
		fputs("Starting purge\n", stderr);
1128 1129 1130 1131 1132 1133
	}

	que_run_threads(thr);

	if (srv_print_thread_releases) {

1134
		fprintf(stderr,
unknown's avatar
unknown committed
1135 1136
			"Purge ends; pages handled %lu\n",
			(ulong) purge_sys->n_pages_handled);
1137 1138 1139 1140
	}

	return(purge_sys->n_pages_handled - old_pages_handled);
}
unknown's avatar
Merge  
unknown committed
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152

/**********************************************************************
Prints information of the purge system to stderr. */

void
trx_purge_sys_print(void)
/*=====================*/
{
	fprintf(stderr, "InnoDB: Purge system view:\n");
	read_view_print(purge_sys->view);

	fprintf(stderr, "InnoDB: Purge trx n:o %lu %lu, undo n_o %lu %lu\n",
unknown's avatar
unknown committed
1153 1154 1155 1156
		(ulong) ut_dulint_get_high(purge_sys->purge_trx_no),
		(ulong) ut_dulint_get_low(purge_sys->purge_trx_no),
		(ulong) ut_dulint_get_high(purge_sys->purge_undo_no),
		(ulong) ut_dulint_get_low(purge_sys->purge_undo_no));
unknown's avatar
Merge  
unknown committed
1157
	fprintf(stderr,
unknown's avatar
unknown committed
1158 1159
		"InnoDB: Purge next stored %lu, page_no %lu, offset %lu,\n"
		"InnoDB: Purge hdr_page_no %lu, hdr_offset %lu\n",
1160 1161 1162 1163 1164
		(ulong) purge_sys->next_stored,
		(ulong) purge_sys->page_no,
		(ulong) purge_sys->offset,
		(ulong) purge_sys->hdr_page_no,
		(ulong) purge_sys->hdr_offset);
unknown's avatar
Merge  
unknown committed
1165
}