Commit add02ff8 authored by unknown's avatar unknown

fixed --skip-slave-thread bug

added PURGE MASTER LOGS TO and SHOW MASTER LOGS
fixed the output of SHOW MASTER STATUS
updated docs


Docs/manual.texi:
  Update for PURGE MASTER LOGS TO, SHOW MASTER LOGS
sql/lex.h:
  added PURGE
sql/log.cc:
  update for PURGE
BitKeeper/etc/ignore:
  Added include/.my_sys.h.swp PENDING/2000-10-25.01 PENDING/2000-10-25.02 support-files/mysql-3.23.27-beta.spec to the ignore list
sql/mysqld.cc:
  fixed bug in --skip-slave-start
sql/sql_class.cc:
  added linfo to THD
sql/sql_class.h:
  updates for PURGE
sql/sql_lex.h:
  updates for PURGE
sql/sql_parse.cc:
  updates for PURGE
sql/sql_repl.cc:
  updates for PURGE
sql/sql_repl.h:
  updates for PURGE
sql/sql_yacc.yy:
  updates for PURGE
parent 293d7c83
......@@ -159,3 +159,7 @@ PENDING/2000-10-11.06
BitKeeper/etc/csets-out
BitKeeper/etc/csets-in
support-files/mysql-3.23.26-beta.spec
include/.my_sys.h.swp
PENDING/2000-10-25.01
PENDING/2000-10-25.02
support-files/mysql-3.23.27-beta.spec
......@@ -24857,6 +24857,11 @@ propogation. @code{LOAD LOCAL DATA INFILE} will be skipped.
@item
Update queries that use user variables are not replication-safe (yet)
@item
Temporary tables will not work if the table with the same name
is used in more than one thread - we plan on fixing this soon. For
now the only thing you can do is turn off logging of the trouble
queries with @code{SET SQL_LOG_BIN=0}
@item
Starting in 3.23.26, it is safe to connect servers in a circular
master-slave relationship with @code{log-slave-updates} enabled.
Note, however, that many queries will not work right in this kind of
......@@ -24918,6 +24923,9 @@ databases should not be logged to the binary log with @code{binlog-ignore-db}
Starting in 3.23.26, you can use @code{replicate-rewrite-db} to tell
the slave to apply updates from one database on the master to the one
with a different name on the slave
@item
Starting in 3.23.28, you can use @code{PURGE MASTER LOGS TO 'log-name'}
to get rid of old logs while the slave is running
@end itemize
@node Replication Options, Replication SQL, Replication Features, Replication
......@@ -25125,6 +25133,34 @@ command line. (Slave)
@item @code{SHOW SLAVE STATUS}
@tab Provides status info on essential parameters of the slave thread. (Slave)
@item @code{SHOW MASTER LOGS}
@tab Only available starting in 3.23.28. Lists the binary logs on the master. You should use this command
prior to @code{PURGE MASTER LOGS TO} to find out how far you should go.
@item @code{PURGE MASTER LOGS TO 'logname'}
@tab Available starting in 3.23.28. Deletes all the
replication logs that are listed in the log
index as being prior to the specified log, and removed them from the
log index, so that the given log now becomes first. Example:
@example
PURGE MASTER LOGS TO 'mysql-bin.010'
@end example
This command will do nothing and fail with an error
if you have an active slave that
is currently reading one of the logs you are trying to delete. However,
if you have a dormant slave, and happen to purge one of the logs it
wants to read, the slave will be unable to replicate once it comes up.
The command is safe to run while slaves are replicating - you do not
need to stop them.
You must first check all the slaves with @code{SHOW SLAVE STATUS} to
see which log they are on, then do a listing of the logs on the
master with @code{SHOW MASTER LOGS}, find the earliest log among all
the slaves ( if all the slaves are up to date, this will be the
last log on the list), backup all the logs you are about to delete
(optional) and purge up to the target log.
@end multitable
......@@ -25164,7 +25200,20 @@ In 3.23.26 we added @code{server-id} to each replication server, and
now all the old zombie threads are killed on the master when a new replication thread
connects from the same slave
@strong{Q}: How do I rotate replication logs?
@strong{A}: In 3.23.28 you should use @code{PURGE MASTER LOGS TO}
command after determining which logs can be deleted, and optionally
backing them up first. In earlier versions the process is much more
painful, and cannot be safely done without stopping all the slaves in
the case that you plan to re-use log names .
You will need to stop the slave threads, edit the binary log index
file, delete all the old logs, restart the master, start slave threads,
and then remove the old log files.
@strong{Q}: How do I upgrade on a hot replication setup?
@strong{A}: If you are upgrading pre-3.23.26 versions, you should just
lock the master tables, let the slave catch up, then run @code{FLUSH
MASTER} on the master, and @code{FLUSH SLAVE} on the slave to reset the
......@@ -38194,6 +38243,17 @@ though, so 3.23 is not released as a stable version yet.
Fixed bug in a BDB key compare function when comparing part keys.
@item
Added variable @code{bdb_lock_max} to @code{mysqld}.
@item
@code{SLAVE START} did not work if you started with
@code{--skip-slave-start} and had not explicitly run @code{CHANGE
MASTER TO}
@item
Fixed the output of @code{SHOW MASTER STATUS} to be consistent with
@code{SHOW SLAVE STATUS} ( no directory in the log name)
@item
Added @code{PURGE MASTER LOGS TO}
@item
Added @code{SHOW MASTER LOGS}
@end itemize
@node News-3.23.27, News-3.23.26, News-3.23.28, News-3.23.x
......@@ -233,6 +233,7 @@ static SYMBOL symbols[] = {
{ "PACK_KEYS", SYM(PACK_KEYS_SYM),0,0},
{ "PARTIAL", SYM(PARTIAL),0,0},
{ "PASSWORD", SYM(PASSWORD),0,0},
{ "PURGE", SYM(PURGE),0,0},
{ "PRECISION", SYM(PRECISION),0,0},
{ "PRIMARY", SYM(PRIMARY_SYM),0,0},
{ "PROCEDURE", SYM(PROCEDURE),0,0},
......
......@@ -19,6 +19,7 @@
#include "mysql_priv.h"
#include "sql_acl.h"
#include "sql_repl.h"
#include <my_dir.h>
#include <stdarg.h>
......@@ -256,7 +257,8 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name)
}
// if the log entry matches, empty string matching anything
if(!log_name_len || (fname[log_name_len] == '\n' && !memcmp(fname, log_name, log_name_len)))
if(!log_name_len || (fname[log_name_len] == '\n' &&
!memcmp(fname, log_name, log_name_len)))
{
if(log_name_len)
fname[log_name_len] = 0; // to kill \n
......@@ -275,6 +277,137 @@ err:
return error;
}
int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
{
if(!index_file) return LOG_INFO_INVALID;
if(no_rotate) return LOG_INFO_PURGE_NO_ROTATE;
int error;
char fname[FN_REFLEN];
char* fname_end, *p;
uint fname_len, i;
bool logs_to_purge_inited = 0, logs_to_keep_inited = 0, found_log = 0;
DYNAMIC_ARRAY logs_to_purge, logs_to_keep;
my_off_t purge_offset ;
pthread_mutex_lock(&LOCK_index);
if(my_fseek(index_file, 0, MY_SEEK_SET,
MYF(MY_WME) ) == MY_FILEPOS_ERROR)
{
error = LOG_INFO_SEEK;
goto err;
}
if(init_dynamic_array(&logs_to_purge, sizeof(char*), 1024, 1024))
{
error = LOG_INFO_MEM;
goto err;
}
logs_to_purge_inited = 1;
if(init_dynamic_array(&logs_to_keep, sizeof(char*), 1024, 1024))
{
error = LOG_INFO_MEM;
goto err;
}
logs_to_keep_inited = 1;
for(;;)
{
if(!fgets(fname, FN_REFLEN, index_file))
{
if(feof(index_file))
break;
else
error = LOG_INFO_IO;
goto err;
}
*(fname_end = (strend(fname) - 1)) = 0; // kill \n
fname_len = (uint)(fname_end - fname);
if(!memcmp(fname, to_log, fname_len + 1 ))
{
found_log = 1;
purge_offset = my_ftell(index_file, MYF(MY_WME)) - fname_len - 1;
}
if(!found_log && log_in_use(fname))
// if one of the logs before the target is in use
{
error = LOG_INFO_IN_USE;
goto err;
}
p = sql_memdup(fname, (uint)(fname_end - fname) + 1);
if((found_log) ?
insert_dynamic(&logs_to_keep, (gptr) &p) :
insert_dynamic(&logs_to_purge, (gptr) &p)
)
{
error = LOG_INFO_MEM;
goto err;
}
}
if(!found_log)
{
error = LOG_INFO_EOF;
goto err;
}
for(i = 0; i < logs_to_purge.elements; i++)
{
char* l;
get_dynamic(&logs_to_purge, (gptr)&l, i);
if(my_delete(l, MYF(MY_WME)))
sql_print_error("Error deleting %s during purge", l);
}
// if we get killed -9 here, the sysadmin would have to do a small
// vi job on the log index file after restart - otherwise, this should
// be safe
my_fclose(index_file, MYF(MY_WME));
if(!(index_file = my_fopen(index_file_name, O_BINARY|O_WRONLY,
MYF(MY_WME))))
{
sql_print_error("Ouch! Could not re-open the binlog index file \
during log purge for write");
error = LOG_INFO_FATAL;
goto err;
}
for(i = 0; i < logs_to_keep.elements; i++)
{
char* l;
get_dynamic(&logs_to_keep, (gptr)&l, i);
fprintf(index_file, "%s\n", l);
}
my_fclose(index_file, MYF(MY_WME));
if(!(index_file = my_fopen(index_file_name, O_BINARY|O_RDWR|O_APPEND,
MYF(MY_WME))))
{
sql_print_error("Ouch! Could not re-open the binlog index file \
during log purge for append");
error = LOG_INFO_FATAL;
goto err;
}
// now update offsets
adjust_linfo_offsets(purge_offset);
error = 0;
err:
pthread_mutex_unlock(&LOCK_index);
if(logs_to_purge_inited)
delete_dynamic(&logs_to_purge);
if(logs_to_keep_inited)
delete_dynamic(&logs_to_keep);
return error;
}
int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
{
// mutex needed because we need to make sure the file pointer does not move
......@@ -285,6 +418,12 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
char* end ;
pthread_mutex_lock(&LOCK_index);
if(linfo->fatal)
{
error = LOG_INFO_FATAL;
goto err;
}
if(my_fseek(index_file, linfo->index_file_offset, MY_SEEK_SET, MYF(MY_WME) ) == MY_FILEPOS_ERROR)
{
error = LOG_INFO_SEEK;
......
......@@ -158,6 +158,8 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl,
opt_disable_networking=0, opt_bootstrap=0,opt_skip_show_db=0,
opt_ansi_mode=0,opt_myisam_log=0;
bool opt_sql_bin_update = 0, opt_log_slave_updates = 0;
extern MASTER_INFO glob_mi;
extern int init_master_info(MASTER_INFO* mi);
// if sql_bin_update is true, SQL_LOG_UPDATE and SQL_LOG_BIN are kept in sync, and are
// treated as aliases for each other
......@@ -1618,6 +1620,8 @@ int main(int argc, char **argv)
if(!opt_skip_slave_start &&
pthread_create(&hThread, &connection_attrib, handle_slave, 0))
sql_print_error("Warning: Can't create thread to handle slave");
else if(opt_skip_slave_start)
init_master_info(&glob_mi);
}
else
sql_print_error("Server id is not set, slave thread will not be started");
......
......@@ -94,6 +94,7 @@ THD::THD()
options=thd_startup_options;
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
start_time=(time_t) 0;
current_linfo = 0;
last_nx_table = last_nx_db = 0;
inactive_timeout=net_wait_timeout;
open_options=ha_open_options;
......
......@@ -34,12 +34,20 @@ enum enum_log_type { LOG_CLOSED, LOG_NORMAL, LOG_NEW, LOG_BIN };
#define LOG_INFO_IO -2
#define LOG_INFO_INVALID -3
#define LOG_INFO_SEEK -4
#define LOG_INFO_PURGE_NO_ROTATE -5
#define LOG_INFO_MEM -6
#define LOG_INFO_FATAL -7
#define LOG_INFO_IN_USE -8
typedef struct st_log_info
{
char log_file_name[FN_REFLEN];
my_off_t index_file_offset;
my_off_t pos;
my_off_t pos;
bool fatal; // if the purge happens to give us a negative offset
pthread_mutex_t lock;
st_log_info():fatal(0) { pthread_mutex_init(&lock, NULL);}
~st_log_info() { pthread_mutex_destroy(&lock);}
} LOG_INFO;
typedef struct st_master_info
......@@ -114,6 +122,7 @@ public:
int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name);
int purge_logs(THD* thd, const char* to_log);
void flush(void);
void close(bool exiting = 0); // if we are exiting, we also want to close the
// index file
......@@ -126,6 +135,9 @@ public:
inline bool is_open() { return log_type != LOG_CLOSED; }
char* get_index_fname() { return index_file_name;}
char* get_log_fname() { return log_file_name; }
void lock_index() { pthread_mutex_lock(&LOCK_index);}
void unlock_index() { pthread_mutex_unlock(&LOCK_index);}
FILE* get_index_file() { return index_file;}
};
/* character conversion tables */
......@@ -295,6 +307,10 @@ public:
bool volatile killed,bootstrap;
bool system_thread,in_lock_tables,global_read_lock;
bool query_error;
LOG_INFO* current_linfo;
// if we do a purge of binary logs, log index info of the threads
// that are currently reading it needs to be adjusted. To do that
// each thread that is using LOG_INFO needs to adjust the pointer to it
THD();
~THD();
bool store_globals();
......
......@@ -48,7 +48,7 @@ enum enum_sql_command {
SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_SHOW_CREATE,
SQLCOM_SHOW_MASTER_STAT, SQLCOM_SHOW_SLAVE_STAT, SQLCOM_CHANGE_MASTER,
SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE,
SQLCOM_RESET
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS
};
enum lex_states { STATE_START, STATE_CHAR, STATE_IDENT,
......@@ -97,6 +97,7 @@ typedef struct st_lex {
char *length,*dec,*change,*name;
char *db,*db1,*table1,*db2,*table2; /* For outer join using .. */
char *backup_dir; /* For RESTORE/BACKUP */
char* to_log; /* For PURGE MASTER LOGS TO */
String *wild;
sql_exchange *exchange;
ha_rows select_limit,offset_limit;
......
......@@ -927,6 +927,13 @@ mysql_execute_command(void)
#endif
break;
}
case SQLCOM_PURGE:
{
if(check_access(thd, PROCESS_ACL, any_db))
goto error;
res = purge_master_logs(thd, lex->to_log);
break;
}
case SQLCOM_BACKUP_TABLE:
{
if (check_db_used(thd,tables) ||
......@@ -1183,6 +1190,18 @@ mysql_execute_command(void)
res= -1;
break;
}
case SQLCOM_SHOW_BINLOGS:
#ifdef DONT_ALLOW_SHOW_COMMANDS
send_error(&thd->net,ER_NOT_ALLOWED_COMMAND); /* purecov: inspected */
DBUG_VOID_RETURN;
#else
{
if(check_access(thd, PROCESS_ACL, any_db))
goto error;
res = show_binlogs(thd);
break;
}
#endif
case SQLCOM_SHOW_CREATE:
#ifdef DONT_ALLOW_SHOW_COMMANDS
send_error(&thd->net,ER_NOT_ALLOWED_COMMAND); /* purecov: inspected */
......
......@@ -93,6 +93,87 @@ static int send_file(THD *thd)
DBUG_RETURN(error);
}
void adjust_linfo_offsets(my_off_t purge_offset)
{
THD *tmp;
pthread_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
while((tmp=it++))
{
LOG_INFO* linfo;
if((linfo = tmp->current_linfo))
{
pthread_mutex_lock(&linfo->lock);
// no big deal if we just started reading the log
// nothing to adjust
if(linfo->index_file_offset < purge_offset)
linfo->fatal = (linfo->index_file_offset != 0);
else
linfo->index_file_offset -= purge_offset;
pthread_mutex_unlock(&linfo->lock);
}
}
pthread_mutex_unlock(&LOCK_thread_count);
}
bool log_in_use(const char* log_name)
{
int log_name_len = strlen(log_name) + 1;
THD *tmp;
bool result = 0;
pthread_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
while((tmp=it++))
{
LOG_INFO* linfo;
if((linfo = tmp->current_linfo))
{
pthread_mutex_lock(&linfo->lock);
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
pthread_mutex_unlock(&linfo->lock);
if(result) break;
}
}
pthread_mutex_unlock(&LOCK_thread_count);
return result;
}
int purge_master_logs(THD* thd, const char* to_log)
{
char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log);
int res = mysql_bin_log.purge_logs(thd, search_file_name);
char* errmsg = 0;
switch(res)
{
case 0: break;
case LOG_INFO_EOF: errmsg = "Target log not found in binlog index"; break;
case LOG_INFO_IO: errmsg = "I/O error reading log index file"; break;
case LOG_INFO_INVALID: errmsg = "Server configuration does not permit \
binlog purge"; break;
case LOG_INFO_SEEK: errmsg = "Failed on fseek()"; break;
case LOG_INFO_PURGE_NO_ROTATE: errmsg = "Cannot purge unrotatable log";
break;
case LOG_INFO_MEM: errmsg = "Out of memory"; break;
case LOG_INFO_FATAL: errmsg = "Fatal error during purge"; break;
case LOG_INFO_IN_USE: errmsg = "A purgable log is in use, will not purge";
break;
default:
errmsg = "Unknown error during purge"; break;
}
if(errmsg)
send_error(&thd->net, 0, errmsg);
else
send_ok(&thd->net);
}
void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
{
LOG_INFO linfo;
......@@ -117,6 +198,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
mysql_bin_log.make_log_name(search_file_name, log_ident);
else
search_file_name[0] = 0;
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
if(mysql_bin_log.find_first_log(&linfo, search_file_name))
{
......@@ -366,9 +450,20 @@ sweepstakes if you report the bug";
send_eof(&thd->net);
thd->proc_info = "waiting to finalize termination";
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
err:
thd->proc_info = "waiting to finalize termination";
pthread_mutex_lock(&LOCK_thread_count);
// exclude iteration through thread list
// this is needed for purge_logs() - it will iterate through
// thread list and update thd->current_linfo->index_file_offset
// this mutex will make sure that it never tried to update our linfo
// after we return from this stack frame
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
if(log)
(void) my_fclose(log, MYF(MY_WME));
send_error(&thd->net, 0, errmsg);
......@@ -594,7 +689,8 @@ int show_binlog_info(THD* thd)
{
LOG_INFO li;
mysql_bin_log.get_current_log(&li);
net_store_data(packet, li.log_file_name);
int dir_len = dirname_length(li.log_file_name);
net_store_data(packet, li.log_file_name + dir_len);
net_store_data(packet, (longlong)li.pos);
net_store_data(packet, &binlog_do_db);
net_store_data(packet, &binlog_ignore_db);
......@@ -613,3 +709,71 @@ int show_binlog_info(THD* thd)
send_eof(&thd->net);
DBUG_RETURN(0);
}
int show_binlogs(THD* thd)
{
const char* errmsg = 0;
FILE* index_file;
char fname[FN_REFLEN];
NET* net = &thd->net;
List<Item> field_list;
String* packet = &thd->packet;
if(!mysql_bin_log.is_open())
{
errmsg = "binlog is not open";
goto err;
}
field_list.push_back(new Item_empty_string("Log_name", 128));
if(send_fields(thd, field_list, 1))
{
sql_print_error("Failed in send_fields");
return 1;
}
mysql_bin_log.lock_index();
index_file = mysql_bin_log.get_index_file();
if(!index_file)
{
errmsg = "Uninitialized index file pointer";
mysql_bin_log.unlock_index();
goto err;
}
if(my_fseek(index_file, 0, MY_SEEK_SET, MYF(MY_WME)))
{
errmsg = "Failed on fseek()";
mysql_bin_log.unlock_index();
goto err;
}
while(fgets(fname, sizeof(fname), index_file))
{
char* fname_end;
*(fname_end = (strend(fname) - 1)) = 0;
int dir_len = dirname_length(fname);
packet->length(0);
net_store_data(packet, fname + dir_len, (fname_end - fname)-dir_len);
if(my_net_write(net, (char*) packet->ptr(), packet->length()))
{
sql_print_error("Failed in my_net_write");
mysql_bin_log.unlock_index();
return 1;
}
}
mysql_bin_log.unlock_index();
send_eof(net);
err:
if(errmsg)
{
send_error(net, 0, errmsg);
return 1;
}
send_ok(net);
return 0;
}
......@@ -14,7 +14,10 @@ int stop_slave(THD* thd = 0, bool net_report = 1);
int change_master(THD* thd);
void reset_slave();
void reset_master();
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);
int show_binlogs(THD* thd);
extern int init_master_info(MASTER_INFO* mi);
void kill_zombie_dump_threads(uint32 slave_server_id);
......
......@@ -107,6 +107,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token MASTER_SYM
%token REPAIR
%token RESET_SYM
%token PURGE
%token SLAVE
%token START_SYM
%token STOP_SYM
......@@ -493,7 +494,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%type <NONE>
query verb_clause create change select drop insert replace insert2
insert_values update delete show describe load alter optimize flush
reset begin commit rollback slave master_def master_defs
reset purge begin commit rollback slave master_def master_defs
repair restore backup analyze check rename
field_list field_list_item field_spec kill
select_item_list select_item values_list no_braces
......@@ -549,6 +550,7 @@ verb_clause:
| lock
| kill
| optimize
| purge
| rename
| repair
| replace
......@@ -2147,6 +2149,10 @@ show_param:
if (!add_table_to_list($3,NULL))
YYABORT;
}
| MASTER_SYM LOGS_SYM
{
Lex->sql_command = SQLCOM_SHOW_BINLOGS;
}
| keys_or_index FROM table_ident opt_db
{
Lex->sql_command= SQLCOM_SHOW_KEYS;
......@@ -2246,6 +2252,13 @@ reset_option:
SLAVE { Lex->type|= REFRESH_SLAVE; }
| MASTER_SYM { Lex->type|= REFRESH_MASTER; }
purge:
PURGE { Lex->sql_command = SQLCOM_PURGE; Lex->type=0;}
MASTER_SYM LOGS_SYM TO_SYM TEXT_STRING
{
Lex->to_log = $6.str;
}
/* kill threads */
kill:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment