Commit 41239f1f authored by kostja@vajra.(none)'s avatar kostja@vajra.(none)

No semantical change. Move checks of compatibility

of requested lock type and requested table operation from 
mysql_insert into a separate function.
parent 4144dc72
......@@ -401,6 +401,79 @@ void mark_fields_used_by_triggers_for_insert_stmt(THD *thd, TABLE *table,
}
/**
Upgrade table-level lock of INSERT statement to TL_WRITE if
a more concurrent lock is infeasible for some reason. This is
necessary for engines without internal locking support (MyISAM).
An engine with internal locking implementation might later
downgrade the lock in handler::store_lock() method.
*/
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
enum_duplicates duplic,
bool is_multi_insert)
{
if (duplic == DUP_UPDATE ||
duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT)
{
*lock_type= TL_WRITE;
return;
}
if (*lock_type == TL_WRITE_DELAYED)
{
#ifdef EMBEDDED_LIBRARY
/* No auxiliary threads in the embedded server. */
*lock_type= TL_WRITE;
return;
#else
/*
We do not use delayed threads if:
- we're running in the safe mode or skip-new - the feature
is disabled in these modes
- we're running this query in statement level replication,
on a replication slave - because we must ensure serial
execution of queries on the slave
- it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
insert cannot be concurrent
*/
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
thd->slave_thread ||
thd->variables.max_insert_delayed_threads == 0)
{
*lock_type= TL_WRITE;
return;
}
#endif
bool log_on= (thd->options & OPTION_BIN_LOG ||
! (thd->security_ctx->master_access & SUPER_ACL));
if (log_on && mysql_bin_log.is_open() && is_multi_insert)
{
/*
Statement-based binary logging does not work in this case, because:
a) two concurrent statements may have their rows intermixed in the
queue, leading to autoincrement replication problems on slave (because
the values generated used for one statement don't depend only on the
value generated for the first row of this statement, so are not
replicable)
b) if first row of the statement has an error the full statement is
not binlogged, while next rows of the statement may be inserted.
c) if first row succeeds, statement is binlogged immediately with a
zero error code (i.e. "no error"), if then second row fails, query
will fail on slave too and slave will stop (wrongly believing that the
master got no error).
So we fall back to non-delayed INSERT.
*/
*lock_type= TL_WRITE;
}
}
}
/**
INSERT statement implementation
*/
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
List<Item> &fields,
List<List_item> &values_list,
......@@ -436,58 +509,31 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
DBUG_ENTER("mysql_insert");
/*
in safe mode or with skip-new change delayed insert to be regular
if we are told to replace duplicates, the insert cannot be concurrent
delayed insert changed to regular in slave thread
Upgrade lock type if the requested lock is incompatible with
the current connection mode or table operation.
*/
#ifdef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
lock_type=TL_WRITE;
#else
if ((lock_type == TL_WRITE_DELAYED &&
((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
(lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
(duplic == DUP_UPDATE))
lock_type=TL_WRITE;
#endif
if ((lock_type == TL_WRITE_DELAYED) &&
log_on && mysql_bin_log.is_open() &&
(values_list.elements > 1))
{
upgrade_lock_type(thd, &table_list->lock_type, duplic,
values_list.elements > 1);
lock_type= table_list->lock_type;
/*
Statement-based binary logging does not work in this case, because:
a) two concurrent statements may have their rows intermixed in the
queue, leading to autoincrement replication problems on slave (because
the values generated used for one statement don't depend only on the
value generated for the first row of this statement, so are not
replicable)
b) if first row of the statement has an error the full statement is
not binlogged, while next rows of the statement may be inserted.
c) if first row succeeds, statement is binlogged immediately with a
zero error code (i.e. "no error"), if then second row fails, query
will fail on slave too and slave will stop (wrongly believing that the
master got no error).
So we fallback to non-delayed INSERT.
We can't write-delayed into a table locked with LOCK TABLES:
this will lead to a deadlock, since the delayed thread will
never be able to get a lock on the table. QQQ: why not
upgrade the lock here instead?
*/
lock_type= TL_WRITE;
if (lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
find_locked_table(thd, table_list->db, table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
DBUG_RETURN(TRUE);
}
table_list->lock_type= lock_type;
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
res= 1;
if (thd->locked_tables)
{
DBUG_ASSERT(table_list->db); /* Must be set in the parser */
if (find_locked_table(thd, table_list->db, table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
DBUG_RETURN(TRUE);
}
}
if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
{
/*
......@@ -1460,6 +1506,12 @@ public:
}
};
/**
delayed_insert - context of a thread responsible for delayed insert
into one table. When processing delayed inserts, we create an own
thread for every distinct table. Later on all delayed inserts directed
into that table are handled by a dedicated thread.
*/
class delayed_insert :public ilink {
uint locks_in_memory;
......@@ -1551,6 +1603,12 @@ public:
I_List<delayed_insert> delayed_threads;
/**
Return an instance of delayed insert thread that can handle
inserts into a given table, if it exists. Otherwise return NULL.
*/
static
delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
thd->proc_info="waiting for delay_list";
......@@ -1571,6 +1629,18 @@ delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
}
/**
Attempt to find or create a delayed insert thread to handle inserts
into this table.
@return Return an instance of the table in the delayed thread
@retval NULL too many delayed threads OR
this thread ran out of resources OR
a newly created delayed insert thread ran out of resources OR
the delayed insert thread failed to open the table.
In the last three cases an error is set in THD.
*/
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
int error;
......@@ -1679,11 +1749,17 @@ static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
}
/*
As we can't let many threads modify the same TABLE structure, we create
an own structure for each tread. This includes a row buffer to save the
column values and new fields that points to the new row buffer.
The memory is allocated in the client thread and is freed automaticly.
/**
As we can't let many client threads modify the same TABLE
structure of the dedicated delayed insert thread, we create an
own structure for each client thread. This includes a row
buffer to save the column values and new fields that point to
the new row buffer. The memory is allocated in the client
thread and is freed automatically.
@pre This function is called from the client thread. Delayed
insert thread mutex must be acquired before invoking this
function.
*/
TABLE *delayed_insert::get_local_table(THD* client_thd)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment