Commit df0e057a authored by unknown's avatar unknown

Added support for X/Open XA prepare, recover, commit and rollback.


innobase/include/trx0roll.h:
  Changed prototype of the function trx_rollback_or_clean_all_without_sess
  because this function is executed in a background thread.
innobase/include/trx0trx.h:
  Added support for X/Open XA prepare, recover and search by X/Open XA XID.
innobase/include/trx0undo.h:
  Added support for X/Open XA prepare and recover. We need to store X/Open XA XID
  to the undo log header for recovery.
innobase/log/log0recv.c:
  Create a thread to run trx_rollback_or_clean_all_without_sess function
  to rollback the uncommitted transactions which have no user session.
innobase/row/row0ins.c:
  Remove unnecessary variables.
innobase/trx/trx0roll.c:
  Changed so that trx_rollback_or_clean_all_without_sess is executed
  in a background thread. We should also leave all prepared transactions
  active to wait for commit or abort from MySQL.
innobase/trx/trx0sys.c:
  Only those rows which belong to the active transaction in crash recovery
  are undone.
innobase/trx/trx0trx.c:
  Added support for X/Open XA prepare and recover. We need to store X/Open XA
  XID to trx structure and left prepared transactions to wait for a
  commit or abort from MySQL. This requires also that we add TRX_PREPARED
  state to the transaction and TRX_UNDO_PREPARED state for undo logs.
innobase/trx/trx0undo.c:
  Added support for X/Open XA prepare and recover. We need to store X/Open XA
  XID to undo log header for recovery of distributed transactions.
sql/ha_innodb.h:
  Added prototypes for X/Open XA prepare, recover, commit and rollback.
sql/handler.h:
  Added definition for X/Open XA XID structure.
parent f4110834
...@@ -104,11 +104,12 @@ trx_rollback( ...@@ -104,11 +104,12 @@ trx_rollback(
/*********************************************************************** /***********************************************************************
Rollback or clean up transactions which have no user session. If the Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert transaction already was committed, then we clean up a possible insert
undo log. If the transaction was not yet committed, then we roll it back. */ undo log. If the transaction was not yet committed, then we roll it back.
Note: this is done in a background thread */
void void *
trx_rollback_or_clean_all_without_sess(void); trx_rollback_or_clean_all_without_sess(void *);
/*========================================*/ /*============================================*/
/******************************************************************** /********************************************************************
Finishes a transaction rollback. */ Finishes a transaction rollback. */
......
...@@ -16,6 +16,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -16,6 +16,7 @@ Created 3/26/1996 Heikki Tuuri
#include "que0types.h" #include "que0types.h"
#include "mem0mem.h" #include "mem0mem.h"
#include "read0types.h" #include "read0types.h"
#include "xa.h"
extern ulint trx_n_mysql_transactions; extern ulint trx_n_mysql_transactions;
...@@ -156,6 +157,36 @@ trx_commit_for_mysql( ...@@ -156,6 +157,36 @@ trx_commit_for_mysql(
/*=================*/ /*=================*/
/* out: 0 or error number */ /* out: 0 or error number */
trx_t* trx); /* in: trx handle */ trx_t* trx); /* in: trx handle */
/**************************************************************************
Does the transaction prepare for MySQL. */
ulint
trx_prepare_for_mysql(
/*=================*/
/* out: 0 or error number */
trx_t* trx); /* in: trx handle */
/**************************************************************************
This function is used to find number of prepared transactions and
their transaction objects for a recovery. */
int
trx_recover_for_mysql(
/*=================*/
/* out: number of prepared transactions */
XID* xid_list, /* in/out: prepared transactions */
uint len); /* in: number of slots in xid_list */
/***********************************************************************
This function is used to commit one X/Open XA distributed transaction
which is in the prepared state */
trx_t *
trx_get_trx_by_xid(
/*===============*/
/* out: trx or NULL */
XID* xid); /* in: X/Open XA Transaction Idenfication */
/************************************************************************** /**************************************************************************
If required, flushes the log to disk if we called trx_commit_for_mysql() If required, flushes the log to disk if we called trx_commit_for_mysql()
with trx->flush_log_later == TRUE. */ with trx->flush_log_later == TRUE. */
...@@ -339,6 +370,9 @@ struct trx_struct{ ...@@ -339,6 +370,9 @@ struct trx_struct{
if we can use the insert buffer for if we can use the insert buffer for
them, we set this FALSE */ them, we set this FALSE */
dulint id; /* transaction id */ dulint id; /* transaction id */
XID xid; /* X/Open XA transaction
identification to identify a
transaction branch */
dulint no; /* transaction serialization number == dulint no; /* transaction serialization number ==
max trx id when the transaction is max trx id when the transaction is
moved to COMMITTED_IN_MEMORY state */ moved to COMMITTED_IN_MEMORY state */
...@@ -353,8 +387,10 @@ struct trx_struct{ ...@@ -353,8 +387,10 @@ struct trx_struct{
dulint table_id; /* table id if the preceding field is dulint table_id; /* table id if the preceding field is
TRUE */ TRUE */
/*------------------------------*/ /*------------------------------*/
void* mysql_thd; /* MySQL thread handle corresponding int active_trans; /* whether a transaction in MySQL
to this trx, or NULL */ is active */
void* mysql_thd; /* MySQL thread handle corresponding
to this trx, or NULL */
char** mysql_query_str;/* pointer to the field in mysqld_thd char** mysql_query_str;/* pointer to the field in mysqld_thd
which contains the pointer to the which contains the pointer to the
current SQL query string */ current SQL query string */
...@@ -543,6 +579,7 @@ struct trx_struct{ ...@@ -543,6 +579,7 @@ struct trx_struct{
#define TRX_NOT_STARTED 1 #define TRX_NOT_STARTED 1
#define TRX_ACTIVE 2 #define TRX_ACTIVE 2
#define TRX_COMMITTED_IN_MEMORY 3 #define TRX_COMMITTED_IN_MEMORY 3
#define TRX_PREPARED 4 /* Support for 2PC/XA */
/* Transaction execution states when trx state is TRX_ACTIVE */ /* Transaction execution states when trx state is TRX_ACTIVE */
#define TRX_QUE_RUNNING 1 /* transaction is running */ #define TRX_QUE_RUNNING 1 /* transaction is running */
......
...@@ -14,6 +14,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -14,6 +14,7 @@ Created 3/26/1996 Heikki Tuuri
#include "mtr0mtr.h" #include "mtr0mtr.h"
#include "trx0sys.h" #include "trx0sys.h"
#include "page0types.h" #include "page0types.h"
#include "xa.h"
/*************************************************************************** /***************************************************************************
Builds a roll pointer dulint. */ Builds a roll pointer dulint. */
...@@ -36,7 +37,7 @@ trx_undo_decode_roll_ptr( ...@@ -36,7 +37,7 @@ trx_undo_decode_roll_ptr(
ibool* is_insert, /* out: TRUE if insert undo log */ ibool* is_insert, /* out: TRUE if insert undo log */
ulint* rseg_id, /* out: rollback segment id */ ulint* rseg_id, /* out: rollback segment id */
ulint* page_no, /* out: page number */ ulint* page_no, /* out: page number */
ulint* offset); /* out: offset of the undo entry within page */ ulint* offset); /* out: offset of the undo entry within page */
/*************************************************************************** /***************************************************************************
Returns TRUE if the roll pointer is of the insert type. */ Returns TRUE if the roll pointer is of the insert type. */
UNIV_INLINE UNIV_INLINE
...@@ -239,6 +240,18 @@ trx_undo_set_state_at_finish( ...@@ -239,6 +240,18 @@ trx_undo_set_state_at_finish(
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
trx_undo_t* undo, /* in: undo log memory copy */ trx_undo_t* undo, /* in: undo log memory copy */
mtr_t* mtr); /* in: mtr */ mtr_t* mtr); /* in: mtr */
/**********************************************************************
Sets the state of the undo log segment at a transaction prepare. */
page_t*
trx_undo_set_state_at_prepare(
/*==========================*/
/* out: undo log segment header page,
x-latched */
trx_t* trx, /* in: transaction */
trx_undo_t* undo, /* in: undo log memory copy */
mtr_t* mtr); /* in: mtr */
/************************************************************************** /**************************************************************************
Adds the update undo log header as the first in the history list, and Adds the update undo log header as the first in the history list, and
frees the memory object, or puts it to the list of cached update undo log frees the memory object, or puts it to the list of cached update undo log
...@@ -294,7 +307,23 @@ trx_undo_parse_discard_latest( ...@@ -294,7 +307,23 @@ trx_undo_parse_discard_latest(
byte* end_ptr,/* in: buffer end */ byte* end_ptr,/* in: buffer end */
page_t* page, /* in: page or NULL */ page_t* page, /* in: page or NULL */
mtr_t* mtr); /* in: mtr or NULL */ mtr_t* mtr); /* in: mtr or NULL */
/************************************************************************
Write X/Open XA Transaction Identification (XID) to undo log header */
void
trx_undo_write_xid(
/*===============*/
trx_ulogf_t* log_hdr,/* in: undo log header */
XID* xid); /* in: X/Open XA Transaction Identification */
/************************************************************************
Read X/Open XA Transaction Identification (XID) from undo log header */
void
trx_undo_read_xid(
/*==============*/
trx_ulogf_t* log_hdr,/* in: undo log header */
XID* xid); /* out: X/Open XA Transaction Identification */
/* Types of an undo log segment */ /* Types of an undo log segment */
#define TRX_UNDO_INSERT 1 /* contains undo entries for inserts */ #define TRX_UNDO_INSERT 1 /* contains undo entries for inserts */
...@@ -310,6 +339,8 @@ trx_undo_parse_discard_latest( ...@@ -310,6 +339,8 @@ trx_undo_parse_discard_latest(
#define TRX_UNDO_TO_PURGE 4 /* update undo segment will not be #define TRX_UNDO_TO_PURGE 4 /* update undo segment will not be
reused: it can be freed in purge when reused: it can be freed in purge when
all undo data in it is removed */ all undo data in it is removed */
#define TRX_UNDO_PREPARED 5 /* contains an undo log of an
prepared transaction */
/* Transaction undo log memory object; this is protected by the undo_mutex /* Transaction undo log memory object; this is protected by the undo_mutex
in the corresponding transaction object */ in the corresponding transaction object */
...@@ -332,6 +363,8 @@ struct trx_undo_struct{ ...@@ -332,6 +363,8 @@ struct trx_undo_struct{
field */ field */
dulint trx_id; /* id of the trx assigned to the undo dulint trx_id; /* id of the trx assigned to the undo
log */ log */
XID xid; /* X/Open XA transaction
identification */
ibool dict_operation; /* TRUE if a dict operation trx */ ibool dict_operation; /* TRUE if a dict operation trx */
dulint table_id; /* if a dict operation, then the table dulint table_id; /* if a dict operation, then the table
id */ id */
...@@ -452,7 +485,18 @@ page of an update undo log segment. */ ...@@ -452,7 +485,18 @@ page of an update undo log segment. */
#define TRX_UNDO_HISTORY_NODE 34 /* If the log is put to the history #define TRX_UNDO_HISTORY_NODE 34 /* If the log is put to the history
list, the file list node is here */ list, the file list node is here */
/*-------------------------------------------------------------*/ /*-------------------------------------------------------------*/
#define TRX_UNDO_LOG_HDR_SIZE (34 + FLST_NODE_SIZE) /* X/Open XA Transaction Identification (XID) */
#define TRX_UNDO_XA_FORMAT (34 + FLST_NODE_SIZE)
#define TRX_UNDO_XA_TRID_LEN (TRX_UNDO_XA_FORMAT + 4)
#define TRX_UNDO_XA_BQUAL_LEN (TRX_UNDO_XA_TRID_LEN + 4)
#define TRX_UNDO_XA_XID (TRX_UNDO_XA_BQUAL_LEN + 4)
#define TRX_UNDO_XA_LEN (TRX_UNDO_XA_XID + XIDDATASIZE)
#define TRX_UNDO_XA_EXISTS 256
/*-------------------------------------------------------------*/
#define TRX_UNDO_LOG_HDR_SIZE (TRX_UNDO_XA_LEN)
/*-------------------------------------------------------------*/
#ifndef UNIV_NONINL #ifndef UNIV_NONINL
#include "trx0undo.ic" #include "trx0undo.ic"
......
/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 1998-2002
* Sleepycat Software. All rights reserved.
*
* $Id: xa.h,v 11.5 2002/01/11 15:52:30 bostic Exp $
*/
/*
* Start of xa.h header
*
* Define a symbol to prevent multiple inclusions of this header file
*/
#ifndef XA_H
#define XA_H
/*
* Transaction branch identification: XID and NULLXID:
*/
#ifndef XIDDATASIZE
#define XIDDATASIZE 128 /* size in bytes */
#define MAXGTRIDSIZE 64 /* maximum size in bytes of gtrid */
#define MAXBQUALSIZE 64 /* maximum size in bytes of bqual */
struct xid_t {
long formatID; /* format identifier */
long gtrid_length; /* value from 1 through 64 */
long bqual_length; /* value from 1 through 64 */
char data[XIDDATASIZE];
};
typedef struct xid_t XID;
#endif
/*
* A value of -1 in formatID means that the XID is null.
*/
/*
* Declarations of routines by which RMs call TMs:
*/
extern int ax_reg __P((int, XID *, long));
extern int ax_unreg __P((int, long));
/*
* XA Switch Data Structure
*/
#define RMNAMESZ 32 /* length of resource manager name, */
/* including the null terminator */
#define MAXINFOSIZE 256 /* maximum size in bytes of xa_info */
/* strings, including the null
terminator */
struct xa_switch_t {
char name[RMNAMESZ]; /* name of resource manager */
long flags; /* resource manager specific options */
long version; /* must be 0 */
int (*xa_open_entry) /* xa_open function pointer */
__P((char *, int, long));
int (*xa_close_entry) /* xa_close function pointer */
__P((char *, int, long));
int (*xa_start_entry) /* xa_start function pointer */
__P((XID *, int, long));
int (*xa_end_entry) /* xa_end function pointer */
__P((XID *, int, long));
int (*xa_rollback_entry) /* xa_rollback function pointer */
__P((XID *, int, long));
int (*xa_prepare_entry) /* xa_prepare function pointer */
__P((XID *, int, long));
int (*xa_commit_entry) /* xa_commit function pointer */
__P((XID *, int, long));
int (*xa_recover_entry) /* xa_recover function pointer */
__P((XID *, long, int, long));
int (*xa_forget_entry) /* xa_forget function pointer */
__P((XID *, int, long));
int (*xa_complete_entry) /* xa_complete function pointer */
__P((int *, int *, int, long));
};
/*
* Flag definitions for the RM switch
*/
#define TMNOFLAGS 0x00000000L /* no resource manager features
selected */
#define TMREGISTER 0x00000001L /* resource manager dynamically
registers */
#define TMNOMIGRATE 0x00000002L /* resource manager does not support
association migration */
#define TMUSEASYNC 0x00000004L /* resource manager supports
asynchronous operations */
/*
* Flag definitions for xa_ and ax_ routines
*/
/* use TMNOFLAGGS, defined above, when not specifying other flags */
#define TMASYNC 0x80000000L /* perform routine asynchronously */
#define TMONEPHASE 0x40000000L /* caller is using one-phase commit
optimisation */
#define TMFAIL 0x20000000L /* dissociates caller and marks
transaction branch rollback-only */
#define TMNOWAIT 0x10000000L /* return if blocking condition
exists */
#define TMRESUME 0x08000000L /* caller is resuming association with
suspended transaction branch */
#define TMSUCCESS 0x04000000L /* dissociate caller from transaction
branch */
#define TMSUSPEND 0x02000000L /* caller is suspending, not ending,
association */
#define TMSTARTRSCAN 0x01000000L /* start a recovery scan */
#define TMENDRSCAN 0x00800000L /* end a recovery scan */
#define TMMULTIPLE 0x00400000L /* wait for any asynchronous
operation */
#define TMJOIN 0x00200000L /* caller is joining existing
transaction branch */
#define TMMIGRATE 0x00100000L /* caller intends to perform
migration */
/*
* ax_() return codes (transaction manager reports to resource manager)
*/
#define TM_JOIN 2 /* caller is joining existing
transaction branch */
#define TM_RESUME 1 /* caller is resuming association with
suspended transaction branch */
#define TM_OK 0 /* normal execution */
#define TMER_TMERR -1 /* an error occurred in the transaction
manager */
#define TMER_INVAL -2 /* invalid arguments were given */
#define TMER_PROTO -3 /* routine invoked in an improper
context */
/*
* xa_() return codes (resource manager reports to transaction manager)
*/
#define XA_RBBASE 100 /* The inclusive lower bound of the
rollback codes */
#define XA_RBROLLBACK XA_RBBASE /* The rollback was caused by an
unspecified reason */
#define XA_RBCOMMFAIL XA_RBBASE+1 /* The rollback was caused by a
communication failure */
#define XA_RBDEADLOCK XA_RBBASE+2 /* A deadlock was detected */
#define XA_RBINTEGRITY XA_RBBASE+3 /* A condition that violates the
integrity of the resources was
detected */
#define XA_RBOTHER XA_RBBASE+4 /* The resource manager rolled back the
transaction branch for a reason not
on this list */
#define XA_RBPROTO XA_RBBASE+5 /* A protocol error occurred in the
resource manager */
#define XA_RBTIMEOUT XA_RBBASE+6 /* A transaction branch took too long */
#define XA_RBTRANSIENT XA_RBBASE+7 /* May retry the transaction branch */
#define XA_RBEND XA_RBTRANSIENT /* The inclusive upper bound of the
rollback codes */
#define XA_NOMIGRATE 9 /* resumption must occur where
suspension occurred */
#define XA_HEURHAZ 8 /* the transaction branch may have
been heuristically completed */
#define XA_HEURCOM 7 /* the transaction branch has been
heuristically committed */
#define XA_HEURRB 6 /* the transaction branch has been
heuristically rolled back */
#define XA_HEURMIX 5 /* the transaction branch has been
heuristically committed and rolled
back */
#define XA_RETRY 4 /* routine returned with no effect and
may be re-issued */
#define XA_RDONLY 3 /* the transaction branch was read-only
and has been committed */
#define XA_OK 0 /* normal execution */
#define XAER_ASYNC -2 /* asynchronous operation already
outstanding */
#define XAER_RMERR -3 /* a resource manager error occurred in
the transaction branch */
#define XAER_NOTA -4 /* the XID is not valid */
#define XAER_INVAL -5 /* invalid arguments were given */
#define XAER_PROTO -6 /* routine invoked in an improper
context */
#define XAER_RMFAIL -7 /* resource manager unavailable */
#define XAER_DUPID -8 /* the XID already exists */
#define XAER_OUTSIDE -9 /* resource manager doing work outside
transaction */
#endif /* ifndef XA_H */
/*
* End of xa.h header
*/
...@@ -2851,11 +2851,13 @@ void ...@@ -2851,11 +2851,13 @@ void
recv_recovery_from_checkpoint_finish(void) recv_recovery_from_checkpoint_finish(void)
/*======================================*/ /*======================================*/
{ {
int i;
os_thread_id_t recovery_thread_id;
/* Rollback the uncommitted transactions which have no user session */ /* Rollback the uncommitted transactions which have no user session */
if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO) { fprintf(stderr,
trx_rollback_or_clean_all_without_sess(); "InnoDB: Starting to apply log records to the database...\n");
}
/* Apply the hashed log records to the respective file pages */ /* Apply the hashed log records to the respective file pages */
...@@ -2888,9 +2890,15 @@ recv_recovery_from_checkpoint_finish(void) ...@@ -2888,9 +2890,15 @@ recv_recovery_from_checkpoint_finish(void)
/* Free the resources of the recovery system */ /* Free the resources of the recovery system */
recv_recovery_on = FALSE; recv_recovery_on = FALSE;
#ifndef UNIV_LOG_DEBUG #ifndef UNIV_LOG_DEBUG
recv_sys_free(); recv_sys_free();
#endif #endif
if (srv_force_recovery < SRV_FORCE_NO_TRX_UNDO) {
os_thread_create(trx_rollback_or_clean_all_without_sess,
(void *)&i, &recovery_thread_id);
}
} }
/********************************************************** /**********************************************************
......
...@@ -1509,7 +1509,6 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1509,7 +1509,6 @@ row_ins_scan_sec_index_for_duplicate(
ibool moved; ibool moved;
mtr_t mtr; mtr_t mtr;
trx_t* trx; trx_t* trx;
const char* ptr;
n_unique = dict_index_get_n_unique(index); n_unique = dict_index_get_n_unique(index);
...@@ -1630,7 +1629,6 @@ row_ins_duplicate_error_in_clust( ...@@ -1630,7 +1629,6 @@ row_ins_duplicate_error_in_clust(
page_t* page; page_t* page;
ulint n_unique; ulint n_unique;
trx_t* trx = thr_get_trx(thr); trx_t* trx = thr_get_trx(thr);
const char* ptr;
UT_NOT_USED(mtr); UT_NOT_USED(mtr);
......
...@@ -331,10 +331,11 @@ trx_savept_take( ...@@ -331,10 +331,11 @@ trx_savept_take(
/*********************************************************************** /***********************************************************************
Rollback or clean up transactions which have no user session. If the Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert transaction already was committed, then we clean up a possible insert
undo log. If the transaction was not yet committed, then we roll it back. */ undo log. If the transaction was not yet committed, then we roll it back.
Note: this is done in a background thread */
void void *
trx_rollback_or_clean_all_without_sess(void) trx_rollback_or_clean_all_without_sess(void *i)
/*========================================*/ /*========================================*/
{ {
mem_heap_t* heap; mem_heap_t* heap;
...@@ -362,7 +363,7 @@ trx_rollback_or_clean_all_without_sess(void) ...@@ -362,7 +363,7 @@ trx_rollback_or_clean_all_without_sess(void)
fprintf(stderr, fprintf(stderr,
"InnoDB: Starting rollback of uncommitted transactions\n"); "InnoDB: Starting rollback of uncommitted transactions\n");
} else { } else {
return; os_thread_exit(i);
} }
loop: loop:
heap = mem_heap_create(512); heap = mem_heap_create(512);
...@@ -371,9 +372,15 @@ loop: ...@@ -371,9 +372,15 @@ loop:
trx = UT_LIST_GET_FIRST(trx_sys->trx_list); trx = UT_LIST_GET_FIRST(trx_sys->trx_list);
while (trx && (trx->sess || (trx->conc_state == TRX_NOT_STARTED))) { while (trx) {
trx = UT_LIST_GET_NEXT(trx_list, trx); if ((trx->sess || (trx->conc_state == TRX_NOT_STARTED))) {
trx = UT_LIST_GET_NEXT(trx_list, trx);
} else if (trx->conc_state == TRX_PREPARED) {
trx->sess = trx_dummy_sess;
} else {
break;
}
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
...@@ -384,10 +391,11 @@ loop: ...@@ -384,10 +391,11 @@ loop:
mem_heap_free(heap); mem_heap_free(heap);
return; os_thread_exit(i);
} }
trx->sess = trx_dummy_sess; trx->sess = trx_dummy_sess;
if (trx->conc_state == TRX_COMMITTED_IN_MEMORY) { if (trx->conc_state == TRX_COMMITTED_IN_MEMORY) {
fprintf(stderr, "InnoDB: Cleaning up trx with id %lu %lu\n", fprintf(stderr, "InnoDB: Cleaning up trx with id %lu %lu\n",
...@@ -486,6 +494,8 @@ loop: ...@@ -486,6 +494,8 @@ loop:
mem_heap_free(heap); mem_heap_free(heap);
goto loop; goto loop;
os_thread_exit(i); /* not reached */
} }
/*********************************************************************** /***********************************************************************
......
...@@ -887,8 +887,12 @@ trx_sys_init_at_db_start(void) ...@@ -887,8 +887,12 @@ trx_sys_init_at_db_start(void)
trx = UT_LIST_GET_FIRST(trx_sys->trx_list); trx = UT_LIST_GET_FIRST(trx_sys->trx_list);
for (;;) { for (;;) {
rows_to_undo +=
if ( trx->conc_state != TRX_PREPARED) {
rows_to_undo +=
ut_conv_dulint_to_longlong(trx->undo_no); ut_conv_dulint_to_longlong(trx->undo_no);
}
trx = UT_LIST_GET_NEXT(trx_list, trx); trx = UT_LIST_GET_NEXT(trx_list, trx);
if (!trx) { if (!trx) {
......
...@@ -24,6 +24,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -24,6 +24,7 @@ Created 3/26/1996 Heikki Tuuri
#include "thr0loc.h" #include "thr0loc.h"
#include "btr0sea.h" #include "btr0sea.h"
#include "os0proc.h" #include "os0proc.h"
#include "xa.h"
/* Copy of the prototype for innobase_mysql_print_thd: this /* Copy of the prototype for innobase_mysql_print_thd: this
copy MUST be equal to the one in mysql/sql/ha_innodb.cc ! */ copy MUST be equal to the one in mysql/sql/ha_innodb.cc ! */
...@@ -156,6 +157,10 @@ trx_create( ...@@ -156,6 +157,10 @@ trx_create(
trx->read_view_heap = mem_heap_create(256); trx->read_view_heap = mem_heap_create(256);
trx->read_view = NULL; trx->read_view = NULL;
/* Set X/Open XA transaction identification to NULL */
memset(&trx->xid,0,sizeof(trx->xid));
trx->xid.formatID = -1;
return(trx); return(trx);
} }
...@@ -408,13 +413,22 @@ trx_lists_init_at_db_start(void) ...@@ -408,13 +413,22 @@ trx_lists_init_at_db_start(void)
trx = trx_create(NULL); trx = trx_create(NULL);
trx->id = undo->trx_id; trx->id = undo->trx_id;
trx->xid = undo->xid;
trx->insert_undo = undo; trx->insert_undo = undo;
trx->rseg = rseg; trx->rseg = rseg;
if (undo->state != TRX_UNDO_ACTIVE) { if (undo->state != TRX_UNDO_ACTIVE) {
trx->conc_state = TRX_COMMITTED_IN_MEMORY; /* Prepared transactions are left in
the prepared state waiting for a
commit or abort decision from MySQL */
if (undo->state == TRX_UNDO_PREPARED) {
trx->conc_state = TRX_PREPARED;
} else {
trx->conc_state =
TRX_COMMITTED_IN_MEMORY;
}
/* We give a dummy value for the trx no; /* We give a dummy value for the trx no;
this should have no relevance since purge this should have no relevance since purge
...@@ -457,10 +471,22 @@ trx_lists_init_at_db_start(void) ...@@ -457,10 +471,22 @@ trx_lists_init_at_db_start(void)
trx = trx_create(NULL); trx = trx_create(NULL);
trx->id = undo->trx_id; trx->id = undo->trx_id;
trx->xid = undo->xid;
if (undo->state != TRX_UNDO_ACTIVE) { if (undo->state != TRX_UNDO_ACTIVE) {
trx->conc_state =
TRX_COMMITTED_IN_MEMORY; /* Prepared transactions are left in
the prepared state waiting for a
commit or abort decision from MySQL */
if (undo->state == TRX_UNDO_PREPARED) {
trx->conc_state =
TRX_PREPARED;
} else {
trx->conc_state =
TRX_COMMITTED_IN_MEMORY;
}
/* We give a dummy value for the trx /* We give a dummy value for the trx
number */ number */
...@@ -726,7 +752,8 @@ trx_commit_off_kernel( ...@@ -726,7 +752,8 @@ trx_commit_off_kernel(
mutex_enter(&kernel_mutex); mutex_enter(&kernel_mutex);
} }
ut_ad(trx->conc_state == TRX_ACTIVE); ut_ad(trx->conc_state == TRX_ACTIVE || trx->conc_state == TRX_PREPARED);
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
...@@ -1667,3 +1694,238 @@ trx_print( ...@@ -1667,3 +1694,238 @@ trx_print(
innobase_mysql_print_thd(f, trx->mysql_thd); innobase_mysql_print_thd(f, trx->mysql_thd);
} }
} }
/********************************************************************
Prepares a transaction. */
void
trx_prepare_off_kernel(
/*==================*/
trx_t* trx) /* in: transaction */
{
page_t* update_hdr_page;
dulint lsn;
trx_rseg_t* rseg;
trx_undo_t* undo;
ibool must_flush_log = FALSE;
mtr_t mtr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
rseg = trx->rseg;
if (trx->insert_undo != NULL || trx->update_undo != NULL) {
mutex_exit(&kernel_mutex);
mtr_start(&mtr);
must_flush_log = TRUE;
/* Change the undo log segment states from TRX_UNDO_ACTIVE
to some other state: these modifications to the file data
structure define the transaction as prepared in the file
based world, at the serialization point of the log sequence
number lsn obtained below. */
mutex_enter(&(rseg->mutex));
if (trx->insert_undo != NULL) {
trx_undo_set_state_at_prepare(trx, trx->insert_undo,
&mtr);
}
undo = trx->update_undo;
if (undo) {
mutex_enter(&kernel_mutex);
trx->no = trx_sys_get_new_trx_no();
mutex_exit(&kernel_mutex);
/* It is not necessary to obtain trx->undo_mutex here
because only a single OS thread is allowed to do the
transaction prepare for this transaction. */
update_hdr_page = trx_undo_set_state_at_prepare(trx, undo, &mtr);
}
mutex_exit(&(rseg->mutex));
/*--------------*/
mtr_commit(&mtr);
/*--------------*/
lsn = mtr.end_lsn;
mutex_enter(&kernel_mutex);
}
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
/*--------------------------------------*/
trx->conc_state = TRX_PREPARED;
/*--------------------------------------*/
if (trx->read_view) {
read_view_close(trx->read_view);
mem_heap_empty(trx->read_view_heap);
trx->read_view = NULL;
}
if (must_flush_log) {
mutex_exit(&kernel_mutex);
if (trx->insert_undo != NULL) {
trx_undo_insert_cleanup(trx);
}
/* Write the log to the log files AND flush them to disk */
/*-------------------------------------*/
log_write_up_to(lsn, LOG_WAIT_ONE_GROUP, TRUE);
/*-------------------------------------*/
mutex_enter(&kernel_mutex);
}
}
/**************************************************************************
Does the transaction prepare for MySQL. */
ulint
trx_prepare_for_mysql(
/*=================*/
/* out: 0 or error number */
trx_t* trx) /* in: trx handle */
{
/* Because we do not do the prepare by sending an Innobase
sig to the transaction, we must here make sure that trx has been
started. */
ut_a(trx);
trx->op_info = "preparing";
trx_start_if_not_started(trx);
mutex_enter(&kernel_mutex);
trx_prepare_off_kernel(trx);
mutex_exit(&kernel_mutex);
trx->op_info = "";
return(0);
}
/**************************************************************************
This function is used to find number of prepared transactions and
their transaction objects for a recovery. */
int
trx_recover_for_mysql(
/*==================*/
/* out: number of prepared transactions
stored in xid_list */
XID* xid_list, /* in/out: prepared transactions */
uint len) /* in: number of slots in xid_list */
{
trx_t* trx;
int num_of_transactions = 0;
ut_ad(xid_list);
ut_ad(len);
fprintf(stderr,
"InnoDB: Starting recovery for XA transactions...\n");
/* We should set those transactions which are in
the prepared state to the xid_list */
mutex_enter(&kernel_mutex);
trx = UT_LIST_GET_FIRST(trx_sys->trx_list);
while (trx) {
if (trx->conc_state == TRX_PREPARED) {
xid_list[num_of_transactions] = trx->xid;
num_of_transactions++;
if ( (uint)num_of_transactions == len ) {
break;
}
}
trx = UT_LIST_GET_NEXT(trx_list, trx);
}
mutex_exit(&kernel_mutex);
fprintf(stderr,
"InnoDB: %d transactions in prepare state after recovery\n",
num_of_transactions);
return (num_of_transactions);
}
/***********************************************************************
This function is used to find one X/Open XA distributed transaction
which is in the prepared state */
trx_t *
trx_get_trx_by_xid(
/*===============*/
/* out: trx or NULL */
XID* xid) /* in: X/Open XA Transaction Idenfication */
{
trx_t* trx;
if (xid == NULL) {
return (NULL);
}
mutex_enter(&kernel_mutex);
trx = UT_LIST_GET_FIRST(trx_sys->trx_list);
while (trx) {
/* Compare two X/Open XA transaction id's: their
length should be the same and binary comparison
of gtrid_lenght+bqual_length bytes should be
the same */
if (xid->gtrid_length == trx->xid.gtrid_length &&
xid->bqual_length == trx->xid.bqual_length &&
memcmp(xid, &trx->xid,
xid->gtrid_length +
xid->bqual_length) == 0) {
break;
}
trx = UT_LIST_GET_NEXT(trx_list, trx);
}
mutex_exit(&kernel_mutex);
if (trx) {
if (trx->conc_state != TRX_PREPARED) {
return(NULL);
}
return(trx);
} else {
return(NULL);
}
}
This diff is collapsed.
...@@ -80,6 +80,7 @@ extern "C" { ...@@ -80,6 +80,7 @@ extern "C" {
#include "../innobase/include/fsp0fsp.h" #include "../innobase/include/fsp0fsp.h"
#include "../innobase/include/sync0sync.h" #include "../innobase/include/sync0sync.h"
#include "../innobase/include/fil0fil.h" #include "../innobase/include/fil0fil.h"
#include "../innobase/include/xa.h"
} }
#define HA_INNOBASE_ROWS_IN_TABLE 10000 /* to get optimization right */ #define HA_INNOBASE_ROWS_IN_TABLE 10000 /* to get optimization right */
...@@ -149,6 +150,15 @@ static mysql_byte* innobase_get_key(INNOBASE_SHARE *share,uint *length, ...@@ -149,6 +150,15 @@ static mysql_byte* innobase_get_key(INNOBASE_SHARE *share,uint *length,
static INNOBASE_SHARE *get_share(const char *table_name); static INNOBASE_SHARE *get_share(const char *table_name);
static void free_share(INNOBASE_SHARE *share); static void free_share(INNOBASE_SHARE *share);
/*********************************************************************
Commits a transaction in an InnoDB database. */
void
innobase_commit_low(
/*================*/
trx_t* trx); /* in: transaction handle */
/* General functions */ /* General functions */
/********************************************************************** /**********************************************************************
...@@ -1254,7 +1264,7 @@ innobase_commit( ...@@ -1254,7 +1264,7 @@ innobase_commit(
if (trx_handle != (void*)&innodb_dummy_stmt_trx_handle if (trx_handle != (void*)&innodb_dummy_stmt_trx_handle
|| (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) { || (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) {
/* We were instructed to commit the whole transaction, or /* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */ this is an SQL statement end and autocommit is on */
innobase_commit_low(trx); innobase_commit_low(trx);
...@@ -1395,6 +1405,39 @@ innobase_rollback( ...@@ -1395,6 +1405,39 @@ innobase_rollback(
DBUG_RETURN(convert_error_code_to_mysql(error, NULL)); DBUG_RETURN(convert_error_code_to_mysql(error, NULL));
} }
/*********************************************************************
Rolls back a transaction */
int
innobase_rollback_trx(
/*==================*/
/* out: 0 or error number */
trx_t* trx) /* in: transaction */
{
int error = 0;
DBUG_ENTER("innobase_rollback_trx");
DBUG_PRINT("trans", ("aborting transaction"));
/* Release a possible FIFO ticket and search latch. Since we will
reserve the kernel mutex, we have to release the search system latch
first to obey the latching order. */
innobase_release_stat_resources(trx);
if (trx->auto_inc_lock) {
/* If we had reserved the auto-inc lock for some table (if
we come here to roll back the latest SQL statement) we
release it now before a possibly lengthy rollback */
row_unlock_table_autoinc_for_mysql(trx);
}
error = trx_rollback_for_mysql(trx);
DBUG_RETURN(convert_error_code_to_mysql(error, NULL));
}
/********************************************************************* /*********************************************************************
Rolls back a transaction to a savepoint. */ Rolls back a transaction to a savepoint. */
...@@ -5666,4 +5709,158 @@ innobase_query_is_replace(void) ...@@ -5666,4 +5709,158 @@ innobase_query_is_replace(void)
} }
} }
/***********************************************************************
This function is used to prepare X/Open XA distributed transaction */
int innobase_xa_prepare(
/*====================*/
/* out: 0 or error number */
THD* thd, /* in: handle to the MySQL thread of the user
whose XA transaction should be prepared */
bool all) /* in: TRUE - commit transaction
FALSE - the current SQL statement ended */
{
int error = 0;
trx_t* trx;
trx = check_trx_exists(thd);
/* TODO: Get X/Open XA Transaction Identification from MySQL*/
memset(&trx->xid, 0, sizeof(trx->xid));
trx->xid.formatID = -1;
/* Release a possible FIFO ticket and search latch. Since we will
reserve the kernel mutex, we have to release the search system latch
first to obey the latching order. */
innobase_release_stat_resources(trx);
if (trx->active_trans == 0 && trx->conc_state != TRX_NOT_STARTED) {
fprintf(stderr,
"InnoDB: Error: thd->transaction.all.innodb_active_trans == 0\n"
"InnoDB: but trx->conc_state != TRX_NOT_STARTED\n");
}
if (all || (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))) {
/* We were instructed to prepare the whole transaction, or
this is an SQL statement end and autocommit is on */
error = trx_prepare_for_mysql(trx);
} else {
/* We just mark the SQL statement ended and do not do a
transaction prepare */
if (trx->auto_inc_lock) {
/* If we had reserved the auto-inc lock for some
table in this SQL statement we release it now */
row_unlock_table_autoinc_for_mysql(trx);
}
/* Store the current undo_no of the transaction so that we
know where to roll back if we have to roll back the next
SQL statement */
trx_mark_sql_stat_end(trx);
}
/* Tell the InnoDB server that there might be work for utility
threads: */
srv_active_wake_master_thread();
return error;
}
/***********************************************************************
This function is used to recover X/Open XA distributed transactions */
int innobase_xa_recover(
/* out: number of prepared transactions
stored in xid_list */
XID* xid_list, /* in/out: prepared transactions */
uint len) /* in: number of slots in xid_list */
/*====================*/
{
if (len == 0 || xid_list == NULL) {
return 0;
}
return (trx_recover_for_mysql(xid_list, len));
}
/***********************************************************************
This function is used to commit one X/Open XA distributed transaction
which is in the prepared state */
int innobase_commit_by_xid(
/*=======================*/
/* out: 0 or error number */
XID* xid) /* in: X/Open XA Transaction Identification */
{
trx_t* trx;
trx = trx_get_trx_by_xid(xid);
if (trx) {
innobase_commit_low(trx);
return(XA_OK);
} else {
return(XAER_NOTA);
}
}
/***********************************************************************
This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state */
int innobase_rollback_by_xid(
/* out: 0 or error number */
XID *xid) /* in : X/Open XA Transaction Idenfification */
{
trx_t* trx;
trx = trx_get_trx_by_xid(xid);
if (trx) {
return(innobase_rollback_trx(trx));
} else {
return(XAER_NOTA);
}
}
/***********************************************************************
This function is used to test commit/rollback of XA transactions */
int innobase_xa_end(
/*================*/
THD* thd) /* in: MySQL thread handle of the user for whom
transactions should be recovered */
{
DBUG_ENTER("innobase_xa_end");
XID trx_list[100];
int trx_num, trx_num_max = 100;
int i;
XID xid;
while(trx_num = innobase_xa_recover(trx_list, trx_num_max)) {
for(i=0;i < trx_num; i++) {
xid = trx_list[i];
if ( i % 2) {
innobase_commit_by_xid(&xid);
} else {
innobase_rollback_by_xid(&xid);
}
}
}
free(trx_list);
DBUG_RETURN(0);
}
#endif /* HAVE_INNOBASE_DB */ #endif /* HAVE_INNOBASE_DB */
...@@ -243,3 +243,46 @@ void innobase_release_temporary_latches(void* innobase_tid); ...@@ -243,3 +243,46 @@ void innobase_release_temporary_latches(void* innobase_tid);
void innobase_store_binlog_offset_and_flush_log(char *binlog_name,longlong offset); void innobase_store_binlog_offset_and_flush_log(char *binlog_name,longlong offset);
int innobase_start_trx_and_assign_read_view(THD* thd); int innobase_start_trx_and_assign_read_view(THD* thd);
/***********************************************************************
This function is used to prepare X/Open XA distributed transaction */
int innobase_xa_prepare(
/*====================*/
/* out: 0 or error number */
THD* thd, /* in: handle to the MySQL thread of the user
whose XA transaction should be prepared */
bool all); /* in: TRUE - commit transaction
FALSE - the current SQL statement ended */
/***********************************************************************
This function is used to recover X/Open XA distributed transactions */
int innobase_xa_recover(
/*====================*/
/* out: number of prepared transactions
stored in xid_list */
XID* xid_list, /* in/out: prepared transactions */
uint len); /* in: number of slots in xid_list */
/***********************************************************************
This function is used to commit one X/Open XA distributed transaction
which is in the prepared state */
int innobase_commit_by_xid(
/*=======================*/
/* out: 0 or error number */
XID* xid); /* in : X/Open XA Transaction Identification */
/***********************************************************************
This function is used to rollback one X/Open XA distributed transaction
which is in the prepared state */
int innobase_rollback_by_xid(
/* out: 0 or error number */
XID *xid); /* in : X/Open XA Transaction Idenfification */
int innobase_xa_end(THD *thd);
...@@ -193,6 +193,41 @@ typedef struct st_thd_trans { ...@@ -193,6 +193,41 @@ typedef struct st_thd_trans {
void *ndb_tid; void *ndb_tid;
} THD_TRANS; } THD_TRANS;
#ifndef XIDDATASIZE /* no xa.h included */
/* XXX - may be we should disable xa completely in this case ? */
#define XIDDATASIZE 128
#define MAXGTRIDSIZE 64
#define MAXBQUALSIZE 64
struct xid_t {
long formatID;
long gtrid_length;
long bqual_length;
char data[XIDDATASIZE];
};
typedef struct xid_t XID;
#endif
typedef struct
{
byte slot;
uint savepoint_offset;
int (*close_connection)(THD *thd);
int (*savepoint_set)(THD *thd, void *sv);
int (*savepoint_rollback)(THD *thd, void *sv);
int (*savepoint_release)(THD *thd, void *sv);
int (*commit)(THD *thd, bool all);
int (*rollback)(THD *thd, bool all);
int (*prepare)(THD *thd, bool all);
int (*recover)(XID *xid_list, uint len);
int (*commit_by_xid)(XID *xid);
int (*rollback_by_xid)(XID *xid);
} handlerton;
enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED, enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED,
ISO_REPEATABLE_READ, ISO_SERIALIZABLE}; ISO_REPEATABLE_READ, ISO_SERIALIZABLE};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment