Commit 0f3d4b2f authored by Michael Widenius's avatar Michael Widenius

Fixed bug that 'maria_read_log -a' didn't set max_trid when reparing tables.

Fixed bug in Aria when replacing short keys with long keys and a key tree both overflow and underflow at same time.
Fixed several bugs when generating recovery logs when using RGQ with replacing long keys with short keys and vice versa.
Lots of new DBUG_ASSERT()'s
Added more information to recovery log to make it easier to know from where log entry orginated.
Introduced MARIA_PAGE->org_size that tells what the size of the page was in last log entry. This allows us to find out if all key changes for index page was logged.
Small code cleanups:
- Introduced _ma_log_key_changes() to log crc of key page changes
- Added share->max_index_block_size as max size of data one can put in key block (block_size - KEYPAGE_CHECKSUM_SIZE)
  This will later simplify adding a directory to index pages.
- Write page number instead of page postition to DBUG log



mysql-test/lib/v1/mysql-test-run.pl:
  Use --general-log instead of --log to disable warning when using RQG
sql/mysqld.cc:
  If we have already sent ok to client when we get an error, log this to stderr
  Don't disable option --log-output if CSV engine is not supported.
storage/maria/ha_maria.cc:
  Log queries to recovery log also in LOCK TABLES
storage/maria/ma_check.c:
  If param->max_trid is set, use this value instead of max_trid_in_system().
  This is used by recovery to set max_trid to max seen trid so far.
  keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE -> max_index_block_size (Style optimization)
storage/maria/ma_delete.c:
  Mark tables crashed early
  Write page number instead of page position to debug log.
  Added parameter to ma_log_delete() and ma_log_prefix() that is logged so that we can find where wrong log entries where generated.
  Fixed bug where a page was not proplerly written when same key tree had both an overflow and underflow when deleting a key.
  keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE => max_index_block_size (Style optimization)
  ma_log_delete() now has extra parameter of how many bytes from end of page should be appended to log for page (for page overflows)
storage/maria/ma_key_recover.c:
  Added extra parameter to ma_log_prefix() to indicate what caused log entry.
  Update MARIA_PAGE->org_size when logging info about page.
  Much more DBUG_ASSERT()'s.
  Fix some bugs in maria_log_add() to handle page overflows.
  Added _ma_log_key_changes() to log crc of key page changes.
  If EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES is defines, log the resulting pages to log so one can trivally
  see how the resulting page should have looked like (for errors in CRC values)
storage/maria/ma_key_recover.h:
  Added _ma_log_key_changes() which is only called if EXTRA_DEBUG_KEY_CHANGES is defined.
  Updated function prototypes.
storage/maria/ma_loghandler.h:
  Added more values to en_key_debug, to get more exact location where things went wrong when logging to recovery log.
storage/maria/ma_open.c:
  Initialize share->max_index_block_size
storage/maria/ma_page.c:
  Added updating and testing of MARIA_PAGE->org_size
  Write page number instead of page postition to DBUG log
  Generate error if we read page with wrong data.
  Removed wrong assert: key_del_current != share->state.key_del.
  Simplify _ma_log_compact_keypage()
storage/maria/ma_recovery.c:
  Set param.max_trid to max seen trid before running repair table (used for alter table to create index)
storage/maria/ma_rt_key.c:
  Update call to _ma_log_delete()
storage/maria/ma_rt_split.c:
  Use _ma_log_key_changes()
  Update MARIA_PAGE->org_size
storage/maria/ma_unique.c:
  Remove casts
storage/maria/ma_write.c:
  keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE => share->max_index_block_length.
  Updated calls to _ma_log_prefix()
  Changed code to use _ma_log_key_changes()
  Update ma_page->org_size
  Fixed bug in _ma_log_split() for pages that overflow
  Added KEY_OP_DEBUG logging to functions
  Log KEYPAGE_FLAG in all log entries
storage/maria/maria_def.h:
  Added SHARE->max_index_block_size
  Added MARIA_PAGE->org_size
storage/maria/trnman.c:
  Reset flags for new transaction.
parent ad6d95d3
......@@ -2959,7 +2959,7 @@ sub run_benchmarks ($) {
if ( ! $benchmark )
{
mtr_add_arg($args, "--log");
mtr_add_arg($args, "--general-log");
mtr_run("$glob_mysql_bench_dir/run-all-tests", $args, "", "", "", "");
# FIXME check result code?!
}
......
......@@ -3070,7 +3070,19 @@ int my_message_sql(uint error, const char *str, myf MyFlags)
}
else
{
if (! thd->main_da.is_error()) // Return only first message
if (thd->main_da.is_ok())
{
/*
Client has already got ok packet; Write message to error log.
This could happen if we get an error in implicit commit.
This should never happen in normal operation, so lets
assert here in debug builds.
*/
DBUG_ASSERT(0);
func= sql_print_error;
MyFlags|= ME_NOREFRESH;
}
else if (! thd->main_da.is_error()) // Return only first message
{
thd->main_da.set_error_status(thd, error, str);
}
......@@ -4185,7 +4197,6 @@ a file name for --log-bin-index option", opt_binlog_index_name);
unireg_abort(1);
}
#ifdef WITH_CSV_STORAGE_ENGINE
if (opt_bootstrap)
log_output_options= LOG_FILE;
else
......@@ -4219,10 +4230,6 @@ a file name for --log-bin-index option", opt_binlog_index_name);
logger.set_handlers(LOG_FILE, opt_slow_log ? log_output_options:LOG_NONE,
opt_log ? log_output_options:LOG_NONE);
}
#else
logger.set_handlers(LOG_FILE, opt_slow_log ? LOG_FILE:LOG_NONE,
opt_log ? LOG_FILE:LOG_NONE);
#endif
/*
Check that the default storage engine is actually available.
......@@ -6297,13 +6304,11 @@ each time the SQL thread starts.",
"Log some extra information to update log. Please note that this option "
"is deprecated; see --log-short-format option.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifdef WITH_CSV_STORAGE_ENGINE
{"log-output", OPT_LOG_OUTPUT,
"Syntax: log-output[=value[,value...]], where \"value\" could be TABLE, "
"FILE or NONE.",
&log_output_str, &log_output_str, 0,
GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"log-queries-not-using-indexes", OPT_LOG_QUERIES_NOT_USING_INDEXES,
"Log queries that are executed without benefit of any index to the slow log if it is open.",
&opt_log_queries_not_using_indexes, &opt_log_queries_not_using_indexes,
......@@ -8631,7 +8636,6 @@ mysqld_get_one_option(int optid,
WARN_DEPRECATED(NULL, "7.0", "--log_slow_queries", "'--slow_query_log'/'--log-slow-file'");
opt_slow_log= 1;
break;
#ifdef WITH_CSV_STORAGE_ENGINE
case OPT_LOG_OUTPUT:
{
if (!argument || !argument[0])
......@@ -8649,7 +8653,6 @@ mysqld_get_one_option(int optid,
}
break;
}
#endif
case OPT_EVENT_SCHEDULER:
#ifndef HAVE_EVENT_SCHEDULER
sql_perror("Event scheduler is not supported in embedded build.");
......
......@@ -750,8 +750,11 @@ static int maria_create_trn_for_mysql(MARIA_HA *info)
thd->query_length());
}
else
{
DBUG_PRINT("info", ("lock_type: %d trnman_flags: %u",
info->lock_type, trnman_get_flags(trn))); /* QQ */
info->lock_type, trnman_get_flags(trn)));
}
#endif
DBUG_RETURN(0);
}
......@@ -2347,6 +2350,12 @@ int ha_maria::extra(enum ha_extra_function operation)
int ha_maria::reset(void)
{
if (file->trn)
{
TRN *trn= file->trn;
/* Next statement is a new statement. Ensure it's logged */
trnman_set_flags(trn, trnman_get_flags(trn) & ~TRN_STATE_INFO_LOGGED);
}
return maria_reset(file);
}
......
......@@ -136,11 +136,13 @@ void maria_chk_init_for_check(HA_CHECK *param, MARIA_HA *info)
Set up transaction handler so that we can see all rows. When rows is read
we will check the found id against param->max_tried
*/
if (param->max_trid == 0)
{
if (!ma_control_file_inited())
param->max_trid= 0; /* Give warning for first trid found */
else
param->max_trid= max_trid_in_system();
}
maria_ignore_trids(info);
}
......@@ -867,7 +869,7 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
llstr(anc_page->pos, llbuff));
}
if (anc_page->size > (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
if (anc_page->size > share->max_index_block_size)
{
_ma_check_print_error(param,
"Page at %s has impossible (too big) pagelength",
......@@ -2325,11 +2327,13 @@ static int initialize_variables_for_repair(HA_CHECK *param,
}
/* Set up transaction handler so that we can see all rows */
if (param->max_trid == 0)
{
if (!ma_control_file_inited())
param->max_trid= 0; /* Give warning for first trid found */
else
param->max_trid= max_trid_in_system();
}
maria_ignore_trids(info);
/* Don't write transid's during repair */
maria_versioning(info, 0);
......@@ -5609,7 +5613,7 @@ static int sort_insert_key(MARIA_SORT_PARAM *sort_param,
a_length+=t_length;
_ma_store_page_used(share, anc_buff, a_length);
key_block->end_pos+=t_length;
if (a_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
if (a_length <= share->max_index_block_size)
{
MARIA_KEY tmp_key2;
tmp_key2.data= key_block->lastkey;
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
Copyright (C) 2009-2010 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -180,7 +181,11 @@ my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
key->data= key_buff;
}
res= _ma_ck_real_delete(info, key, &new_root);
if ((res= _ma_ck_real_delete(info, key, &new_root)))
{
/* We have to mark the table crashed before unpin_all_pages() */
maria_mark_crashed(info);
}
key->data= save_key_data;
if (!res && share->now_transactional)
......@@ -218,7 +223,8 @@ my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
my_errno=ENOMEM;
DBUG_RETURN(1);
}
DBUG_PRINT("info",("root_page: %ld", (long) old_root));
DBUG_PRINT("info",("root_page: %lu",
(ulong) (old_root / keyinfo->block_length)));
if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
{
......@@ -435,7 +441,8 @@ static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
*/
if (share->now_transactional &&
_ma_log_delete(anc_page, s_temp.key_pos,
s_temp.changed_length, s_temp.move_length))
s_temp.changed_length, s_temp.move_length,
0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
DBUG_RETURN(-1);
if (!nod_flag)
......@@ -458,7 +465,7 @@ static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
}
if (ret_value >0)
{
save_flag=1;
save_flag= 2;
if (ret_value == 1)
ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
else
......@@ -474,17 +481,20 @@ static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
ret_value= _ma_insert(info, key, anc_page, keypos,
last_key.data,
(MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS))
ret_value= -1;
}
}
if (ret_value == 0 && anc_page->size >
(uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
if (ret_value == 0 && anc_page->size > share->max_index_block_size)
{
/* parent buffer got too big ; We have to split the page */
save_flag=1;
save_flag= 3;
ret_value= _ma_split_page(info, key, anc_page,
(uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE),
share->max_index_block_size,
(uchar*) 0, 0, 0, lastkey, 0) | 2;
DBUG_ASSERT(anc_page->org_size == anc_page->size);
}
if (save_flag && ret_value != 1)
{
......@@ -550,7 +560,8 @@ static int del(MARIA_HA *info, MARIA_KEY *key,
MARIA_KEY ret_key;
MARIA_PAGE next_page;
DBUG_ENTER("del");
DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx", (long) leaf_page,
DBUG_PRINT("enter",("leaf_page: %lu keypos: 0x%lx",
(ulong) (leaf_page->pos / share->block_size),
(ulong) keypos));
DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
......@@ -587,11 +598,10 @@ static int del(MARIA_HA *info, MARIA_KEY *key,
ret_value= underflow(info, keyinfo, leaf_page, &next_page,
endpos);
if (ret_value == 0 && leaf_page->size >
(uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
share->max_index_block_size)
{
ret_value= (_ma_split_page(info, key, leaf_page,
(uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE),
share->max_index_block_size,
(uchar*) 0, 0, 0,
ret_key_buff, 0) | 2);
}
......@@ -708,8 +718,7 @@ err:
@fn underflow()
@param anc_buff Anchestor page data
@param leaf_page Page number of leaf page
@param leaf_buff Leaf page (page that underflowed)
@param leaf_page Leaf page (page that underflowed)
@param leaf_page_link Pointer to pin information about leaf page
@param keypos Position after current key in anc_buff
......@@ -743,7 +752,8 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
MARIA_KEY tmp_key, anc_key, leaf_key;
MARIA_PAGE next_page;
DBUG_ENTER("underflow");
DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx",(long) leaf_page->pos,
DBUG_PRINT("enter",("leaf_page: %lu keypos: 0x%lx",
(ulong) (leaf_page->pos / share->block_size),
(ulong) keypos));
DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
......@@ -841,7 +851,7 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (buff_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
if (buff_length <= share->max_index_block_size)
{
/* All keys fitted into one page */
page_mark_changed(info, &next_page);
......@@ -854,10 +864,15 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
if (share->now_transactional)
{
/* Log changes to parent page */
/*
Log changes to parent page. Note that this page may have been
temporarily bigger than block_size.
*/
if (_ma_log_delete(anc_page, key_deleted.key_pos,
key_deleted.changed_length,
key_deleted.move_length))
key_deleted.move_length,
anc_length - anc_page->org_size,
KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
goto err;
/*
Log changes to leaf page. Data for leaf page is in leaf_buff
......@@ -986,7 +1001,8 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
*/
DBUG_ASSERT(new_buff_length <= next_buff_length);
if (_ma_log_prefix(&next_page, key_inserted.changed_length,
(int) (new_buff_length - next_buff_length)))
(int) (new_buff_length - next_buff_length),
KEY_OP_DEBUG_LOG_PREFIX_1))
goto err;
}
page_mark_changed(info, &next_page);
......@@ -1044,11 +1060,19 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
/* Remember for logging how many bytes of leaf_buff that are not changed */
DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
unchanged_leaf_length= leaf_length - (key_inserted.changed_length -
key_inserted.move_length);
unchanged_leaf_length= (leaf_length - p_length -
(key_inserted.changed_length -
key_inserted.move_length));
new_buff_length= buff_length + leaf_length - p_length + t_length;
#ifdef EXTRA_DEBUG
/* Ensure that unchanged_leaf_length is correct */
DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
leaf_buff + leaf_length - unchanged_leaf_length,
unchanged_leaf_length) == 0);
#endif
page_flag= next_page.flag | leaf_page->flag;
if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
......@@ -1069,8 +1093,7 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
anc_page->size= new_anc_length;
page_store_size(share, anc_page);
if (new_buff_length <= (uint) (keyinfo->block_length -
KEYPAGE_CHECKSUM_SIZE))
if (new_buff_length <= share->max_index_block_size)
{
/* All keys fitted into one page */
page_mark_changed(info, leaf_page);
......@@ -1079,10 +1102,14 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
if (share->now_transactional)
{
/* Log changes to parent page */
/*
Log changes to parent page. Note that this page may have been
temporarily bigger than block_size.
*/
if (_ma_log_delete(anc_page, key_deleted.key_pos,
key_deleted.changed_length, key_deleted.move_length))
key_deleted.changed_length, key_deleted.move_length,
anc_length - anc_page->org_size,
KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
goto err;
/*
Log changes to next page. Data for leaf page is in buff
......@@ -1192,8 +1219,10 @@ static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
This contains original data with new data added first
*/
DBUG_ASSERT(leaf_length <= new_leaf_length);
DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
(int) (new_leaf_length - leaf_length)))
(int) (new_leaf_length - leaf_length),
KEY_OP_DEBUG_LOG_PREFIX_2))
goto err;
/*
Log changes to next page
......@@ -1395,7 +1424,9 @@ static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
****************************************************************************/
/**
@brief log entry where some parts are deleted and some things are changed
@brief
log entry where some parts are deleted and some things are changed
and some data could be added last.
@fn _ma_log_delete()
@param info Maria handler
......@@ -1404,74 +1435,148 @@ static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
@param key_pos Start of change area
@param changed_length How many bytes where changed at key_pos
@param move_length How many bytes where deleted at key_pos
@param append_length Length of data added last
This is taken from end of ma_page->buff
This is mainly used when a key is deleted. The append happens
when we delete a key from a page with data > block_size kept in
memory and we have to add back the data that was stored > block_size
*/
my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
uint changed_length, uint move_length)
uint changed_length, uint move_length,
uint append_length,
enum en_key_debug debug_marker __attribute__((unused)))
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
uint translog_parts;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3 + 3 + 6 + 3 + 7];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
uint translog_parts, current_size, extra_length;
uint offset= (uint) (key_pos - ma_page->buff);
MARIA_HA *info= ma_page->info;
MARIA_SHARE *share= info->s;
my_off_t page;
DBUG_ENTER("_ma_log_delete");
DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
(ulong) ma_page->pos, changed_length, move_length));
(ulong) (ma_page->pos / share->block_size),
changed_length, move_length));
DBUG_ASSERT(share->now_transactional && move_length);
DBUG_ASSERT(offset + changed_length <= ma_page->size);
DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
/* Store address of new root page */
page= ma_page->pos / share->block_size;
page_store(log_data + FILEID_STORE_SIZE, page);
log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
current_size= ma_page->org_size;
#ifdef EXTRA_DEBUG_KEY_CHANGES
*log_pos++= KEY_OP_DEBUG;
*log_pos++= debug_marker;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
log_pos[0]= KEY_OP_OFFSET;
int2store(log_pos+1, offset);
log_pos[3]= KEY_OP_SHIFT;
int2store(log_pos+4, -(int) move_length);
log_pos+= 6;
translog_parts= 1;
log_pos+= 3;
translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
extra_length= 0;
if (changed_length)
{
if (offset + changed_length >= share->max_index_block_size)
{
changed_length= share->max_index_block_size - offset;
move_length= 0; /* Nothing to move */
current_size= share->max_index_block_size;
}
log_pos[0]= KEY_OP_CHANGE;
int2store(log_pos+1, changed_length);
log_pos+= 3;
translog_parts= 2;
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + offset;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
log_array[translog_parts].str= ma_page->buff + offset;
log_array[translog_parts].length= changed_length;
translog_parts++;
/* We only have to move things after offset+changed_length */
offset+= changed_length;
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
if (move_length)
{
uint log_length;
if (offset + move_length < share->max_index_block_size)
{
/*
Move down things that is on page.
page_offset in apply_redo_inxed() will be at original offset
+ changed_length.
*/
log_pos[0]= KEY_OP_SHIFT;
int2store(log_pos+1, - (int) move_length);
log_length= 3;
current_size-= move_length;
}
else
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
changed_length+= 7;
/* Delete to end of page */
uint tmp= current_size - offset;
current_size= offset;
log_pos[0]= KEY_OP_DEL_SUFFIX;
int2store(log_pos+1, tmp);
log_length= 3;
}
log_array[translog_parts].str= log_pos;
log_array[translog_parts].length= log_length;
translog_parts++;
log_pos+= log_length;
extra_length+= log_length;
}
#endif
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
if (current_size != ma_page->size &&
current_size != share->max_index_block_size)
{
/* Append data that didn't fit on the page before */
uint length= (min(ma_page->size, share->max_index_block_size) -
current_size);
uchar *data= ma_page->buff + current_size;
DBUG_ASSERT(length <= append_length);
log_pos[0]= KEY_OP_ADD_SUFFIX;
int2store(log_pos+1, length);
log_array[translog_parts].str= log_pos;
log_array[translog_parts].length= 3;
log_array[translog_parts + 1].str= data;
log_array[translog_parts + 1].length= length;
log_pos+= 3;
translog_parts+= 2;
current_size+= length;
extra_length+= 3 + length;
}
_ma_log_key_changes(ma_page,
log_array + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= current_size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t)
log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
changed_length,
TRANSLOG_INTERNAL_PARTS + translog_parts,
log_array[TRANSLOG_INTERNAL_PARTS].length +
changed_length + extra_length, translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
......
......@@ -312,24 +312,33 @@ my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
*/
my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
int move_length)
int move_length,
enum en_key_debug debug_marker __attribute__((unused)))
{
uint translog_parts;
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2], *log_pos;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2 + 2];
uchar *log_pos;
uchar *buff= ma_page->buff;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
pgcache_page_no_t page;
MARIA_HA *info= ma_page->info;
DBUG_ENTER("_ma_log_prefix");
DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
(ulong) ma_page->pos, changed_length, move_length));
DBUG_ASSERT(ma_page->size == ma_page->org_size + move_length);
page= ma_page->pos / info->s->block_size;
log_pos= log_data + FILEID_STORE_SIZE;
page_store(log_pos, page);
log_pos+= PAGE_STORE_SIZE;
#ifdef EXTRA_DEBUG_KEY_CHANGES
(*log_pos++)= KEY_OP_DEBUG;
(*log_pos++)= debug_marker;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
......@@ -373,21 +382,11 @@ my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
translog_parts= 2;
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
changed_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page, log_array + TRANSLOG_INTERNAL_PARTS +
translog_parts, log_pos, &changed_length,
&translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -407,7 +406,7 @@ my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
{
LSN lsn;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 10 + 7 + 2], *log_pos;
uchar *buff= ma_page->buff;
int diff;
......@@ -417,6 +416,8 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
DBUG_ENTER("_ma_log_suffix");
DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
(ulong) ma_page->pos, org_length, new_length));
DBUG_ASSERT(ma_page->size == new_length);
DBUG_ASSERT(ma_page->org_size == org_length);
page= ma_page->pos / info->s->block_size;
......@@ -451,20 +452,11 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
ha_checksum crc;
crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, new_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -481,12 +473,16 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
@param ma_page Changed page
@param org_page_length Length of data in page before key was added
Final length in ma_page->size
@note
If handle_overflow is set, then we have to protect against
logging changes that is outside of the page.
This may happen during underflow() handling where the buffer
in memory temporary contains more data than block_size
ma_page may be a page that was previously logged and cuted down
becasue it's too big. (org_page_length > ma_page->org_size)
*/
my_bool _ma_log_add(MARIA_PAGE *ma_page,
......@@ -496,14 +492,14 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 3 + 3 + 3 + 3 + 7 +
2];
3 + 2];
uchar *log_pos;
uchar *buff= ma_page->buff;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
MARIA_HA *info= ma_page->info;
uint offset= (uint) (key_pos - buff);
uint page_length= info->s->block_size - KEYPAGE_CHECKSUM_SIZE;
uint translog_parts;
uint max_page_size= info->s->max_index_block_size;
uint translog_parts, current_size;
pgcache_page_no_t page_pos;
DBUG_ENTER("_ma_log_add");
DBUG_PRINT("enter", ("page: %lu org_page_length: %u changed_length: %u "
......@@ -512,6 +508,9 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
move_length));
DBUG_ASSERT(info->s->now_transactional);
DBUG_ASSERT(move_length <= (int) changed_length);
DBUG_ASSERT(ma_page->org_size == min(org_page_length, max_page_size));
DBUG_ASSERT(ma_page->size == org_page_length + move_length);
DBUG_ASSERT(offset < max_page_size);
/*
Write REDO entry that contains the logical operations we need
......@@ -520,6 +519,7 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
log_pos= log_data + FILEID_STORE_SIZE;
page_pos= ma_page->pos / info->s->block_size;
page_store(log_pos, page_pos);
current_size= ma_page->org_size;
log_pos+= PAGE_STORE_SIZE;
#ifdef EXTRA_DEBUG_KEY_CHANGES
......@@ -531,40 +531,41 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
if (org_page_length + move_length > page_length)
{
/*
Overflow. Cut either key or data from page end so that key fits
The code that splits the too big page will ignore logging any
data over org_page_length
Don't overwrite page boundary
It's ok to cut this as we will append the data at end of page
in the next log entry
*/
DBUG_ASSERT(handle_overflow);
if (offset + changed_length > page_length)
if (offset + changed_length > max_page_size)
{
/* Log that data changed to end of page */
changed_length= page_length - offset;
move_length= 0;
/* Set page to max length */
org_page_length= page_length;
DBUG_ASSERT(handle_overflow);
changed_length= max_page_size - offset; /* Update to end of page */
move_length= 0; /* Nothing to move */
/* Extend the page to max length on recovery */
*log_pos++= KEY_OP_MAX_PAGELENGTH;
current_size= max_page_size;
}
else
/* Check if adding the key made the page overflow */
if (current_size + move_length > max_page_size)
{
/* They key will not be part of the page ; Don't log it */
uint diff= org_page_length + move_length - page_length;
/*
Adding the key caused an overflow. Cut away the part of the
page that doesn't fit.
*/
uint diff;
DBUG_ASSERT(handle_overflow);
diff= current_size + move_length - max_page_size;
log_pos[0]= KEY_OP_DEL_SUFFIX;
int2store(log_pos+1, diff);
log_pos+= 3;
org_page_length-= diff;
DBUG_ASSERT(org_page_length == page_length - move_length);
}
DBUG_ASSERT(offset != org_page_length);
current_size= max_page_size - move_length;
}
if (offset == org_page_length)
if (offset == current_size)
{
DBUG_ASSERT(move_length == (int) changed_length);
log_pos[0]= KEY_OP_ADD_SUFFIX;
current_size+= changed_length;
}
else
{
......@@ -576,51 +577,103 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
log_pos[0]= KEY_OP_SHIFT;
int2store(log_pos+1, move_length);
log_pos+= 3;
current_size+= move_length;
}
log_pos[0]= KEY_OP_CHANGE;
}
int2store(log_pos+1, changed_length);
log_pos+= 3;
translog_parts= 2;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
translog_parts= TRANSLOG_INTERNAL_PARTS + 2;
#ifdef EXTRA_DEBUG_KEY_CHANGES
/*
If page was originally > block_size before operation and now all data
fits, append the end data that was not part of the previous logged
page to it.
*/
DBUG_ASSERT(current_size <= max_page_size && current_size <= ma_page->size);
if (current_size != ma_page->size && current_size != max_page_size)
{
MARIA_SHARE *share= info->s;
ha_checksum crc;
uint save_page_length= ma_page->size;
uint new_length= org_page_length + move_length;
_ma_store_page_used(share, buff, new_length);
crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, new_length);
int4store(log_pos+3, crc);
uint length= min(ma_page->size, max_page_size) - current_size;
uchar *data= ma_page->buff + current_size;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
changed_length+= 7;
translog_parts++;
_ma_store_page_used(share, buff, save_page_length);
log_pos[0]= KEY_OP_ADD_SUFFIX;
int2store(log_pos+1, length);
log_array[translog_parts].str= log_pos;
log_array[translog_parts].length= 3;
log_array[translog_parts+1].str= data;
log_array[translog_parts+1].length= length;
log_pos+= 3;
translog_parts+= 2;
current_size+= length;
changed_length+= length + 3;
}
#endif
_ma_log_key_changes(ma_page, log_array + translog_parts,
log_pos, &changed_length, &translog_parts);
/*
Remember new page length for future log entries for same page
Note that this can be different from ma_page->size in case of page
overflow!
*/
ma_page->org_size= current_size;
DBUG_ASSERT(ma_page->org_size == min(ma_page->size, max_page_size));
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t)
log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
changed_length,
TRANSLOG_INTERNAL_PARTS + translog_parts,
changed_length, translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
/* Log checksum and optionally key page to log */
void _ma_log_key_changes(MARIA_PAGE *ma_page, LEX_CUSTRING *log_array,
uchar *log_pos, uint *changed_length,
uint *translog_parts)
{
MARIA_SHARE *share= ma_page->info->s;
int page_length= min(ma_page->size, share->max_index_block_size);
uint org_length;
ha_checksum crc;
DBUG_ASSERT(ma_page->flag == (uint) ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET]);
/* We have to change length as the page may have been shortened */
org_length= _ma_get_page_used(share, ma_page->buff);
_ma_store_page_used(share, ma_page->buff, page_length);
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
_ma_store_page_used(share, ma_page->buff, org_length);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[0].str= log_pos;
log_array[0].length= 7;
(*changed_length)+= 7;
(*translog_parts)++;
#ifdef EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES
log_array[1].str= ma_page->buff;
log_array[1].length= page_length;
(*changed_length)+= page_length;
(*translog_parts)++;
#endif
#endif
}
/****************************************************************************
Redo of key pages
****************************************************************************/
......@@ -716,7 +769,7 @@ uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
bzero(buff, LSN_STORE_SIZE);
memcpy(buff + LSN_STORE_SIZE, header, length);
bzero(buff + LSN_STORE_SIZE + length,
share->block_size - LSN_STORE_SIZE - KEYPAGE_CHECKSUM_SIZE - length);
share->max_index_block_size - LSN_STORE_SIZE - length);
bfill(buff + share->block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
......@@ -847,7 +900,9 @@ err:
KEY_OP_ADD_SUFFIX 2 length, data Add data to end of page
KEY_OP_DEL_SUFFIX 2 length Reduce page length with this
Sets position to start of page
KEY_OP_CHECK 6 page_length[2},CRC Used only when debugging
KEY_OP_CHECK 6 page_length[2],CRC Used only when debugging
This may be followed by page_length
of data (until end of log record)
KEY_OP_COMPACT_PAGE 6 transid
KEY_OP_SET_PAGEFLAG 1 flag for page
KEY_OP_MAX_PAGELENGTH 0 Set page to max length
......@@ -870,7 +925,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
const uchar *header_end= header + head_length;
uint page_offset= 0, org_page_length;
uint nod_flag, page_length, keypage_header, keynr;
uint max_page_length= share->block_size - KEYPAGE_CHECKSUM_SIZE;
uint max_page_size= share->max_index_block_size;
int result;
MARIA_PAGE page;
DBUG_ENTER("_ma_apply_redo_index");
......@@ -919,11 +974,14 @@ uint _ma_apply_redo_index(MARIA_HA *info,
header+= 2;
DBUG_PRINT("redo", ("key_op_shift: %d", length));
DBUG_ASSERT(page_offset != 0 && page_offset <= page_length &&
page_length + length <= max_page_length);
page_length + length <= max_page_size);
if (length < 0)
{
DBUG_ASSERT(page_offset - length <= page_length);
bmove(buff + page_offset, buff + page_offset - length,
page_length - page_offset + length);
}
else if (page_length != page_offset)
bmove_upp(buff + page_length + length, buff + page_length,
page_length - page_offset);
......@@ -937,6 +995,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
DBUG_ASSERT(page_offset != 0 && page_offset + length <= page_length);
memcpy(buff + page_offset, header + 2 , length);
page_offset+= length; /* Put offset after changed length */
header+= 2 + length;
break;
}
......@@ -948,7 +1007,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
insert_length, changed_length));
DBUG_ASSERT(insert_length <= changed_length &&
page_length + changed_length <= max_page_length);
page_length + changed_length <= max_page_size);
bmove_upp(buff + page_length + insert_length, buff + page_length,
page_length - keypage_header);
......@@ -974,8 +1033,8 @@ uint _ma_apply_redo_index(MARIA_HA *info,
case KEY_OP_ADD_SUFFIX: /* 6 */
{
uint insert_length= uint2korr(header);
DBUG_PRINT("redo", ("key_op_add_prefix: %u", insert_length));
DBUG_ASSERT(page_length + insert_length <= max_page_length);
DBUG_PRINT("redo", ("key_op_add_suffix: %u", insert_length));
DBUG_ASSERT(page_length + insert_length <= max_page_size);
memcpy(buff + page_length, header+2, insert_length);
page_length+= insert_length;
......@@ -1003,13 +1062,22 @@ uint _ma_apply_redo_index(MARIA_HA *info,
if (crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE))
{
DBUG_PRINT("error", ("page_length %u",page_length));
DBUG_DUMP("KEY_OP_CHECK bad page", buff, max_page_length);
DBUG_ASSERT("crc" == "failure in REDO_INDEX");
DBUG_DUMP("KEY_OP_CHECK bad page", buff, page_length);
if (header + 6 + page_length <= header_end)
{
DBUG_DUMP("KEY_OP_CHECK org page", header + 6, page_length);
}
DBUG_ASSERT("crc failure in REDO_INDEX" == 0);
}
#endif
DBUG_PRINT("redo", ("key_op_check"));
header+= 6;
/*
This is the last entry in the block and it can contain page_length
data or not
*/
DBUG_ASSERT(header + 6 == header_end ||
header + 6 + page_length == header_end);
header= header_end;
break;
}
case KEY_OP_DEBUG:
......@@ -1018,7 +1086,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
break;
case KEY_OP_MAX_PAGELENGTH:
DBUG_PRINT("redo", ("key_op_max_page_length"));
page_length= max_page_length;
page_length= max_page_size;
break;
case KEY_OP_MULTI_COPY: /* 9 */
{
......@@ -1040,7 +1108,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
log_memcpy_length= uint2korr(header);
header+= 2;
log_memcpy_end= header + log_memcpy_length;
DBUG_ASSERT(full_length <= max_page_length);
DBUG_ASSERT(full_length <= max_page_size);
while (header < log_memcpy_end)
{
uint to, from;
......@@ -1049,7 +1117,7 @@ uint _ma_apply_redo_index(MARIA_HA *info,
from= uint2korr(header);
header+= 2;
/* "from" is a place in the existing page */
DBUG_ASSERT(max(from, to) < max_page_length);
DBUG_ASSERT(max(from, to) < max_page_size);
memcpy(buff + to, buff + from, full_length);
}
break;
......
......@@ -64,17 +64,26 @@ extern my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length);
my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length,
enum en_key_debug debug_marker);
my_bool _ma_log_suffix(MARIA_PAGE *page, uint org_length,
uint new_length);
my_bool _ma_log_add(MARIA_PAGE *page, uint buff_length, uchar *key_pos,
uint changed_length, int move_length,
my_bool handle_overflow);
my_bool _ma_log_delete(MARIA_PAGE *page, const uchar *key_pos,
uint changed_length, uint move_length);
uint changed_length, uint move_length,
uint append_length, enum en_key_debug debug_marker);
my_bool _ma_log_change(MARIA_PAGE *page, const uchar *key_pos, uint length,
enum en_key_debug debug_marker);
my_bool _ma_log_new(MARIA_PAGE *page, my_bool root_page);
#ifdef EXTRA_DEBUG_KEY_CHANGES
void _ma_log_key_changes(MARIA_PAGE *ma_page, LEX_CUSTRING *log_array,
uchar *log_pos, uint *changed_length,
uint *translog_parts);
#else
void _ma_log_key_changes(A,B,C,D,E)
#endif
uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
const uchar *header, uint length);
......
......@@ -172,13 +172,24 @@ enum en_key_op
enum en_key_debug
{
KEY_OP_DEBUG_RTREE_COMBINE,
KEY_OP_DEBUG_RTREE_SPLIT,
KEY_OP_DEBUG_RTREE_SET_KEY,
KEY_OP_DEBUG_FATHER_CHANGED_1,
KEY_OP_DEBUG_FATHER_CHANGED_2,
KEY_OP_DEBUG_LOG_SPLIT,
KEY_OP_DEBUG_LOG_ADD
KEY_OP_DEBUG_RTREE_COMBINE, /* 0 */
KEY_OP_DEBUG_RTREE_SPLIT, /* 1 */
KEY_OP_DEBUG_RTREE_SET_KEY, /* 2 */
KEY_OP_DEBUG_FATHER_CHANGED_1, /* 3 */
KEY_OP_DEBUG_FATHER_CHANGED_2, /* 4 */
KEY_OP_DEBUG_LOG_SPLIT, /* 5 */
KEY_OP_DEBUG_LOG_ADD, /* 6 */
KEY_OP_DEBUG_LOG_PREFIX_1, /* 7 */
KEY_OP_DEBUG_LOG_PREFIX_2, /* 8 */
KEY_OP_DEBUG_LOG_PREFIX_3, /* 9 */
KEY_OP_DEBUG_LOG_PREFIX_4, /* 10 */
KEY_OP_DEBUG_LOG_PREFIX_5, /* 11 */
KEY_OP_DEBUG_LOG_DEL_CHANGE_1, /* 12 */
KEY_OP_DEBUG_LOG_DEL_CHANGE_2, /* 13 */
KEY_OP_DEBUG_LOG_DEL_CHANGE_3, /* 14 */
KEY_OP_DEBUG_LOG_DEL_CHANGE_RT, /* 15 */
KEY_OP_DEBUG_LOG_DEL_PREFIX, /* 16 */
KEY_OP_DEBUG_LOG_MIDDLE /* 17 */
};
......
......@@ -550,6 +550,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
strmov(share->open_file_name.str, name);
share->block_size= share->base.block_size; /* Convenience */
share->max_index_block_size= share->block_size - KEYPAGE_CHECKSUM_SIZE;
{
HA_KEYSEG *pos=share->keyparts;
uint32 ftkey_nr= 1;
......
......@@ -59,6 +59,7 @@ void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info,
page->buff= buff;
page->pos= pos;
page->size= _ma_get_page_used(share, buff);
page->org_size= page->size;
page->flag= _ma_get_keypage_flag(share, buff);
page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
share->base.key_reflength : 0);
......@@ -68,7 +69,7 @@ void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info,
void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
{
uint length= page->size;
DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE);
DBUG_ASSERT(length <= share->max_index_block_size);
bzero(page->buff + length, share->block_size - length);
}
#endif
......@@ -103,7 +104,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
MARIA_SHARE *share= info->s;
uint block_size= share->block_size;
DBUG_ENTER("_ma_fetch_keypage");
DBUG_PRINT("enter",("pos: %ld", (long) pos));
DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
tmp= pagecache_read(share->pagecache, &share->kfile,
(pgcache_page_no_t) (pos / block_size), level, buff,
......@@ -142,6 +143,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
page->buff= tmp;
page->pos= pos;
page->size= _ma_get_page_used(share, tmp);
page->org_size= page->size; /* For debugging */
page->flag= _ma_get_keypage_flag(share, tmp);
page->node= ((page->flag & KEYPAGE_FLAG_ISNOD) ?
share->base.key_reflength : 0);
......@@ -149,7 +151,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
#ifdef EXTRA_DEBUG
{
uint page_size= page->size;
if (page_size < 4 || page_size > block_size ||
if (page_size < 4 || page_size > share->max_index_block_size ||
_ma_get_keynr(share, tmp) != keyinfo->key_nr)
{
DBUG_PRINT("error",("page %lu had wrong page length: %u keynr: %u",
......@@ -159,7 +161,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
info->last_keypage = HA_OFFSET_ERROR;
maria_print_error(share, HA_ERR_CRASHED);
my_errno= HA_ERR_CRASHED;
tmp= 0;
DBUG_RETURN(1);
}
}
#endif
......@@ -179,6 +181,13 @@ my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
MARIA_PINNED_PAGE page_link;
DBUG_ENTER("_ma_write_keypage");
/*
The following ensures that for transactional tables we have logged
all changes that changes the page size (as the logging code sets
page->org_size)
*/
DBUG_ASSERT(!share->now_transactional || page->size == page->org_size);
#ifdef EXTRA_DEBUG /* Safety check */
{
uint page_length, nod_flag;
......@@ -193,7 +202,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
(page->pos & (maria_block_size-1)))
{
DBUG_PRINT("error",("Trying to write inside key status region: "
"key_start: %lu length: %lu page: %lu",
"key_start: %lu length: %lu page_pos: %lu",
(long) share->base.keystart,
(long) share->state.state.key_file_length,
(long) page->pos));
......@@ -201,7 +210,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
DBUG_ASSERT(0);
DBUG_RETURN(1);
}
DBUG_PRINT("page",("write page at: %lu",(long) page->pos));
DBUG_PRINT("page",("write page at: %lu",(ulong) (page->pos / block_size)));
DBUG_DUMP("buff", buff, page_length);
DBUG_ASSERT(page_length >= share->keypage_header + nod_flag +
page->keyinfo->minlength || maria_in_recovery);
......@@ -274,7 +283,7 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
enum pagecache_page_lock lock_method;
enum pagecache_page_pin pin_method;
DBUG_ENTER("_ma_dispose");
DBUG_PRINT("enter",("pos: %ld", (long) pos));
DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
DBUG_ASSERT(pos % block_size == 0);
(void) _ma_lock_key_del(info, 0);
......@@ -423,8 +432,7 @@ my_off_t _ma_new(register MARIA_HA *info, int level,
share->key_del_current= mi_sizekorr(buff+share->keypage_header);
#ifndef DBUG_OFF
key_del_current= share->key_del_current;
DBUG_ASSERT(key_del_current != share->state.key_del &&
(key_del_current != 0) &&
DBUG_ASSERT((key_del_current != 0) &&
((key_del_current == HA_OFFSET_ERROR) ||
(key_del_current <=
(share->state.state.key_file_length - block_size))));
......@@ -453,32 +461,48 @@ my_off_t _ma_new(register MARIA_HA *info, int level,
Log compactation of a index page
*/
static my_bool _ma_log_compact_keypage(MARIA_HA *info, my_off_t page,
static my_bool _ma_log_compact_keypage(MARIA_PAGE *ma_page,
TrID min_read_from)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + TRANSID_SIZE];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 7 + TRANSID_SIZE];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
MARIA_HA *info= ma_page->info;
MARIA_SHARE *share= info->s;
uint translog_parts, extra_length;
my_off_t page= ma_page->pos;
DBUG_ENTER("_ma_log_compact_keypage");
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
DBUG_PRINT("enter", ("page: %lu", (ulong) (page / share->block_size)));
/* Store address of new root page */
page/= share->block_size;
page_store(log_data + FILEID_STORE_SIZE, page);
log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE]= KEY_OP_COMPACT_PAGE;
transid_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE +1,
min_read_from);
log_pos= log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
log_pos[0]= KEY_OP_COMPACT_PAGE;
transid_store(log_pos + 1, min_read_from);
log_pos+= 1 + TRANSID_SIZE;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
translog_parts= 1;
extra_length= 0;
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
(translog_size_t) sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data, NULL))
log_array[TRANSLOG_INTERNAL_PARTS +
0].length + extra_length,
TRANSLOG_INTERNAL_PARTS + translog_parts,
log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
......@@ -526,7 +550,7 @@ my_bool _ma_compact_keypage(MARIA_PAGE *ma_page, TrID min_read_from)
{
if (!(page= (*ma_page->keyinfo->skip_key)(&key, 0, 0, page)))
{
DBUG_PRINT("error",("Couldn't find last key: page: 0x%lx",
DBUG_PRINT("error",("Couldn't find last key: page_pos: 0x%lx",
(long) page));
maria_print_error(share, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED;
......@@ -588,7 +612,7 @@ my_bool _ma_compact_keypage(MARIA_PAGE *ma_page, TrID min_read_from)
if (share->now_transactional)
{
if (_ma_log_compact_keypage(info, ma_page->pos, min_read_from))
if (_ma_log_compact_keypage(ma_page, min_read_from))
DBUG_RETURN(1);
}
DBUG_RETURN(0);
......
......@@ -545,7 +545,7 @@ static int display_and_apply_record(const LOG_DESC *log_desc,
if (log_desc->record_execute_in_redo_phase == NULL)
{
/* die on all not-yet-handled records :) */
DBUG_ASSERT("one more hook" == "to write");
DBUG_ASSERT("one more hook to write" == 0);
return 1;
}
if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
......@@ -1063,6 +1063,7 @@ prototype_redo_exec_hook(REDO_REPAIR_TABLE)
param.isam_file_name= name= info->s->open_file_name.str;
param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
param.tmpdir= maria_tmpdir;
param.max_trid= max_long_trid;
DBUG_ASSERT(maria_tmpdir);
info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
......
......@@ -91,7 +91,8 @@ int maria_rtree_delete_key(MARIA_PAGE *page, uchar *key, uint key_length)
page->size-= key_length_with_nod_flag;
page_store_size(share, page);
if (share->now_transactional &&
_ma_log_delete(page, key_start, 0, key_length_with_nod_flag))
_ma_log_delete(page, key_start, 0, key_length_with_nod_flag,
0, KEY_OP_DEBUG_LOG_DEL_CHANGE_RT))
return -1;
return 0;
}
......
......@@ -308,7 +308,7 @@ static my_bool _ma_log_rt_split(MARIA_PAGE *page,
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
*log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
uint translog_parts, extra_length= 0;
my_off_t page_pos;
DBUG_ENTER("_ma_log_rt_split");
......@@ -344,24 +344,11 @@ static my_bool _ma_log_rt_split(MARIA_PAGE *page,
translog_parts+= 2;
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= page->size;
ha_checksum crc;
uchar *check_start= log_pos;
crc= my_checksum(0, page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
log_pos++;
int2store(log_pos, page_length);
log_pos+= 2;
int4store(log_pos, crc);
log_pos+= 4;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= check_start;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
page->org_size= page->size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......
......@@ -68,8 +68,7 @@ my_bool _ma_check_unique(MARIA_HA *info, MARIA_UNIQUEDEF *def, uchar *record,
DBUG_ASSERT(info->last_key.data_length == MARIA_UNIQUE_HASH_LENGTH);
if (_ma_search_next(info, &info->last_key, SEARCH_BIGGER,
info->s->state.key_root[def->key]) ||
bcmp((char*) info->last_key.data, (char*) key_buff,
MARIA_UNIQUE_HASH_LENGTH))
bcmp(info->last_key.data, key_buff, MARIA_UNIQUE_HASH_LENGTH))
{
info->page_changed= 1; /* Can't optimize read next */
info->cur_row.lastpos= lastpos;
......
......@@ -823,9 +823,9 @@ int _ma_insert(register MARIA_HA *info, MARIA_KEY *key,
Check if the new key fits totally into the the page
(anc_buff is big enough to contain a full page + one key)
*/
if (a_length <= (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
if (a_length <= share->max_index_block_size)
{
if (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE - a_length < 32 &&
if (share->max_index_block_size - a_length < 32 &&
(keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
share->base.key_reflength <= share->base.rec_reflength &&
share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
......@@ -887,7 +887,7 @@ ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
{
if (share->now_transactional &&
_ma_log_add(anc_page, org_anc_length,
key_pos, s_temp.changed_length, t_length, 0))
key_pos, s_temp.changed_length, t_length, 1))
DBUG_RETURN(-1);
}
DBUG_RETURN(0); /* There is room on page */
......@@ -1265,7 +1265,7 @@ static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
curr_keylength);
if ((right ? right_length : left_length) + curr_keylength <=
(uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
share->max_index_block_size)
{
/* Enough space to hold all keys in the two buffers ; Balance bufferts */
new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
......@@ -1320,7 +1320,8 @@ static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
start of page
*/
if (_ma_log_prefix(&next_page, 0,
((int) new_right_length - (int) right_length)))
((int) new_right_length - (int) right_length),
KEY_OP_DEBUG_LOG_PREFIX_3))
goto err;
}
else
......@@ -1383,7 +1384,8 @@ static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
*/
if (_ma_log_prefix(&next_page,
(uint) (new_right_length - right_length),
(int) (new_right_length - right_length)))
(int) (new_right_length - right_length),
KEY_OP_DEBUG_LOG_PREFIX_4))
goto err;
}
else
......@@ -1545,7 +1547,8 @@ static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
This contains the last 'extra_buff' from 'buff'
*/
if (_ma_log_prefix(&extra_page,
0, (int) (extra_buff_length - right_length)))
0, (int) (extra_buff_length - right_length),
KEY_OP_DEBUG_LOG_PREFIX_5))
goto err;
/*
......@@ -1891,6 +1894,9 @@ my_bool _ma_log_new(MARIA_PAGE *ma_page, my_bool root_page)
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + LSN_STORE_SIZE;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
info->trn, info,
(translog_size_t)
......@@ -1912,7 +1918,7 @@ my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
uint offset= (uint) (key_pos - ma_page->buff), translog_parts;
my_off_t page;
MARIA_HA *info= ma_page->info;
......@@ -1921,6 +1927,7 @@ my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
DBUG_ASSERT(info->s->now_transactional);
DBUG_ASSERT(offset + length <= ma_page->size);
DBUG_ASSERT(ma_page->org_size == ma_page->size);
/* Store address of new root page */
page= ma_page->pos / info->s->block_size;
......@@ -1944,21 +1951,9 @@ my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
translog_parts= 2;
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
log_pos+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &length, &translog_parts);
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -1994,8 +1989,6 @@ my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length,
- Page is shortened from end
- Data is added to end of page
- Data added at front of page
*/
static my_bool _ma_log_split(MARIA_PAGE *ma_page,
......@@ -2006,9 +1999,9 @@ static my_bool _ma_log_split(MARIA_PAGE *ma_page,
uint changed_length)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 3+3+3+3+3+2 +7];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
uint offset= (uint) (key_pos - ma_page->buff);
uint translog_parts, extra_length;
MARIA_HA *info= ma_page->info;
......@@ -2018,6 +2011,7 @@ static my_bool _ma_log_split(MARIA_PAGE *ma_page,
(ulong) ma_page->pos, org_length, new_length));
DBUG_ASSERT(changed_length >= data_length);
DBUG_ASSERT(org_length <= info->s->max_index_block_size);
log_pos= log_data + FILEID_STORE_SIZE;
page= ma_page->pos / info->s->block_size;
......@@ -2029,6 +2023,10 @@ static my_bool _ma_log_split(MARIA_PAGE *ma_page,
(*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
if (new_length <= offset || !key_pos)
{
/*
......@@ -2053,6 +2051,11 @@ static my_bool _ma_log_split(MARIA_PAGE *ma_page,
*/
max_key_length= new_length - offset;
extra_length= min(key_length, max_key_length);
if (offset + move_length > new_length)
{
/* This is true when move_length includes changes for next packed key */
move_length= new_length - offset;
}
if ((int) new_length < (int) (org_length + move_length + data_length))
{
......@@ -2115,21 +2118,12 @@ static my_bool _ma_log_split(MARIA_PAGE *ma_page,
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -2168,8 +2162,9 @@ static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
int move_length)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 12 + 7], *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
uint offset= (uint) (key_pos - ma_page->buff);
uint diff_length= org_length + move_length - new_length;
uint translog_parts, extra_length;
......@@ -2180,6 +2175,7 @@ static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
(ulong) ma_page->pos, org_length, new_length));
DBUG_ASSERT((int) diff_length > 0);
DBUG_ASSERT(ma_page->size == new_length);
log_pos= log_data + FILEID_STORE_SIZE;
page= ma_page->pos / info->s->block_size;
......@@ -2189,6 +2185,15 @@ static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
translog_parts= 1;
extra_length= 0;
#ifdef EXTRA_DEBUG_KEY_CHANGES
*log_pos++= KEY_OP_DEBUG;
*log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
if (offset < diff_length + info->s->keypage_header)
{
/*
......@@ -2236,21 +2241,11 @@ static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page,
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
log_data);
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -2277,9 +2272,9 @@ static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
uint key_length, int move_length)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+5+3+3+3 + 7];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7];
uchar *log_pos;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
uint key_offset;
uint translog_parts, extra_length;
my_off_t page;
......@@ -2287,6 +2282,8 @@ static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
DBUG_ENTER("_ma_log_key_middle");
DBUG_PRINT("enter", ("page: %lu", (ulong) ma_page->pos));
DBUG_ASSERT(ma_page->size == new_length);
/* new place of key after changes */
key_pos+= data_added_first;
key_offset= (uint) (key_pos - ma_page->buff);
......@@ -2314,6 +2311,15 @@ static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
page_store(log_pos, page);
log_pos+= PAGE_STORE_SIZE;
#ifdef EXTRA_DEBUG_KEY_CHANGES
*log_pos++= KEY_OP_DEBUG;
*log_pos++= KEY_OP_DEBUG_LOG_MIDDLE;
#endif
/* Store keypage_flag */
*log_pos++= KEY_OP_SET_PAGEFLAG;
*log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
log_pos[0]= KEY_OP_DEL_SUFFIX;
int2store(log_pos+1, data_deleted_last);
log_pos+= 3;
......@@ -2362,21 +2368,11 @@ static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page,
key_length);
}
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......@@ -2401,14 +2397,17 @@ static my_bool _ma_log_middle(MARIA_PAGE *ma_page,
uint data_deleted_last)
{
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5], *log_pos;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos;
MARIA_HA *info= ma_page->info;
my_off_t page;
uint translog_parts, extra_length;
DBUG_ENTER("_ma_log_middle");
DBUG_PRINT("enter", ("page: %lu", (ulong) page));
DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last ==
ma_page->size);
page= ma_page->page / info->s->block_size;
log_pos= log_data + FILEID_STORE_SIZE;
......@@ -2434,21 +2433,11 @@ static my_bool _ma_log_middle(MARIA_PAGE *ma_page,
translog_parts= 2;
extra_length= data_changed_first;
#ifdef EXTRA_DEBUG_KEY_CHANGES
{
int page_length= ma_page->size;
ha_checksum crc;
crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
page_length - LSN_STORE_SIZE);
log_pos[0]= KEY_OP_CHECK;
int2store(log_pos+1, page_length);
int4store(log_pos+3, crc);
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
extra_length+= 7;
translog_parts++;
}
#endif
_ma_log_key_changes(ma_page,
log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
log_pos, &extra_length, &translog_parts);
/* Remember new page length for future log entires for same page */
ma_page->org_size= ma_page->size;
DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
......
......@@ -39,6 +39,7 @@
#define SANITY_CHECKS 1
#ifdef EXTRA_DEBUG
#define EXTRA_DEBUG_KEY_CHANGES
#define EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES
#endif
#define MAX_NONMAPPED_INSERTS 1000
......@@ -361,6 +362,7 @@ typedef struct st_maria_share
uint in_trans; /* Number of references by trn */
uint w_locks, r_locks, tot_locks; /* Number of read/write locks */
uint block_size; /* block_size of keyfile & data file*/
uint max_index_block_size; /* block_size - end_of_page_info */
/* Fixed length part of a packed row in BLOCK_RECORD format */
uint base_length;
myf write_flag;
......@@ -833,6 +835,7 @@ typedef struct st_maria_page
uchar *buff; /* Data for page */
my_off_t pos; /* Disk address to page */
uint size; /* Size of data on page */
uint org_size; /* Size of page at read or after log */
uint node; /* 0 or share->base.key_reflength */
uint flag; /* Page flag */
uint link_offset;
......
......@@ -363,6 +363,7 @@ TRN *trnman_new_trn(WT_THD *wt)
trn->used_tables= 0;
trn->locked_tables= 0;
trn->flags= 0;
/*
only after the following function TRN is considered initialized,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment