Commit 2c035a59 authored by sasha@mysql.sashanet.com's avatar sasha@mysql.sashanet.com

Merge work:/home/bk/mysql-4.0

into mysql.sashanet.com:/reiser-data/mysql-4.0
parents 6806b51a be388422
......@@ -21,7 +21,8 @@
* all copies and derivative works. Thank you. *
* *
* The author makes no warranty of any kind with respect to this *
* product and explicitly disclaims any implied warranties of mer- *
* product and explicitly disclaims any implied warranties of mer- *ct_lex.table_list.first=0;
thd->lex.selec
* chantability or fitness for any particular purpose. *
* *
******************************************************************************
......@@ -58,7 +59,7 @@
* seismo!bpa!sjuvax!bbanerje
*
* Michael Widenius:
* DBUG_DUMP - To dump a pice of memory.
* DBUG_DUMP - To dump a block of memory.
* PUSH_FLAG "O" - To be used insted of "o" if we don't
* want flushing (for slow systems)
* PUSH_FLAG "A" - as 'O', but we will append to the out file instead
......@@ -705,9 +706,15 @@ char ***_sframep_ __attribute__((unused)))
if (!_no_db_)
{
int save_errno=errno;
/* Sasha: the test below is so we could call functions with DBUG_ENTER
before my_thread_init(). I needed this because I suspected corruption
of a block allocated by my_thread_init() itself, so I wanted to use
my_malloc()/my_free() in my_thread_init()/my_thread_end()
*/
if (!(state=code_state()))
return;
if (!init_done)
_db_push_ (_DBUG_START_CONDITION_);
state=code_state();
*_sfunc_ = state->func;
*_sfile_ = state->file;
......@@ -787,10 +794,10 @@ uint *_slevel_)
if (!_no_db_)
{
int save_errno=errno;
if (!(state=code_state()))
return;
if (!init_done)
_db_push_ ("");
if (!(state=code_state()))
return; /* Only happens at end of program */
if (stack->flags & (TRACE_ON | DEBUG_ON | PROFILE_ON))
{
if (!state->locked)
......@@ -855,6 +862,9 @@ uint _line_,
const char *keyword)
{
CODE_STATE *state=code_state();
/* Sasha: pre-my_thread_init() safety */
if (!state)
return;
state->u_line = _line_;
state->u_keyword = (char*) keyword;
}
......@@ -890,7 +900,9 @@ void _db_doprnt_ (const char *format,...)
{
va_list args;
CODE_STATE *state;
state=code_state();
/* Sasha: pre-my_thread_init() safety */
if (!(state=code_state()))
return;
va_start(args,format);
......@@ -942,7 +954,9 @@ uint length)
int pos;
char dbuff[90];
CODE_STATE *state;
state=code_state();
/* Sasha: pre-my_thread_init() safety */
if (!(state=code_state()))
return;
if (_db_keyword_ ((char*) keyword))
{
......@@ -1224,7 +1238,9 @@ const char *keyword)
if (!init_done)
_db_push_ ("");
state=code_state();
/* Sasha: pre-my_thread_init() safety */
if (!(state=code_state()))
return FALSE;
result = FALSE;
if (DEBUGGING &&
state->level <= stack -> maxdepth &&
......
......@@ -137,6 +137,10 @@ extern int NEAR my_errno; /* Last error in mysys */
#define NORMAL_SAFEMALLOC sf_malloc_quick=0
extern uint sf_malloc_prehunc,sf_malloc_endhunc,sf_malloc_quick;
extern ulonglong safemalloc_mem_limit;
/* keep track of shutdown,signal, and main threads so that my_end() will not
report errors with them
*/
extern pthread_t shutdown_th, main_th,signal_th;
#define CALLER_INFO_PROTO , const char *sFile, uint uLine
#define CALLER_INFO , __FILE__, __LINE__
#define ORIG_CALLER_INFO , sFile, uLine
......
......@@ -2,7 +2,34 @@ drop table if exists t1,t2,t3;
create table t1(id1 int not null auto_increment primary key, t char(12));
create table t2(id2 int not null, t char(12));
create table t3(id3 int not null, t char(12), index(id3));
select count(*) from t1 where id1 > 95;
count(*)
5
select count(*) from t2 where id2 > 95;
count(*)
25
select count(*) from t3 where id3 > 95;
count(*)
250
update t1,t2,t3 set t1.t="aaa", t2.t="bbb", t3.t="cc" where t1.id1 = t2.id2 and t2.id2 = t3.id3 and t1.id1 > 90;
select count(*) from t1 where t = "aaa";
count(*)
10
select count(*) from t1 where id1 > 90;
count(*)
10
select count(*) from t2 where t = "bbb";
count(*)
10
select count(*) from t2 where id2 > 90;
count(*)
50
select count(*) from t3 where t = "cc";
count(*)
500
select count(*) from t3 where id3 > 90;
count(*)
500
delete t1.*, t2.*, t3.* from t1,t2,t3 where t1.id1 = t2.id2 and t2.id2 = t3.id3 and t1.id1 > 95;
check table t1, t2, t3;
Table Op Msg_type Msg_text
......
......@@ -9,16 +9,16 @@ Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Lo
change master to master_host='127.0.0.1';
show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
127.0.0.1 test MASTER_PORT 60 4 slave-relay-bin.001 4 No No 0 0 0
127.0.0.1 test MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0
change master to master_host='127.0.0.1',master_user='root',
master_password='',master_port=MASTER_PORT;
show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
127.0.0.1 root MASTER_PORT 60 4 slave-relay-bin.001 4 No No 0 0 0
127.0.0.1 root MASTER_PORT 7 4 slave-relay-bin.001 4 No No 0 0 0
slave start;
show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
127.0.0.1 root MASTER_PORT 60 master-bin.001 79 slave-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79
127.0.0.1 root MASTER_PORT 7 master-bin.001 79 slave-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79
drop table if exists t1;
create table t1 (n int);
insert into t1 values (10),(45),(90);
......
......@@ -29,7 +29,18 @@ while ($1)
dec $1;
}
enable_query_log;
select count(*) from t1 where id1 > 95;
select count(*) from t2 where id2 > 95;
select count(*) from t3 where id3 > 95;
update t1,t2,t3 set t1.t="aaa", t2.t="bbb", t3.t="cc" where t1.id1 = t2.id2 and t2.id2 = t3.id3 and t1.id1 > 90;
select count(*) from t1 where t = "aaa";
select count(*) from t1 where id1 > 90;
select count(*) from t2 where t = "bbb";
select count(*) from t2 where id2 > 90;
select count(*) from t3 where t = "cc";
select count(*) from t3 where id3 > 90;
delete t1.*, t2.*, t3.* from t1,t2,t3 where t1.id1 = t2.id2 and t2.id2 = t3.id3 and t1.id1 > 95;
check table t1, t2, t3;
......
......@@ -8,6 +8,7 @@ sync_with_master;
show slave status;
change master to master_log_pos=73;
slave stop;
change master to master_log_pos=73;
--replace_result 3306 MASTER_PORT 9306 MASTER_PORT 3334 MASTER_PORT 3336 MASTER_PORT
show slave status;
......
--server-id=2
--server-id=22 --master-connect-retry=7
......@@ -8,6 +8,7 @@ connection slave;
reset slave;
--replace_result $MASTER_MYPORT MASTER_PORT
show slave status;
change master to master_host='127.0.0.1';
# The following needs to be cleaned up when change master is fixed
--replace_result $MASTER_MYPORT MASTER_PORT 3306 MASTER_PORT 3334 MASTER_PORT
......
......@@ -38,6 +38,7 @@ struct irem {
my_string _sFileName; /* File in which memory was new'ed */
uint _uLineNum; /* Line number in above file */
uint _uDataSize; /* Size requested */
pthread_t thread_id;
long _lSpecialValue; /* Underrun marker value */
};
......@@ -56,6 +57,11 @@ extern const char *soundex_map;
extern USED_MEM* my_once_root_block;
extern uint my_once_extra;
/* these threads are exept from safemalloc leak scrutiny unless
PEDANTIC_SAFEMALLOC is defined
*/
extern pthread_t signal_thread,kill_thread;
#ifndef HAVE_TEMPNAM
extern int _my_tempnam_used;
#endif
......
......@@ -105,19 +105,33 @@ static long thread_id=0;
my_bool my_thread_init(void)
{
struct st_my_thread_var *tmp;
#ifdef EXTRA_DEBUG
fprintf(stderr,"my_thread_init(): thread_id=%ld\n",pthread_self());
#endif
#if !defined(__WIN__) || defined(USE_TLS) || ! defined(SAFE_MUTEX)
pthread_mutex_lock(&THR_LOCK_lock);
#endif
#if !defined(__WIN__) || defined(USE_TLS)
if (my_pthread_getspecific(struct st_my_thread_var *,THR_KEY_mysys))
{
#ifdef EXTRA_DEBUG
fprintf(stderr,"my_thread_init() called more than once in thread %ld\n",
pthread_self());
#endif
pthread_mutex_unlock(&THR_LOCK_lock);
return 0; /* Safequard */
}
/* We must have many calloc() here because these are freed on
pthread_exit */
/*
Sasha: the above comment does not make sense. I have changed calloc() to
equivalent my_malloc() but it was calloc() before. It seems like the
comment is out of date - we always call my_thread_end() before
pthread_exit() to clean up. Note that I have also fixed up DBUG
code to be able to call it from my_thread_init()
*/
if (!(tmp=(struct st_my_thread_var *)
calloc(1,sizeof(struct st_my_thread_var))))
my_malloc(sizeof(struct st_my_thread_var),MYF(MY_WME|MY_ZEROFILL))))
{
pthread_mutex_unlock(&THR_LOCK_lock);
return 1;
......@@ -125,6 +139,9 @@ my_bool my_thread_init(void)
pthread_setspecific(THR_KEY_mysys,tmp);
#else
/* Sasha: TODO - explain what exactly we are doing on Windows
At first glance, I have a hard time following the code
*/
if (THR_KEY_mysys.id) /* Already initialized */
{
#if !defined(__WIN__) || defined(USE_TLS) || ! defined(SAFE_MUTEX)
......@@ -146,9 +163,18 @@ my_bool my_thread_init(void)
void my_thread_end(void)
{
struct st_my_thread_var *tmp=my_thread_var;
#ifdef EXTRA_DEBUG
fprintf(stderr,"my_thread_end(): tmp=%p,thread_id=%ld\n",
tmp,pthread_self());
#endif
if (tmp)
{
#if !defined(DBUG_OFF)
/* Sasha: tmp->dbug is allocated inside DBUG library
so for now we will not mess with trying to use my_malloc()/
my_free(), but in the future it would be nice to figure out a
way to do it
*/
if (tmp->dbug)
{
free(tmp->dbug);
......@@ -160,12 +186,15 @@ void my_thread_end(void)
#endif
pthread_mutex_destroy(&tmp->mutex);
#if (!defined(__WIN__) && !defined(OS2)) || defined(USE_TLS)
free(tmp);
#endif
}
#if (!defined(__WIN__) && !defined(OS2)) || defined(USE_TLS)
/* we need to setspecific to 0 BEFORE we call my_free, as my_free
uses some DBUG_ macros that will use the follow the specific
pointer after the block it is pointing to has been freed if
specific does not get reset first
*/
pthread_setspecific(THR_KEY_mysys,0);
my_free((gptr)tmp,MYF(MY_WME));
#endif
}
}
struct st_my_thread_var *_my_thread_var(void)
......
......@@ -73,14 +73,25 @@
#include "mysys_err.h"
ulonglong safemalloc_mem_limit = ~(ulonglong)0;
pthread_t shutdown_th=0,main_th=0,signal_th=0;
#define pNext tInt._pNext
#define pPrev tInt._pPrev
#define sFileName tInt._sFileName
#define uLineNum tInt._uLineNum
#define uDataSize tInt._uDataSize
#define thread_id tInt.thread_id
#define lSpecialValue tInt._lSpecialValue
#ifndef PEDANTIC_SAFEMALLOC
static int sf_malloc_tampered = 0; /* set to 1 after TERMINATE() if we had
to fiddle with cNewCount and the linked
list of blocks so that _sanity() will
not fuss when it is not supposed to
*/
#endif
/* Static functions prototypes */
static int check_ptr(const char *where, byte *ptr, const char *sFile,
......@@ -174,6 +185,7 @@ gptr _mymalloc (uint uSize, const char *sFile, uint uLine, myf MyFlags)
pTmp -> sFileName = (my_string) sFile;
pTmp -> uLineNum = uLine;
pTmp -> uDataSize = uSize;
pTmp->thread_id = pthread_self();
pTmp -> pPrev = NULL;
/* Add this remember structure to the linked list */
......@@ -359,6 +371,12 @@ static int check_ptr(const char *where, byte *ptr, const char *sFile,
return 0;
}
static int legal_leak(struct remember* pPtr)
{
return pthread_self() == pPtr->thread_id || main_th == pPtr->thread_id
|| shutdown_th == pPtr->thread_id
|| signal_th == pPtr->thread_id;
}
/*
* TERMINATE(FILE *file)
......@@ -376,6 +394,47 @@ void TERMINATE (FILE *file)
/* NEW and the number of calls to FREE. >0 means more */
/* NEWs than FREEs. <0, etc. */
#ifndef PEDANTIC_SAFEMALLOC
/* Avoid false alarms for blocks that we cannot free before my_end()
This does miss some positives, but that is ok. This will only miss
failures to free things allocated in the main thread which
performs only one-time allocations. If you really need to
debug memory allocations in the main thread,
#define PEDANTIC_SAFEMALLOC
*/
if ((pPtr=pRememberRoot))
{
while (pPtr)
{
if (legal_leak(pPtr))
{
sf_malloc_tampered=1;
cNewCount--;
lCurMemory -= pPtr->uDataSize;
if (pPtr->pPrev)
{
struct remember* tmp;
tmp = pPtr->pPrev->pNext = pPtr->pNext;
if (tmp)
tmp->pPrev = pPtr->pPrev;
pPtr->pNext = pPtr->pPrev = 0;
pPtr = tmp;
}
else
{
pRememberRoot = pPtr->pNext;
pPtr->pNext = pPtr->pPrev = 0;
pPtr = pRememberRoot;
if (pPtr)
pPtr->pPrev=0;
}
}
else
pPtr = pPtr->pNext;
}
}
#endif
if (cNewCount)
{
if (file)
......@@ -402,10 +461,14 @@ void TERMINATE (FILE *file)
if (file)
{
fprintf (file,
"\t%6u bytes at 0x%09lx, allocated at line %4u in '%s'\n",
"\t%6u bytes at 0x%09lx, allocated at line %4u in '%s'",
pPtr -> uDataSize,
(ulong) &(pPtr -> aData[sf_malloc_prehunc]),
pPtr -> uLineNum, pPtr -> sFileName);
#ifdef THREAD
fprintf(file, " in thread %ld", pPtr->thread_id);
#endif
fprintf(file, "\n");
(void) fflush(file);
}
DBUG_PRINT("safe",
......@@ -484,6 +547,10 @@ int _sanity (const char *sFile, uint uLine)
uint count=0;
pthread_mutex_lock(&THR_LOCK_malloc);
#ifndef PEDANTIC_SAFEMALLOC
if (sf_malloc_tampered && cNewCount < 0)
cNewCount=0;
#endif
count=cNewCount;
for (pTmp = pRememberRoot; pTmp != NULL && count-- ; pTmp = pTmp -> pNext)
flag+=_checkchunk (pTmp, sFile, uLine);
......@@ -492,6 +559,7 @@ int _sanity (const char *sFile, uint uLine)
{
const char *format="Safemalloc link list destroyed, discovered at '%s:%d'";
fprintf (stderr, format, sFile, uLine); fputc('\n',stderr);
fprintf (stderr, "root=%p,count=%d,pTmp=%p\n", pRememberRoot,count,pTmp);
(void) fflush(stderr);
DBUG_PRINT("safe",(format, sFile, uLine));
flag=1;
......
......@@ -280,6 +280,8 @@ static SYMBOL symbols[] = {
{ "READ", SYM(READ_SYM),0,0},
{ "REAL", SYM(REAL),0,0},
{ "REFERENCES", SYM(REFERENCES),0,0},
{ "RELAY_LOG_FILE", SYM(RELAY_LOG_FILE_SYM),0,0},
{ "RELAY_LOG_POS", SYM(RELAY_LOG_POS_SYM),0,0},
{ "RELOAD", SYM(RELOAD),0,0},
{ "REGEXP", SYM(REGEXP),0,0},
{ "RENAME", SYM(RENAME),0,0},
......
......@@ -479,6 +479,7 @@ err:
rli->relay_log_pos = 4;
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name));
flush_relay_log_info(rli);
}
/*
No need to free io_buf because we allocated both fname and io_buf in
......
......@@ -1607,7 +1607,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
(actual_error = thd->net.last_errno) && expected_error)
(actual_error = thd->net.last_errno) && expected_error &&
!ignored_error_code(actual_error))
{
const char* errmsg = "Slave: did not get the expected error\
running query from master - expected: '%s' (%d), got '%s' (%d)";
......
......@@ -38,7 +38,17 @@
#define ONE_THREAD
#endif
/* do stack traces are only supported on linux intel */
#ifdef SAFEMALLOC
#define SHUTDOWN_THD shutdown_th=pthread_self();
#define MAIN_THD main_th=pthread_self();
#define SIGNAL_THD signal_th=pthread_self();
#else
#define SHUTDOWN_THD
#define MAIN_THD
#define SIGNAL_THD
#endif
/* stack traces are only supported on linux intel */
#if defined(__linux__) && defined(__i386__) && defined(USE_PSTACK)
#define HAVE_STACK_TRACE_ON_SEGV
#include "../pstack/pstack.h"
......@@ -701,6 +711,7 @@ static void __cdecl kill_server(int sig_ptr)
sql_print_error(ER(ER_GOT_SIGNAL),my_progname,sig); /* purecov: inspected */
#if defined(USE_ONE_SIGNAL_HAND) && !defined(__WIN__) && !defined(OS2)
SHUTDOWN_THD;
my_thread_init(); // If this is a new thread
#endif
close_connections();
......@@ -716,6 +727,7 @@ static void __cdecl kill_server(int sig_ptr)
#ifdef USE_ONE_SIGNAL_HAND
static pthread_handler_decl(kill_server_thread,arg __attribute__((unused)))
{
SHUTDOWN_THD;
my_thread_init(); // Initialize new thread
kill_server(0);
my_thread_end(); // Normally never reached
......@@ -1262,6 +1274,7 @@ static void init_signals(void)
signal(SIGALRM, SIG_IGN);
signal(SIGBREAK,SIG_IGN);
signal_thread = pthread_self();
SIGNAL_THD;
}
static void start_signal_handler(void)
......@@ -1387,6 +1400,7 @@ static void init_signals(void)
sigaction(SIGBUS, &sa, NULL);
#endif
sigaction(SIGILL, &sa, NULL);
sigaction(SIGFPE, &sa, NULL);
}
(void) sigemptyset(&set);
#ifdef THREAD_SPECIFIC_SIGPIPE
......@@ -1454,7 +1468,7 @@ static void *signal_hand(void *arg __attribute__((unused)))
int sig;
my_thread_init(); // Init new thread
DBUG_ENTER("signal_hand");
SIGNAL_THD;
/* Setup alarm handler */
init_thr_alarm(max_connections+max_insert_delayed_threads);
#if SIGINT != THR_KILL_SIGNAL
......@@ -1509,7 +1523,10 @@ static void *signal_hand(void *arg __attribute__((unused)))
else
while ((error=my_sigwait(&set,&sig)) == EINTR) ;
if (cleanup_done)
{
my_thread_end();
pthread_exit(0); // Safety
}
switch (sig) {
case SIGTERM:
case SIGQUIT:
......@@ -1603,6 +1620,7 @@ int uname(struct utsname *a)
pthread_handler_decl(handle_shutdown,arg)
{
MSG msg;
SHUTDOWN_THD;
my_thread_init();
/* this call should create the message queue for this thread */
......@@ -1629,6 +1647,7 @@ int __stdcall handle_kill(ulong ctrl_type)
#ifdef OS2
pthread_handler_decl(handle_shutdown,arg)
{
SHUTDOWN_THD;
my_thread_init();
// wait semaphore
......@@ -1700,6 +1719,7 @@ int main(int argc, char **argv)
my_umask=0660; // Default umask for new files
my_umask_dir=0700; // Default umask for new directories
MAIN_THD;
MY_INIT(argv[0]); // init my_sys library & pthreads
tzset(); // Set tzname
......@@ -1891,45 +1911,6 @@ int main(int argc, char **argv)
using_update_log=1;
}
init_slave();
if (opt_bin_log && !server_id)
{
server_id= !master_host ? 1 : 2;
switch (server_id) {
#ifdef EXTRA_DEBUG
case 1:
sql_print_error("\
Warning: You have enabled the binary log, but you haven't set server-id:\n\
Updates will be logged to the binary log, but connections to slaves will\n\
not be accepted.");
break;
#endif
case 2:
sql_print_error("\
Warning: You should set server-id to a non-0 value if master_host is set.\n\
The server will not act as a slave.");
break;
}
}
if (opt_bin_log)
{
if (!opt_bin_logname)
{
char tmp[FN_REFLEN];
/* TODO: The following should be using fn_format(); We just need to
first change fn_format() to cut the file name if it's too long.
*/
strmake(tmp,glob_hostname,FN_REFLEN-5);
strmov(strcend(tmp,'.'),"-bin");
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
}
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
LOG_BIN);
using_update_log=1;
}
if (opt_slow_log)
open_log(&mysql_slow_log, glob_hostname, opt_slow_logname, "-slow.log",
LOG_NORMAL);
......@@ -2000,6 +1981,46 @@ The server will not act as a slave.");
if (!opt_noacl)
udf_init();
#endif
/* init_slave() must be called after the thread keys are created */
init_slave();
if (opt_bin_log && !server_id)
{
server_id= !master_host ? 1 : 2;
switch (server_id) {
#ifdef EXTRA_DEBUG
case 1:
sql_print_error("\
Warning: You have enabled the binary log, but you haven't set server-id:\n\
Updates will be logged to the binary log, but connections to slaves will\n\
not be accepted.");
break;
#endif
case 2:
sql_print_error("\
Warning: You should set server-id to a non-0 value if master_host is set.\n\
The server will not act as a slave.");
break;
}
}
if (opt_bin_log)
{
if (!opt_bin_logname)
{
char tmp[FN_REFLEN];
/* TODO: The following should be using fn_format(); We just need to
first change fn_format() to cut the file name if it's too long.
*/
strmake(tmp,glob_hostname,FN_REFLEN-5);
strmov(strcend(tmp,'.'),"-bin");
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
}
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
LOG_BIN);
using_update_log=1;
}
if (opt_bootstrap)
{
......
......@@ -108,7 +108,11 @@ net_printf(NET *net, uint errcode, ...)
thd->query_error = 1; // if we are here, something is wrong :-)
query_cache_abort(net); // Safety
va_start(args,errcode);
format=ER(errcode);
// Sasha: this is needed to make net_printf() work with 0 argument for
// errorcode and use the argument after that as the format string. This
// is usefull for rare errors that are not worth the hassle to put in
// errmsg.sys, but at the same time, the message is not fixed text
format=errcode ? ER(errcode) : va_arg(args,char*);
offset= net->return_errno ? 2 : 0;
text_pos=(char*) net->buff+head_length+offset+1;
(void) vsprintf(my_const_cast(char*) (text_pos),format,args);
......
......@@ -166,6 +166,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
const char** errmsg)
{
*errmsg=0;
if (rli->log_pos_current)
return 0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
......@@ -345,7 +346,14 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
}
}
DBUG_ASSERT(thd != 0);
/* is is criticate to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it
*/
THD_CHECK_SENTRY(thd);
if (*slave_running)
{
KICK_SLAVE(thd);
}
while (*slave_running)
{
/* there is a small chance that slave thread might miss the first
......@@ -367,8 +375,10 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
DBUG_ASSERT_LOCK(cond_lock);
pthread_cond_timedwait(term_cond, cond_lock, &abstime);
if (*slave_running)
{
KICK_SLAVE(thd);
}
}
if (term_lock)
pthread_mutex_unlock(term_lock);
return 0;
......@@ -958,6 +968,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->cur_log_fd = -1;
rli->slave_skip_counter=0;
rli->log_pos_current=0;
rli->abort_pos_wait=0;
rli->skip_log_purge=0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
{
......@@ -1288,9 +1300,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
bool pos_reached = 0;
int event_count = 0;
pthread_mutex_lock(&data_lock);
while (!thd->killed)
abort_pos_wait=0; // abort only if master info changes during wait
while (!thd->killed || !abort_pos_wait)
{
int cmp_result;
if (abort_pos_wait)
{
abort_pos_wait=0;
pthread_mutex_unlock(&data_lock);
return -1;
}
DBUG_ASSERT(*master_log_name || master_log_pos == 0);
if (*master_log_name)
{
......@@ -1342,10 +1361,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() ||
my_pthread_setspecific_ptr(THR_THD, thd) ||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
if (init_thr_lock() || thd->store_globals())
{
end_thread(thd,0);
DBUG_RETURN(-1);
......@@ -1359,7 +1375,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0; // Probably not needed
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
......@@ -1373,7 +1388,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
}
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
......@@ -1603,6 +1617,7 @@ slave_begin:
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_io");
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_IO))
......@@ -1800,11 +1815,12 @@ err:
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end(); // clean-up before broadcast
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
pthread_mutex_unlock(&mi->run_lock);
my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
goto slave_begin;
......@@ -1840,6 +1856,7 @@ slave_begin:
my_thread_init();
thd = new THD; // note that contructor of THD uses DBUG_ !
DBUG_ENTER("handle_slave_sql");
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
if (init_slave_thread(thd, SLAVE_THD_SQL))
......@@ -1853,6 +1870,7 @@ slave_begin:
sql_print_error("Failed during slave thread initialization");
goto err;
}
THD_CHECK_SENTRY(thd);
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
threads.append(thd);
......@@ -1883,6 +1901,7 @@ log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
{
thd->proc_info = "Processing master log event";
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
// do not scare the user if SQL thread was simply killed or stopped
......@@ -1918,14 +1937,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
rli->sql_thd = 0;
pthread_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end(); // clean-up before broadcasting termination
pthread_cond_broadcast(&rli->stop_cond);
// tell the world we are done
pthread_mutex_unlock(&rli->run_lock);
my_thread_end();
#ifndef DBUG_OFF // TODO: reconsider the code below
if (abort_slave_event_count && !rli->events_till_abort)
goto slave_begin;
......@@ -2423,12 +2444,34 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
my_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
// TODO: make skip_log_purge a start-up option. At this point this
// is not critical priority
if (!rli->skip_log_purge)
{
// purge_first_log will properly set up relay log coordinates in rli
if (rli->relay_log.purge_first_log(rli))
{
errmsg = "Error purging processed log";
goto err;
}
}
else
{
// TODO: verify that no lock is ok here. At this point, if we
// get this wrong, this is actually no big deal - the only time
// this code will ever be executed is if we are recovering from
// a bug when a full reload of the slave is not feasible or
// desirable.
if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
{
errmsg = "error switching to the next log";
goto err;
}
rli->relay_log_pos = 4;
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name));
flush_relay_log_info(rli);
}
// next log is hot
if (rli->relay_log.is_active(rli->linfo.log_file_name))
......
......@@ -151,10 +151,13 @@ typedef struct st_relay_log_info
char last_slave_error[MAX_SLAVE_ERRMSG];
THD* sql_thd;
bool log_pos_current;
bool abort_pos_wait;
bool skip_log_purge;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
cur_log_init_count(0),
log_pos_current(0)
log_pos_current(0),abort_pos_wait(0),
skip_log_purge(0)
{
relay_log_name[0] = master_log_name[0] = 0;
bzero(&info_file,sizeof(info_file));
......
......@@ -104,6 +104,9 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
cond_count=0;
convert_set=0;
mysys_var=0;
#ifndef DBUG_OFF
dbug_sentry=THD_SENTRY_MAGIC;
#endif
net.vio=0;
ull=0;
system_thread=cleanup_done=0;
......@@ -191,6 +194,7 @@ void THD::cleanup(void)
THD::~THD()
{
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
/* Close connection */
if (net.vio)
......@@ -223,12 +227,16 @@ THD::~THD()
mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
pthread_mutex_destroy(&active_vio_lock);
#endif
#ifndef DBUG_OFF
dbug_sentry = THD_SENTRY_GONE;
#endif
DBUG_VOID_RETURN;
}
void THD::awake(bool prepare_to_die)
{
THD_CHECK_SENTRY(this);
if (prepare_to_die)
killed = 1;
thr_alarm_kill(real_id);
......
......@@ -251,6 +251,11 @@ public:
class delayed_insert;
#define THD_SENTRY_MAGIC 0xfeedd1ff
#define THD_SENTRY_GONE 0xdeadbeef
#define THD_CHECK_SENTRY(thd) DBUG_ASSERT(thd->dbug_sentry == THD_SENTRY_MAGIC)
/* For each client connection we create a separate thread with THD serving as
a thread/connection descriptor */
......@@ -312,6 +317,9 @@ public:
// TODO: document the variables below
MYSQL_LOCK *lock,*locked_tables;
ULL *ull;
#ifndef DBUG_OFF
uint dbug_sentry; // watch out for memory corruption
#endif
struct st_my_thread_var *mysys_var;
enum enum_server_command command;
uint32 server_id;
......
......@@ -151,6 +151,7 @@ LEX *lex_start(THD *thd, uchar *buf,uint length)
lex->yacc_yyss=lex->yacc_yyvs=0;
lex->ignore_space=test(thd->sql_mode & MODE_IGNORE_SPACE);
lex->slave_thd_opt=0;
bzero(&lex->mi,sizeof(lex->mi));
return lex;
}
......
......@@ -98,6 +98,8 @@ typedef struct st_lex_master_info
uint port, connect_retry;
ulonglong pos;
ulong server_id;
char* relay_log_name;
ulong relay_log_pos;
} LEX_MASTER_INFO;
......
......@@ -690,6 +690,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
{
int error=0,restart_thread_mask;
const char* errmsg=0;
bool need_relay_log_purge=1;
// kill slave thread
lock_slave_threads(mi);
......@@ -713,7 +714,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
return 1;
}
pthread_mutex_lock(&mi->data_lock);
/* data lock not needed since we have already stopped the running threads,
and we have the hold on the run locks which will keep all threads that
could possibly modify the data structures from running
*/
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
// if we change host or port, we must reset the postion
......@@ -742,23 +746,56 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
if (lex_mi->relay_log_name)
{
need_relay_log_purge = 0;
mi->rli.skip_log_purge=1;
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name));
}
if (lex_mi->relay_log_pos)
{
need_relay_log_purge=0;
mi->rli.relay_log_pos=lex_mi->relay_log_pos;
}
flush_master_info(mi);
pthread_mutex_unlock(&mi->data_lock);
if (need_relay_log_purge)
{
mi->rli.skip_log_purge=0;
thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
&errmsg))
{
send_error(&thd->net, 0, "Failed purging old relay logs");
net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
return 1;
}
}
else
{
const char* msg;
if (init_relay_log_pos(&mi->rli,0/*log already inited*/,
0 /*pos already inited*/,
0 /*no data lock*/,
&msg))
{
//Sasha: note that I had to change net_printf() to make this work
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
return 1;
}
pthread_mutex_lock(&mi->rli.data_lock);
}
mi->rli.master_log_pos = mi->master_log_pos;
strnmov(mi->rli.master_log_name,mi->master_log_name,
sizeof(mi->rli.master_log_name));
if (!mi->rli.master_log_name[0]) // uninitialized case
mi->rli.master_log_pos=0;
pthread_cond_broadcast(&mi->rli.data_cond);
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait = 1;
pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
thd->proc_info = "starting slave";
......
......@@ -241,6 +241,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token MASTER_PORT_SYM
%token MASTER_CONNECT_RETRY_SYM
%token MASTER_SERVER_ID_SYM
%token RELAY_LOG_FILE_SYM
%token RELAY_LOG_POS_SYM
%token MATCH
%token MAX_ROWS
%token MAX_QUERIES_PER_HOUR
......@@ -701,6 +703,16 @@ master_def:
{
Lex->mi.connect_retry = $3;
}
|
RELAY_LOG_FILE_SYM EQ TEXT_STRING
{
Lex->mi.relay_log_name = $3.str;
}
|
RELAY_LOG_POS_SYM EQ ULONG_NUM
{
Lex->mi.relay_log_pos = $3;
}
/* create a table */
......@@ -3013,6 +3025,7 @@ keyword:
| ISSUER_SYM {}
| INNOBASE_SYM {}
| INSERT_METHOD {}
| IO_THREAD {}
| LAST_SYM {}
| LEVEL_SYM {}
| LOCAL_SYM {}
......@@ -3055,6 +3068,8 @@ keyword:
| RAID_CHUNKSIZE {}
| RAID_STRIPED_SYM {}
| RAID_TYPE {}
| RELAY_LOG_FILE_SYM {}
| RELAY_LOG_POS_SYM {}
| RELOAD {}
| REPAIR {}
| REPEATABLE_SYM {}
......@@ -3074,6 +3089,7 @@ keyword:
| SQL_CACHE_SYM {}
| SQL_NO_CACHE_SYM {}
| SQL_QUERY_CACHE_TYPE_SYM {}
| SQL_THREAD {}
| START_SYM {}
| STATUS_SYM {}
| STOP_SYM {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment