Commit ab3e7f9f authored by Michael Widenius's avatar Michael Widenius

Fixed bug that another thread used handler->s->id before it was recorded in the log.

This fixed an assert in recovert in mi_recovery.c "cmp_translog_addr(rec->lsn, checkpoint_start) < 0"

storage/maria/ma_loghandler.c:
  Don't assign share->id until it's recorded in the log.
  Had to do an extra test in translog_write_record() to not call translog_assign_id_to_share() for LOGREC_FILE_ID (which sets share->id)
storage/maria/ma_recovery.c:
  Print comment in log for checkpoints
parent 70575456
...@@ -6102,7 +6102,7 @@ my_bool translog_write_record(LSN *lsn, ...@@ -6102,7 +6102,7 @@ my_bool translog_write_record(LSN *lsn,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
if (tbl_info) if (tbl_info && type != LOGREC_FILE_ID)
{ {
MARIA_SHARE *share= tbl_info->s; MARIA_SHARE *share= tbl_info->s;
DBUG_ASSERT(share->now_transactional); DBUG_ASSERT(share->now_transactional);
...@@ -7754,6 +7754,7 @@ out: ...@@ -7754,6 +7754,7 @@ out:
int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn) int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
{ {
uint16 id;
MARIA_SHARE *share= tbl_info->s; MARIA_SHARE *share= tbl_info->s;
/* /*
If you give an id to a non-BLOCK_RECORD table, you also need to release If you give an id to a non-BLOCK_RECORD table, you also need to release
...@@ -7762,13 +7763,14 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn) ...@@ -7762,13 +7763,14 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
DBUG_ASSERT(share->data_file_type == BLOCK_RECORD); DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
/* re-check under mutex to avoid having 2 ids for the same share */ /* re-check under mutex to avoid having 2 ids for the same share */
pthread_mutex_lock(&share->intern_lock); pthread_mutex_lock(&share->intern_lock);
if (unlikely(share->id == 0)) if (likely(share->id == 0))
{ {
LSN lsn; LSN lsn;
LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
uchar log_data[FILEID_STORE_SIZE]; uchar log_data[FILEID_STORE_SIZE];
/* Inspired by set_short_trid() of trnman.c */ /* Inspired by set_short_trid() of trnman.c */
uint i= share->kfile.file % SHARE_ID_MAX + 1; uint i= share->kfile.file % SHARE_ID_MAX + 1;
id= 0;
do do
{ {
my_atomic_rwlock_wrlock(&LOCK_id_to_share); my_atomic_rwlock_wrlock(&LOCK_id_to_share);
...@@ -7778,14 +7780,15 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn) ...@@ -7778,14 +7780,15 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
if (id_to_share[i] == NULL && if (id_to_share[i] == NULL &&
my_atomic_casptr((void **)&id_to_share[i], &tmp, share)) my_atomic_casptr((void **)&id_to_share[i], &tmp, share))
{ {
share->id= (uint16)i; id= (uint16) i;
break; break;
} }
} }
my_atomic_rwlock_wrunlock(&LOCK_id_to_share); my_atomic_rwlock_wrunlock(&LOCK_id_to_share);
i= 1; /* scan the whole array */ i= 1; /* scan the whole array */
} while (share->id == 0); } while (id == 0);
DBUG_PRINT("info", ("id_to_share: 0x%lx -> %u", (ulong)share, share->id)); DBUG_PRINT("info", ("id_to_share: 0x%lx -> %u", (ulong)share, id));
fileid_store(log_data, id);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
/* /*
...@@ -7807,11 +7810,17 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn) ...@@ -7807,11 +7810,17 @@ int translog_assign_id_to_share(MARIA_HA *tbl_info, TRN *trn)
log_array[TRANSLOG_INTERNAL_PARTS + log_array[TRANSLOG_INTERNAL_PARTS +
1].length), 1].length),
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data, NULL))) log_array, NULL, NULL)))
{ {
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
return 1; return 1;
} }
/*
Now when translog record is done, we can set share->id.
If we set it before, then translog_write_record may pick up the id
before it's written to the log.
*/
share->id= id;
} }
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
return 0; return 0;
......
...@@ -642,6 +642,7 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn, ...@@ -642,6 +642,7 @@ static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
prototype_redo_exec_hook_dummy(CHECKPOINT) prototype_redo_exec_hook_dummy(CHECKPOINT)
{ {
/* the only checkpoint we care about was found via control file, ignore */ /* the only checkpoint we care about was found via control file, ignore */
tprint(tracef, "CHECKPOINT found\n");
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment