Commit 0412d652 authored by unknown's avatar unknown

fixed bug in parallel repair

parent 3229ddaa
......@@ -324,7 +324,7 @@ typedef struct st_io_cache_share
/* to sync on reads into buffer */
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
int count, total;
/* actual IO_CACHE that filled the buffer */
struct st_io_cache *active;
#ifdef NOT_YET_IMPLEMENTED
......
......@@ -2129,7 +2129,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick)
{
int got_error;
uint i,key, total_key_length;
uint i,key, total_key_length, istep;
ulong rec_length;
ha_rows start_records;
my_off_t new_header_length,del;
......@@ -2264,8 +2264,8 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
info->state->records=info->state->del=share->state.split=0;
info->state->empty=0;
for (i=key=0 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i++, key++)
for (i=key=0, istep=1 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
{
sort_param[i].key=key;
sort_param[i].keyinfo=share->keyinfo+key;
......@@ -2276,9 +2276,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
(char*) (share->state.rec_per_key_part+
(uint) (rec_per_key_part - param->rec_per_key_part)),
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
i--;
istep=0;
continue;
}
istep=1;
if ((!(param->testflag & T_SILENT)))
printf ("- Fixing index %d\n",key+1);
sort_param[i].key_read= ((sort_param[i].keyinfo->flag & HA_FULLTEXT) ?
......
......@@ -441,7 +441,7 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
DBUG_ASSERT(info->type == READ_CACHE);
pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init (&s->cond, 0);
s->count=num_threads-1;
s->total=s->count=num_threads-1;
s->active=0; /* to catch errors */
info->share=s;
info->read_function=_my_b_read_r;
......@@ -456,32 +456,36 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
*/
void remove_io_thread(IO_CACHE *info)
{
pthread_mutex_lock(&info->share->mutex);
if (! info->share->count--)
pthread_cond_signal(&info->share->cond);
pthread_mutex_unlock(&info->share->mutex);
IO_CACHE_SHARE *s=info->share;
pthread_mutex_lock(&s->mutex);
s->total--;
if (! s->count--)
pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->mutex);
}
static int lock_io_cache(IO_CACHE *info)
{
pthread_mutex_lock(&info->share->mutex);
if (!info->share->count)
return 1;
int total;
IO_CACHE_SHARE *s=info->share;
--(info->share->count);
pthread_cond_wait(&info->share->cond, &info->share->mutex);
/*
count can be -1 here, if one thread was removed (remove_io_thread)
while all others were locked (lock_io_cache).
If this is the case, this thread behaves as if count was 0 from the
very beginning, that is returns 1 and does not unlock the mutex.
*/
if (++(info->share->count))
pthread_mutex_lock(&s->mutex);
if (!s->count)
{
pthread_mutex_unlock(&info->share->mutex);
return 0;
s->count=s->total;
return 1;
}
return 1;
total=s->total;
s->count--;
pthread_cond_wait(&s->cond, &s->mutex);
if (s->total < total)
return 1;
pthread_mutex_unlock(&s->mutex);
return 0;
}
static void unlock_io_cache(IO_CACHE *info)
......@@ -1132,19 +1136,15 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER("end_io_cache");
#ifdef THREAD
/*
if IO_CACHE is shared between several threads, only one
thread needs to call end_io_cache() - just as init_io_cache()
should be called only once and then memcopy'ed
*/
if (info->share)
{
#ifdef SAFE_MUTEX
/* simple protection against multi-close: destroying share first */
if (pthread_cond_destroy (&info->share->cond) |
pthread_mutex_destroy(&info->share->mutex))
{
DBUG_RETURN(1);
}
#else
pthread_cond_destroy (&info->share->cond);
pthread_mutex_destroy(&info->share->mutex);
#endif
info->share=0;
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment