Commit 5e72901c authored by kostja@bodhi.(none)'s avatar kostja@bodhi.(none)

Merge bodhi.(none):/opt/local/work/mysql-5.0-runtime

into  bodhi.(none):/opt/local/work/mysql-5.1-runtime
parents 7ca9124d c4bcce64
...@@ -1761,18 +1761,18 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) ...@@ -1761,18 +1761,18 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
thd->proc_info="waiting for delay_list"; thd->proc_info="waiting for delay_list";
pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list
I_List_iterator<Delayed_insert> it(delayed_threads); I_List_iterator<Delayed_insert> it(delayed_threads);
Delayed_insert *tmp; Delayed_insert *di;
while ((tmp=it++)) while ((di= it++))
{ {
if (!strcmp(tmp->thd.db, table_list->db) && if (!strcmp(table_list->db, di->table_list.db) &&
!strcmp(table_list->table_name, tmp->table->s->table_name.str)) !strcmp(table_list->table_name, di->table_list.table_name))
{ {
tmp->lock(); di->lock();
break; break;
} }
} }
pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
return tmp; return di;
} }
...@@ -1798,21 +1798,41 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) ...@@ -1798,21 +1798,41 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
Two latter cases indicate a request for lock upgrade. Two latter cases indicate a request for lock upgrade.
XXX: why do we regard INSERT DELAYED into a view as an error and XXX: why do we regard INSERT DELAYED into a view as an error and
do not simply a lock upgrade? do not simply perform a lock upgrade?
TODO: The approach with using two mutexes to work with the
delayed thread list -- LOCK_delayed_insert and
LOCK_delayed_create -- is redundant, and we only need one of
them to protect the list. The reason we have two locks is that
we do not want to block look-ups in the list while we're waiting
for the newly created thread to open the delayed table. However,
this wait itself is redundant -- we always call get_local_table
later on, and there wait again until the created thread acquires
a table lock.
As is redundant the concept of locks_in_memory, since we already
have another counter with similar semantics - tables_in_use,
both of them are devoted to counting the number of producers for
a given consumer (delayed insert thread), only at different
stages of producer-consumer relationship.
'dead' and 'status' variables in Delayed_insert are redundant
too, since there is already 'di->thd.killed' and
di->stacked_inserts.
*/ */
static static
bool delayed_get_table(THD *thd, TABLE_LIST *table_list) bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
{ {
int error; int error;
Delayed_insert *tmp; Delayed_insert *di;
DBUG_ENTER("delayed_get_table"); DBUG_ENTER("delayed_get_table");
/* Must be set in the parser */ /* Must be set in the parser */
DBUG_ASSERT(table_list->db); DBUG_ASSERT(table_list->db);
/* Find the thread which handles this table. */ /* Find the thread which handles this table. */
if (!(tmp=find_handler(thd,table_list))) if (!(di= find_handler(thd, table_list)))
{ {
/* /*
No match. Create a new thread to handle the table, but No match. Create a new thread to handle the table, but
...@@ -1826,9 +1846,9 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) ...@@ -1826,9 +1846,9 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
The first search above was done without LOCK_delayed_create. The first search above was done without LOCK_delayed_create.
Another thread might have created the handler in between. Search again. Another thread might have created the handler in between. Search again.
*/ */
if (! (tmp= find_handler(thd, table_list))) if (! (di= find_handler(thd, table_list)))
{ {
if (!(tmp=new Delayed_insert())) if (!(di= new Delayed_insert()))
{ {
my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert)); my_error(ER_OUTOFMEMORY,MYF(0),sizeof(Delayed_insert));
thd->fatal_error(); thd->fatal_error();
...@@ -1837,28 +1857,30 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) ...@@ -1837,28 +1857,30 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thread_count++; thread_count++;
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
tmp->thd.set_db(table_list->db, strlen(table_list->db)); di->thd.set_db(table_list->db, strlen(table_list->db));
tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME)); di->thd.query= my_strdup(table_list->table_name, MYF(MY_WME));
if (tmp->thd.db == NULL || tmp->thd.query == NULL) if (di->thd.db == NULL || di->thd.query == NULL)
{ {
/* The error is reported */ /* The error is reported */
delete tmp; delete di;
thd->fatal_error(); thd->fatal_error();
goto end_create; goto end_create;
} }
tmp->table_list= *table_list; // Needed to open table di->table_list= *table_list; // Needed to open table
tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query; /* Replace volatile strings with local copies */
tmp->lock(); di->table_list.alias= di->table_list.table_name= di->thd.query;
pthread_mutex_lock(&tmp->mutex); di->table_list.db= di->thd.db;
if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib, di->lock();
handle_delayed_insert,(void*) tmp))) pthread_mutex_lock(&di->mutex);
if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
handle_delayed_insert, (void*) di)))
{ {
DBUG_PRINT("error", DBUG_PRINT("error",
("Can't create thread to handle delayed insert (error %d)", ("Can't create thread to handle delayed insert (error %d)",
error)); error));
pthread_mutex_unlock(&tmp->mutex); pthread_mutex_unlock(&di->mutex);
tmp->unlock(); di->unlock();
delete tmp; delete di;
my_error(ER_CANT_CREATE_THREAD, MYF(0), error); my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
thd->fatal_error(); thd->fatal_error();
goto end_create; goto end_create;
...@@ -1866,15 +1888,15 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) ...@@ -1866,15 +1888,15 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
/* Wait until table is open */ /* Wait until table is open */
thd->proc_info="waiting for handler open"; thd->proc_info="waiting for handler open";
while (!tmp->thd.killed && !tmp->table && !thd->killed) while (!di->thd.killed && !di->table && !thd->killed)
{ {
pthread_cond_wait(&tmp->cond_client,&tmp->mutex); pthread_cond_wait(&di->cond_client, &di->mutex);
} }
pthread_mutex_unlock(&tmp->mutex); pthread_mutex_unlock(&di->mutex);
thd->proc_info="got old table"; thd->proc_info="got old table";
if (tmp->thd.killed) if (di->thd.killed)
{ {
if (tmp->thd.net.report_error) if (di->thd.net.report_error)
{ {
/* /*
Copy the error message. Note that we don't treat fatal Copy the error message. Note that we don't treat fatal
...@@ -1882,31 +1904,34 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) ...@@ -1882,31 +1904,34 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
main thread. Use of my_message will enable stored main thread. Use of my_message will enable stored
procedures continue handlers. procedures continue handlers.
*/ */
my_message(tmp->thd.net.last_errno, tmp->thd.net.last_error, my_message(di->thd.net.last_errno, di->thd.net.last_error,
MYF(0)); MYF(0));
} }
tmp->unlock(); di->unlock();
goto end_create; goto end_create;
} }
if (thd->killed) if (thd->killed)
{ {
tmp->unlock(); di->unlock();
goto end_create; goto end_create;
} }
pthread_mutex_lock(&LOCK_delayed_insert);
delayed_threads.append(di);
pthread_mutex_unlock(&LOCK_delayed_insert);
} }
pthread_mutex_unlock(&LOCK_delayed_create); pthread_mutex_unlock(&LOCK_delayed_create);
} }
pthread_mutex_lock(&tmp->mutex); pthread_mutex_lock(&di->mutex);
table_list->table= tmp->get_local_table(thd); table_list->table= di->get_local_table(thd);
pthread_mutex_unlock(&tmp->mutex); pthread_mutex_unlock(&di->mutex);
if (table_list->table) if (table_list->table)
{ {
DBUG_ASSERT(thd->net.report_error == 0); DBUG_ASSERT(thd->net.report_error == 0);
thd->di=tmp; thd->di= di;
} }
/* Unlock the delayed insert object after its last access. */ /* Unlock the delayed insert object after its last access. */
tmp->unlock(); di->unlock();
DBUG_RETURN((table_list->table == NULL)); DBUG_RETURN((table_list->table == NULL));
end_create: end_create:
...@@ -2155,26 +2180,26 @@ void kill_delayed_threads(void) ...@@ -2155,26 +2180,26 @@ void kill_delayed_threads(void)
VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list
I_List_iterator<Delayed_insert> it(delayed_threads); I_List_iterator<Delayed_insert> it(delayed_threads);
Delayed_insert *tmp; Delayed_insert *di;
while ((tmp=it++)) while ((di= it++))
{ {
tmp->thd.killed= THD::KILL_CONNECTION; di->thd.killed= THD::KILL_CONNECTION;
if (tmp->thd.mysys_var) if (di->thd.mysys_var)
{ {
pthread_mutex_lock(&tmp->thd.mysys_var->mutex); pthread_mutex_lock(&di->thd.mysys_var->mutex);
if (tmp->thd.mysys_var->current_cond) if (di->thd.mysys_var->current_cond)
{ {
/* /*
We need the following test because the main mutex may be locked We need the following test because the main mutex may be locked
in handle_delayed_insert() in handle_delayed_insert()
*/ */
if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) if (&di->mutex != di->thd.mysys_var->current_mutex)
pthread_mutex_lock(tmp->thd.mysys_var->current_mutex); pthread_mutex_lock(di->thd.mysys_var->current_mutex);
pthread_cond_broadcast(tmp->thd.mysys_var->current_cond); pthread_cond_broadcast(di->thd.mysys_var->current_cond);
if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) if (&di->mutex != di->thd.mysys_var->current_mutex)
pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex); pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
} }
pthread_mutex_unlock(&tmp->thd.mysys_var->mutex); pthread_mutex_unlock(&di->thd.mysys_var->mutex);
} }
} }
VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
...@@ -2248,11 +2273,6 @@ pthread_handler_t handle_delayed_insert(void *arg) ...@@ -2248,11 +2273,6 @@ pthread_handler_t handle_delayed_insert(void *arg)
} }
di->table->copy_blobs=1; di->table->copy_blobs=1;
/* One can now use this */
pthread_mutex_lock(&LOCK_delayed_insert);
delayed_threads.append(di);
pthread_mutex_unlock(&LOCK_delayed_insert);
/* Tell client that the thread is initialized */ /* Tell client that the thread is initialized */
pthread_cond_signal(&di->cond_client); pthread_cond_signal(&di->cond_client);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment