Commit 3971e262 authored by Sergei Golubchik's avatar Sergei Golubchik

maria: deadlock detection when waiting on unique key (useless until we can rollback)

include/my_pthread.h:
  cleanup
include/waiting_threads.h:
  header guard
mysys/waiting_threads.c:
  bug - kill strategy were not applied to deadlocks of length 1.
  cast timeout to ulonglong.
storage/maria/ma_static.c:
  declare WT_RESOURCE_TYPE ma_rc_dup_unique
storage/maria/ma_write.c:
  deadlock detection when waiting on unique key (useless until we can rollback)
storage/maria/maria_def.h:
  deadlock detection when waiting on unique key (useless until we can rollback)
storage/maria/trnman.c:
  use deadlock detector.
  protect state transitions of a TRN with a mutex.
  trnman_trid_to_trn() function.
storage/maria/trnman.h:
  trnman_trid_to_trn() function
  protect state transitions of a TRN with a mutex
  use deadlock detector.
storage/maria/trnman_public.h:
  trnman_trid_to_trn()
parent 7ca3fc4f
...@@ -437,9 +437,10 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex); ...@@ -437,9 +437,10 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex);
#ifndef set_timespec_time_nsec #ifndef set_timespec_time_nsec
#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \ #define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \
ulonglong now= (TIME) + (NSEC/100); \ ulonglong nsec= (NSEC); \
ulonglong now= (TIME) + (nsec/100); \
(ABSTIME).TV_sec= (now / ULL(10000000)); \ (ABSTIME).TV_sec= (now / ULL(10000000)); \
(ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \ (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + (nsec % 100)); \
} while(0) } while(0)
#endif /* !set_timespec_time_nsec */ #endif /* !set_timespec_time_nsec */
......
...@@ -13,6 +13,9 @@ ...@@ -13,6 +13,9 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef _waiting_threads_h
#define _waiting_threads_h
#include <my_global.h> #include <my_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <lf.h> #include <lf.h>
...@@ -152,3 +155,4 @@ void wt_thd_release(WT_THD *, WT_RESOURCE_ID *); ...@@ -152,3 +155,4 @@ void wt_thd_release(WT_THD *, WT_RESOURCE_ID *);
#define wt_thd_release_all(THD) wt_thd_release((THD), 0) #define wt_thd_release_all(THD) wt_thd_release((THD), 0)
int wt_resource_id_memcmp(void *, void *); int wt_resource_id_memcmp(void *, void *);
#endif
...@@ -227,6 +227,20 @@ struct deadlock_arg { ...@@ -227,6 +227,20 @@ struct deadlock_arg {
WT_RESOURCE *rc; WT_RESOURCE *rc;
}; };
static void change_victim(WT_THD* found, struct deadlock_arg *arg)
{
if (found->weight < arg->victim->weight)
{
if (arg->victim != arg->thd)
{
rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == found->waiting_for);
}
arg->victim= found;
arg->rc= 0;
}
}
/* /*
loop detection in a wait-for graph with a limited search depth. loop detection in a wait-for graph with a limited search depth.
*/ */
...@@ -294,16 +308,8 @@ retry: ...@@ -294,16 +308,8 @@ retry:
break; break;
case WT_DEADLOCK: case WT_DEADLOCK:
ret= WT_DEADLOCK; ret= WT_DEADLOCK;
if (cursor->weight < arg->victim->weight) change_victim(cursor, arg);
{ if (arg->rc)
if (arg->victim != arg->thd)
{
rc_unlock(arg->victim->waiting_for); /* release the previous victim */
DBUG_ASSERT(arg->rc == cursor->waiting_for);
}
arg->victim= cursor;
}
else if (arg->rc)
rc_unlock(arg->rc); rc_unlock(arg->rc);
goto end; goto end;
case WT_OK: case WT_OK:
...@@ -329,13 +335,15 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, ...@@ -329,13 +335,15 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
int ret; int ret;
DBUG_ENTER("deadlock"); DBUG_ENTER("deadlock");
ret= deadlock_search(&arg, blocker, depth); ret= deadlock_search(&arg, blocker, depth);
if (arg.rc)
rc_unlock(arg.rc);
if (ret == WT_DEPTH_EXCEEDED) if (ret == WT_DEPTH_EXCEEDED)
{ {
increment_cycle_stats(WT_CYCLE_STATS, max_depth); increment_cycle_stats(WT_CYCLE_STATS, max_depth);
ret= WT_OK; ret= WT_OK;
} }
if (ret == WT_DEADLOCK && depth)
change_victim(blocker, &arg);
if (arg.rc)
rc_unlock(arg.rc);
if (ret == WT_DEADLOCK && arg.victim != thd) if (ret == WT_DEADLOCK && arg.victim != thd)
{ {
DBUG_PRINT("wt", ("killing %s", arg.victim->name)); DBUG_PRINT("wt", ("killing %s", arg.victim->name));
...@@ -570,7 +578,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ...@@ -570,7 +578,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_OK; ret= WT_OK;
rc_unlock(rc); rc_unlock(rc);
set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000); set_timespec_time_nsec(timeout, starttime, wt_timeout_short*ULL(1000));
if (ret == WT_TIMEOUT) if (ret == WT_TIMEOUT)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
if (ret == WT_TIMEOUT) if (ret == WT_TIMEOUT)
...@@ -579,7 +587,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) ...@@ -579,7 +587,7 @@ int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
ret= WT_DEADLOCK; ret= WT_DEADLOCK;
else if (wt_timeout_long > wt_timeout_short) else if (wt_timeout_long > wt_timeout_short)
{ {
set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000); set_timespec_time_nsec(timeout, starttime, wt_timeout_long*ULL(1000));
if (!thd->killed) if (!thd->killed)
ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
} }
......
...@@ -88,7 +88,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info) ...@@ -88,7 +88,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
It's enough to compare trids here (instead of calling It's enough to compare trids here (instead of calling
tranman_can_read_from) as history->trid is a commit_trid tranman_can_read_from) as history->trid is a commit_trid
*/ */
while (trn->trid < history->trid) while (trn->trid < history->trid && history->trid != ~(TrID)0)
history= history->next; history= history->next;
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
/* The current item can't be deleted as it's the first one visible for us */ /* The current item can't be deleted as it's the first one visible for us */
......
...@@ -64,6 +64,9 @@ HASH maria_stored_state; ...@@ -64,6 +64,9 @@ HASH maria_stored_state;
*/ */
TRN dummy_transaction_object; TRN dummy_transaction_object;
/* a WT_RESOURCE_TYPE for transactions waiting on a unique key conflict */
WT_RESOURCE_TYPE ma_rc_dup_unique={ wt_resource_id_memcmp, 0};
/* Enough for comparing if number is zero */ /* Enough for comparing if number is zero */
uchar maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; uchar maria_zero_string[]= {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
......
...@@ -180,15 +180,47 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -180,15 +180,47 @@ int maria_write(MARIA_HA *info, uchar *record)
} }
else else
{ {
if (keyinfo->ck_insert(info, while (keyinfo->ck_insert(info,
(*keyinfo->make_key)(info, &int_key, i, (*keyinfo->make_key)(info, &int_key, i,
buff, record, filepos, buff, record, filepos,
info->trn->trid))) info->trn->trid)))
{ {
TRN *blocker=trnman_trid_to_trn(info->trn, info->dup_key_trid);
DBUG_PRINT("error",("Got error: %d on write",my_errno));
/*
if blocker TRN was not found, it means that the conflicting
transaction was committed long time ago. It could not be
aborted, as it would have to wait on the key tree lock
to remove the conflicting key it has inserted.
*/
if (local_lock_tree) if (local_lock_tree)
rw_unlock(&keyinfo->root_lock); rw_unlock(&keyinfo->root_lock);
DBUG_PRINT("error",("Got error: %d on write",my_errno)); if (!blocker)
goto err; goto err;
if (blocker->commit_trid != ~(TrID)0)
{ /* committed, albeit recently */
pthread_mutex_unlock(& blocker->state_lock);
goto err;
}
{ /* running. now we wait */
WT_RESOURCE_ID rc;
int res;
rc.type= &ma_rc_dup_unique;
rc.value.ptr= blocker; /* TODO savepoint id when we'll have them */
res= wt_thd_will_wait_for(& info->trn->wt, & blocker->wt, & rc);
if (res != WT_OK)
{
pthread_mutex_unlock(& blocker->state_lock);
goto err;
}
res=wt_thd_cond_timedwait(& info->trn->wt, & blocker->state_lock);
pthread_mutex_unlock(& blocker->state_lock);
if (res != WT_OK)
goto err;
}
if (local_lock_tree)
rw_wrlock(&keyinfo->root_lock);
} }
} }
...@@ -597,9 +629,22 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key, ...@@ -597,9 +629,22 @@ static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key,
else /* not HA_FULLTEXT, normal HA_NOSAME key */ else /* not HA_FULLTEXT, normal HA_NOSAME key */
{ {
DBUG_PRINT("warning", ("Duplicate key")); DBUG_PRINT("warning", ("Duplicate key"));
/*
FIXME
When the index will support true versioning - with multiple
identical values in the UNIQUE index, invisible to each other -
the following should be changed to "continue inserting keys, at the
end (of the row or statement) wait". Until it's done we cannot properly
support deadlock timeouts.
*/
/*
transaction that has inserted the conflicting key is in progress.
wait for it to be committed or aborted.
*/
info->dup_key_trid= _ma_trid_from_key(&tmp_key);
info->dup_key_pos= dup_key_pos; info->dup_key_pos= dup_key_pos;
my_afree((uchar*) temp_buff); my_afree((uchar*) temp_buff);
my_errno=HA_ERR_FOUND_DUPP_KEY; my_errno= HA_ERR_FOUND_DUPP_KEY;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
} }
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "ma_loghandler.h" #include "ma_loghandler.h"
#include "ma_control_file.h" #include "ma_control_file.h"
#include "ma_state.h" #include "ma_state.h"
#include <waiting_threads.h>
/* For testing recovery */ /* For testing recovery */
#ifdef TO_BE_REMOVED #ifdef TO_BE_REMOVED
...@@ -492,13 +493,14 @@ struct st_maria_handler ...@@ -492,13 +493,14 @@ struct st_maria_handler
uint32 int_keytree_version; /* -""- */ uint32 int_keytree_version; /* -""- */
int (*read_record)(MARIA_HA *, uchar*, MARIA_RECORD_POS); int (*read_record)(MARIA_HA *, uchar*, MARIA_RECORD_POS);
invalidator_by_filename invalidator; /* query cache invalidator */ invalidator_by_filename invalidator; /* query cache invalidator */
ulonglong last_auto_increment; /* auto value at start of statement */ ulonglong last_auto_increment; /* auto value at start of statement */
ulong this_unique; /* uniq filenumber or thread */ ulong this_unique; /* uniq filenumber or thread */
ulong last_unique; /* last unique number */ ulong last_unique; /* last unique number */
ulong this_loop; /* counter for this open */ ulong this_loop; /* counter for this open */
ulong last_loop; /* last used counter */ ulong last_loop; /* last used counter */
MARIA_RECORD_POS save_lastpos; MARIA_RECORD_POS save_lastpos;
MARIA_RECORD_POS dup_key_pos; MARIA_RECORD_POS dup_key_pos;
TrID dup_key_trid;
my_off_t pos; /* Intern variable */ my_off_t pos; /* Intern variable */
my_off_t last_keypage; /* Last key page read */ my_off_t last_keypage; /* Last key page read */
my_off_t last_search_keypage; /* Last keypage when searching */ my_off_t last_search_keypage; /* Last keypage when searching */
...@@ -759,6 +761,7 @@ extern char *maria_data_root; ...@@ -759,6 +761,7 @@ extern char *maria_data_root;
extern uchar maria_zero_string[]; extern uchar maria_zero_string[];
extern my_bool maria_inited, maria_in_ha_maria; extern my_bool maria_inited, maria_in_ha_maria;
extern HASH maria_stored_state; extern HASH maria_stored_state;
extern WT_RESOURCE_TYPE ma_rc_dup_unique;
/* This is used by _ma_calc_xxx_key_length och _ma_store_key */ /* This is used by _ma_calc_xxx_key_length och _ma_store_key */
typedef struct st_maria_s_param typedef struct st_maria_s_param
...@@ -782,7 +785,6 @@ typedef struct st_pinned_page ...@@ -782,7 +785,6 @@ typedef struct st_pinned_page
my_bool changed; my_bool changed;
} MARIA_PINNED_PAGE; } MARIA_PINNED_PAGE;
/* Prototypes for intern functions */ /* Prototypes for intern functions */
extern int _ma_read_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS); extern int _ma_read_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS);
extern int _ma_read_rnd_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS, extern int _ma_read_rnd_dynamic_record(MARIA_HA *, uchar *, MARIA_RECORD_POS,
......
...@@ -46,7 +46,7 @@ static TRN *pool; ...@@ -46,7 +46,7 @@ static TRN *pool;
/* a hash for committed transactions that maps trid to a TRN structure */ /* a hash for committed transactions that maps trid to a TRN structure */
static LF_HASH trid_to_trn; static LF_HASH trid_to_trn;
/* an array that maps short_trid of an active transaction to a TRN structure */ /* an array that maps short_id of an active transaction to a TRN structure */
static TRN **short_trid_to_active_trn; static TRN **short_trid_to_active_trn;
/* locks for short_trid_to_active_trn and pool */ /* locks for short_trid_to_active_trn and pool */
...@@ -114,11 +114,13 @@ int trnman_init(TrID initial_trid) ...@@ -114,11 +114,13 @@ int trnman_init(TrID initial_trid)
{ {
DBUG_ENTER("trnman_init"); DBUG_ENTER("trnman_init");
wt_init(); /* FIXME this should be done in the server, not in the engine! */
short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*), short_trid_to_active_trn= (TRN **)my_malloc(SHORT_TRID_MAX*sizeof(TRN*),
MYF(MY_WME|MY_ZEROFILL)); MYF(MY_WME|MY_ZEROFILL));
if (unlikely(!short_trid_to_active_trn)) if (unlikely(!short_trid_to_active_trn))
DBUG_RETURN(1); DBUG_RETURN(1);
short_trid_to_active_trn--; /* min short_trid is 1 */ short_trid_to_active_trn--; /* min short_id is 1 */
/* /*
Initialize lists. Initialize lists.
...@@ -179,6 +181,8 @@ void trnman_destroy() ...@@ -179,6 +181,8 @@ void trnman_destroy()
{ {
TRN *trn= pool; TRN *trn= pool;
pool= pool->next; pool= pool->next;
pthread_mutex_destroy(&trn->state_lock);
wt_thd_destroy(&trn->wt);
my_free((void *)trn, MYF(0)); my_free((void *)trn, MYF(0));
} }
lf_hash_destroy(&trid_to_trn); lf_hash_destroy(&trid_to_trn);
...@@ -188,6 +192,9 @@ void trnman_destroy() ...@@ -188,6 +192,9 @@ void trnman_destroy()
my_atomic_rwlock_destroy(&LOCK_pool); my_atomic_rwlock_destroy(&LOCK_pool);
my_free((void *)(short_trid_to_active_trn+1), MYF(0)); my_free((void *)(short_trid_to_active_trn+1), MYF(0));
short_trid_to_active_trn= NULL; short_trid_to_active_trn= NULL;
wt_end();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -206,11 +213,13 @@ static TrID new_trid() ...@@ -206,11 +213,13 @@ static TrID new_trid()
DBUG_RETURN(++global_trid_generator); DBUG_RETURN(++global_trid_generator);
} }
static void set_short_trid(TRN *trn) static uint get_short_trid(TRN *trn)
{ {
int i= (int) ((global_trid_generator + (intptr)trn) * 312089 % int i= (int) ((global_trid_generator + (intptr)trn) * 312089 %
SHORT_TRID_MAX + 1); SHORT_TRID_MAX + 1);
for ( ; !trn->short_id ; i= 1) uint res=0;
for ( ; !res ; i= 1)
{ {
my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_wrlock(&LOCK_short_trid_to_trn);
for ( ; i <= SHORT_TRID_MAX; i++) /* the range is [1..SHORT_TRID_MAX] */ for ( ; i <= SHORT_TRID_MAX; i++) /* the range is [1..SHORT_TRID_MAX] */
...@@ -219,12 +228,13 @@ static void set_short_trid(TRN *trn) ...@@ -219,12 +228,13 @@ static void set_short_trid(TRN *trn)
if (short_trid_to_active_trn[i] == NULL && if (short_trid_to_active_trn[i] == NULL &&
my_atomic_casptr((void **)&short_trid_to_active_trn[i], &tmp, trn)) my_atomic_casptr((void **)&short_trid_to_active_trn[i], &tmp, trn))
{ {
trn->short_id= i; res= i;
break; break;
} }
} }
my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn); my_atomic_rwlock_wrunlock(&LOCK_short_trid_to_trn);
} }
return res;
} }
/* /*
...@@ -243,7 +253,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -243,7 +253,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
we have a mutex, to do simple things under it - allocate a TRN, we have a mutex, to do simple things under it - allocate a TRN,
increment trnman_active_transactions, set trn->min_read_from. increment trnman_active_transactions, set trn->min_read_from.
Note that all the above is fast. generating short_trid may be slow, Note that all the above is fast. generating short_id may be slow,
as it involves scanning a large array - so it's done outside of the as it involves scanning a large array - so it's done outside of the
mutex. mutex.
*/ */
...@@ -280,6 +290,8 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -280,6 +290,8 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
return 0; return 0;
} }
trnman_allocated_transactions++; trnman_allocated_transactions++;
pthread_mutex_init(&trn->state_lock, MY_MUTEX_INIT_FAST);
wt_thd_init(&trn->wt);
} }
trn->pins= lf_hash_get_pins(&trid_to_trn); trn->pins= lf_hash_get_pins(&trid_to_trn);
if (!trn->pins) if (!trn->pins)
...@@ -293,7 +305,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -293,7 +305,6 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
trn->min_read_from= active_list_min.next->trid; trn->min_read_from= active_list_min.next->trid;
trn->trid= new_trid(); trn->trid= new_trid();
trn->short_id= 0;
trn->next= &active_list_max; trn->next= &active_list_max;
trn->prev= active_list_max.prev; trn->prev= active_list_max.prev;
...@@ -320,7 +331,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) ...@@ -320,7 +331,9 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond)
only after the following function TRN is considered initialized, only after the following function TRN is considered initialized,
so it must be done the last so it must be done the last
*/ */
set_short_trid(trn); pthread_mutex_lock(&trn->state_lock);
trn->short_id= get_short_trid(trn);
pthread_mutex_unlock(&trn->state_lock);
res= lf_hash_insert(&trid_to_trn, trn->pins, &trn); res= lf_hash_insert(&trid_to_trn, trn->pins, &trn);
DBUG_ASSERT(res <= 0); DBUG_ASSERT(res <= 0);
...@@ -364,6 +377,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -364,6 +377,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
/* if a rollback, all UNDO records should have been executed */ /* if a rollback, all UNDO records should have been executed */
DBUG_ASSERT(commit || trn->undo_lsn == 0); DBUG_ASSERT(commit || trn->undo_lsn == 0);
DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list")); DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list"));
pthread_mutex_lock(&LOCK_trn_list); pthread_mutex_lock(&LOCK_trn_list);
/* remove from active list */ /* remove from active list */
...@@ -402,7 +416,11 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -402,7 +416,11 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
*/ */
if (commit && active_list_min.next != &active_list_max) if (commit && active_list_min.next != &active_list_max)
{ {
pthread_mutex_lock(&trn->state_lock);
trn->commit_trid= global_trid_generator; trn->commit_trid= global_trid_generator;
wt_thd_release_all(& trn->wt);
pthread_mutex_unlock(&trn->state_lock);
trn->next= &committed_list_max; trn->next= &committed_list_max;
trn->prev= committed_list_max.prev; trn->prev= committed_list_max.prev;
trnman_committed_transactions++; trnman_committed_transactions++;
...@@ -436,11 +454,14 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -436,11 +454,14 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
TRN *t= free_me; TRN *t= free_me;
free_me= free_me->next; free_me= free_me->next;
/* /* ignore OOM. it's harmless, and we can do nothing here anyway */
ignore OOM here. it's harmless, and there's nothing we could do, anyway
*/
(void)lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID)); (void)lf_hash_delete(&trid_to_trn, pins, &t->trid, sizeof(TrID));
pthread_mutex_lock(&trn->state_lock);
trn->short_id= 0;
wt_thd_release_all(& trn->wt);
pthread_mutex_unlock(&trn->state_lock);
trnman_free_trn(t); trnman_free_trn(t);
} }
...@@ -533,6 +554,33 @@ int trnman_can_read_from(TRN *trn, TrID trid) ...@@ -533,6 +554,33 @@ int trnman_can_read_from(TRN *trn, TrID trid)
return can; return can;
} }
TRN *trnman_trid_to_trn(TRN *trn, TrID trid)
{
TRN **found;
LF_REQUIRE_PINS(3);
if (trid < trn->min_read_from)
return 0; /* it's committed eons ago */
found= lf_hash_search(&trid_to_trn, trn->pins, &trid, sizeof(trid));
if (found == NULL || found == MY_ERRPTR)
return 0; /* no luck */
/* we've found something */
pthread_mutex_lock(&(*found)->state_lock);
if ((*found)->short_id == 0)
{
pthread_mutex_unlock(&(*found)->state_lock);
lf_hash_search_unpin(trn->pins);
return 0; /* but it was a ghost */
}
lf_hash_search_unpin(trn->pins);
/* Gotcha! */
return *found; /* note that TRN is returned locked !!! */
}
/* TODO: the stubs below are waiting for savepoints to be implemented */ /* TODO: the stubs below are waiting for savepoints to be implemented */
void trnman_new_statement(TRN *trn __attribute__ ((unused))) void trnman_new_statement(TRN *trn __attribute__ ((unused)))
......
...@@ -21,19 +21,32 @@ C_MODE_START ...@@ -21,19 +21,32 @@ C_MODE_START
#include <lf.h> #include <lf.h>
#include "trnman_public.h" #include "trnman_public.h"
#include "ma_loghandler_lsn.h" #include "ma_loghandler_lsn.h"
#include <waiting_threads.h>
/* /*
trid - 6 uchar transaction identifier. Assigned when a transaction trid - 6 uchar transaction identifier. Assigned when a transaction
is created. Transaction can always be identified by its trid, is created. Transaction can always be identified by its trid,
even after transaction has ended. even after transaction has ended.
short_trid - 2-byte transaction identifier, identifies a running short_id - 2-byte transaction identifier, identifies a running
transaction, is reassigned when transaction ends. transaction, is reassigned when transaction ends.
when short_id is 0, TRN is not initialized, for all practical purposes
it could be considered unused.
when commit_trid is ~(TrID)0 the transaction is running, otherwise it's
committed.
state_lock mutex protects the state of a TRN, that is whether a TRN
is committed/running/unused. Meaning that modifications of short_id and
commit_trid happen under this mutex.
*/ */
struct st_transaction struct st_transaction
{ {
LF_PINS *pins; LF_PINS *pins;
WT_THD wt;
pthread_mutex_t state_lock;
void *used_tables; /* Tables used by transaction */ void *used_tables; /* Tables used by transaction */
TRN *next, *prev; TRN *next, *prev;
TrID trid, min_read_from, commit_trid; TrID trid, min_read_from, commit_trid;
...@@ -41,7 +54,6 @@ struct st_transaction ...@@ -41,7 +54,6 @@ struct st_transaction
LSN_WITH_FLAGS first_undo_lsn; LSN_WITH_FLAGS first_undo_lsn;
uint locked_tables; uint locked_tables;
uint16 short_id; uint16 short_id;
/* Note! if short_id is 0, trn is NOT initialized */
}; };
#define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000) #define TRANSACTION_LOGGED_LONG_ID ULL(0x8000000000000000)
......
...@@ -45,6 +45,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit); ...@@ -45,6 +45,7 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit);
#define trnman_rollback_trn(T) trnman_end_trn(T, FALSE) #define trnman_rollback_trn(T) trnman_end_trn(T, FALSE)
void trnman_free_trn(TRN *trn); void trnman_free_trn(TRN *trn);
int trnman_can_read_from(TRN *trn, TrID trid); int trnman_can_read_from(TRN *trn, TrID trid);
TRN *trnman_trid_to_trn(TRN *trn, TrID trid);
void trnman_new_statement(TRN *trn); void trnman_new_statement(TRN *trn);
void trnman_rollback_statement(TRN *trn); void trnman_rollback_statement(TRN *trn);
my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment