slave.cc 80.3 KB
Newer Older
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
   
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#include "mysql_priv.h"
#include <mysql.h>
20
#include <myisam.h>
bk@work.mysql.com's avatar
bk@work.mysql.com committed
21
#include "mini_client.h"
22
#include "slave.h"
23
#include "sql_repl.h"
24
#include "repl_failsafe.h"
bk@work.mysql.com's avatar
bk@work.mysql.com committed
25
#include <thr_alarm.h>
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
26
#include <my_dir.h>
27
#include <assert.h>
bk@work.mysql.com's avatar
bk@work.mysql.com committed
28

29 30 31
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;

32 33
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);

34
volatile bool slave_sql_running = 0, slave_io_running = 0;
35
char* slave_load_tmpdir = 0;
36 37 38
MASTER_INFO main_mi;
MASTER_INFO* active_mi;
volatile int active_mi_in_use = 0;
39
HASH replicate_do_table, replicate_ignore_table;
40
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
41
bool do_table_inited = 0, ignore_table_inited = 0;
42
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
43
bool table_rules_on = 0;
44
ulonglong relay_log_space_limit = 0;
45 46 47 48 49 50 51

/*
  When slave thread exits, we need to remember the temporary tables so we
  can re-use them on slave start.

  TODO: move the vars below under MASTER_INFO
*/
52

53
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
54
int events_till_abort = -1;
55
static int events_till_disconnect = -1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
56

57
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
58

59
void skip_load_data_infile(NET* net);
60
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
61
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
62
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
63 64
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
65
static int count_relay_log_space(RELAY_LOG_INFO* rli);
66
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
67
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
68 69
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
70
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
71
			     bool reconnect, bool suppress_warnings);
72 73
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg);
74
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
75 76
static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name);
77
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
78
char* rewrite_db(char* db);
79

80 81 82 83 84 85

/*
  Get a bit mask for which threads are running so that we later can
  restart these threads
*/

86 87 88 89 90 91 92 93
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
{
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
  register int tmp_mask=0;
  if (set_io)
    tmp_mask |= SLAVE_IO;
  if (set_sql)
    tmp_mask |= SLAVE_SQL;
94 95
  if (inverse)
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
96 97 98
  *mask = tmp_mask;
}

99

100 101 102 103 104 105 106 107 108 109 110 111 112 113
void lock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_lock(&mi->run_lock);
  pthread_mutex_lock(&mi->rli.run_lock);
}

void unlock_slave_threads(MASTER_INFO* mi)
{
  //TODO: see if we can do this without dual mutex
  pthread_mutex_unlock(&mi->rli.run_lock);
  pthread_mutex_unlock(&mi->run_lock);
}

114

115 116
int init_slave()
{
117
  DBUG_ENTER("init_slave");
118 119 120 121 122

  /*
    TODO: re-write this to interate through the list of files
    for multi-master
  */
123 124 125
  active_mi = &main_mi;

  /*
126 127 128
    If master_host is not specified, try to read it from the master_info file.
    If master_host is specified, create the master_info file if it doesn't
    exists.
129
  */
130 131
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
		       !master_host))
132
  {
133 134
    sql_print_error("Warning: failed to initialized master info");
    DBUG_RETURN(0);
135
  }
136 137 138 139 140 141 142 143 144

  /*
    make sure slave thread gets started if server_id is set,
    valid master.info is present, and master_host has not been specified
  */
  if (server_id && !master_host && active_mi->host[0])
    master_host= active_mi->host;

  if (master_host && !opt_skip_slave_start)
145
  {
146 147 148 149 150 151 152
    if (start_slave_threads(1 /* need mutex */,
			    0 /* no wait for start*/,
			    active_mi,
			    master_info_file,
			    relay_log_info_file,
			    SLAVE_IO | SLAVE_SQL))
      sql_print_error("Warning: Can't create threads to handle slave");
153
  }
154
  DBUG_RETURN(0);
155 156
}

157

158 159
static void free_table_ent(TABLE_RULE_ENT* e)
{
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
160
  my_free((gptr) e, MYF(0));
161 162 163 164 165 166 167 168 169
}

static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
			   my_bool not_used __attribute__((unused)))
{
  *len = e->key_len;
  return (byte*)e->db;
}

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198

/*
  Open the given relay log

  SYNOPSIS
    init_relay_log_pos()
    rli			Relay information (will be initialized)
    log			Name of relay log file to read from. NULL = First log
    pos			Position in relay log file 
    need_data_lock	Set to 1 if this functions should do mutex locks
    errmsg		Store pointer to error message here

  DESCRIPTION
  - Close old open relay log files.
  - If we are using the same relay log as the running IO-thread, then set
    rli->cur_log to point to the same IO_CACHE entry.
  - If not, open the 'log' binary file.

  TODO
    - check proper initialization of master_log_name/master_log_pos
    - We may always want to delete all logs before 'log'.
      Currently if we are not calling this with 'log' as NULL or the first
      log we will never delete relay logs.
      If we want this we should not set skip_log_purge to 1.

  RETURN VALUES
    0	ok
    1	error.  errmsg is set to point to the error message
*/
199

200 201 202 203
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
		       ulonglong pos, bool need_data_lock,
		       const char** errmsg)
{
204 205
  DBUG_ENTER("init_relay_log_pos");

206
  *errmsg=0;
207
  if (rli->log_pos_current)			// TODO: When can this happen ?
208
    DBUG_RETURN(0);
209 210 211 212 213
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
  pthread_mutex_lock(log_lock);
  if (need_data_lock)
    pthread_mutex_lock(&rli->data_lock);
  
214
  /* Close log file and free buffers if it's already open */
215 216 217 218 219 220 221
  if (rli->cur_log_fd >= 0)
  {
    end_io_cache(&rli->cache_buf);
    my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
  
222
  rli->relay_log_pos = pos;
223

224 225 226 227
  /*
    Test to see if the previous run was with the skip of purging
    If yes, we do not purge when we restart
  */
228
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
229 230 231 232
  {
    *errmsg="Could not find first log during relay log initialization";
    goto err;
  }
233 234

  if (log)					// If not first log
235
  {
236 237
    if (strcmp(log, rli->linfo.log_file_name))
      rli->skip_log_purge=1;			// Different name; Don't purge
238
    if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
239 240 241 242
    {
      *errmsg="Could not find target log during relay log initialization";
      goto err;
    }
243
  }
244 245
  strmake(rli->relay_log_name,rli->linfo.log_file_name,
	  sizeof(rli->relay_log_name)-1);
246 247
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
  {
248 249 250 251 252
    /*
      The IO thread is using this log file.
      In this case, we will use the same IO_CACHE pointer to
      read data as the IO thread is using to write data.
    */
253
    if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
254
	check_binlog_magic(rli->cur_log,errmsg))
255
      goto err;
256
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
257 258 259
  }
  else
  {
260 261 262
    /*
      Open the relay log and set rli->cur_log to point at this one
    */
263 264 265 266 267
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
				     rli->linfo.log_file_name,errmsg)) < 0)
      goto err;
    rli->cur_log = &rli->cache_buf;
  }
268 269 270 271
  if (pos > BIN_LOG_HEADER_SIZE)
    my_b_seek(rli->cur_log,(off_t)pos);
  rli->log_pos_current=1;

272
err:
273 274 275 276 277
  pthread_cond_broadcast(&rli->data_cond);
  if (need_data_lock)
    pthread_mutex_unlock(&rli->data_lock);
  pthread_mutex_unlock(log_lock);
  DBUG_RETURN ((*errmsg) ? 1 : 0);
278 279
}

280

281
/* called from get_options() in mysqld.cc on start-up */
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
282 283

void init_slave_skip_errors(const char* arg)
284
{
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
285
  const char *p;
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
  if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
  {
    fprintf(stderr, "Badly out of memory, please check your system status\n");
    exit(1);
  }
  use_slave_mask = 1;
  for (;isspace(*arg);++arg)
    /* empty */;
  if (!my_casecmp(arg,"all",3))
  {
    bitmap_set_all(&slave_error_mask);
    return;
  }
  for (p= arg ; *p; )
  {
    long err_code;
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
      break;
    if (err_code < MAX_SLAVE_ERROR)
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
    while (!isdigit(*p) && *p)
      p++;
  }
}

311

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
312
/*
313
  We assume we have a run lock on rli and that both slave thread
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
314 315 316
  are not running
*/

317 318
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
		     const char** errmsg)
319
{
320
  int error=0;
321
  DBUG_ENTER("purge_relay_logs");
322
  if (!rli->inited)
323
    DBUG_RETURN(0); /* successfully do nothing */
324

325 326
  DBUG_ASSERT(rli->slave_running == 0);
  DBUG_ASSERT(rli->mi->slave_running == 0);
327

328 329 330 331
  rli->slave_skip_counter=0;
  pthread_mutex_lock(&rli->data_lock);
  rli->pending=0;
  rli->master_log_name[0]=0;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
332
  rli->master_log_pos=0;			// 0 means uninitialized
333
  if (rli->relay_log.reset_logs(thd))
334 335 336 337 338
  {
    *errmsg = "Failed during log reset";
    error=1;
    goto err;
  }
339 340
  /* Save name of used relay log file */
  strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
341
	  sizeof(rli->relay_log_name)-1);
342 343 344
  // Just first log with magic number and nothing else
  rli->log_space_total= BIN_LOG_HEADER_SIZE;
  rli->relay_log_pos=   BIN_LOG_HEADER_SIZE;
345
  rli->relay_log.reset_bytes_written();
346 347
  rli->log_pos_current=0;
  if (!just_reset)
348 349
    error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
			      0 /* do not need data lock */, errmsg);
350

351 352 353 354
err:
#ifndef DBUG_OFF
  char buf[22];
#endif  
355
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
356
  pthread_mutex_unlock(&rli->data_lock);
357
  DBUG_RETURN(error);
358 359
}

360

361 362 363 364 365 366 367
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
{
  if (!mi->inited)
    return 0; /* successfully do nothing */
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
  pthread_mutex_t *sql_cond_lock,*io_cond_lock;
368
  DBUG_ENTER("terminate_slave_threads");
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384

  sql_cond_lock=sql_lock;
  io_cond_lock=io_lock;
  
  if (skip_lock)
  {
    sql_lock = io_lock = 0;
  }
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
  {
    mi->abort_slave=1;
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
				        io_cond_lock,
					&mi->stop_cond,
					&mi->slave_running)) &&
	!force_all)
385
      DBUG_RETURN(error);
386 387 388 389 390 391 392 393 394 395
  }
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
  {
    DBUG_ASSERT(mi->rli.sql_thd != 0) ;
    mi->rli.abort_slave=1;
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
				      sql_cond_lock,
				      &mi->rli.stop_cond,
				      &mi->rli.slave_running)) &&
	!force_all)
396
      DBUG_RETURN(error);
397
  }
398
  DBUG_RETURN(0);
399 400
}

401

402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
			   pthread_mutex_t *cond_lock,
			   pthread_cond_t* term_cond,
			   volatile bool* slave_running)
{
  if (term_lock)
  {
    pthread_mutex_lock(term_lock);
    if (!*slave_running)
    {
      pthread_mutex_unlock(term_lock);
      return ER_SLAVE_NOT_RUNNING;
    }
  }
  DBUG_ASSERT(thd != 0);
417 418 419
  /*
    Is is criticate to test if the slave is running. Otherwise, we might
    be referening freed memory trying to kick it
420
  */
421
  THD_CHECK_SENTRY(thd);
422 423 424 425
  if (*slave_running)
  {
    KICK_SLAVE(thd);
  }
426 427
  while (*slave_running)
  {
428 429 430
    /*
      There is a small chance that slave thread might miss the first
      alarm. To protect againts it, resend the signal until it reacts
431 432
    */
    struct timespec abstime;
433
    set_timespec(abstime,2);
434 435
    pthread_cond_timedwait(term_cond, cond_lock, &abstime);
    if (*slave_running)
436
    {
437
      KICK_SLAVE(thd);
438
    }
439 440 441 442 443 444
  }
  if (term_lock)
    pthread_mutex_unlock(term_lock);
  return 0;
}

445

446
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
447
		       pthread_mutex_t *cond_lock,
448 449 450
		       pthread_cond_t *start_cond,
		       volatile bool *slave_running,
		       volatile ulong *slave_run_id,
451 452 453
		       MASTER_INFO* mi)
{
  pthread_t th;
454
  ulong start_id;
455
  DBUG_ASSERT(mi->inited);
456 457
  DBUG_ENTER("start_slave_thread");

458 459 460 461 462 463 464 465 466
  if (start_lock)
    pthread_mutex_lock(start_lock);
  if (!server_id)
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
    sql_print_error("Server id not set, will not start slave");
467
    DBUG_RETURN(ER_BAD_SLAVE);
468 469 470
  }
  
  if (*slave_running)
471 472 473 474 475
  {
    if (start_cond)
      pthread_cond_broadcast(start_cond);
    if (start_lock)
      pthread_mutex_unlock(start_lock);
476
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
477
  }
478 479
  start_id= *slave_run_id;
  DBUG_PRINT("info",("Creating new slave thread"));
480 481 482 483
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
  {
    if (start_lock)
      pthread_mutex_unlock(start_lock);
484
    DBUG_RETURN(ER_SLAVE_THREAD);
485 486 487 488
  }
  if (start_cond && cond_lock)
  {
    THD* thd = current_thd;
489
    while (start_id == *slave_run_id)
490
    {
491
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
492
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
493
					    "Waiting for slave thread to start");
494 495 496 497 498
      pthread_cond_wait(start_cond,cond_lock);
      thd->exit_cond(old_msg);
      if (thd->killed)
      {
	pthread_mutex_unlock(cond_lock);
499
	DBUG_RETURN(ER_SERVER_SHUTDOWN);
500 501 502 503 504
      }
    }
  }
  if (start_lock)
    pthread_mutex_unlock(start_lock);
505
  DBUG_RETURN(0);
506
}
507 508 509 510 511 512


/*
  SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
  sense to do that for starting a slave - we always care if it actually
  started the threads that were not previously running
513
*/
514

515 516 517 518 519 520 521
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
			MASTER_INFO* mi, const char* master_info_fname,
			const char* slave_info_fname, int thread_mask)
{
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
  pthread_cond_t* cond_io=0,*cond_sql=0;
  int error=0;
522
  DBUG_ENTER("start_slave_threads");
523 524 525 526 527 528 529 530 531 532 533 534 535
  
  if (need_slave_mutex)
  {
    lock_io = &mi->run_lock;
    lock_sql = &mi->rli.run_lock;
  }
  if (wait_for_start)
  {
    cond_io = &mi->start_cond;
    cond_sql = &mi->rli.start_cond;
    lock_cond_io = &mi->run_lock;
    lock_cond_sql = &mi->rli.run_lock;
  }
536 537 538

  if (thread_mask & SLAVE_IO)
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
539 540
			     cond_io,
			     &mi->slave_running, &mi->slave_run_id,
541 542
			     mi);
  if (!error && (thread_mask & SLAVE_SQL))
543
  {
544 545
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
			     cond_sql,
546 547
			     &mi->rli.slave_running, &mi->rli.slave_run_id,
			     mi);
548 549 550
    if (error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
  }
551
  DBUG_RETURN(error);
552
}
553

554

555 556 557 558 559 560 561
void init_table_rule_hash(HASH* h, bool* h_inited)
{
  hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
	    (hash_get_key) get_table_key,
	    (void (*)(void*)) free_table_ent, 0);
  *h_inited = 1;
}
bk@work.mysql.com's avatar
bk@work.mysql.com committed
562

563 564
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
565
  my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
566 567 568 569 570 571 572 573 574
		     TABLE_RULE_ARR_SIZE);
  *a_inited = 1;
}

static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
  uint i;
  const char* key_end = key + len;
  
575
  for (i = 0; i < a->elements; i++)
576 577 578
    {
      TABLE_RULE_ENT* e ;
      get_dynamic(a, (gptr)&e, i);
579
      if (!wild_case_compare(key, key_end, (const char*)e->db,
580 581 582 583 584 585 586
			    (const char*)(e->db + e->key_len),'\\'))
	return e;
    }
  
  return 0;
}

587 588
int tables_ok(THD* thd, TABLE_LIST* tables)
{
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
589 590 591 592 593 594 595 596 597 598
  for (; tables; tables = tables->next)
  {
    if (!tables->updating) 
      continue;
    char hash_key[2*NAME_LEN+2];
    char* p;
    p = strmov(hash_key, tables->db ? tables->db : thd->db);
    *p++ = '.';
    uint len = strmov(p, tables->real_name) - hash_key ;
    if (do_table_inited) // if there are any do's
599
    {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
600 601 602
      if (hash_search(&replicate_do_table, (byte*) hash_key, len))
	return 1;
    }
603
    if (ignore_table_inited) // if there are any ignores
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
604 605 606
    {
      if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
	return 0; 
607
    }
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
608 609 610 611 612 613 614
    if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
					  hash_key, len))
      return 1;
    if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
					      hash_key, len))
      return 0;
  }
615

616 617 618 619
  /*
    If no explicit rule found and there was a do list, do not replicate.
    If there was no do list, go ahead
  */
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
620
  return !do_table_inited && !wild_do_table_inited;
621 622 623 624 625
}


int add_table_rule(HASH* h, const char* table_spec)
{
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
626
  const char* dot = strchr(table_spec, '.');
627
  if (!dot) return 1;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
628
  // len is always > 0 because we know the there exists a '.'
629 630 631
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
632
  if (!e) return 1;
633 634 635 636 637 638 639 640
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  (void)hash_insert(h, (byte*)e);
  return 0;
}

641 642
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
643
  const char* dot = strchr(table_spec, '.');
644
  if (!dot) return 1;
645 646 647
  uint len = (uint)strlen(table_spec);
  TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
						 + len, MYF(MY_WME));
648
  if (!e) return 1;
649 650 651 652 653 654 655 656
  e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  e->tbl_name = e->db + (dot - table_spec) + 1;
  e->key_len = len;
  memcpy(e->db, table_spec, len);
  insert_dynamic(a, (gptr)&e);
  return 0;
}

657 658 659
static void free_string_array(DYNAMIC_ARRAY *a)
{
  uint i;
660
  for (i = 0; i < a->elements; i++)
661 662
    {
      char* p;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
663
      get_dynamic(a, (gptr) &p, i);
664 665 666 667 668
      my_free(p, MYF(MY_WME));
    }
  delete_dynamic(a);
}

669 670
#ifdef NOT_USED_YET

671 672 673 674 675
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
  end_master_info(mi);
  return 0;
}
676
#endif
677

678 679
void end_slave()
{
680 681 682 683 684
  /*
    TODO: replace the line below with
    list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
    once multi-master code is ready.
  */
685 686 687
  terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
  end_master_info(active_mi);
  if (do_table_inited)
688
    hash_free(&replicate_do_table);
689
  if (ignore_table_inited)
690
    hash_free(&replicate_ignore_table);
691
  if (wild_do_table_inited)
692
    free_string_array(&replicate_wild_do_table);
693
  if (wild_ignore_table_inited)
694 695
    free_string_array(&replicate_wild_ignore_table);
}
696

697

698
static bool io_slave_killed(THD* thd, MASTER_INFO* mi)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
699
{
700 701 702
  DBUG_ASSERT(mi->io_thd == thd);
  DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
  return mi->abort_slave || abort_loop || thd->killed;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
703 704
}

705

706
static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
707 708 709 710 711 712
{
  DBUG_ASSERT(rli->sql_thd == thd);
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
  return rli->abort_slave || abort_loop || thd->killed;
}

713

714
void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
715 716 717
{
  va_list args;
  va_start(args,msg);
718 719 720 721 722
  my_vsnprintf(rli->last_slave_error,
	       sizeof(rli->last_slave_error), msg, args);
  sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
		  err_code);
  rli->last_slave_errno = err_code;
723 724
}

725

726
void skip_load_data_infile(NET* net)
727 728 729
{
  (void)my_net_write(net, "\xfb/dev/null", 10);
  (void)net_flush(net);
730 731
  (void)my_net_read(net);			// discard response
  send_ok(net);					// the master expects it
732 733
}

734

735
char* rewrite_db(char* db)
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
736
{
737 738
  if (replicate_rewrite_db.is_empty() || !db)
    return db;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
739 740 741
  I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  i_string_pair* tmp;

742 743 744 745 746
  while ((tmp=it++))
  {
    if (!strcmp(tmp->key, db))
      return tmp->val;
  }
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
747 748
  return db;
}
749

750

bk@work.mysql.com's avatar
bk@work.mysql.com committed
751 752 753
int db_ok(const char* db, I_List<i_string> &do_list,
	  I_List<i_string> &ignore_list )
{
754
  if (do_list.is_empty() && ignore_list.is_empty())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
755 756
    return 1; // ok to replicate if the user puts no constraints

757 758 759 760 761
  /*
    If the user has specified restrictions on which databases to replicate
    and db was not selected, do not replicate.
  */
  if (!db)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
762
    return 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
763

764 765 766 767
  if (!do_list.is_empty()) // if the do's are not empty
  {
    I_List_iterator<i_string> it(do_list);
    i_string* tmp;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
768

769 770 771 772
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 1; // match
bk@work.mysql.com's avatar
bk@work.mysql.com committed
773
    }
774 775
    return 0;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
776
  else // there are some elements in the don't, otherwise we cannot get here
777 778 779
  {
    I_List_iterator<i_string> it(ignore_list);
    i_string* tmp;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
780

781 782 783 784
    while ((tmp=it++))
    {
      if (!strcmp(tmp->ptr, db))
	return 0; // match
bk@work.mysql.com's avatar
bk@work.mysql.com committed
785
    }
786 787
    return 1;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
788 789
}

790

791 792
static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
				 const char *default_val)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
793
{
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
794 795 796 797 798 799 800
  uint length;
  if ((length=my_b_gets(f,var, max_size)))
  {
    char* last_p = var + length -1;
    if (*last_p == '\n')
      *last_p = 0; // if we stopped on newline, kill it
    else
bk@work.mysql.com's avatar
bk@work.mysql.com committed
801
    {
802 803 804 805
      /*
	If we truncated a line or stopped on last char, remove all chars
	up to and including newline.
      */
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
806
      int c;
807
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
808
    }
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
809 810 811 812
    return 0;
  }
  else if (default_val)
  {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
813
    strmake(var,  default_val, max_size-1);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
814 815
    return 0;
  }
816
  return 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
817 818
}

819

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
820
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
821 822 823
{
  char buf[32];
  
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
824 825 826 827 828
  if (my_b_gets(f, buf, sizeof(buf))) 
  {
    *var = atoi(buf);
    return 0;
  }
829
  else if (default_val)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
830 831 832 833
  {
    *var = default_val;
    return 0;
  }
834
  return 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
835 836
}

837

838 839
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
840
  const char* errmsg= 0;
841
  
842
  switch (*mysql->server_version) {
843 844 845 846
  case '3':
    mi->old_format = 1;
    break;
  case '4':
847
  case '5':
848 849 850 851
    mi->old_format = 0;
    break;
  default:
    errmsg = "Master reported unrecognized MySQL version";
852
    break;
853
  }
854

855 856 857 858 859 860 861 862
  if (errmsg)
  {
    sql_print_error(errmsg);
    return 1;
  }
  return 0;
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
863 864 865 866

static int create_table_from_dump(THD* thd, NET* net, const char* db,
				  const char* table_name)
{
867
  ulong packet_len = my_net_read(net); // read create table statement
868 869
  Vio* save_vio;
  HA_CHECK_OPT check_opt;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
870
  TABLE_LIST tables;
871 872
  int error= 1;
  handler *file;
873
  ulong save_options;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
874
  
875 876 877 878 879 880 881 882 883 884 885
  if (packet_len == packet_error)
  {
    send_error(&thd->net, ER_MASTER_NET_READ);
    return 1;
  }
  if (net->read_pos[0] == 255) // error from master
  {
    net->read_pos[packet_len] = 0;
    net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
    return 1;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
886 887
  thd->command = COM_TABLE_DUMP;
  thd->query = sql_alloc(packet_len + 1);
888 889 890 891 892 893
  if (!thd->query)
  {
    sql_print_error("create_table_from_dump: out of memory");
    net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
    return 1;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
894 895 896 897 898
  memcpy(thd->query, net->read_pos, packet_len);
  thd->query[packet_len] = 0;
  thd->current_tablenr = 0;
  thd->query_error = 0;
  thd->net.no_send_ok = 1;
899 900 901
  
  /* we do not want to log create table statement */
  save_options = thd->options;
902
  thd->options &= ~(ulong) (OPTION_BIN_LOG);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
903
  thd->proc_info = "Creating table from master dump";
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
904
  // save old db in case we are creating in a different database
bk@work.mysql.com's avatar
bk@work.mysql.com committed
905
  char* save_db = thd->db;
906
  thd->db = (char*)db;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
907
  mysql_parse(thd, thd->query, packet_len); // run create table
908
  thd->db = save_db;		// leave things the way the were before
909
  thd->options = save_options;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
910
  
911 912
  if (thd->query_error)
    goto err;			// mysql_parse took care of the error send
bk@work.mysql.com's avatar
bk@work.mysql.com committed
913 914 915

  bzero((char*) &tables,sizeof(tables));
  tables.db = (char*)db;
916
  tables.alias= tables.real_name= (char*)table_name;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
917 918
  tables.lock_type = TL_WRITE;
  thd->proc_info = "Opening master dump table";
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
919 920
  if (!open_ltable(thd, &tables, TL_WRITE))
  {
921
    send_error(&thd->net,0,0);			// Send error from open_ltable
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
922
    sql_print_error("create_table_from_dump: could not open created table");
923
    goto err;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
924
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
925
  
926
  file = tables.table->file;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
927
  thd->proc_info = "Reading master dump table data";
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
928 929 930 931
  if (file->net_read_dump(net))
  {
    net_printf(&thd->net, ER_MASTER_NET_READ);
    sql_print_error("create_table_from_dump::failed in\
bk@work.mysql.com's avatar
bk@work.mysql.com committed
932
 handler::net_read_dump()");
933
    goto err;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
934
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
935 936

  check_opt.init();
serg@serg.mysql.com's avatar
serg@serg.mysql.com committed
937
  check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
jcole@tetra.spaceapes.com's avatar
jcole@tetra.spaceapes.com committed
938
  thd->proc_info = "Rebuilding the index on master dump table";
939 940 941 942 943
  /*
    We do not want repair() to spam us with messages
    just send them to the error log, and report the failure in case of
    problems.
  */
944
  save_vio = thd->net.vio;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
945
  thd->net.vio = 0;
946
  error=file->repair(thd,&check_opt) != 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
947
  thd->net.vio = save_vio;
948 949 950 951
  if (error)
    net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);

err:
bk@work.mysql.com's avatar
bk@work.mysql.com committed
952 953 954 955 956
  close_thread_tables(thd);
  thd->net.no_send_ok = 0;
  return error; 
}

957 958
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
		       MASTER_INFO *mi, MYSQL *mysql)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
959
{
960 961 962 963 964 965
  int error= 1;
  const char *errmsg=0;
  bool called_connected= (mysql != NULL);
  DBUG_ENTER("fetch_master_table");
  DBUG_PRINT("enter", ("db_name: '%s'  table_name: '%s'",
		       db_name,table_name));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
966

monty@work.mysql.com's avatar
merge  
monty@work.mysql.com committed
967
  if (!called_connected)
968 969 970 971 972 973
  { 
    if (!(mysql = mc_mysql_init(NULL)))
    {
      send_error(&thd->net);			// EOM
      DBUG_RETURN(1);
    }
monty@work.mysql.com's avatar
merge  
monty@work.mysql.com committed
974
    if (connect_to_master(thd, mysql, mi))
975
    {
976 977 978
      net_printf(&thd->net, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql));
      mc_mysql_close(mysql);
      DBUG_RETURN(1);
979
    }
980 981
    if (thd->killed)
      goto err;
982
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
983

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
984
  if (request_table_dump(mysql, db_name, table_name))
985
  {
986 987
    error= ER_UNKNOWN_ERROR;
    errmsg= "Failed on table dump request";
988 989
    goto err;
  }
monty@work.mysql.com's avatar
merge  
monty@work.mysql.com committed
990
  if (create_table_from_dump(thd, &mysql->net, db_name,
991
			    table_name))
992
    goto err;    // create_table_from_dump will have sent the error already
bk@work.mysql.com's avatar
bk@work.mysql.com committed
993
  error = 0;
994

bk@work.mysql.com's avatar
bk@work.mysql.com committed
995
 err:
996
  thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
997 998 999 1000 1001
  if (!called_connected)
    mc_mysql_close(mysql);
  if (errmsg && thd->net.vio)
    send_error(&thd->net, error, errmsg);
  DBUG_RETURN(test(error));			// Return 1 on error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1002 1003
}

1004

1005 1006
void end_master_info(MASTER_INFO* mi)
{
1007 1008
  DBUG_ENTER("end_master_info");

1009
  if (!mi->inited)
1010
    DBUG_VOID_RETURN;
1011 1012
  end_relay_log_info(&mi->rli);
  if (mi->fd >= 0)
1013 1014 1015 1016 1017
  {
    end_io_cache(&mi->file);
    (void)my_close(mi->fd, MYF(MY_WME));
    mi->fd = -1;
  }
1018
  mi->inited = 0;
1019 1020

  DBUG_VOID_RETURN;
1021 1022
}

1023

1024 1025 1026 1027 1028 1029
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
  char fname[FN_REFLEN+128];
  int info_fd;
  const char* msg = 0;
  int error = 0;
1030
  DBUG_ENTER("init_relay_log_info");
1031

1032
  if (rli->inited)				// Set if this function called
1033 1034
    DBUG_RETURN(0);
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
1035 1036 1037 1038 1039 1040
  pthread_mutex_lock(&rli->data_lock);
  info_fd = rli->info_fd;
  rli->pending = 0;
  rli->cur_log_fd = -1;
  rli->slave_skip_counter=0;
  rli->log_pos_current=0;
1041 1042
  rli->abort_pos_wait=0;
  rli->skip_log_purge=0;
1043 1044
  rli->log_space_limit = relay_log_space_limit;
  rli->log_space_total = 0;
1045

1046 1047 1048 1049
  // TODO: make this work with multi-master
  if (!opt_relay_logname)
  {
    char tmp[FN_REFLEN];
1050 1051 1052
    /*
      TODO: The following should be using fn_format();  We just need to
      first change fn_format() to cut the file name if it's too long.
1053 1054 1055 1056 1057
    */
    strmake(tmp,glob_hostname,FN_REFLEN-5);
    strmov(strcend(tmp,'.'),"-relay-bin");
    opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
  }
1058 1059 1060 1061 1062 1063
  if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
	       "-relay-bin", opt_relaylog_index_name,
	       LOG_BIN, 1 /* read_append cache */,
	       1 /* no auto events */))
    DBUG_RETURN(1);

1064
  /* if file does not exist */
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1065
  if (access(fname,F_OK))
1066
  {
1067 1068 1069 1070
    /*
      If someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
1071 1072
    if (info_fd >= 0)
      my_close(info_fd, MYF(MY_WME));
1073 1074 1075
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME)))
1076
    {
1077 1078
      msg= current_thd->net.last_error;
      goto err;
1079
    }
1080 1081 1082

    /* Init relay log with first entry in the relay index file */
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
1083
			   &msg))
1084
      goto err;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1085
    rli->master_log_pos = 0;			// uninitialized
1086 1087 1088 1089
    rli->info_fd = info_fd;
  }
  else // file exists
  {
1090
    if (info_fd >= 0)
1091
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
1092 1093 1094
    else if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	     init_io_cache(&rli->info_file, info_fd,
			   IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
1095 1096 1097
    {
      if (info_fd >= 0)
	my_close(info_fd, MYF(0));
1098
      rli->info_fd= -1;
1099
      rli->relay_log.close(1);
1100
      pthread_mutex_unlock(&rli->data_lock);
1101
      DBUG_RETURN(1);
1102 1103 1104
    }
      
    rli->info_fd = info_fd;
1105
    int relay_log_pos, master_log_pos;
1106
    if (init_strvar_from_file(rli->relay_log_name,
1107 1108
			      sizeof(rli->relay_log_name), &rli->info_file,
			      "") ||
1109
       init_intvar_from_file(&relay_log_pos,
1110
			     &rli->info_file, BIN_LOG_HEADER_SIZE) ||
1111 1112
       init_strvar_from_file(rli->master_log_name,
			     sizeof(rli->master_log_name), &rli->info_file,
1113
			     "") ||
1114
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
1115 1116 1117 1118
    {
      msg="Error reading slave log configuration";
      goto err;
    }
1119 1120 1121
    rli->relay_log_pos=  relay_log_pos;
    rli->master_log_pos= master_log_pos;

1122 1123 1124
    if (init_relay_log_pos(rli,
			   rli->relay_log_name,
			   rli->relay_log_pos,
1125 1126
			   0 /* no data lock*/,
			   &msg))
1127
      goto err;
1128
  }
1129
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
1130
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
1131 1132 1133 1134
  /*
    Now change the cache from READ to WRITE - must do this
    before flush_relay_log_info
  */
1135
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
1136
  error= flush_relay_log_info(rli);
1137 1138 1139 1140 1141
  if (count_relay_log_space(rli))
  {
    msg="Error counting relay log space";
    goto err;
  }
1142
  rli->inited= 1;
1143
  pthread_mutex_unlock(&rli->data_lock);
1144
  DBUG_RETURN(error);
1145 1146 1147 1148

err:
  sql_print_error(msg);
  end_io_cache(&rli->info_file);
1149 1150
  if (info_fd >= 0)
    my_close(info_fd, MYF(0));
1151
  rli->info_fd= -1;
1152
  rli->relay_log.close(1);
1153
  pthread_mutex_unlock(&rli->data_lock);
1154
  DBUG_RETURN(1);
1155 1156
}

1157

1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
{
  MY_STAT s;
  DBUG_ENTER("add_relay_log");
  if (!my_stat(linfo->log_file_name,&s,MYF(0)))
  {
    sql_print_error("log %s listed in the index, but failed to stat",
		    linfo->log_file_name);
    DBUG_RETURN(1);
  }
  rli->log_space_total += s.st_size;
#ifndef DBUG_OFF
  char buf[22];
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
1172
#endif  
1173 1174 1175
  DBUG_RETURN(0);
}

1176

1177 1178
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
{
1179
  bool slave_killed=0;
1180 1181 1182
  MASTER_INFO* mi = rli->mi;
  const char* save_proc_info;
  THD* thd = mi->io_thd;
1183

1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
  DBUG_ENTER("wait_for_relay_log_space");
  pthread_mutex_lock(&rli->log_space_lock);
  save_proc_info = thd->proc_info;
  thd->proc_info = "Waiting for relay log space to free";
  while (rli->log_space_limit < rli->log_space_total &&
	 !(slave_killed=io_slave_killed(thd,mi)))
  {
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
  }
  thd->proc_info = save_proc_info;
  pthread_mutex_unlock(&rli->log_space_lock);
  DBUG_RETURN(slave_killed);
}

1198

1199 1200 1201 1202 1203
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
  LOG_INFO linfo;
  DBUG_ENTER("count_relay_log_space");
  rli->log_space_total = 0;
1204
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
1205 1206 1207 1208
  {
    sql_print_error("Could not find first log while counting relay log space");
    DBUG_RETURN(1);
  }
1209
  do
1210 1211 1212
  {
    if (add_relay_log(rli,&linfo))
      DBUG_RETURN(1);
1213
  } while (!rli->relay_log.find_next_log(&linfo, 1));
1214 1215
  DBUG_RETURN(0);
}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1216

1217

1218
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
1219 1220
		     const char* slave_info_fname,
		     bool abort_if_no_master_info_file)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1221
{
1222 1223 1224 1225
  int fd,error;
  char fname[FN_REFLEN+128];
  DBUG_ENTER("init_master_info");

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1226
  if (mi->inited)
1227
    DBUG_RETURN(0);
1228 1229
  mi->mysql=0;
  mi->file_id=1;
1230
  mi->ignore_stop_event=0;
1231
  fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1232

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1233 1234 1235 1236
  /*
    We need a mutex while we are changing master info parameters to
    keep other threads from reading bogus info
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1237

1238
  pthread_mutex_lock(&mi->data_lock);
1239
  fd = mi->fd;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1240
  
1241
  if (access(fname,F_OK))
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1242
  {
1243 1244 1245 1246 1247
    if (abort_if_no_master_info_file)
    {
      pthread_mutex_unlock(&mi->data_lock);
      DBUG_RETURN(0);
    }
1248 1249 1250 1251
    /*
      if someone removed the file from underneath our feet, just close
      the old descriptor and re-create the old file
    */
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1252 1253
    if (fd >= 0)
      my_close(fd, MYF(MY_WME));
1254 1255 1256 1257 1258
    if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
		      MYF(MY_WME)))
      goto err;

1259
    mi->master_log_name[0] = 0;
1260
    mi->master_log_pos = BIN_LOG_HEADER_SIZE;		// skip magic number
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1261 1262 1263 1264 1265 1266 1267
    mi->fd = fd;
      
    if (master_host)
      strmake(mi->host, master_host, sizeof(mi->host) - 1);
    if (master_user)
      strmake(mi->user, master_user, sizeof(mi->user) - 1);
    if (master_password)
1268
      strmake(mi->password, master_password, HASH_PASSWORD_LENGTH);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1269 1270 1271
    mi->port = master_port;
    mi->connect_retry = master_connect_retry;
  }
1272
  else // file exists
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1273
  {
1274
    if (fd >= 0)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1275
      reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
1276 1277 1278 1279
    else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
	     init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
			   0, MYF(MY_WME)))
      goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1280

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1281
    mi->fd = fd;
1282 1283
    int port, connect_retry, master_log_pos;

1284
    if (init_strvar_from_file(mi->master_log_name,
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1285
			      sizeof(mi->master_log_name), &mi->file,
1286
			      "") ||
1287
	init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1288 1289 1290 1291 1292 1293
	init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
			      master_host) ||
	init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
			      master_user) || 
	init_strvar_from_file(mi->password, HASH_PASSWORD_LENGTH+1, &mi->file,
			      master_password) ||
1294 1295
	init_intvar_from_file(&port, &mi->file, master_port) ||
	init_intvar_from_file(&connect_retry, &mi->file,
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1296
			      master_connect_retry))
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1297
    {
1298
      sql_print_error("Error reading master configuration");
1299
      goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1300
    }
1301 1302 1303 1304 1305 1306 1307
    /*
      This has to be handled here as init_intvar_from_file can't handle
      my_off_t types
    */
    mi->master_log_pos= (my_off_t) master_log_pos;
    mi->port= (uint) port;
    mi->connect_retry= (uint) connect_retry;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1308
  }
1309 1310 1311
  DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
			    mi->master_log_name,
			    (ulong) mi->master_log_pos));
1312 1313 1314 1315 1316

  if (init_relay_log_info(&mi->rli, slave_info_fname))
    goto err;
  mi->rli.mi = mi;

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1317
  mi->inited = 1;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1318
  // now change cache READ -> WRITE - must do this before flush_master_info
1319
  reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1320
  error=test(flush_master_info(mi));
1321
  pthread_mutex_unlock(&mi->data_lock);
1322
  DBUG_RETURN(error);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1323

1324
err:
1325 1326 1327 1328 1329 1330
  if (fd >= 0)
  {
    my_close(fd, MYF(0));
    end_io_cache(&mi->file);
  }
  mi->fd= -1;
1331
  pthread_mutex_unlock(&mi->data_lock);
1332
  DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1333 1334
}

1335

1336 1337 1338 1339 1340
int register_slave_on_master(MYSQL* mysql)
{
  String packet;
  char buf[4];

1341
  if (!report_host)
1342 1343 1344 1345 1346
    return 0;
  
  int4store(buf, server_id);
  packet.append(buf, 4);

1347
  net_store_data(&packet, report_host); 
1348
  if (report_user)
1349 1350 1351 1352
    net_store_data(&packet, report_user);
  else
    packet.append((char)0);
  
1353
  if (report_password)
1354
    net_store_data(&packet, report_user);
1355 1356 1357 1358 1359
  else
    packet.append((char)0);

  int2store(buf, (uint16)report_port);
  packet.append(buf, 2);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
1360 1361 1362 1363
  int4store(buf, rpl_recovery_rank);
  packet.append(buf, 4);
  int4store(buf, 0); /* tell the master will fill in master_id */
  packet.append(buf, 4);
1364

1365
  if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
1366 1367
		       packet.length(), 0))
  {
1368 1369
    sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
		    mc_mysql_errno(mysql),
1370 1371 1372 1373 1374 1375 1376
		    mc_mysql_error(mysql));
    return 1;
  }

  return 0;
}

1377
int show_master_info(THD* thd, MASTER_INFO* mi)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1378
{
1379
  // TODO: fix this for multi-master
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1380 1381 1382
  DBUG_ENTER("show_master_info");
  List<Item> field_list;
  field_list.push_back(new Item_empty_string("Master_Host",
1383
						     sizeof(mi->host)));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1384
  field_list.push_back(new Item_empty_string("Master_User",
1385
						     sizeof(mi->user)));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1386 1387
  field_list.push_back(new Item_empty_string("Master_Port", 6));
  field_list.push_back(new Item_empty_string("Connect_retry", 6));
1388
  field_list.push_back(new Item_empty_string("Master_Log_File",
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1389
						     FN_REFLEN));
1390 1391 1392 1393 1394 1395 1396 1397
  field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
						     FN_REFLEN));
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1398 1399
  field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
1400 1401 1402
  field_list.push_back(new Item_empty_string("Last_errno", 4));
  field_list.push_back(new Item_empty_string("Last_error", 20));
  field_list.push_back(new Item_empty_string("Skip_counter", 12));
1403
  field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
1404
  field_list.push_back(new Item_empty_string("Relay_log_space", 12));
1405
  if (send_fields(thd, field_list, 1))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1406 1407
    DBUG_RETURN(-1);

1408 1409 1410 1411
  if (mi->host[0])
  {
    String *packet= &thd->packet;
    packet->length(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1412
  
1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435
    pthread_mutex_lock(&mi->data_lock);
    pthread_mutex_lock(&mi->rli.data_lock);
    net_store_data(packet, mi->host);
    net_store_data(packet, mi->user);
    net_store_data(packet, (uint32) mi->port);
    net_store_data(packet, (uint32) mi->connect_retry);
    net_store_data(packet, mi->master_log_name);
    net_store_data(packet, (longlong) mi->master_log_pos);
    net_store_data(packet, mi->rli.relay_log_name +
		   dirname_length(mi->rli.relay_log_name));
    net_store_data(packet, (longlong) mi->rli.relay_log_pos);
    net_store_data(packet, mi->rli.master_log_name);
    net_store_data(packet, mi->slave_running ? "Yes":"No");
    net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
    net_store_data(packet, &replicate_do_db);
    net_store_data(packet, &replicate_ignore_db);
    net_store_data(packet, (uint32)mi->rli.last_slave_errno);
    net_store_data(packet, mi->rli.last_slave_error);
    net_store_data(packet, mi->rli.slave_skip_counter);
    net_store_data(packet, (longlong) mi->rli.master_log_pos);
    net_store_data(packet, (longlong) mi->rli.log_space_total);
    pthread_mutex_unlock(&mi->rli.data_lock);
    pthread_mutex_unlock(&mi->data_lock);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1436
  
1437 1438 1439
    if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
      DBUG_RETURN(-1);
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1440 1441 1442 1443
  send_eof(&thd->net);
  DBUG_RETURN(0);
}

1444 1445

bool flush_master_info(MASTER_INFO* mi)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1446
{
1447
  IO_CACHE* file = &mi->file;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
1448
  char lbuf[22];
1449 1450 1451
  DBUG_ENTER("flush_master_info");
  DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));

1452
  my_b_seek(file, 0L);
1453
  my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n",
1454 1455 1456 1457
	      mi->master_log_name, llstr(mi->master_log_pos, lbuf),
	      mi->host, mi->user,
	      mi->password, mi->port, mi->connect_retry
	      );
1458
  flush_io_cache(file);
1459
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1460 1461
}

1462

1463 1464
int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
				    ulonglong log_pos)
1465
{
1466 1467
  if (!inited)
    return -1;
1468
  int event_count = 0;
1469 1470
  ulong init_abort_pos_wait;
  DBUG_ENTER("wait_for_pos");
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1471 1472
  DBUG_PRINT("enter",("master_log_name: '%s'  pos: %ld",
		      master_log_name, (ulong) master_log_pos));
1473

1474
  pthread_mutex_lock(&data_lock);
1475 1476
  // abort only if master info changes during wait
  init_abort_pos_wait= abort_pos_wait;
1477

1478 1479 1480
  while (!thd->killed &&
	 init_abort_pos_wait == abort_pos_wait &&
	 mi->slave_running)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1481
  {
1482 1483
    bool pos_reached;
    int cmp_result= 0;
1484 1485
    DBUG_ASSERT(*master_log_name || master_log_pos == 0);
    if (*master_log_name)
1486
    {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1487
      /*
1488 1489 1490 1491 1492 1493 1494
	TODO:
	Replace strncmp() with a comparison function that
	can handle comparison of the following files:
	mysqlbin.999
	mysqlbin.1000
      */
      char *basename= master_log_name + dirname_length(master_log_name);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1495 1496 1497
      cmp_result =  strncmp(basename, log_name->ptr(),
			    log_name->length());
    }
1498 1499
    pos_reached = ((!cmp_result && master_log_pos >= log_pos) ||
		   cmp_result > 0);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1500 1501
    if (pos_reached || thd->killed)
      break;
1502
    
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
1503
    DBUG_PRINT("info",("Waiting for master update"));
1504
    const char* msg = thd->enter_cond(&data_cond, &data_lock,
1505
				      "Waiting for master update");
1506
    pthread_cond_wait(&data_cond, &data_lock);
1507 1508 1509
    thd->exit_cond(msg);
    event_count++;
  }
1510
  pthread_mutex_unlock(&data_lock);
1511 1512 1513 1514 1515 1516 1517
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d",
		     (int) thd->killed,
		     (int) (init_abort_pos_wait != abort_pos_wait),
		     (int) mi->slave_running));
  DBUG_RETURN((thd->killed || init_abort_pos_wait != abort_pos_wait ||
	      !mi->slave_running) ?
	      -1 : event_count);
1518 1519
}

1520

1521
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1522 1523 1524 1525 1526
{
  DBUG_ENTER("init_slave_thread");
  thd->system_thread = thd->bootstrap = 1;
  thd->client_capabilities = 0;
  my_net_init(&thd->net, 0);
1527
  thd->net.read_timeout = slave_net_timeout;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1528 1529
  thd->master_access= ~0;
  thd->priv_user = 0;
1530
  thd->slave_thread = 1;
1531
  thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1532 1533
  thd->system_thread = 1;
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1534
  thd->real_id=pthread_self();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1535 1536 1537 1538
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id = thread_id++;
  pthread_mutex_unlock(&LOCK_thread_count);

1539
  if (init_thr_lock() || thd->store_globals())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1540 1541 1542 1543 1544
  {
    end_thread(thd,0);
    DBUG_RETURN(-1);
  }

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1545
#if !defined(__WIN__) && !defined(OS2)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1546 1547 1548 1549 1550
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

1551
  if ((ulong) thd->variables.max_join_size == (ulong) HA_POS_ERROR)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1552 1553
    thd->options |= OPTION_BIG_SELECTS;

1554
  if (thd_type == SLAVE_THD_SQL)
1555
    thd->proc_info= "Waiting for the next event in slave queue";
1556
  else
1557
    thd->proc_info= "Waiting for master update";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1558 1559 1560 1561 1562
  thd->version=refresh_version;
  thd->set_time();
  DBUG_RETURN(0);
}

1563

1564 1565
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
		      void* thread_killed_arg)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1566
{
1567
  int nap_time;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1568 1569 1570 1571 1572
  thr_alarm_t alarmed;
  thr_alarm_init(&alarmed);
  time_t start_time= time((time_t*) 0);
  time_t end_time= start_time+sec;

1573
  while ((nap_time= (int) (end_time - start_time)) > 0)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1574
  {
1575
    ALARM alarm_buff;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1576
    /*
1577
      The only reason we are asking for alarm is so that
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1578 1579 1580
      we will be woken up in case of murder, so if we do not get killed,
      set the alarm so it goes off after we wake up naturally
    */
1581
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1582
    sleep(nap_time);
1583
    thr_end_alarm(&alarmed);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1584
    
1585
    if ((*thread_killed)(thd,thread_killed_arg))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1586 1587 1588 1589 1590 1591
      return 1;
    start_time=time((time_t*) 0);
  }
  return 0;
}

1592

1593 1594
static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
			bool *suppress_warnings)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1595
{
1596
  char buf[FN_REFLEN + 10];
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1597 1598
  int len;
  int binlog_flags = 0; // for now
1599
  char* logname = mi->master_log_name;
1600 1601
  DBUG_ENTER("request_dump");

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1602 1603
  // TODO if big log files: Change next to int8store()
  int4store(buf, (longlong) mi->master_log_pos);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1604
  int2store(buf + 4, binlog_flags);
1605
  int4store(buf + 6, server_id);
1606
  len = (uint) strlen(logname);
1607
  memcpy(buf + 10, logname,len);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1608 1609
  if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  {
1610 1611 1612 1613 1614
    /*
      Something went wrong, so we will just reconnect and retry later
      in the future, we should do a better error analysis, but for
      now we just fill up the error log :-)
    */
1615 1616 1617
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
      *suppress_warnings= 1;			// Suppress reconnect warning
    else
1618 1619 1620 1621
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
		      mc_mysql_errno(mysql), mc_mysql_error(mysql),
		      master_connect_retry);
    DBUG_RETURN(1);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1622
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1623

1624
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1625 1626
}

1627

1628
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1629 1630 1631
{
  char buf[1024];
  char * p = buf;
1632 1633
  uint table_len = (uint) strlen(table);
  uint db_len = (uint) strlen(db);
1634
  if (table_len + db_len > sizeof(buf) - 2)
1635 1636 1637 1638
  {
    sql_print_error("request_table_dump: Buffer overrun");
    return 1;
  } 
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1639 1640 1641 1642 1643 1644 1645
  
  *p++ = db_len;
  memcpy(p, db, db_len);
  p += db_len;
  *p++ = table_len;
  memcpy(p, table, table_len);
  
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1646 1647 1648
  if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  {
    sql_print_error("request_table_dump: Error sending the table dump \
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1649
command");
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1650 1651
    return 1;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1652 1653 1654 1655

  return 0;
}

1656

1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
/*
  read one event from the master
  
  SYNOPSIS
    read_event()
    mysql		MySQL connection
    mi			Master connection information
    suppress_warnings	TRUE when a normal net read timeout has caused us to
			try a reconnect.  We do not want to print anything to
			the error log in this case because this a anormal
			event in an idle server.
1668

1669 1670 1671 1672 1673 1674 1675
    RETURN VALUES
    'packet_error'	Error
    number		Length of packet

*/

static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1676
{
1677
  ulong len;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1678

1679
  *suppress_warnings= 0;
1680 1681 1682
  /*
    my_real_read() will time us out
    We check if we were told to die, and if not, try reading again
1683 1684

    TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
1685
  */
1686
#ifndef DBUG_OFF
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1687
  if (disconnect_slave_event_count && !(events_till_disconnect--))
1688 1689 1690
    return packet_error;      
#endif
  
1691
  len = mc_net_safe_read(mysql);
1692
  if (len == packet_error || (long) len < 1)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1693
  {
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704
    if (mc_mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
    {
      /*
	We are trying a normal reconnect after a read timeout;
	we suppress prints to .err file as long as the reconnect
	happens without problems
      */
      *suppress_warnings= TRUE;
    }
    else
      sql_print_error("Error reading packet from server: %s (\
1705
server_errno=%d)",
1706
		      mc_mysql_error(mysql), mc_mysql_errno(mysql));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1707 1708 1709
    return packet_error;
  }

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1710 1711
  if (len == 1)
  {
1712
     sql_print_error("Slave: received 0 length packet from server, apparent\
1713 1714
 master shutdown: %s",
		     mc_mysql_error(mysql));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1715
     return packet_error;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1716
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1717 1718
  
  DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
1719
		      len, mysql->net.read_pos[4]));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1720 1721 1722
  return len - 1;   
}

1723

1724
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
1725
{
1726 1727 1728 1729 1730 1731 1732
  switch (expected_error) {
  case ER_NET_READ_ERROR:
  case ER_NET_ERROR_ON_WRITE:  
  case ER_SERVER_SHUTDOWN:  
  case ER_NEW_ABORTING_CONNECTION:
    my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), 
		"Slave: query '%s' partially completed on the master \
1733 1734
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
1735
 slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1;\
1736
 SLAVE START;", thd->query);
1737 1738 1739 1740 1741 1742
    rli->last_slave_errno = expected_error;
    sql_print_error("%s",rli->last_slave_error);
    return 1;
  default:
    return 0;
  }
1743
}
1744

1745

1746
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1747
{
1748 1749 1750
  DBUG_ASSERT(rli->sql_thd==thd);
  Log_event * ev = next_event(rli);
  DBUG_ASSERT(rli->sql_thd==thd);
1751
  if (sql_slave_killed(thd,rli))
1752
    return 1;
1753 1754
  if (ev)
  {
1755
    int type_code = ev->get_type_code();
1756
    int exec_res;
1757
    pthread_mutex_lock(&rli->data_lock);
1758 1759 1760 1761 1762 1763 1764 1765 1766

    /*
      Skip queries originating from this server or number of
      queries specified by the user in slave_skip_counter
      We can't however skip event's that has something to do with the
      log files themselves.
    */

    if (ev->server_id == (uint32) ::server_id ||
1767
	(rli->slave_skip_counter && type_code != ROTATE_EVENT))
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1768
    {
1769 1770
      /* TODO: I/O thread should not even log events with the same server id */
      rli->inc_pos(ev->get_event_len(),
1771
		   type_code != STOP_EVENT ? ev->log_pos : LL(0),
1772 1773
		   1/* skip lock*/);
      flush_relay_log_info(rli);
1774 1775 1776 1777 1778 1779 1780 1781

      /*
	Protect against common user error of setting the counter to 1
	instead of 2 while recovering from an failed auto-increment insert
      */
      if (rli->slave_skip_counter && 
	  !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
	    rli->slave_skip_counter == 1))
1782 1783
        --rli->slave_skip_counter;
      pthread_mutex_unlock(&rli->data_lock);
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1784 1785 1786
      delete ev;     
      return 0;					// avoid infinite update loops
    }
1787
    pthread_mutex_unlock(&rli->data_lock);
1788 1789
  
    thd->server_id = ev->server_id; // use the original server id for logging
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1790
    thd->set_time();				// time the query
1791
    if (!ev->when)
1792
      ev->when = time(NULL);
1793
    ev->thd = thd;
1794 1795 1796
    thd->log_pos = ev->log_pos;
    exec_res = ev->exec_event(rli);
    DBUG_ASSERT(rli->sql_thd==thd);
1797 1798
    delete ev;
    return exec_res;
1799
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1800
  else
1801
  {
jcole@tetra.spaceapes.com's avatar
jcole@tetra.spaceapes.com committed
1802 1803 1804 1805
    sql_print_error("\
Could not parse log event entry, check the master for binlog corruption\n\
This may also be a network problem, or just a bug in the master or slave code.\
");
1806 1807
    return 1;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1808 1809
}

1810

1811 1812
/* slave I/O thread */
pthread_handler_decl(handle_slave_io,arg)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1813
{
1814 1815 1816 1817 1818 1819 1820 1821 1822
  THD *thd; // needs to be first for thread_stack
  MYSQL *mysql;
  MASTER_INFO *mi = (MASTER_INFO*)arg; 
  char llbuff[22];
  uint retry_count;
  
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();

1823
#ifndef DBUG_OFF
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1824
slave_begin:  
1825
#endif  
1826
  DBUG_ASSERT(mi->inited);
1827 1828 1829
  mysql= NULL ;
  retry_count= 0;

1830
  pthread_mutex_lock(&mi->run_lock);
1831 1832 1833
  /* Inform waiting threads that slave has started */
  mi->slave_run_id++;

1834
#ifndef DBUG_OFF  
1835
  mi->events_till_abort = abort_slave_event_count;
1836
#endif  
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1837
  
1838
  thd= new THD; // note that contructor of THD uses DBUG_ !
1839
  DBUG_ENTER("handle_slave_io");
1840
  THD_CHECK_SENTRY(thd);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1841 1842

  pthread_detach_this_thread();
1843
  if (init_slave_thread(thd, SLAVE_THD_IO))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1844 1845 1846 1847 1848 1849
  {
    pthread_cond_broadcast(&mi->start_cond);
    pthread_mutex_unlock(&mi->run_lock);
    sql_print_error("Failed during slave I/O thread initialization");
    goto err;
  }
1850
  mi->io_thd = thd;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1851
  thd->thread_stack = (char*)&thd; // remember where our stack is
1852
  pthread_mutex_lock(&LOCK_thread_count);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1853
  threads.append(thd);
1854
  pthread_mutex_unlock(&LOCK_thread_count);
1855 1856 1857
  mi->slave_running = 1;
  mi->abort_slave = 0;
  pthread_mutex_unlock(&mi->run_lock);
1858
  pthread_cond_broadcast(&mi->start_cond);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1859
  
1860 1861 1862
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
			    mi->master_log_name,
			    llstr(mi->master_log_pos,llbuff)));
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1863
  
1864
  if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1865
  {
1866
    sql_print_error("Slave I/O thread: error in mc_mysql_init()");
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1867 1868
    goto err;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1869
  
1870

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1871
  thd->proc_info = "connecting to master";
1872
  // we can get killed during safe_connect
1873
  if (!safe_connect(thd, mysql, mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1874
    sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
1875
  replication started in log '%s' at position %s", mi->user,
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1876 1877 1878
		    mi->host, mi->port,
		    IO_RPL_LOG_NAME,
		    llstr(mi->master_log_pos,llbuff));
1879
  else
1880
  {
1881
    sql_print_error("Slave I/O thread killed while connecting to master");
1882 1883
    goto err;
  }
1884

1885
connected:
1886

1887
  thd->slave_net = &mysql->net;
1888
  thd->proc_info = "Checking master version";
1889
  if (check_master_version(mysql, mi))
1890
    goto err;
1891
  if (!mi->old_format)
1892
  {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1893 1894 1895 1896 1897
    /*
      Register ourselves with the master.
      If fails, this is not fatal - we just print the error message and go
      on with life.
    */
1898 1899 1900 1901
    thd->proc_info = "Registering slave on master";
    if (register_slave_on_master(mysql) ||  update_slave_list(mysql))
      goto err;
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1902
  
1903
  DBUG_PRINT("info",("Starting reading binary log from master"));
1904
  while (!io_slave_killed(thd,mi))
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
1905
  {
1906
    bool suppress_warnings= 0;    
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1907
    thd->proc_info = "Requesting binlog dump";
1908
    if (request_dump(mysql, mi, &suppress_warnings))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1909 1910
    {
      sql_print_error("Failed on request_dump()");
1911
      if (io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1912 1913
      {
	sql_print_error("Slave I/O thread killed while requesting master \
1914
dump");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1915 1916
	goto err;
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1917
	  
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1918 1919 1920 1921 1922 1923 1924
      thd->proc_info = "Waiiting to reconnect after a failed dump request";
      mc_end_server(mysql);
      /*
	First time retry immediately, assuming that we can recover
	right away - if first time fails, sleep between re-tries
	hopefuly the admin can fix the problem sometime
      */
1925 1926 1927 1928
      if (retry_count++)
      {
	if (retry_count > master_retry_count)
	  goto err;				// Don't retry forever
1929 1930
	safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
		   (void*)mi);
1931
      }
1932
      if (io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1933 1934
      {
	sql_print_error("Slave I/O thread killed while retrying master \
1935
dump");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1936 1937
	goto err;
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1938

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1939
      thd->proc_info = "Reconnecting after a failed dump request";
1940 1941
      if (!suppress_warnings)
	sql_print_error("Slave I/O thread: failed dump request, \
1942
reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
1943 1944 1945
			llstr(mi->master_log_pos,llbuff));
      if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	  io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1946 1947
      {
	sql_print_error("Slave I/O thread killed during or \
1948
after reconnect");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1949 1950
	goto err;
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1951

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1952 1953
      goto connected;
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1954

1955
    while (!io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1956
    {
1957
      bool suppress_warnings= 0;    
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1958
      thd->proc_info = "Reading master update";
1959
      ulong event_len = read_event(mysql, mi, &suppress_warnings);
1960
      if (io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1961
      {
1962 1963
	if (global_system_variables.log_warnings)
	  sql_print_error("Slave I/O thread killed while reading event");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1964 1965
	goto err;
      }
1966
	  	  
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1967 1968
      if (event_len == packet_error)
      {
1969 1970
	uint mysql_error_number= mc_mysql_errno(mysql);
	if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1971
	{
1972 1973 1974 1975
	  sql_print_error("\
Log entry on master is longer than max_allowed_packet (%ld) on \
slave. If the entry is correct, restart the server with a higher value of \
max_allowed_packet",
1976
			  thd->variables.max_allowed_packet);
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1977 1978
	  goto err;
	}
1979 1980 1981 1982 1983 1984
	if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
	{
	  sql_print_error(ER(mysql_error_number), mysql_error_number,
			  mc_mysql_error(mysql));
	  goto err;
	}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1985 1986
	thd->proc_info = "Waiting to reconnect after a failed read";
	mc_end_server(mysql);
1987 1988 1989 1990
	if (retry_count++)
	{
	  if (retry_count > master_retry_count)
	    goto err;				// Don't retry forever
1991
	  safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
1992 1993
		     (void*) mi);
	}	    
1994
	if (io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1995
	{
1996 1997
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed while waiting to \
1998
reconnect after a failed read");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1999 2000 2001
	  goto err;
	}
	thd->proc_info = "Reconnecting after a failed read";
2002 2003
	if (!suppress_warnings)
	  sql_print_error("Slave I/O thread: Failed reading log event, \
2004
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
2005 2006 2007
			  llstr(mi->master_log_pos, llbuff));
	if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
	    io_slave_killed(thd,mi))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2008
	{
2009 2010
	  if (global_system_variables.log_warnings)
	    sql_print_error("Slave I/O thread killed during or after a \
2011
reconnect done to recover from failed read");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2012 2013 2014
	  goto err;
	}
	goto connected;
2015
      } // if (event_len == packet_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2016
	  
2017
      retry_count=0;			// ok event, reset retry counter
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2018 2019 2020 2021
      thd->proc_info = "Queueing event from master";
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
		      event_len))
      {
2022
	sql_print_error("Slave I/O thread could not queue event from master");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2023 2024
	goto err;
      }
2025
      flush_master_info(mi);
2026 2027 2028 2029 2030 2031 2032 2033
      if (mi->rli.log_space_limit && mi->rli.log_space_limit <
	  mi->rli.log_space_total)
	if (wait_for_relay_log_space(&mi->rli))
	{
	  sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
	  goto err;
	}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2034
      // TODO: check debugging abort code
2035
#ifndef DBUG_OFF
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2036 2037 2038 2039 2040
      if (abort_slave_event_count && !--events_till_abort)
      {
	sql_print_error("Slave I/O thread: debugging abort");
	goto err;
      }
2041
#endif
2042
    } 
2043
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2044

monty@donna.mysql.fi's avatar
monty@donna.mysql.fi committed
2045
  // error = 0;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2046
err:
2047 2048 2049
  // print the current replication position
  sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
		  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2050
  thd->query = thd->db = 0; // extra safety
2051 2052
  if (mysql)
  {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2053
    mc_mysql_close(mysql);
2054 2055
    mi->mysql=0;
  }
jcole@tetra.spaceapes.com's avatar
jcole@tetra.spaceapes.com committed
2056
  thd->proc_info = "Waiting for slave mutex on exit";
2057 2058 2059 2060
  pthread_mutex_lock(&mi->run_lock);
  mi->slave_running = 0;
  mi->io_thd = 0;
  // TODO: make rpl_status part of MASTER_INFO
2061
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2062 2063
  mi->abort_slave = 0; // TODO: check if this is needed
  DBUG_ASSERT(thd->net.buff != 0);
2064
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2065
  pthread_mutex_lock(&LOCK_thread_count);
2066
  THD_CHECK_SENTRY(thd);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2067
  delete thd;
2068
  pthread_mutex_unlock(&LOCK_thread_count);
2069 2070
  my_thread_end();				// clean-up before broadcast
  pthread_cond_broadcast(&mi->stop_cond);	// tell the world we are done
2071
  pthread_mutex_unlock(&mi->run_lock);
2072
#ifndef DBUG_OFF
2073
  if (abort_slave_event_count && !events_till_abort)
2074 2075
    goto slave_begin;
#endif  
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2076 2077 2078 2079
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2080

2081
/* slave SQL logic thread */
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2082

2083 2084
pthread_handler_decl(handle_slave_sql,arg)
{
2085
  THD *thd;			/* needs to be first for thread_stack */
2086 2087
  char llbuff[22],llbuff1[22];
  RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
2088 2089 2090 2091 2092 2093 2094 2095 2096
  const char *errmsg;

  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  my_thread_init();

#ifndef DBUG_OFF
slave_begin:  
#endif  

2097 2098 2099
  DBUG_ASSERT(rli->inited);
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(!rli->slave_running);
2100
  errmsg= 0;
2101 2102 2103 2104
#ifndef DBUG_OFF  
  rli->events_till_abort = abort_slave_event_count;
#endif  
  DBUG_ENTER("handle_slave_sql");
2105

2106
  thd = new THD; // note that contructor of THD uses DBUG_ !
2107
  THD_CHECK_SENTRY(thd);
2108 2109 2110
  /* Inform waiting threads that slave has started */
  rli->slave_run_id++;

2111 2112
  pthread_detach_this_thread();
  if (init_slave_thread(thd, SLAVE_THD_SQL))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2113 2114 2115 2116 2117 2118 2119 2120 2121 2122
  {
    /*
      TODO: this is currently broken - slave start and change master
      will be stuck if we fail here
    */
    pthread_cond_broadcast(&rli->start_cond);
    pthread_mutex_unlock(&rli->run_lock);
    sql_print_error("Failed during slave thread initialization");
    goto err;
  }
2123
  rli->sql_thd= thd;
2124
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2125 2126
  thd->thread_stack = (char*)&thd; // remember where our stack is
  pthread_mutex_lock(&LOCK_thread_count);
2127
  threads.append(thd);
2128
  pthread_mutex_unlock(&LOCK_thread_count);
2129 2130 2131
  rli->slave_running = 1;
  rli->abort_slave = 0;
  pthread_mutex_unlock(&rli->run_lock);
2132
  pthread_cond_broadcast(&rli->start_cond);
2133 2134
  // This should always be set to 0 when the slave thread is started
  rli->pending = 0;
2135 2136 2137 2138
  if (init_relay_log_pos(rli,
			 rli->relay_log_name,
			 rli->relay_log_pos,
			 1 /*need data lock*/, &errmsg))
2139 2140 2141 2142 2143
  {
    sql_print_error("Error initializing relay log position: %s",
		    errmsg);
    goto err;
  }
2144
  THD_CHECK_SENTRY(thd);
2145
  DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
2146 2147
  DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
  DBUG_ASSERT(rli->sql_thd == thd);
2148 2149 2150 2151

  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
			    rli->master_log_name,
			    llstr(rli->master_log_pos,llbuff)));
2152 2153
  if (global_system_variables.log_warnings)
    sql_print_error("Slave SQL thread initialized, starting replication in \
2154
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2155 2156 2157 2158 2159
		    llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
		    llstr(rli->relay_log_pos,llbuff1));

  /* Read queries from the IO/THREAD until this thread is killed */

2160
  while (!sql_slave_killed(thd,rli))
2161 2162 2163
  {
    thd->proc_info = "Processing master log event"; 
    DBUG_ASSERT(rli->sql_thd == thd);
2164
    THD_CHECK_SENTRY(thd);
2165 2166 2167
    if (exec_relay_log_event(thd,rli))
    {
      // do not scare the user if SQL thread was simply killed or stopped
2168
      if (!sql_slave_killed(thd,rli))
2169 2170
        sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2171
the slave SQL thread with \"SLAVE START\". We stopped at log \
2172 2173 2174 2175
'%s' position %s",
		      RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
      goto err;
    }
2176
  }
2177

2178
  /* Thread stopped. Print the current replication position to the log */
2179 2180 2181
  sql_print_error("Slave SQL thread exiting, replication stopped in log \
 '%s' at position %s",
		  RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
2182 2183

 err:
2184 2185 2186 2187 2188 2189
  thd->query = thd->db = 0; // extra safety
  thd->proc_info = "Waiting for slave mutex on exit";
  pthread_mutex_lock(&rli->run_lock);
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
  rli->slave_running = 0;
  rli->save_temporary_tables = thd->temporary_tables;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2190 2191 2192 2193 2194

  /*
    TODO: see if we can do this conditionally in next_event() instead
    to avoid unneeded position re-init
  */
2195 2196 2197 2198 2199
  rli->log_pos_current=0; 
  thd->temporary_tables = 0; // remove tempation from destructor to close them
  DBUG_ASSERT(thd->net.buff != 0);
  net_end(&thd->net); // destructor will not free it, because we are weird
  DBUG_ASSERT(rli->sql_thd == thd);
2200
  THD_CHECK_SENTRY(thd);
2201
  rli->sql_thd= 0;
2202
  pthread_mutex_lock(&LOCK_thread_count);
2203
  THD_CHECK_SENTRY(thd);
2204 2205
  delete thd;
  pthread_mutex_unlock(&LOCK_thread_count);
2206
  my_thread_end(); // clean-up before broadcasting termination
2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
  pthread_cond_broadcast(&rli->stop_cond);
  // tell the world we are done
  pthread_mutex_unlock(&rli->run_lock);
#ifndef DBUG_OFF // TODO: reconsider the code below
  if (abort_slave_event_count && !rli->events_till_abort)
    goto slave_begin;
#endif  
  pthread_exit(0);
  DBUG_RETURN(0);				// Can't return anything here
}
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2217

2218 2219 2220 2221 2222 2223 2224
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
  int error = 1;
  ulong num_bytes;
  bool cev_not_written;
  THD* thd;
  NET* net = &mi->mysql->net;
2225
  DBUG_ENTER("process_io_create_file");
2226 2227

  if (unlikely(!cev->is_valid()))
2228
    DBUG_RETURN(1);
2229 2230 2231 2232 2233 2234
  /*
    TODO: fix to honor table rules, not only db rules
  */
  if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  {
    skip_load_data_infile(net);
2235
    DBUG_RETURN(0);
2236 2237 2238 2239
  }
  DBUG_ASSERT(cev->inited_from_old);
  thd = mi->io_thd;
  thd->file_id = cev->file_id = mi->file_id++;
2240
  thd->server_id = cev->server_id;
2241 2242 2243 2244 2245 2246 2247 2248 2249
  cev_not_written = 1;
  
  if (unlikely(net_request_file(net,cev->fname)))
  {
    sql_print_error("Slave I/O: failed requesting download of '%s'",
		    cev->fname);
    goto err;
  }

2250
  /* this dummy block is so we could instantiate Append_block_log_event
2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267
     once and then modify it slightly instead of doing it multiple times
     in the loop
  */
  {
    Append_block_log_event aev(thd,0,0);
  
    for (;;)
    {
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
      {
	sql_print_error("Network read error downloading '%s' from master",
			cev->fname);
	goto err;
      }
      if (unlikely(!num_bytes)) /* eof */
      {
	send_ok(net); /* 3.23 master wants it */
2268 2269
	Execute_load_log_event xev(thd);
	xev.log_pos = mi->master_log_pos;
2270 2271 2272 2273 2274 2275
	if (unlikely(mi->rli.relay_log.append(&xev)))
	{
	  sql_print_error("Slave I/O: error writing Exec_load event to \
relay log");
	  goto err;
	}
2276
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2277 2278 2279 2280 2281 2282
	break;
      }
      if (unlikely(cev_not_written))
      {
	cev->block = (char*)net->read_pos;
	cev->block_len = num_bytes;
2283
	cev->log_pos = mi->master_log_pos;
2284 2285 2286 2287 2288 2289 2290
	if (unlikely(mi->rli.relay_log.append(cev)))
	{
	  sql_print_error("Slave I/O: error writing Create_file event to \
relay log");
	  goto err;
	}
	cev_not_written=0;
2291
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2292 2293 2294 2295 2296
      }
      else
      {
	aev.block = (char*)net->read_pos;
	aev.block_len = num_bytes;
2297
	aev.log_pos = mi->master_log_pos;
2298 2299 2300 2301 2302 2303
	if (unlikely(mi->rli.relay_log.append(&aev)))
	{
	  sql_print_error("Slave I/O: error writing Append_block event to \
relay log");
	  goto err;
	}
2304
	mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2305 2306 2307 2308 2309
      }
    }
  }
  error=0;
err:
2310
  DBUG_RETURN(error);
2311
}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2312

2313
/*
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330
  Start using a new binary log on the master

  SYNOPSIS
    process_io_rotate()
    mi			master_info for the slave
    rev			The rotate log event read from the binary log

  DESCRIPTION
    Updates the master info and relay data with the place in the next binary
    log where we should start reading.

  NOTES
    We assume we already locked mi->data_lock

  RETURN VALUES
    0		ok
    1	        Log event is illegal
2331 2332
*/

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2333
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
2334
{
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2335
  int return_val= 1;
2336
  DBUG_ENTER("process_io_rotate");
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2337
  safe_mutex_assert_owner(&mi->data_lock);
2338

2339
  if (unlikely(!rev->is_valid()))
2340
    DBUG_RETURN(1);
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351

  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->master_log_pos= rev->pos;

  pthread_mutex_lock(&mi->rli.data_lock);
  memcpy(mi->rli.master_log_name, rev->new_log_ident, rev->ident_len+1);
  mi->rli.master_log_pos= rev->pos;
  pthread_mutex_unlock(&mi->rli.data_lock);

  DBUG_PRINT("info", ("master_log_pos: '%s' %d",
		      mi->master_log_name, (ulong) mi->master_log_pos));
2352
#ifndef DBUG_OFF
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2353 2354 2355 2356 2357 2358
  /*
    If we do not do this, we will be getting the first
    rotate event forever, so we need to not disconnect after one.
  */
  if (disconnect_slave_event_count)
    events_till_disconnect++;
2359
#endif
2360
  DBUG_RETURN(0);
2361 2362
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2363
/*
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2364 2365 2366
  TODO: 
    Test this code before release - it has to be tested on a separate
    setup with 3.23 master 
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2367 2368 2369 2370
*/

static int queue_old_event(MASTER_INFO *mi, const char *buf,
			   ulong event_len)
2371
{
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2372
  const char *errmsg = 0;
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2373 2374 2375 2376
  ulong inc_pos;
  bool ignore_event= 0;
  char *tmp_buf = 0;
  RELAY_LOG_INFO *rli= &mi->rli;
2377 2378
  DBUG_ENTER("queue_old_event");

monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2379 2380 2381
  /*
    If we get Load event, we need to pass a non-reusable buffer
    to read_log_event, so we do a trick
2382 2383 2384 2385 2386 2387
  */
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  {
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
    {
      sql_print_error("Slave I/O: out of memory for Load event");
2388
      DBUG_RETURN(1);
2389 2390 2391 2392 2393
    }
    memcpy(tmp_buf,buf,event_len);
    tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
    buf = (const char*)tmp_buf;
  }
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2394 2395
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
					    1 /*old format*/ );
2396
  if (unlikely(!ev))
2397 2398
  {
    sql_print_error("Read invalid event from master: '%s',\
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2399
 master could be corrupt but a more likely cause of this is a bug",
2400
		    errmsg);
2401 2402
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(1);
2403
  }
2404
  pthread_mutex_lock(&mi->data_lock);
2405
  ev->log_pos = mi->master_log_pos;
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2406
  switch (ev->get_type_code()) {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2407 2408 2409 2410 2411
  case STOP_EVENT:
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event=0;
    inc_pos= event_len;
    break;
2412
  case ROTATE_EVENT:
2413
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2414 2415
    {
      delete ev;
2416
      pthread_mutex_unlock(&mi->data_lock);
2417
      DBUG_RETURN(1);
2418
    }
2419
    mi->ignore_stop_event=1;
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2420
    inc_pos= 0;
2421
    break;
2422 2423
  case CREATE_FILE_EVENT:
  {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2424 2425
    /* We come here when and only when tmp_buf != 0 */
    DBUG_ASSERT(tmp_buf);
2426
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2427
    delete ev;
2428
    mi->master_log_pos += event_len;
2429
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2430
    pthread_mutex_unlock(&mi->data_lock);
2431
    my_free((char*)tmp_buf, MYF(0));
2432
    DBUG_RETURN(error);
2433
  }
2434
  default:
2435
    mi->ignore_stop_event=0;
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2436
    inc_pos= event_len;
2437 2438
    break;
  }
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2439
  if (likely(!ignore_event))
2440
  {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2441
    if (unlikely(rli->relay_log.append(ev)))
2442 2443 2444
    {
      delete ev;
      pthread_mutex_unlock(&mi->data_lock);
2445
      DBUG_RETURN(1);
2446
    }
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2447
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2448 2449
  }
  delete ev;
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2450
  mi->master_log_pos+= inc_pos;
2451
  DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
2452
  pthread_mutex_unlock(&mi->data_lock);
2453
  DBUG_RETURN(0);
2454 2455
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2456 2457 2458 2459 2460 2461
/*
  TODO: verify the issue with stop events, see if we need them at all
  in the relay log
*/

int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
2462
{
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2463 2464 2465 2466
  int error= 0;
  ulong inc_pos;
  bool ignore_event= 0;
  RELAY_LOG_INFO *rli= &mi->rli;
2467 2468
  DBUG_ENTER("queue_event");

2469
  if (mi->old_format)
2470
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
2471 2472

  pthread_mutex_lock(&mi->data_lock);
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2473

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2474 2475 2476 2477 2478
  /*
    TODO: figure out if other events in addition to Rotate
    require special processing
  */
  switch (buf[EVENT_TYPE_OFFSET]) {
2479
  case STOP_EVENT:
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2480 2481 2482
    ignore_event= mi->ignore_stop_event;
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2483
    break;
2484 2485 2486
  case ROTATE_EVENT:
  {
    Rotate_log_event rev(buf,event_len,0);
2487
    if (unlikely(process_io_rotate(mi,&rev)))
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2488 2489
    {
      pthread_mutex_unlock(&mi->data_lock);
2490
      DBUG_RETURN(1);
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2491 2492 2493
    }
    mi->ignore_stop_event= 1;
    inc_pos= 0;
2494 2495 2496
    break;
  }
  default:
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2497 2498
    mi->ignore_stop_event= 0;
    inc_pos= event_len;
2499 2500 2501
    break;
  }
  
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2502 2503
  if (likely(!ignore_event &&
	     !(error= rli->relay_log.appendv(buf,event_len,0))))
2504
  {
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2505
    mi->master_log_pos+= inc_pos;
2506
    DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
monty@mashka.mysql.fi's avatar
monty@mashka.mysql.fi committed
2507
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2508
  }
2509
  pthread_mutex_unlock(&mi->data_lock);
2510
  DBUG_RETURN(error);
2511 2512
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2513

2514 2515
void end_relay_log_info(RELAY_LOG_INFO* rli)
{
2516 2517
  DBUG_ENTER("end_relay_log_info");

2518
  if (!rli->inited)
2519
    DBUG_VOID_RETURN;
2520
  if (rli->info_fd >= 0)
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2521 2522
  {
    end_io_cache(&rli->info_file);
2523
    (void) my_close(rli->info_fd, MYF(MY_WME));
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2524 2525
    rli->info_fd = -1;
  }
2526
  if (rli->cur_log_fd >= 0)
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2527 2528 2529 2530 2531
  {
    end_io_cache(&rli->cache_buf);
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
    rli->cur_log_fd = -1;
  }
2532 2533 2534
  rli->inited = 0;
  rli->log_pos_current=0;
  rli->relay_log.close(1);
2535
  DBUG_VOID_RETURN;
2536 2537 2538
}

/* try to connect until successful or slave killed */
2539
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2540
{
2541
  return connect_to_master(thd, mysql, mi, 0, 0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2542 2543
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2544

2545 2546 2547 2548
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2549

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2550
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
2551
			     bool reconnect, bool suppress_warnings)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2552
{
2553
  int slave_was_killed;
2554 2555
  int last_errno= -2;				// impossible error
  ulong err_count=0;
2556
  char llbuff[22];
2557
  DBUG_ENTER("connect_to_master");
2558

2559 2560 2561
#ifndef DBUG_OFF
  events_till_disconnect = disconnect_slave_event_count;
#endif
2562 2563 2564 2565
  uint client_flag=0;
  if (opt_slave_compressed_protocol)
    client_flag=CLIENT_COMPRESS;		/* We will use compression */

2566
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
2567
	 (reconnect ? mc_mysql_reconnect(mysql) != 0:
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2568
	  !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
2569
			    mi->port, 0, client_flag,
2570
			    thd->variables.net_read_timeout)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2571
  {
2572 2573 2574
    /* Don't repeat last error */
    if (mc_mysql_errno(mysql) != last_errno)
    {
2575
      last_errno=mc_mysql_errno(mysql);
2576
      suppress_warnings= 0;
2577
      sql_print_error("Slave I/O thread: error %s to master \
2578
'%s@%s:%d': \
2579
Error: '%s'  errno: %d  retry-time: %d  retries: %d",
2580
		      (reconnect ? "reconnecting" : "connecting"),
2581
		      mi->user,mi->host,mi->port,
2582
		      mc_mysql_error(mysql), last_errno,
2583 2584
		      mi->connect_retry,
		      master_retry_count);
2585
    }
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2586 2587 2588
    /*
      By default we try forever. The reason is that failure will trigger
      master election, so if the user did not set master_retry_count we
2589
      do not want to have election triggered on the first failure to
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2590
      connect
2591
    */
2592
    if (++err_count == master_retry_count)
2593 2594
    {
      slave_was_killed=1;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2595 2596
      if (reconnect)
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
2597 2598
      break;
    }
2599 2600
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
	       (void*)mi);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2601
  }
2602

2603 2604
  if (!slave_was_killed)
  {
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2605
    if (reconnect)
2606
    { 
2607
      if (!suppress_warnings && global_system_variables.log_warnings)
2608
	sql_print_error("Slave: connected to master '%s@%s:%d',\
2609
replication resumed in log '%s' at position %s", mi->user,
2610 2611 2612 2613
			mi->host, mi->port,
			IO_RPL_LOG_NAME,
			llstr(mi->master_log_pos,llbuff));
    }
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2614 2615 2616 2617
    else
    {
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
      mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2618
		      mi->user, mi->host, mi->port);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2619
    }
2620
#ifdef SIGNAL_WITH_VIO_CLOSE
2621
    thd->set_active_vio(mysql->net.vio);
2622
#endif      
2623
  }
2624 2625
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  DBUG_RETURN(slave_was_killed);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2626 2627
}

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2628

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2629 2630 2631 2632 2633
/*
  Try to connect until successful or slave killed or we have retried
  master_retry_count times
*/

2634 2635
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
			  bool suppress_warnings)
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2636
{
2637
  return connect_to_master(thd, mysql, mi, 1, suppress_warnings);
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2638 2639
}

2640

2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670
/*
  Store the file and position where the execute-slave thread are in the
  relay log.

  SYNOPSIS
    flush_relay_log_info()
    rli			Relay log information

  NOTES
    - As this is only called by the slave thread, we don't need to
      have a lock on this.
    - If there is an active transaction, then we don't update the position
      in the relay log.  This is to ensure that we re-execute statements
      if we die in the middle of an transaction that was rolled back.
    - As a transaction never spans binary logs, we don't have to handle the
      case where we do a relay-log-rotation in the middle of the transaction.
      If this would not be the case, we would have to ensure that we
      don't delete the relay log file where the transaction started when
      we switch to a new relay log file.

  TODO
    - Change the log file information to a binary format to avoid calling
      longlong2str.

  RETURN VALUES
    0	ok
    1	write error
*/

bool flush_relay_log_info(RELAY_LOG_INFO* rli)
2671
{
2672 2673 2674 2675 2676 2677 2678 2679
  bool error=0;
  IO_CACHE *file = &rli->info_file;
  char buff[FN_REFLEN*2+22*2+4], *pos;

  /* sql_thd is not set when calling from init_slave() */
  if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
    return 0;					// Wait for COMMIT

2680
  my_b_seek(file, 0L);
2681 2682 2683 2684 2685 2686 2687 2688
  pos=strmov(buff, rli->relay_log_name);
  *pos++='\n';
  pos=longlong2str(rli->relay_log_pos, pos, 10);
  *pos++='\n';
  pos=strmov(pos, rli->master_log_name);
  *pos++='\n';
  pos=longlong2str(rli->master_log_pos, pos, 10);
  *pos='\n';
2689
  if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
2690 2691 2692 2693 2694 2695
    error=1;
  if (flush_io_cache(file))
    error=1;
  if (flush_io_cache(rli->cur_log))		// QQ Why this call ?
    error=1;
  return error;
2696 2697
}

2698 2699 2700 2701 2702 2703 2704

/*
  This function is called when we notice that the current "hot" log
  got rotated under our feet.
*/

static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
2705 2706 2707
{
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  DBUG_ASSERT(rli->cur_log_fd == -1);
2708 2709 2710
  DBUG_ENTER("reopen_relay_log");

  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
2711
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2712
				   errmsg)) <0)
2713
    DBUG_RETURN(0);
2714
  my_b_seek(cur_log,rli->relay_log_pos);
2715
  DBUG_RETURN(cur_log);
2716 2717
}

2718

2719 2720 2721 2722 2723 2724 2725
Log_event* next_event(RELAY_LOG_INFO* rli)
{
  Log_event* ev;
  IO_CACHE* cur_log = rli->cur_log;
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
  const char* errmsg=0;
  THD* thd = rli->sql_thd;
2726
  DBUG_ENTER("next_event");
2727 2728
  DBUG_ASSERT(thd != 0);

monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2729 2730 2731 2732 2733 2734 2735
  /*
    For most operations we need to protect rli members with data_lock,
    so we will hold it for the most of the loop below
    However, we will release it whenever it is worth the hassle, 
    and in the cases when we go into a pthread_cond_wait() with the
    non-data_lock mutex
  */
2736 2737
  pthread_mutex_lock(&rli->data_lock);
  
2738
  while (!sql_slave_killed(thd,rli))
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2739 2740 2741
  {
    /*
      We can have two kinds of log reading:
2742 2743 2744 2745 2746 2747 2748 2749
      hot_log:
        rli->cur_log points at the IO_CACHE of relay_log, which
        is actively being updated by the I/O thread. We need to be careful
        in this case and make sure that we are not looking at a stale log that
        has already been rotated. If it has been, we reopen the log.

      The other case is much simpler:
        We just have a read only log that nobody else will be updating.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2750
    */
2751 2752 2753 2754 2755
    bool hot_log;
    if ((hot_log = (cur_log != &rli->cache_buf)))
    {
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
      pthread_mutex_lock(log_lock);
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2756 2757

      /*
2758
	Reading xxx_file_id is safe because the log will only
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2759 2760
	be rotated when we hold relay_log.LOCK_log
      */
2761
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
2762
      {
2763 2764 2765 2766
	// The master has switched to a new log file; Reopen the old log file
	cur_log=reopen_relay_log(rli, &errmsg);
	pthread_mutex_unlock(log_lock);
	if (!cur_log)				// No more log files
2767
	  goto err;
2768
	hot_log=0;				// Using old binary log
2769 2770
      }
    }
2771
    DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
2772
    DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
2773 2774 2775
    /*
      Relay log is always in new format - if the master is 3.23, the
      I/O thread will convert the format for us
2776
    */
2777
    if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
2778 2779 2780 2781 2782
    {
      DBUG_ASSERT(thd==rli->sql_thd);
      if (hot_log)
	pthread_mutex_unlock(log_lock);
      pthread_mutex_unlock(&rli->data_lock);
2783
      DBUG_RETURN(ev);
2784 2785
    }
    DBUG_ASSERT(thd==rli->sql_thd);
2786
    if (opt_reckless_slave)			// For mysql-test
2787
      cur_log->error = 0;
2788
    if (cur_log->error < 0)
2789 2790
    {
      errmsg = "slave SQL thread aborted because of I/O error";
2791 2792
      if (hot_log)
	pthread_mutex_unlock(log_lock);
2793 2794
      goto err;
    }
2795 2796
    if (!cur_log->error) /* EOF */
    {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2797 2798 2799 2800 2801
      /*
	On a hot log, EOF means that there are no more updates to
	process and we must block until I/O thread adds some and
	signals us to continue
      */
2802 2803
      if (hot_log)
      {
2804
	DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2805 2806 2807 2808
	/*
	  We can, and should release data_lock while we are waiting for
	  update. If we do not, show slave status will block
	*/
2809
	pthread_mutex_unlock(&rli->data_lock);
2810
 	/* Note that wait_for_update unlocks lock_log ! */
2811 2812 2813 2814 2815 2816
	rli->relay_log.wait_for_update(rli->sql_thd);
	
	// re-acquire data lock since we released it earlier
	pthread_mutex_lock(&rli->data_lock);
	continue;
      }
2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839
      /*
	If the log was not hot, we need to move to the next log in
	sequence. The next log could be hot or cold, we deal with both
	cases separately after doing some common initialization
      */
      end_io_cache(cur_log);
      DBUG_ASSERT(rli->cur_log_fd >= 0);
      my_close(rli->cur_log_fd, MYF(MY_WME));
      rli->cur_log_fd = -1;
	
      /*
	TODO: make skip_log_purge a start-up option. At this point this
	is not critical priority
      */
      if (!rli->skip_log_purge)
      {
	// purge_first_log will properly set up relay log coordinates in rli
	if (rli->relay_log.purge_first_log(rli))
	{
	  errmsg = "Error purging processed log";
	  goto err;
	}
      }
2840 2841
      else
      {
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2842
	/*
2843 2844 2845 2846 2847
	  If hot_log is set, then we already have a lock on
	  LOCK_log.  If not, we have to get the lock.

	  According to Sasha, the only time this code will ever be executed
	  is if we are recovering from a bug.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2848
	*/
2849
	if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
2850
	{
2851 2852
	  errmsg = "error switching to the next log";
	  goto err;
2853
	}
2854 2855
	rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
	rli->pending=0;
2856 2857
	strmake(rli->relay_log_name,rli->linfo.log_file_name,
		sizeof(rli->relay_log_name)-1);
2858 2859
	flush_relay_log_info(rli);
      }
2860
	
2861 2862 2863
      // next log is hot 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
      {
2864
#ifdef EXTRA_DEBUG
2865 2866
	sql_print_error("next log '%s' is currently active",
			rli->linfo.log_file_name);
2867
#endif	  
2868 2869 2870
	rli->cur_log= cur_log= rli->relay_log.get_log_file();
	rli->cur_log_old_open_count= rli->relay_log.get_open_count();
	DBUG_ASSERT(rli->cur_log_fd == -1);
2871
	  
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2872
	/*
2873 2874
	  Read pointer has to be at the start since we are the only
	  reader
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2875
	*/
2876
	if (check_binlog_magic(cur_log,&errmsg))
2877
	  goto err;
2878
	continue;
2879
      }
2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891
      /*
	if we get here, the log was not hot, so we will have to
	open it ourselves
      */
#ifdef EXTRA_DEBUG
      sql_print_error("next log '%s' is not active",
		      rli->linfo.log_file_name);
#endif	  
      // open_binlog() will check the magic header
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
				       &errmsg)) <0)
	goto err;
2892
    }
2893
    else
2894
    {
2895 2896 2897 2898 2899 2900
      /*
	Read failed with a non-EOF error.
	TODO: come up with something better to handle this error
      */
      if (hot_log)
	pthread_mutex_unlock(log_lock);
2901
      sql_print_error("Slave SQL thread: I/O error reading \
2902
event(errno: %d  cur_log->error: %d)",
2903
		      my_errno,cur_log->error);
2904 2905
      // set read position to the beginning of the event
      my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
2906 2907
      /* otherwise, we have had a partial read */
      errmsg = "Aborting slave SQL thread because of partial event read";
2908
      break;					// To end of function
2909 2910
    }
  }
2911
  if (!errmsg && global_system_variables.log_warnings)
2912
    errmsg = "slave SQL thread was killed";
2913

2914 2915
err:
  pthread_mutex_unlock(&rli->data_lock);
2916 2917
  if (errmsg)
    sql_print_error("Error reading relay log event: %s", errmsg);
2918
  DBUG_RETURN(0);
2919 2920
}

sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2921

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2922 2923
#ifdef __GNUC__
template class I_List_iterator<i_string>;
sasha@mysql.sashanet.com's avatar
sasha@mysql.sashanet.com committed
2924
template class I_List_iterator<i_string_pair>;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2925
#endif