Commit 87ae5a15 authored by unknown's avatar unknown

BUG#12162 - one can start two transactions with the same XID.

Now we keep all active XID's in a hash

parent e10362b4
......@@ -22,6 +22,8 @@ a
xa start 'testa','testb';
insert t1 values (30);
xa end 'testa','testb';
xa start 'testa','testb';
ERROR XAE08: XAER_DUPID: The XID already exists
xa start 0x7465737462, 0x2030405060, 0xb;
insert t1 values (40);
xa end 'testb',' 0@P`',11;
......@@ -35,11 +37,11 @@ formatID gtrid_length bqual_length data
11 5 5 testb 0@P`
1 5 5 testatestb
xa commit 'testb',0x2030405060,11;
ERROR XAE04: XAER_NOTA: Unknown XID
xa rollback 'testa','testb';
xa start 'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz';
ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '' at line 1
select * from t1;
a
20
40
drop table t1;
......@@ -31,6 +31,9 @@ xa end 'testa','testb';
connect (con1,localhost,,,);
connection con1;
--error 1438
xa start 'testa','testb';
# gtrid [ , bqual [ , formatID ] ]
xa start 0x7465737462, 0x2030405060, 0xb;
insert t1 values (40);
......@@ -47,6 +50,7 @@ xa prepare 'testa','testb';
xa recover;
--error 1397
xa commit 'testb',0x2030405060,11;
xa rollback 'testa','testb';
......
......@@ -7037,7 +7037,7 @@ innobase_xa_prepare(
return(0);
}
trx->xid=thd->transaction.xid;
trx->xid=thd->transaction.xid_state.xid;
/* Release a possible FIFO ticket and search latch. Since we will
reserve the kernel mutex, we have to release the search system latch
......
......@@ -547,8 +547,8 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
trans->ht[trans->nht++]=ht_arg;
DBUG_ASSERT(*ht == ht_arg);
trans->no_2pc|=(ht_arg->prepare==0);
if (thd->transaction.xid.is_null())
thd->transaction.xid.set(thd->query_id);
if (thd->transaction.xid_state.xid.is_null())
thd->transaction.xid_state.xid.set(thd->query_id);
DBUG_VOID_RETURN;
}
......@@ -595,7 +595,7 @@ int ha_commit_trans(THD *thd, bool all)
THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt;
bool is_real_trans= all || thd->transaction.all.nht == 0;
handlerton **ht= trans->ht;
my_xid xid= thd->transaction.xid.get_my_xid();
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
DBUG_ENTER("ha_commit_trans");
if (thd->in_sub_stmt)
......@@ -695,7 +695,7 @@ int ha_commit_one_phase(THD *thd, bool all)
trans->nht=0;
trans->no_2pc=0;
if (is_real_trans)
thd->transaction.xid.null();
thd->transaction.xid_state.xid.null();
if (all)
{
#ifdef HAVE_QUERY_CACHE
......@@ -751,7 +751,7 @@ int ha_rollback_trans(THD *thd, bool all)
trans->nht=0;
trans->no_2pc=0;
if (is_real_trans)
thd->transaction.xid.null();
thd->transaction.xid_state.xid.null();
if (all)
{
thd->variables.tx_isolation=thd->session_tx_isolation;
......@@ -945,6 +945,7 @@ int ha_recover(HASH *commit_list)
char buf[XIDDATASIZE*4+6]; // see xid_to_str
sql_print_information("ignore xid %s", xid_to_str(buf, list+i));
#endif
xid_cache_insert(list+i, XA_PREPARED);
found_foreign_xids++;
continue;
}
......@@ -1008,10 +1009,8 @@ bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
handlerton **ht= handlertons, **end_ht=ht+total_ha;
bool error=TRUE;
int len, got;
XID *list=0;
int i=0;
XID_STATE *xs;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new Item_int("formatID",0,11));
......@@ -1021,48 +1020,30 @@ bool mysql_xa_recover(THD *thd)
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
for (len= MAX_XID_LIST_SIZE ; list==0 && len > MIN_XID_LIST_SIZE; len/=2)
{
list=(XID *)my_malloc(len*sizeof(XID), MYF(0));
}
if (!list)
{
my_error(ER_OUTOFMEMORY, MYF(0), len);
DBUG_RETURN(1);
}
for ( ; ht < end_ht ; ht++)
pthread_mutex_lock(&LOCK_xid_cache);
while (xs=(XID_STATE*)hash_element(&xid_cache, i++))
{
if (!(*ht)->recover)
continue;
while ((got=(*(*ht)->recover)(list, len)) > 0 )
if (xs->xa_state==XA_PREPARED)
{
XID *xid, *end;
for (xid=list, end=list+got; xid < end; xid++)
protocol->prepare_for_resend();
protocol->store_longlong((longlong)xs->xid.formatID, FALSE);
protocol->store_longlong((longlong)xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong)xs->xid.bqual_length, FALSE);
protocol->store(xs->xid.data, xs->xid.gtrid_length+xs->xid.bqual_length,
&my_charset_bin);
if (protocol->write())
{
if (xid->get_my_xid())
continue; // skip "our" xids
protocol->prepare_for_resend();
protocol->store_longlong((longlong)xid->formatID, FALSE);
protocol->store_longlong((longlong)xid->gtrid_length, FALSE);
protocol->store_longlong((longlong)xid->bqual_length, FALSE);
protocol->store(xid->data, xid->gtrid_length+xid->bqual_length,
&my_charset_bin);
if (protocol->write())
goto err;
pthread_mutex_unlock(&LOCK_xid_cache);
DBUG_RETURN(1);
}
if (got < len)
break;
}
}
error=FALSE;
pthread_mutex_unlock(&LOCK_xid_cache);
send_eof(thd);
err:
my_free((gptr)list, MYF(0));
DBUG_RETURN(error);
DBUG_RETURN(0);
}
/*
......
......@@ -227,11 +227,11 @@ struct xid_t {
char data[XIDDATASIZE]; // not \0-terminated !
bool eq(struct xid_t *xid)
{ return !memcmp(this, xid, sizeof(long)*3+gtrid_length+bqual_length); }
{ return !memcmp(this, xid, length()); }
bool eq(long g, long b, const char *d)
{ return g == gtrid_length && b == bqual_length && !memcmp(d, data, g+b); }
void set(struct xid_t *xid)
{ memcpy(this, xid, sizeof(long)*3+xid->gtrid_length+xid->bqual_length); }
{ memcpy(this, xid, xid->length()); }
void set(long f, const char *g, long gl, const char *b, long bl)
{
formatID= f;
......@@ -270,6 +270,11 @@ struct xid_t {
!memcmp(data, MYSQL_XID_PREFIX, MYSQL_XID_PREFIX_LEN) ?
quick_get_my_xid() : 0;
}
uint length()
{
return sizeof(formatID)+sizeof(gtrid_length)+sizeof(bqual_length)+
gtrid_length+bqual_length;
}
};
typedef struct xid_t XID;
......
......@@ -1050,6 +1050,7 @@ void clean_up(bool print_message)
(void) ha_panic(HA_PANIC_CLOSE); /* close all tables and logs */
if (tc_log)
tc_log->close();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, gptr)) free_key_cache);
multi_keycache_free();
end_thr_alarm(1); /* Free allocated memory */
......@@ -2920,6 +2921,11 @@ server.");
using_update_log=1;
}
if (xid_cache_init())
{
sql_print_error("Out of memory");
unireg_abort(1);
}
if (ha_init())
{
sql_print_error("Can't init databases");
......
......@@ -5388,3 +5388,5 @@ ER_STACK_OVERRUN_NEED_MORE
eng "Thread stack overrun: %ld bytes used of a %ld byte stack, and %ld bytes needed. Use 'mysqld -O thread_stack=#' to specify a bigger stack."
ER_TOO_LONG_BODY 42000 S1009
eng "Routine body for '%-.100s' is too long"
ER_XAER_DUPID XAE08
eng "XAER_DUPID: The XID already exists"
......@@ -502,7 +502,7 @@ void close_thread_tables(THD *thd, bool lock_in_use, bool skip_derived,
*/
bzero(&thd->transaction.stmt, sizeof(thd->transaction.stmt));
if (!thd->active_transaction())
thd->transaction.xid.null();
thd->transaction.xid_state.xid.null();
/* VOID(pthread_sigmask(SIG_SETMASK,&thd->block_signals,NULL)); */
if (!lock_in_use)
......
......@@ -323,7 +323,8 @@ void THD::init_for_queries()
variables.trans_alloc_block_size,
variables.trans_prealloc_size);
#endif
transaction.xid.null();
transaction.xid_state.xid.null();
transaction.xid_state.in_thd=1;
}
......@@ -358,9 +359,15 @@ void THD::cleanup(void)
{
DBUG_ENTER("THD::cleanup");
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
if (transaction.xa_state != XA_PREPARED)
if (transaction.xid_state.xa_state == XA_PREPARED)
{
#error xid_state in the cache should be replaced by the allocated value
}
#endif
{
ha_rollback(this);
xid_cache_delete(&transaction.xid_state);
}
if (locked_tables)
{
lock=locked_tables; locked_tables=0;
......@@ -1841,3 +1848,81 @@ void THD::pop_open_tables_state()
set_open_tables_state(state);
DBUG_VOID_RETURN;
}
pthread_mutex_t LOCK_xid_cache;
HASH xid_cache;
static byte *xid_get_hash_key(const byte *ptr,uint *length,
my_bool not_used __attribute__((unused)))
{
*length=((XID_STATE*)ptr)->xid.length();
return (byte *)&((XID_STATE*)ptr)->xid;
}
static void xid_free_hash (void *ptr)
{
if (!((XID_STATE*)ptr)->in_thd)
my_free((byte *)ptr, MYF(0));
}
bool xid_cache_init()
{
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
xid_get_hash_key, xid_free_hash, 0) != 0;
}
void xid_cache_free()
{
if (hash_inited(&xid_cache))
{
hash_free(&xid_cache);
pthread_mutex_destroy(&LOCK_xid_cache);
}
}
XID_STATE *xid_cache_search(XID *xid)
{
pthread_mutex_lock(&LOCK_xid_cache);
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, (byte *)xid, xid->length());
pthread_mutex_unlock(&LOCK_xid_cache);
return res;
}
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
XID_STATE *xs;
my_bool res;
pthread_mutex_lock(&LOCK_xid_cache);
if (hash_search(&xid_cache, (byte *)xid, xid->length()))
res=0;
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
res=1;
else
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->in_thd=0;
res=my_hash_insert(&xid_cache, (byte*)xs);
}
pthread_mutex_unlock(&LOCK_xid_cache);
return res;
}
bool xid_cache_insert(XID_STATE *xid_state)
{
pthread_mutex_lock(&LOCK_xid_cache);
DBUG_ASSERT(hash_search(&xid_cache, (byte *)&xid_state->xid,
xid_state->xid.length())==0);
my_bool res=my_hash_insert(&xid_cache, (byte*)xid_state);
pthread_mutex_unlock(&LOCK_xid_cache);
return res;
}
void xid_cache_delete(XID_STATE *xid_state)
{
pthread_mutex_lock(&LOCK_xid_cache);
hash_delete(&xid_cache, (byte *)xid_state);
pthread_mutex_unlock(&LOCK_xid_cache);
}
......@@ -351,8 +351,6 @@ public:
inline uint32 get_open_count() { return open_count; }
};
/* character conversion tables */
typedef struct st_copy_info {
ha_rows records;
......@@ -564,11 +562,11 @@ struct system_variables
my_bool ndb_use_transactions;
#endif /* HAVE_NDBCLUSTER_DB */
my_bool old_passwords;
/* Only charset part of these variables is sensible */
CHARSET_INFO *character_set_client;
CHARSET_INFO *character_set_client;
CHARSET_INFO *character_set_results;
/* Both charset and collation parts of these variables are important */
CHARSET_INFO *collation_server;
CHARSET_INFO *collation_database;
......@@ -631,7 +629,7 @@ typedef struct system_status_var
ulong filesort_range_count;
ulong filesort_rows;
ulong filesort_scan_count;
/* Ppepared statements and binary protocol */
/* Prepared statements and binary protocol */
ulong com_stmt_prepare;
ulong com_stmt_execute;
ulong com_stmt_send_long_data;
......@@ -656,8 +654,8 @@ void free_tmp_table(THD *thd, TABLE *entry);
/* The following macro is to make init of Query_arena simpler */
#ifndef DBUG_OFF
#define INIT_ARENA_DBUG_INFO is_backup_arena= 0
#else
#define INIT_ARENA_DBUG_INFO
#else
#define INIT_ARENA_DBUG_INFO
#endif
......@@ -925,6 +923,22 @@ struct st_savepoint {
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED};
extern const char *xa_state_names[];
typedef struct st_xid_state {
/* For now, this is only used to catch duplicated external xids */
XID xid; // transaction identifier
enum xa_states xa_state; // used by external XA only
bool in_thd;
} XID_STATE;
extern pthread_mutex_t LOCK_xid_cache;
extern HASH xid_cache;
bool xid_cache_init(void);
void xid_cache_free(void);
XID_STATE *xid_cache_search(XID *xid);
bool xid_cache_insert(XID *xid, enum xa_states xa_state);
bool xid_cache_insert(XID_STATE *xid_state);
void xid_cache_delete(XID_STATE *xid_state);
/*
A registry for item tree transformations performed during
query optimization. We register only those changes which require
......@@ -946,7 +960,7 @@ enum prelocked_mode_type {NON_PRELOCKED= 0, PRELOCKED= 1,
/*
Class that holds information about tables which were open and locked
Class that holds information about tables which were opened and locked
by the thread. It is also used to save/restore this information in
push_open_tables_state()/pop_open_tables_state().
*/
......@@ -1062,7 +1076,7 @@ public:
// the lock_id of a cursor.
pthread_mutex_t LOCK_delete; // Locked before thd is deleted
/* all prepared statements and cursors of this connection */
Statement_map stmt_map;
Statement_map stmt_map;
/*
A pointer to the stack frame of handle_one_connection(),
which is called first in the thread for handling a client
......@@ -1132,10 +1146,10 @@ public:
thr_lock_type update_lock_default;
delayed_insert *di;
my_bool tablespace_op; /* This is TRUE in DISCARD/IMPORT TABLESPACE */
/* TRUE if we are inside of trigger or stored function. */
bool in_sub_stmt;
/* container for handler's private per-connection data */
void *ha_data[MAX_HA];
struct st_transactions {
......@@ -1143,8 +1157,7 @@ public:
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
bool on; // see ha_enable_transaction()
XID xid; // transaction identifier
enum xa_states xa_state; // used by external XA only
XID_STATE xid_state;
/*
Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query
......@@ -1164,7 +1177,7 @@ public:
st_transactions()
{
bzero((char*)this, sizeof(*this));
xid.null();
xid_state.xid.null();
init_sql_alloc(&mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
}
#endif
......
......@@ -2017,7 +2017,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
*/
bzero(&thd->transaction.stmt, sizeof(thd->transaction.stmt));
if (!thd->active_transaction())
thd->transaction.xid.null();
thd->transaction.xid_state.xid.null();
/* report error issued during command execution */
if (thd->killed_errno() && !thd->net.report_error)
......@@ -4502,14 +4502,15 @@ end_with_restore_list:
break;
}
case SQLCOM_XA_START:
if (thd->transaction.xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
if (thd->transaction.xid_state.xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME)
{
if (! thd->transaction.xid.eq(thd->lex->xid))
if (! thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
}
thd->transaction.xa_state=XA_ACTIVE;
thd->transaction.xid_state.xa_state=XA_ACTIVE;
send_ok(thd);
break;
}
......@@ -4518,10 +4519,10 @@ end_with_restore_list:
my_error(ER_XAER_INVAL, MYF(0));
break;
}
if (thd->transaction.xa_state != XA_NOTR)
if (thd->transaction.xid_state.xa_state != XA_NOTR)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xa_state]);
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (thd->active_transaction() || thd->locked_tables)
......@@ -4529,9 +4530,15 @@ end_with_restore_list:
my_error(ER_XAER_OUTSIDE, MYF(0));
break;
}
DBUG_ASSERT(thd->transaction.xid.is_null());
thd->transaction.xa_state=XA_ACTIVE;
thd->transaction.xid.set(thd->lex->xid);
if (xid_cache_search(thd->lex->xid))
{
my_error(ER_XAER_DUPID, MYF(0));
break;
}
DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
thd->transaction.xid_state.xa_state=XA_ACTIVE;
thd->transaction.xid_state.xid.set(thd->lex->xid);
xid_cache_insert(&thd->transaction.xid_state);
thd->options= ((thd->options & (ulong) ~(OPTION_STATUS_NO_TRANS_UPDATE)) |
OPTION_BEGIN);
thd->server_status|= SERVER_STATUS_IN_TRANS;
......@@ -4544,28 +4551,28 @@ end_with_restore_list:
my_error(ER_XAER_INVAL, MYF(0));
break;
}
if (thd->transaction.xa_state != XA_ACTIVE)
if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xa_state]);
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (!thd->transaction.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
}
thd->transaction.xa_state=XA_IDLE;
thd->transaction.xid_state.xa_state=XA_IDLE;
send_ok(thd);
break;
case SQLCOM_XA_PREPARE:
if (thd->transaction.xa_state != XA_IDLE)
if (thd->transaction.xid_state.xa_state != XA_IDLE)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xa_state]);
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (!thd->transaction.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
my_error(ER_XAER_NOTA, MYF(0));
break;
......@@ -4573,22 +4580,28 @@ end_with_restore_list:
if (ha_prepare(thd))
{
my_error(ER_XA_RBROLLBACK, MYF(0));
thd->transaction.xa_state=XA_NOTR;
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state=XA_NOTR;
break;
}
thd->transaction.xa_state=XA_PREPARED;
thd->transaction.xid_state.xa_state=XA_PREPARED;
send_ok(thd);
break;
case SQLCOM_XA_COMMIT:
if (!thd->transaction.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (!(res= !ha_commit_or_rollback_by_xid(thd->lex->xid, 1)))
XID_STATE *xs=xid_cache_search(thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else
{
ha_commit_or_rollback_by_xid(thd->lex->xid, 1);
xid_cache_delete(xs);
send_ok(thd);
}
break;
}
if (thd->transaction.xa_state == XA_IDLE &&
if (thd->transaction.xid_state.xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_ONE_PHASE)
{
int r;
......@@ -4597,7 +4610,7 @@ end_with_restore_list:
else
send_ok(thd);
}
else if (thd->transaction.xa_state == XA_PREPARED &&
else if (thd->transaction.xid_state.xa_state == XA_PREPARED &&
thd->lex->xa_opt == XA_NONE)
{
if (wait_if_global_read_lock(thd, 0, 0))
......@@ -4617,27 +4630,33 @@ end_with_restore_list:
else
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xa_state]);
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
thd->options&= ~(ulong) (OPTION_BEGIN | OPTION_STATUS_NO_TRANS_UPDATE);
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
thd->transaction.xa_state=XA_NOTR;
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state=XA_NOTR;
break;
case SQLCOM_XA_ROLLBACK:
if (!thd->transaction.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (!(res= !ha_commit_or_rollback_by_xid(thd->lex->xid, 0)))
XID_STATE *xs=xid_cache_search(thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else
{
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(xs);
send_ok(thd);
}
break;
}
if (thd->transaction.xa_state != XA_IDLE &&
thd->transaction.xa_state != XA_PREPARED)
if (thd->transaction.xid_state.xa_state != XA_IDLE &&
thd->transaction.xid_state.xa_state != XA_PREPARED)
{
my_error(ER_XAER_RMFAIL, MYF(0),
xa_state_names[thd->transaction.xa_state]);
xa_state_names[thd->transaction.xid_state.xa_state]);
break;
}
if (ha_rollback(thd))
......@@ -4646,7 +4665,8 @@ end_with_restore_list:
send_ok(thd);
thd->options&= ~(ulong) (OPTION_BEGIN | OPTION_STATUS_NO_TRANS_UPDATE);
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
thd->transaction.xa_state=XA_NOTR;
xid_cache_delete(&thd->transaction.xid_state);
thd->transaction.xid_state.xa_state=XA_NOTR;
break;
case SQLCOM_XA_RECOVER:
res= mysql_xa_recover(thd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment