Changes for the max queries per hour

parent 7d5b08b1
...@@ -55,10 +55,9 @@ extern "C" pthread_mutex_t THR_LOCK_keycache; ...@@ -55,10 +55,9 @@ extern "C" pthread_mutex_t THR_LOCK_keycache;
extern "C" int gethostname(char *name, int namelen); extern "C" int gethostname(char *name, int namelen);
#endif #endif
static int check_for_max_user_connections(const char *user, const char *host, uint max_questions); static int check_for_max_user_connections(UC *uc);
static bool check_mqh(THD *thd, const char *user, const char *host, static bool check_mqh(THD *thd);
uint max_questions); static void decrease_user_connections(UC *uc);
static void decrease_user_connections(const char *user, const char *host);
static bool check_db_used(THD *thd,TABLE_LIST *tables); static bool check_db_used(THD *thd,TABLE_LIST *tables);
static bool check_merge_table_access(THD *thd, char *db, TABLE_LIST *tables); static bool check_merge_table_access(THD *thd, char *db, TABLE_LIST *tables);
static bool check_dup(const char *db, const char *name, TABLE_LIST *tables); static bool check_dup(const char *db, const char *name, TABLE_LIST *tables);
...@@ -121,11 +120,54 @@ inline bool end_active_trans(THD *thd) ...@@ -121,11 +120,54 @@ inline bool end_active_trans(THD *thd)
static HASH hash_user_connections; static HASH hash_user_connections;
extern pthread_mutex_t LOCK_user_conn; extern pthread_mutex_t LOCK_user_conn;
struct user_conn { static int get_or_create_user_conn(THD *thd, const char *user, const char *host,
char *user; uint max_questions)
uint len, connections, questions, max_questions; {
time_t intime; int return_val=0;
}; uint temp_len;
char temp_user[USERNAME_LENGTH+HOSTNAME_LENGTH+2];
struct user_conn *uc;
DBUG_ASSERT(user != 0);
DBUG_ASSERT(host != 0);
temp_len= (uint) (strxnmov(temp_user, sizeof(temp_user)-1, user, "@", host,
NullS) - temp_user);
(void) pthread_mutex_lock(&LOCK_user_conn);
uc = (struct user_conn *) hash_search(&hash_user_connections,
(byte*) temp_user, temp_len);
if (!uc)
{
uc= ((struct user_conn*)
my_malloc(sizeof(struct user_conn) + temp_len+1,
MYF(MY_WME)));
if (!uc)
{
send_error(&current_thd->net, 0, NullS); // Out of memory
return_val=1;
goto end;
}
uc->user=(char*) (uc+1);
memcpy(uc->user,temp_user,temp_len+1);
uc->len = temp_len;
uc->connections = 1;
uc->questions=0;
uc->max_questions=max_questions;
uc->intime=thd->thr_create_time;
if (hash_insert(&hash_user_connections, (byte*) uc))
{
my_free((char*) uc,0);
send_error(&current_thd->net, 0, NullS); // Out of memory
return_val=1;
goto end;
}
}
thd->user_connect=uc;
end:
(void) pthread_mutex_unlock(&LOCK_user_conn);
return return_val;
}
/* /*
...@@ -191,16 +233,17 @@ static bool check_user(THD *thd,enum_server_command command, const char *user, ...@@ -191,16 +233,17 @@ static bool check_user(THD *thd,enum_server_command command, const char *user,
db ? db : (char*) ""); db ? db : (char*) "");
thd->db_access=0; thd->db_access=0;
/* Don't allow user to connect if he has done too many queries */ /* Don't allow user to connect if he has done too many queries */
if (max_questions && check_mqh(thd,user,thd->host_or_ip,max_questions)) if ((max_questions || max_user_connections) &&
get_or_create_user_conn(thd,user,thd->host_or_ip,max_questions))
return -1; return -1;
if (max_user_connections && if (max_user_connections && thd->user_connect &&
check_for_max_user_connections(user, thd->host, max_questions)) check_for_max_user_connections(thd->user_connect))
return -1; return -1;
if (db && db[0]) if (db && db[0])
{ {
bool error=test(mysql_change_db(thd,db)); bool error=test(mysql_change_db(thd,db));
if (error) if (error && thd->user_connect)
decrease_user_connections(thd->user,thd->host); decrease_user_connections(thd->user_connect);
return error; return error;
} }
else else
...@@ -233,95 +276,43 @@ void init_max_user_conn(void) ...@@ -233,95 +276,43 @@ void init_max_user_conn(void)
} }
static int check_for_max_user_connections(const char *user, const char *host, static int check_for_max_user_connections(UC *uc)
uint max_questions)
{ {
int error=1; int error=0;
uint temp_len;
char temp_user[USERNAME_LENGTH+HOSTNAME_LENGTH+2];
struct user_conn *uc;
DBUG_ENTER("check_for_max_user_connections"); DBUG_ENTER("check_for_max_user_connections");
DBUG_ASSERT(user != 0); // DBUG_PRINT("enter",("user: '%s' host: '%s'", user, host));
DBUG_ASSERT(host != 0);
DBUG_PRINT("enter",("user: '%s' host: '%s'", user, host));
temp_len= (uint) (strxnmov(temp_user, sizeof(temp_user), user, "@", host, DBUG_ASSERT(uc != 0);
NullS) - temp_user);
(void) pthread_mutex_lock(&LOCK_user_conn); if (max_user_connections <= (uint) uc->connections)
uc = (struct user_conn *) hash_search(&hash_user_connections,
(byte*) temp_user, temp_len);
if (uc) /* user found ; check for no. of connections */
{
if (max_user_connections == (uint) uc->connections)
{
net_printf(&(current_thd->net),ER_TOO_MANY_USER_CONNECTIONS, temp_user);
goto end;
}
uc->connections++;
}
else
{ {
/* the user is not found in the cache; Insert it */ net_printf(&(current_thd->net),ER_TOO_MANY_USER_CONNECTIONS, uc->user);
struct user_conn *uc= ((struct user_conn*) error=1;
my_malloc(sizeof(struct user_conn) + temp_len+1, goto end;
MYF(MY_WME)));
if (!uc)
{
send_error(&current_thd->net, 0, NullS); // Out of memory
goto end;
}
uc->user=(char*) (uc+1);
memcpy(uc->user,temp_user,temp_len+1);
uc->len = temp_len;
uc->connections = 1;
uc->questions=0;
uc->max_questions=max_questions;
uc->intime=current_thd->thr_create_time;
if (hash_insert(&hash_user_connections, (byte*) uc))
{
my_free((char*) uc,0);
send_error(&current_thd->net, 0, NullS); // Out of memory
goto end;
}
} }
error=0; uc->connections++;
end: end:
(void) pthread_mutex_unlock(&LOCK_user_conn);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
static void decrease_user_connections(const char *user, const char *host) static void decrease_user_connections(UC *uc)
{ {
char temp_user[USERNAME_LENGTH+HOSTNAME_LENGTH+2];
int temp_len;
struct user_conn *uc;
if (!max_user_connections) if (!max_user_connections)
return; return;
if (!user)
user="";
if (!host)
host="";
DBUG_ENTER("decrease_user_connections");
DBUG_PRINT("enter",("user: '%s' host: '%s'", user, host));
temp_len= (uint) (strxnmov(temp_user, sizeof(temp_user), user, "@", host, DBUG_ENTER("decrease_user_connections");
NullS) - temp_user); DBUG_ASSERT(uc != 0);
(void) pthread_mutex_lock(&LOCK_user_conn); // DBUG_PRINT("enter",("user: '%s' host: '%s'", user, host));
uc = (struct user_conn *) hash_search(&hash_user_connections, if (!--uc->connections && !mqh_used)
(byte*) temp_user, temp_len);
DBUG_ASSERT(uc != 0); // We should always find the user
if (!uc)
goto end; // Safety; Something went wrong
if (! --uc->connections && !mqh_used)
{ {
/* Last connection for user; Delete it */ /* Last connection for user; Delete it */
(void) pthread_mutex_lock(&LOCK_user_conn);
(void) hash_delete(&hash_user_connections,(byte*) uc); (void) hash_delete(&hash_user_connections,(byte*) uc);
(void) pthread_mutex_unlock(&LOCK_user_conn);
} }
end:
(void) pthread_mutex_unlock(&LOCK_user_conn);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -337,79 +328,83 @@ void free_max_user_conn(void) ...@@ -337,79 +328,83 @@ void free_max_user_conn(void)
returns 0 if OK. returns 0 if OK.
*/ */
static bool check_mqh(THD *thd, const char *user, const char *host, static bool check_mqh(THD *thd)
uint max_questions)
{ {
uint temp_len;
char temp_user[USERNAME_LENGTH+HOSTNAME_LENGTH+2];
struct user_conn *uc;
bool error=0; bool error=0;
DBUG_ENTER("check_mqh");
DBUG_ASSERT(user != 0); UC *uc=thd->user_connect;
DBUG_ASSERT(host != 0); DBUG_ASSERT(uc != 0);
/* TODO: Add username + host to THD for faster execution */ /* TODO: Add username + host to THD for faster execution */
temp_len= (uint) (strxnmov(temp_user, sizeof(temp_user)-1, user, "@", host, bool my_start = thd->start_time != 0;
time_t check_time = (my_start) ? thd->start_time : time(NULL);
if (check_time - uc->intime >= 3600)
{
// (void) pthread_mutex_lock(&LOCK_user_conn);
uc->questions=(uint) my_start;
uc->intime=check_time;
// (void) pthread_mutex_unlock(&LOCK_user_conn);
}
else if (uc->max_questions && ++(uc->questions) > uc->max_questions)
{
net_printf(&thd->net, ER_USER_LIMIT_REACHED, uc->user, "max_questions",
(long) uc->max_questions);
error=1;
goto end;
}
end:
DBUG_RETURN(error);
}
static void reset_mqh(THD *thd,LEX_USER *lu, uint mq)
{
char user[USERNAME_LENGTH+1];
char host[USERNAME_LENGTH+1];
char *where;
if (lu) // for GRANT
{
UC *uc;
uint temp_len;
char temp_user[USERNAME_LENGTH+HOSTNAME_LENGTH+2];
memcpy(user,lu->user.str,lu->user.length);
user[lu->user.length]='\0';
memcpy(host,lu->host.str,lu->host.length);
host[lu->host.length]='\0';
temp_len= (uint) (strxnmov(temp_user, sizeof(temp_user)-1, user, "@", host,
NullS) - temp_user); NullS) - temp_user);
(void) pthread_mutex_lock(&LOCK_user_conn); uc = (struct user_conn *) hash_search(&hash_user_connections,
uc = (struct user_conn *) hash_search(&hash_user_connections,
(byte*) temp_user, temp_len); (byte*) temp_user, temp_len);
if (uc) if (uc)
{
/* user found ; check for no. of queries */
bool my_start = thd->start_time != 0;
time_t check_time = (my_start) ? thd->start_time : time(NULL);
if (check_time - uc->intime >= 3600)
{ {
uc->questions=(uint) my_start; uc->questions=0;
uc->intime=check_time; uc->max_questions=mq;
}
else if (uc->max_questions && ++(uc->questions) > uc->max_questions)
{
net_printf(&thd->net, ER_USER_LIMIT_REACHED, temp_user, "max_questions",
(long) uc->questions);
error=1;
goto end;
} }
} }
else else // for FLUSH PRIVILEGES
{ {
struct user_conn *uc= ((struct user_conn*) (void) pthread_mutex_lock(&LOCK_user_conn);
my_malloc(sizeof(struct user_conn) + temp_len+1, for (uint idx=0;idx<hash_user_connections.records;idx++)
MYF(MY_WME)));
if (!uc)
{
send_error(&current_thd->net, 0, NullS); // Out of memory
error=1;
goto end;
}
uc->user=(char*) (uc+1);
memcpy(uc->user,temp_user,temp_len+1);
uc->len = temp_len;
uc->connections = 1;
uc->questions=0;
uc->max_questions=max_questions;
uc->intime=current_thd->thr_create_time;
if (hash_insert(&hash_user_connections, (byte*) uc))
{ {
my_free((char*) uc,0); HASH_LINK *data=dynamic_element(&hash_user_connections.array,idx,HASH_LINK*);
send_error(&current_thd->net, 0, NullS); // Out of memory UC *uc=(struct user_conn *)data->data;
error=1; where=strchr(uc->user,'@');
memcpy(user,uc->user,where - uc->user);
user[where-uc->user]='\0'; where++;
strcpy(host,where);
uc->max_questions=get_mqh(user,host);
} }
(void) pthread_mutex_unlock(&LOCK_user_conn);
} }
end:
(void) pthread_mutex_unlock(&LOCK_user_conn);
return error;
} }
/* /*
Check connnetion and get priviliges Check connnetion and get priviliges
Returns 0 on ok, -1 < if error is given > 0 on error. Returns 0 on ok, -1 < if error is given > 0 on error.
*/ */
static int static int
check_connections(THD *thd) check_connections(THD *thd)
{ {
...@@ -635,6 +630,8 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -635,6 +630,8 @@ pthread_handler_decl(handle_one_connection,arg)
if (do_command(thd)) if (do_command(thd))
break; break;
} }
if (thd->user_connect)
decrease_user_connections(thd->user_connect);
free_root(&thd->mem_root,MYF(0)); free_root(&thd->mem_root,MYF(0));
if (net->error && net->vio != 0) if (net->error && net->vio != 0)
{ {
...@@ -648,8 +645,7 @@ pthread_handler_decl(handle_one_connection,arg) ...@@ -648,8 +645,7 @@ pthread_handler_decl(handle_one_connection,arg)
send_error(net,net->last_errno,NullS); send_error(net,net->last_errno,NullS);
thread_safe_increment(aborted_threads,&LOCK_thread_count); thread_safe_increment(aborted_threads,&LOCK_thread_count);
} }
decrease_user_connections(thd->user,thd->host);
end_thread: end_thread:
close_connection(net); close_connection(net);
end_thread(thd,1); end_thread(thd,1);
...@@ -713,6 +709,13 @@ pthread_handler_decl(handle_bootstrap,arg) ...@@ -713,6 +709,13 @@ pthread_handler_decl(handle_bootstrap,arg)
thd->query= thd->memdup_w_gap(buff, length+1, thd->db_length+1); thd->query= thd->memdup_w_gap(buff, length+1, thd->db_length+1);
thd->query[length] = '\0'; thd->query[length] = '\0';
thd->query_id=query_id++; thd->query_id=query_id++;
if (thd->user_connect && check_mqh(thd))
{
thd->net.error = 0;
close_thread_tables(thd); // Free tables
free_root(&thd->mem_root,MYF(MY_KEEP_PREALLOC));
break;
}
mysql_parse(thd,thd->query,length); mysql_parse(thd,thd->query,length);
close_thread_tables(thd); // Free tables close_thread_tables(thd); // Free tables
if (thd->fatal_error) if (thd->fatal_error)
...@@ -890,6 +893,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -890,6 +893,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
char *save_user= thd->user; char *save_user= thd->user;
char *save_priv_user= thd->priv_user; char *save_priv_user= thd->priv_user;
char *save_db= thd->db; char *save_db= thd->db;
UC *save_uc= thd->user_connect;
if ((uint) ((uchar*) db - net->read_pos) > packet_length) if ((uint) ((uchar*) db - net->read_pos) > packet_length)
{ // Check if protocol is ok { // Check if protocol is ok
...@@ -908,7 +912,8 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -908,7 +912,8 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->priv_user=save_priv_user; thd->priv_user=save_priv_user;
break; break;
} }
decrease_user_connections (save_user, thd->host); if (max_connections && save_uc)
decrease_user_connections (save_uc);
x_free((gptr) save_db); x_free((gptr) save_db);
x_free((gptr) save_user); x_free((gptr) save_user);
thd->password=test(passwd[0]); thd->password=test(passwd[0]);
...@@ -941,7 +946,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -941,7 +946,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
my_pthread_setprio(pthread_self(),QUERY_PRIOR); my_pthread_setprio(pthread_self(),QUERY_PRIOR);
mysql_log.write(thd,command,"%s",thd->query); mysql_log.write(thd,command,"%s",thd->query);
DBUG_PRINT("query",("%s",thd->query)); DBUG_PRINT("query",("%s",thd->query));
if (mqh_used && check_mqh(thd,thd->user,thd->host,0)) if (thd->user_connect && check_mqh(thd))
{ {
error = TRUE; error = TRUE;
net->error = 0; net->error = 0;
...@@ -1066,8 +1071,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1066,8 +1071,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
send_error(net,0); send_error(net,0);
else else
send_eof(net); send_eof(net);
if (mqh_used && hash_user_connections.array.buffer == 0) if (mqh_used)
init_max_user_conn(); {
if (hash_user_connections.array.buffer == 0)
init_max_user_conn();
reset_mqh(thd,(LEX_USER *)NULL,0);
}
break; break;
} }
case COM_SHUTDOWN: case COM_SHUTDOWN:
...@@ -2297,10 +2306,20 @@ mysql_execute_command(void) ...@@ -2297,10 +2306,20 @@ mysql_execute_command(void)
Query_log_event qinfo(thd, thd->query); Query_log_event qinfo(thd, thd->query);
mysql_bin_log.write(&qinfo); mysql_bin_log.write(&qinfo);
} }
if (mqh_used)
{
if (hash_user_connections.array.buffer == 0)
init_max_user_conn();
if (lex->mqh)
{
List_iterator <LEX_USER> str_list (lex->users_list);
LEX_USER *Str;
str_list.rewind();
reset_mqh(thd,str_list++,lex->mqh);
}
}
} }
} }
if (mqh_used && hash_user_connections.array.buffer == 0)
init_max_user_conn();
break; break;
} }
case SQLCOM_FLUSH: case SQLCOM_FLUSH:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment