Commit b28a44e9 authored by ingo@mysql.com's avatar ingo@mysql.com

Bug#10602 - LOAD INDEX INTO CACHE deadlocks

Fixed two deadlocks.
Made minor changes.
parent effd338f
...@@ -184,10 +184,18 @@ static void test_key_cache(KEY_CACHE *keycache, ...@@ -184,10 +184,18 @@ static void test_key_cache(KEY_CACHE *keycache,
static FILE *keycache_debug_log=NULL; static FILE *keycache_debug_log=NULL;
static void keycache_debug_print _VARARGS((const char *fmt,...)); static void keycache_debug_print _VARARGS((const char *fmt,...));
#define KEYCACHE_DEBUG_OPEN \ #define KEYCACHE_DEBUG_OPEN \
if (!keycache_debug_log) keycache_debug_log=fopen(KEYCACHE_DEBUG_LOG, "w") if (!keycache_debug_log) \
{ \
keycache_debug_log= fopen(KEYCACHE_DEBUG_LOG, "w"); \
(void) setvbuf(keycache_debug_log, NULL, _IOLBF, BUFSIZ); \
}
#define KEYCACHE_DEBUG_CLOSE \ #define KEYCACHE_DEBUG_CLOSE \
if (keycache_debug_log) { fclose(keycache_debug_log); keycache_debug_log=0; } if (keycache_debug_log) \
{ \
fclose(keycache_debug_log); \
keycache_debug_log= 0; \
}
#else #else
#define KEYCACHE_DEBUG_OPEN #define KEYCACHE_DEBUG_OPEN
#define KEYCACHE_DEBUG_CLOSE #define KEYCACHE_DEBUG_CLOSE
...@@ -213,7 +221,7 @@ static long keycache_thread_id; ...@@ -213,7 +221,7 @@ static long keycache_thread_id;
#define KEYCACHE_THREAD_TRACE_BEGIN(l) \ #define KEYCACHE_THREAD_TRACE_BEGIN(l) \
{ struct st_my_thread_var *thread_var= my_thread_var; \ { struct st_my_thread_var *thread_var= my_thread_var; \
keycache_thread_id= my_thread_var->id; \ keycache_thread_id= thread_var->id; \
KEYCACHE_DBUG_PRINT(l,("[thread %ld",keycache_thread_id)) } KEYCACHE_DBUG_PRINT(l,("[thread %ld",keycache_thread_id)) }
#define KEYCACHE_THREAD_TRACE_END(l) \ #define KEYCACHE_THREAD_TRACE_END(l) \
...@@ -240,12 +248,10 @@ static int keycache_pthread_cond_wait(pthread_cond_t *cond, ...@@ -240,12 +248,10 @@ static int keycache_pthread_cond_wait(pthread_cond_t *cond,
static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex); static int keycache_pthread_mutex_lock(pthread_mutex_t *mutex);
static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex); static void keycache_pthread_mutex_unlock(pthread_mutex_t *mutex);
static int keycache_pthread_cond_signal(pthread_cond_t *cond); static int keycache_pthread_cond_signal(pthread_cond_t *cond);
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond);
#else #else
#define keycache_pthread_mutex_lock pthread_mutex_lock #define keycache_pthread_mutex_lock pthread_mutex_lock
#define keycache_pthread_mutex_unlock pthread_mutex_unlock #define keycache_pthread_mutex_unlock pthread_mutex_unlock
#define keycache_pthread_cond_signal pthread_cond_signal #define keycache_pthread_cond_signal pthread_cond_signal
#define keycache_pthread_cond_broadcast pthread_cond_broadcast
#endif /* defined(KEYCACHE_DEBUG) */ #endif /* defined(KEYCACHE_DEBUG) */
static uint next_power(uint value) static uint next_power(uint value)
...@@ -508,6 +514,8 @@ int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size, ...@@ -508,6 +514,8 @@ int resize_key_cache(KEY_CACHE *keycache, uint key_cache_block_size,
keycache->can_be_used= 0; keycache->can_be_used= 0;
while (keycache->cnt_for_resize_op) while (keycache->cnt_for_resize_op)
{ {
KEYCACHE_DBUG_PRINT("resize_key_cache: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
} }
...@@ -520,7 +528,11 @@ finish: ...@@ -520,7 +528,11 @@ finish:
unlink_from_queue(wqueue, thread); unlink_from_queue(wqueue, thread);
/* Signal for the next resize request to proceeed if any */ /* Signal for the next resize request to proceeed if any */
if (wqueue->last_thread) if (wqueue->last_thread)
{
KEYCACHE_DBUG_PRINT("resize_key_cache: signal",
("thread %ld", wqueue->last_thread->next->id));
keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend); keycache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
}
keycache_pthread_mutex_unlock(&keycache->cache_lock); keycache_pthread_mutex_unlock(&keycache->cache_lock);
return blocks; return blocks;
} }
...@@ -544,7 +556,11 @@ static inline void dec_counter_for_resize_op(KEY_CACHE *keycache) ...@@ -544,7 +556,11 @@ static inline void dec_counter_for_resize_op(KEY_CACHE *keycache)
struct st_my_thread_var *last_thread; struct st_my_thread_var *last_thread;
if (!--keycache->cnt_for_resize_op && if (!--keycache->cnt_for_resize_op &&
(last_thread= keycache->resize_queue.last_thread)) (last_thread= keycache->resize_queue.last_thread))
{
KEYCACHE_DBUG_PRINT("dec_counter_for_resize_op: signal",
("thread %ld", last_thread->next->id));
keycache_pthread_cond_signal(&last_thread->next->suspend); keycache_pthread_cond_signal(&last_thread->next->suspend);
}
} }
/* /*
...@@ -761,8 +777,8 @@ static void release_queue(KEYCACHE_WQUEUE *wqueue) ...@@ -761,8 +777,8 @@ static void release_queue(KEYCACHE_WQUEUE *wqueue)
do do
{ {
thread=next; thread=next;
keycache_pthread_cond_signal(&thread->suspend);
KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id)); KEYCACHE_DBUG_PRINT("release_queue: signal", ("thread %ld", thread->id));
keycache_pthread_cond_signal(&thread->suspend);
next=thread->next; next=thread->next;
thread->next= NULL; thread->next= NULL;
} }
...@@ -876,7 +892,8 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot, ...@@ -876,7 +892,8 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot,
BLOCK_LINK **pins; BLOCK_LINK **pins;
KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests)); KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
if (!hot && keycache->waiting_for_block.last_thread) { if (!hot && keycache->waiting_for_block.last_thread)
{
/* Signal that in the LRU warm sub-chain an available block has appeared */ /* Signal that in the LRU warm sub-chain an available block has appeared */
struct st_my_thread_var *last_thread= struct st_my_thread_var *last_thread=
keycache->waiting_for_block.last_thread; keycache->waiting_for_block.last_thread;
...@@ -894,6 +911,7 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot, ...@@ -894,6 +911,7 @@ static void link_block(KEY_CACHE *keycache, BLOCK_LINK *block, my_bool hot,
*/ */
if ((HASH_LINK *) thread->opt_info == hash_link) if ((HASH_LINK *) thread->opt_info == hash_link)
{ {
KEYCACHE_DBUG_PRINT("link_block: signal", ("thread %ld", thread->id));
keycache_pthread_cond_signal(&thread->suspend); keycache_pthread_cond_signal(&thread->suspend);
unlink_from_queue(&keycache->waiting_for_block, thread); unlink_from_queue(&keycache->waiting_for_block, thread);
block->requests++; block->requests++;
...@@ -1000,8 +1018,7 @@ static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count) ...@@ -1000,8 +1018,7 @@ static void reg_requests(KEY_CACHE *keycache, BLOCK_LINK *block, int count)
linking it to the LRU chain if it's the last request linking it to the LRU chain if it's the last request
SYNOPSIS SYNOPSIS
unreg_request()
unreg_block()
keycache pointer to a key cache data structure keycache pointer to a key cache data structure
block pointer to the block to link to the LRU chain block pointer to the block to link to the LRU chain
at_end <-> to link the block at the end of the LRU chain at_end <-> to link the block at the end of the LRU chain
...@@ -1086,6 +1103,9 @@ static inline void wait_for_readers(KEY_CACHE *keycache, BLOCK_LINK *block) ...@@ -1086,6 +1103,9 @@ static inline void wait_for_readers(KEY_CACHE *keycache, BLOCK_LINK *block)
struct st_my_thread_var *thread= my_thread_var; struct st_my_thread_var *thread= my_thread_var;
while (block->hash_link->requests) while (block->hash_link->requests)
{ {
KEYCACHE_DBUG_PRINT("wait_for_readers: wait",
("suspend thread %ld block %u",
thread->id, BLOCK_NUMBER(block)));
block->condvar= &thread->suspend; block->condvar= &thread->suspend;
keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock); keycache_pthread_cond_wait(&thread->suspend, &keycache->cache_lock);
block->condvar= NULL; block->condvar= NULL;
...@@ -1143,6 +1163,7 @@ static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link) ...@@ -1143,6 +1163,7 @@ static void unlink_hash(KEY_CACHE *keycache, HASH_LINK *hash_link)
*/ */
if (page->file == hash_link->file && page->filepos == hash_link->diskpos) if (page->file == hash_link->file && page->filepos == hash_link->diskpos)
{ {
KEYCACHE_DBUG_PRINT("unlink_hash: signal", ("thread %ld", thread->id));
keycache_pthread_cond_signal(&thread->suspend); keycache_pthread_cond_signal(&thread->suspend);
unlink_from_queue(&keycache->waiting_for_hash_link, thread); unlink_from_queue(&keycache->waiting_for_hash_link, thread);
} }
...@@ -1225,6 +1246,8 @@ restart: ...@@ -1225,6 +1246,8 @@ restart:
page.filepos= filepos; page.filepos= filepos;
thread->opt_info= (void *) &page; thread->opt_info= (void *) &page;
link_into_queue(&keycache->waiting_for_hash_link, thread); link_into_queue(&keycache->waiting_for_hash_link, thread);
KEYCACHE_DBUG_PRINT("get_hash_link: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
thread->opt_info= NULL; thread->opt_info= NULL;
...@@ -1343,6 +1366,8 @@ restart: ...@@ -1343,6 +1366,8 @@ restart:
add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do do
{ {
KEYCACHE_DBUG_PRINT("find_key_block: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
} }
...@@ -1360,7 +1385,9 @@ restart: ...@@ -1360,7 +1385,9 @@ restart:
/* This is a request for a page to be removed from cache */ /* This is a request for a page to be removed from cache */
KEYCACHE_DBUG_PRINT("find_key_block", KEYCACHE_DBUG_PRINT("find_key_block",
("request for old page in block %u",BLOCK_NUMBER(block))); ("request for old page in block %u "
"wrmode: %d block->status: %d",
BLOCK_NUMBER(block), wrmode, block->status));
/* /*
Only reading requests can proceed until the old dirty page is flushed, Only reading requests can proceed until the old dirty page is flushed,
all others are to be suspended, then resubmitted all others are to be suspended, then resubmitted
...@@ -1379,6 +1406,8 @@ restart: ...@@ -1379,6 +1406,8 @@ restart:
/* Wait until the request can be resubmitted */ /* Wait until the request can be resubmitted */
do do
{ {
KEYCACHE_DBUG_PRINT("find_key_block: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
} }
...@@ -1448,6 +1477,8 @@ restart: ...@@ -1448,6 +1477,8 @@ restart:
link_into_queue(&keycache->waiting_for_block, thread); link_into_queue(&keycache->waiting_for_block, thread);
do do
{ {
KEYCACHE_DBUG_PRINT("find_key_block: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
} }
...@@ -1528,9 +1559,13 @@ restart: ...@@ -1528,9 +1559,13 @@ restart:
else else
{ {
/* This is for secondary requests for a new page only */ /* This is for secondary requests for a new page only */
page_status= block->hash_link == hash_link && KEYCACHE_DBUG_PRINT("find_key_block",
(block->status & BLOCK_READ) ? ("block->hash_link: %p hash_link: %p "
PAGE_READ : PAGE_WAIT_TO_BE_READ; "block->status: %u", block->hash_link,
hash_link, block->status ));
page_status= (((block->hash_link == hash_link) &&
(block->status & BLOCK_READ)) ?
PAGE_READ : PAGE_WAIT_TO_BE_READ);
} }
} }
keycache->global_cache_read++; keycache->global_cache_read++;
...@@ -1538,17 +1573,22 @@ restart: ...@@ -1538,17 +1573,22 @@ restart:
else else
{ {
reg_requests(keycache, block, 1); reg_requests(keycache, block, 1);
page_status = block->hash_link == hash_link && KEYCACHE_DBUG_PRINT("find_key_block",
(block->status & BLOCK_READ) ? ("block->hash_link: %p hash_link: %p "
PAGE_READ : PAGE_WAIT_TO_BE_READ; "block->status: %u", block->hash_link,
hash_link, block->status ));
page_status= (((block->hash_link == hash_link) &&
(block->status & BLOCK_READ)) ?
PAGE_READ : PAGE_WAIT_TO_BE_READ);
} }
} }
KEYCACHE_DBUG_ASSERT(page_status != -1); KEYCACHE_DBUG_ASSERT(page_status != -1);
*page_st=page_status; *page_st=page_status;
KEYCACHE_DBUG_PRINT("find_key_block", KEYCACHE_DBUG_PRINT("find_key_block",
("fd: %u pos %lu page_status %lu", ("fd: %u pos %lu block->status %u page_status %lu",
(uint) file,(ulong) filepos,(uint) page_status)); (uint) file, (ulong) filepos, block->status,
(uint) page_status));
#if !defined(DBUG_OFF) && defined(EXTRA_DEBUG) #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
DBUG_EXECUTE("check_keycache2", DBUG_EXECUTE("check_keycache2",
...@@ -1604,6 +1644,10 @@ static void read_block(KEY_CACHE *keycache, ...@@ -1604,6 +1644,10 @@ static void read_block(KEY_CACHE *keycache,
/* Page is not in buffer yet, is to be read from disk */ /* Page is not in buffer yet, is to be read from disk */
keycache_pthread_mutex_unlock(&keycache->cache_lock); keycache_pthread_mutex_unlock(&keycache->cache_lock);
/*
Here other threads may step in and register as secondary readers.
They will register in block->wqueue[COND_FOR_REQUESTED].
*/
got_length= my_pread(block->hash_link->file, block->buffer, got_length= my_pread(block->hash_link->file, block->buffer,
read_length, block->hash_link->diskpos, MYF(0)); read_length, block->hash_link->diskpos, MYF(0));
keycache_pthread_mutex_lock(&keycache->cache_lock); keycache_pthread_mutex_lock(&keycache->cache_lock);
...@@ -1634,6 +1678,8 @@ static void read_block(KEY_CACHE *keycache, ...@@ -1634,6 +1678,8 @@ static void read_block(KEY_CACHE *keycache,
add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread); add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
do do
{ {
KEYCACHE_DBUG_PRINT("read_block: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
} }
...@@ -1855,6 +1901,10 @@ int key_cache_insert(KEY_CACHE *keycache, ...@@ -1855,6 +1901,10 @@ int key_cache_insert(KEY_CACHE *keycache,
/* The requested page is to be read into the block buffer */ /* The requested page is to be read into the block buffer */
#if !defined(SERIALIZED_READ_FROM_CACHE) #if !defined(SERIALIZED_READ_FROM_CACHE)
keycache_pthread_mutex_unlock(&keycache->cache_lock); keycache_pthread_mutex_unlock(&keycache->cache_lock);
/*
Here other threads may step in and register as secondary readers.
They will register in block->wqueue[COND_FOR_REQUESTED].
*/
#endif #endif
/* Copy data from buff */ /* Copy data from buff */
...@@ -1865,9 +1915,15 @@ int key_cache_insert(KEY_CACHE *keycache, ...@@ -1865,9 +1915,15 @@ int key_cache_insert(KEY_CACHE *keycache,
#if !defined(SERIALIZED_READ_FROM_CACHE) #if !defined(SERIALIZED_READ_FROM_CACHE)
keycache_pthread_mutex_lock(&keycache->cache_lock); keycache_pthread_mutex_lock(&keycache->cache_lock);
/* Here we are alone again. */
#endif #endif
block->status= BLOCK_READ; block->status= BLOCK_READ;
block->length= read_length+offset; block->length= read_length+offset;
KEYCACHE_DBUG_PRINT("key_cache_insert",
("primary request: new page in cache"));
/* Signal that all pending requests for this now can be processed. */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
release_queue(&block->wqueue[COND_FOR_REQUESTED]);
} }
remove_reader(block); remove_reader(block);
...@@ -2074,9 +2130,16 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block) ...@@ -2074,9 +2130,16 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block)
{ {
KEYCACHE_THREAD_TRACE("free block"); KEYCACHE_THREAD_TRACE("free block");
KEYCACHE_DBUG_PRINT("free_block", KEYCACHE_DBUG_PRINT("free_block",
("block %u to be freed",BLOCK_NUMBER(block))); ("block %u to be freed, hash_link %p",
BLOCK_NUMBER(block), block->hash_link));
if (block->hash_link) if (block->hash_link)
{ {
/*
While waiting for readers to finish, new readers might request the
block. But since we set block->status|= BLOCK_REASSIGNED, they
will wait on block->wqueue[COND_FOR_SAVED]. They must be signalled
later.
*/
block->status|= BLOCK_REASSIGNED; block->status|= BLOCK_REASSIGNED;
wait_for_readers(keycache, block); wait_for_readers(keycache, block);
unlink_hash(keycache, block->hash_link); unlink_hash(keycache, block->hash_link);
...@@ -2102,6 +2165,10 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block) ...@@ -2102,6 +2165,10 @@ static void free_block(KEY_CACHE *keycache, BLOCK_LINK *block)
keycache->free_block_list= block; keycache->free_block_list= block;
/* Keep track of the number of currently unused blocks. */ /* Keep track of the number of currently unused blocks. */
keycache->blocks_unused++; keycache->blocks_unused++;
/* All pending requests for this page must be resubmitted. */
if (block->wqueue[COND_FOR_SAVED].last_thread)
release_queue(&block->wqueue[COND_FOR_SAVED]);
} }
...@@ -2334,6 +2401,8 @@ restart: ...@@ -2334,6 +2401,8 @@ restart:
add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
do do
{ {
KEYCACHE_DBUG_PRINT("flush_key_blocks_int: wait",
("suspend thread %ld", thread->id));
keycache_pthread_cond_wait(&thread->suspend, keycache_pthread_cond_wait(&thread->suspend,
&keycache->cache_lock); &keycache->cache_lock);
} }
...@@ -2685,14 +2754,6 @@ static int keycache_pthread_cond_signal(pthread_cond_t *cond) ...@@ -2685,14 +2754,6 @@ static int keycache_pthread_cond_signal(pthread_cond_t *cond)
} }
static int keycache_pthread_cond_broadcast(pthread_cond_t *cond)
{
int rc;
KEYCACHE_THREAD_TRACE("signal");
rc= pthread_cond_broadcast(cond);
return rc;
}
#if defined(KEYCACHE_DEBUG_LOG) #if defined(KEYCACHE_DEBUG_LOG)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment