Commit 987f0a91 authored by Kentoku SHIBA's avatar Kentoku SHIBA

copy tables with internal xa

parent b7938cce
......@@ -849,6 +849,7 @@ long long spider_copy_tables_body(
MEM_ROOT mem_root;
longlong bulk_insert_rows;
Reprepare_observer *reprepare_observer_backup;
uint tmp_conn_link_idx = 0;
DBUG_ENTER("spider_copy_tables_body");
if (
thd->open_tables != 0 ||
......@@ -989,6 +990,7 @@ long long spider_copy_tables_body(
table_list->table_name_length));
reprepare_observer_backup = thd->m_reprepare_observer;
thd->m_reprepare_observer = NULL;
copy_tables->trx->trx_start = TRUE;
#if MYSQL_VERSION_ID < 50500
if (open_and_lock_tables(thd, table_list))
#else
......@@ -1003,12 +1005,14 @@ long long spider_copy_tables_body(
#endif
{
thd->m_reprepare_observer = reprepare_observer_backup;
copy_tables->trx->trx_start = FALSE;
my_printf_error(ER_SPIDER_UDF_CANT_OPEN_TABLE_NUM,
ER_SPIDER_UDF_CANT_OPEN_TABLE_STR, MYF(0), table_list->db,
table_list->table_name);
goto error;
}
thd->m_reprepare_observer = reprepare_observer_backup;
copy_tables->trx->trx_start = FALSE;
table = table_list->table;
table_share = table->s;
......@@ -1123,6 +1127,7 @@ long long spider_copy_tables_body(
tmp_spider->result_list.sqls = &tmp_sql[roop_count];
tmp_spider->need_mons = &table_conn->need_mon;
tmp_spider->lock_type = TL_READ;
tmp_spider->conn_link_idx = &tmp_conn_link_idx;
uint dbton_id = tmp_spider->share->use_dbton_ids[0];
if (!(tmp_spider->dbton_handler[dbton_id] =
spider_dbton[dbton_id].create_db_handler(tmp_spider,
......@@ -1166,6 +1171,7 @@ long long spider_copy_tables_body(
tmp_spider->result_list.sqls = &tmp_sql[roop_count];
tmp_spider->need_mons = &table_conn->need_mon;
tmp_spider->lock_type = TL_WRITE;
tmp_spider->conn_link_idx = &tmp_conn_link_idx;
uint dbton_id = tmp_spider->share->use_dbton_ids[0];
if (!(tmp_spider->dbton_handler[dbton_id] =
spider_dbton[dbton_id].create_db_handler(tmp_spider,
......@@ -1293,7 +1299,9 @@ error:
delete [] tmp_sql;
}
if (copy_tables)
{
spider_udf_free_copy_tables_alloc(copy_tables);
}
*error = 1;
DBUG_RETURN(0);
}
......@@ -1338,6 +1346,6 @@ void spider_copy_tables_deinit_body(
!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN) &&
(trx = spider_get_trx(thd, TRUE, &error_num))
)
spider_free_trx_conn(trx, FALSE);
spider_copy_table_free_trx_conn(trx);
DBUG_VOID_RETURN;
}
......@@ -9312,6 +9312,8 @@ int spider_db_udf_copy_tables(
{
tmp_spider = &spider[roop_count];
tmp_conn = tmp_spider->conns[0];
/* disable transaction */
spider_conn_clear_queue_at_commit(tmp_conn);
if (!tmp_conn->trx_start)
{
if (spider_db_ping(tmp_spider, tmp_conn, 0))
......@@ -9334,6 +9336,8 @@ int spider_db_udf_copy_tables(
{
tmp_spider = &spider[roop_count];
tmp_conn = tmp_spider->conns[0];
/* disable transaction */
spider_conn_clear_queue_at_commit(tmp_conn);
spider_db_handler *tmp_dbton_hdl =
tmp_spider->dbton_handler[tmp_conn->dbton_id];
if ((error_num = tmp_dbton_hdl->insert_lock_tables_list(tmp_conn, 0)))
......
......@@ -473,6 +473,7 @@ typedef struct st_spider_transaction
bool trx_start;
bool trx_xa;
bool trx_consistent_snapshot;
bool trx_xa_prepared;
bool use_consistent_snapshot;
bool internal_xa;
......
......@@ -1692,7 +1692,8 @@ int spider_internal_start_trx(
if (
!trx->trx_xa &&
trx->internal_xa &&
(!trx->trx_consistent_snapshot || trx->internal_xa_snapshot == 3)
(!trx->trx_consistent_snapshot || trx->internal_xa_snapshot == 3) &&
spider->sql_command != SQLCOM_LOCK_TABLES
) {
trx->trx_xa = TRUE;
trx->xid.formatID = 1;
......@@ -1714,18 +1715,11 @@ int spider_internal_start_trx(
trx->internal_xid_state.xa_state = XA_ACTIVE;
trx->internal_xid_state.xid.set(&trx->xid);
trx->internal_xid_state.in_thd = 1;
while ((error_num = spider_xa_lock(&trx->internal_xid_state)))
if ((error_num = spider_xa_lock(&trx->internal_xid_state)))
{
if (error_num != ER_SPIDER_XA_LOCKED_NUM)
goto error;
else if (trx->xid.formatID == 0)
{
if (error_num == ER_SPIDER_XA_LOCKED_NUM)
my_message(error_num, ER_SPIDER_XA_LOCKED_STR, MYF(0));
goto error;
}
/* retry */
trx->xid.formatID++;
trx->internal_xid_state.xid.set(&trx->xid);
goto error;
}
xa_lock = TRUE;
} else
......@@ -1740,6 +1734,7 @@ int spider_internal_start_trx(
trans_register_ha(trx->thd, TRUE, spider_hton_ptr);
}
trx->trx_start = TRUE;
trx->trx_xa_prepared = FALSE;
}
DBUG_PRINT("info",("spider sync_autocommit = %d", sync_autocommit));
......@@ -2005,12 +2000,11 @@ int spider_internal_xa_rollback(
Open_tables_backup open_tables_backup;
#endif
bool server_lost = FALSE;
bool prepared = (thd->transaction.xid_state.xa_state == XA_PREPARED);
bool table_xa_opened = FALSE;
bool table_xa_member_opened = FALSE;
DBUG_ENTER("spider_internal_xa_rollback");
if (prepared)
if (trx->trx_xa_prepared)
{
/*
select
......@@ -2088,7 +2082,7 @@ int spider_internal_xa_rollback(
{
if (conn->disable_xa)
{
if (conn->table_lock != 3 && !prepared)
if (conn->table_lock != 3 && !trx->trx_xa_prepared)
{
if (
!conn->server_lost &&
......@@ -2103,7 +2097,7 @@ int spider_internal_xa_rollback(
if (!conn->server_lost)
{
if (
!prepared &&
!trx->trx_xa_prepared &&
(tmp_error_num = spider_db_xa_end(conn, &trx->xid))
) {
if (
......@@ -2157,7 +2151,7 @@ int spider_internal_xa_rollback(
goto error_in_rollback;
if (
prepared &&
trx->trx_xa_prepared &&
!server_lost
) {
/*
......@@ -3144,21 +3138,24 @@ int spider_commit(
{
if (trx->trx_xa)
{
if (
trx->internal_xa &&
(error_num = spider_internal_xa_prepare(
thd, trx, table_xa, table_xa_member, TRUE))
) {
if (trx->internal_xa && !trx->trx_xa_prepared)
{
if (
(error_num = spider_internal_xa_prepare(
thd, trx, table_xa, table_xa_member, TRUE))
) {
/*
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
{
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
{
*/
/* rollback for semi_trx */
spider_rollback(hton, thd, all);
/* rollback for semi_trx */
spider_rollback(hton, thd, all);
/*
}
}
*/
DBUG_RETURN(error_num);
DBUG_RETURN(error_num);
}
trx->trx_xa_prepared = TRUE;
}
int tmp_error_num;
if (
......@@ -3303,6 +3300,7 @@ int spider_xa_prepare(
if ((error_num = spider_internal_xa_prepare(
thd, trx, table_xa, table_xa_member, FALSE)))
goto error;
trx->trx_xa_prepared = TRUE;
}
}
......@@ -3372,6 +3370,26 @@ error_get_trx:
DBUG_RETURN(error_num);
}
void spider_copy_table_free_trx_conn(
SPIDER_TRX *trx
) {
SPIDER_CONN *conn;
DBUG_ENTER("spider_copy_table_free_trx_conn");
if ((conn = spider_tree_first(trx->join_trx_top)))
{
do {
spider_end_trx(trx, conn);
conn->join_trx = 0;
} while ((conn = spider_tree_next(conn)));
trx->join_trx_top = NULL;
}
spider_reuse_trx_ha(trx);
spider_free_trx_conn(trx, FALSE);
trx->trx_consistent_snapshot = FALSE;
spider_merge_mem_calc(trx, FALSE);
DBUG_VOID_RETURN;
}
int spider_end_trx(
SPIDER_TRX *trx,
SPIDER_CONN *conn
......
......@@ -208,6 +208,10 @@ int spider_xa_rollback_by_xid(
XID* xid
);
void spider_copy_table_free_trx_conn(
SPIDER_TRX *trx
);
int spider_end_trx(
SPIDER_TRX *trx,
SPIDER_CONN *conn
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment