ndb:

freeing non freed objects on server shutdown
corrected timeout handling on schema events
parent 7c911106
...@@ -1228,9 +1228,9 @@ end: ...@@ -1228,9 +1228,9 @@ end:
struct timespec abstime; struct timespec abstime;
int i; int i;
set_timespec(abstime, 1); set_timespec(abstime, 1);
(void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex, &ndb_schema_object->mutex,
&abstime); &abstime);
(void) pthread_mutex_lock(&schema_share->mutex); (void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++) for (i= 0; i < ndb_number_of_storage_nodes; i++)
...@@ -1252,16 +1252,19 @@ end: ...@@ -1252,16 +1252,19 @@ end:
if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
break; break;
max_timeout--; if (ret)
if (max_timeout == 0)
{ {
sql_print_error("NDB %s: distibuting %s timed out. Ignoring...", max_timeout--;
type_str, ndb_schema_object->key); if (max_timeout == 0)
break; {
sql_print_error("NDB %s: distibuting %s timed out. Ignoring...",
type_str, ndb_schema_object->key);
break;
}
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key);
} }
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
"distributing", ndb_schema_object->key);
} }
(void) pthread_mutex_unlock(&ndb_schema_object->mutex); (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
} }
...@@ -2445,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2445,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
} }
if (!op) if (!op)
{ {
pthread_mutex_unlock(&injector_mutex);
sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
" %s",event_name); " %s",event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
...@@ -2453,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -2453,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
ndb->getNdbError().code, ndb->getNdbError().code,
ndb->getNdbError().message, ndb->getNdbError().message,
"NDB"); "NDB");
pthread_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -2635,20 +2638,23 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, ...@@ -2635,20 +2638,23 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
{ {
struct timespec abstime; struct timespec abstime;
set_timespec(abstime, 1); set_timespec(abstime, 1);
(void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond,
&share->mutex, &share->mutex,
&abstime); &abstime);
max_timeout--;
if (share->op == 0) if (share->op == 0)
break; break;
if (max_timeout == 0) if (ret)
{ {
sql_print_error("NDB delete table: timed out. Ignoring..."); max_timeout--;
break; if (max_timeout == 0)
{
sql_print_error("NDB %s: timed out. Ignoring...", type_str);
break;
}
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
type_str, share->key);
} }
if (ndb_extra_logging)
ndb_report_waiting(type_str, max_timeout,
type_str, share->key);
} }
(void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&share->mutex);
#else #else
...@@ -2711,7 +2717,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2711,7 +2717,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
} }
static int static int
ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
NdbEventOperation *pOp,
Binlog_index_row &row) Binlog_index_row &row)
{ {
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
...@@ -2720,7 +2727,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2720,7 +2727,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
/* make sure to flush any pending events as they can be dependent /* make sure to flush any pending events as they can be dependent
on one of the tables being changed below on one of the tables being changed below
*/ */
injector_thd->binlog_flush_pending_rows_event(true); thd->binlog_flush_pending_rows_event(true);
switch (type) switch (type)
{ {
...@@ -2777,7 +2784,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2777,7 +2784,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
return 0; return 0;
} }
ndb_handle_schema_change(injector_thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, share);
return 0; return 0;
} }
...@@ -3057,7 +3064,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, ...@@ -3057,7 +3064,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_handler_t ndb_binlog_thread_func(void *arg)
{ {
THD *thd; /* needs to be first for thread_stack */ THD *thd; /* needs to be first for thread_stack */
Ndb *ndb= 0; Ndb *i_ndb= 0;
Ndb *s_ndb= 0;
Thd_ndb *thd_ndb=0; Thd_ndb *thd_ndb=0;
int ndb_update_binlog_index= 1; int ndb_update_binlog_index= 1;
injector *inj= injector::instance(); injector *inj= injector::instance();
...@@ -3109,16 +3117,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3109,16 +3117,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
thd->lex->start_transaction_opt= 0; thd->lex->start_transaction_opt= 0;
if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) || if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
schema_ndb->init()) s_ndb->init())
{ {
sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
goto err; goto err;
} }
// empty database // empty database
if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) || if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
ndb->init()) i_ndb->init())
{ {
sql_print_error("NDB Binlog: Getting Ndb object failed"); sql_print_error("NDB Binlog: Getting Ndb object failed");
ndb_binlog_thread_running= -1; ndb_binlog_thread_running= -1;
...@@ -3139,7 +3147,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3139,7 +3147,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
*/ */
injector_thd= thd; injector_thd= thd;
injector_ndb= ndb; injector_ndb= i_ndb;
schema_ndb= s_ndb;
ndb_binlog_thread_running= 1; ndb_binlog_thread_running= 1;
if (opt_bin_log) if (opt_bin_log)
{ {
...@@ -3221,14 +3230,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3221,14 +3230,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
int res= 0, tot_poll_wait= 1000; int res= 0, tot_poll_wait= 1000;
if (ndb_binlog_running) if (ndb_binlog_running)
{ {
res= ndb->pollEvents(tot_poll_wait, &gci); res= i_ndb->pollEvents(tot_poll_wait, &gci);
tot_poll_wait= 0; tot_poll_wait= 0;
} }
int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
ndb_latest_received_binlog_epoch= gci; ndb_latest_received_binlog_epoch= gci;
while (gci > schema_gci && schema_res >= 0) while (gci > schema_gci && schema_res >= 0)
schema_res= schema_ndb->pollEvents(10, &schema_gci); schema_res= s_ndb->pollEvents(10, &schema_gci);
if ((abort_loop || do_ndbcluster_binlog_close_connection) && if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
(ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci ||
...@@ -3256,11 +3265,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3256,11 +3265,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
if (unlikely(schema_res > 0)) if (unlikely(schema_res > 0))
{ {
thd->proc_info= "Processing events from schema table"; thd->proc_info= "Processing events from schema table";
schema_ndb-> s_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
schema_ndb-> s_ndb->
setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
NdbEventOperation *pOp= schema_ndb->nextEvent(); NdbEventOperation *pOp= s_ndb->nextEvent();
while (pOp != NULL) while (pOp != NULL)
{ {
if (!pOp->hasError()) if (!pOp->hasError())
...@@ -3273,7 +3282,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3273,7 +3282,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
"binlog schema event", "binlog schema event",
(ulong) pOp->getNdbError().code, (ulong) pOp->getNdbError().code,
pOp->getNdbError().message); pOp->getNdbError().message);
pOp= schema_ndb->nextEvent(); pOp= s_ndb->nextEvent();
} }
} }
...@@ -3285,7 +3294,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3285,7 +3294,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
int event_count= 0; int event_count= 0;
#endif #endif
thd->proc_info= "Processing events"; thd->proc_info= "Processing events";
NdbEventOperation *pOp= ndb->nextEvent(); NdbEventOperation *pOp= i_ndb->nextEvent();
Binlog_index_row row; Binlog_index_row row;
while (pOp != NULL) while (pOp != NULL)
{ {
...@@ -3296,9 +3305,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3296,9 +3305,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName())); ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch); DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
ndb-> i_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
bzero((char*) &row, sizeof(row)); bzero((char*) &row, sizeof(row));
injector::transaction trans; injector::transaction trans;
...@@ -3307,7 +3316,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3307,7 +3316,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Uint32 iter= 0; Uint32 iter= 0;
const NdbEventOperation *gci_op; const NdbEventOperation *gci_op;
Uint32 event_types; Uint32 event_types;
while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
!= NULL) != NULL)
{ {
NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData(); NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
...@@ -3393,7 +3402,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3393,7 +3402,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
event_count++; event_count++;
#endif #endif
if (pOp->hasError() && if (pOp->hasError() &&
ndb_binlog_thread_handle_error(ndb, pOp, row) < 0) ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
goto err; goto err;
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -3413,7 +3422,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3413,7 +3422,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Uint32 iter= 0; Uint32 iter= 0;
const NdbEventOperation *gci_op; const NdbEventOperation *gci_op;
Uint32 event_types; Uint32 event_types;
while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
!= NULL) != NULL)
{ {
if (gci_op == pOp) if (gci_op == pOp)
...@@ -3425,19 +3434,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3425,19 +3434,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
#endif #endif
if ((unsigned) pOp->getEventType() < if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans); ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
else else
{ {
// set injector_ndb database/schema from table internal name // set injector_ndb database/schema from table internal name
int ret= int ret=
ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
DBUG_ASSERT(ret == 0); DBUG_ASSERT(ret == 0);
ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
// reset to catch errors // reset to catch errors
ndb->setDatabaseName(""); i_ndb->setDatabaseName("");
} }
pOp= ndb->nextEvent(); pOp= i_ndb->nextEvent();
} while (pOp && pOp->getGCI() == gci); } while (pOp && pOp->getGCI() == gci);
/* /*
...@@ -3495,7 +3504,9 @@ err: ...@@ -3495,7 +3504,9 @@ err:
close_thread_tables(thd); close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */ /* don't mess with the injector_ndb anymore from other threads */
injector_thd= 0;
injector_ndb= 0; injector_ndb= 0;
schema_ndb= 0;
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory thd->db= 0; // as not to try to free memory
sql_print_information("Stopping Cluster Binlog"); sql_print_information("Stopping Cluster Binlog");
...@@ -3512,21 +3523,43 @@ err: ...@@ -3512,21 +3523,43 @@ err:
} }
/* remove all event operations */ /* remove all event operations */
if (ndb) if (s_ndb)
{ {
NdbEventOperation *op; NdbEventOperation *op;
DBUG_PRINT("info",("removing all event operations")); DBUG_PRINT("info",("removing all event operations"));
while ((op= ndb->getEventOperation())) while ((op= s_ndb->getEventOperation()))
{ {
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s", DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName())); op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
DBUG_ASSERT(share->op == op ||
share->op_old == op);
share->op= share->op_old= 0;
free_share(&share); free_share(&share);
ndb->dropEventOperation(op); s_ndb->dropEventOperation(op);
}
delete s_ndb;
s_ndb= 0;
}
if (i_ndb)
{
NdbEventOperation *op;
DBUG_PRINT("info",("removing all event operations"));
while ((op= i_ndb->getEventOperation()))
{
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
DBUG_ASSERT(share->op == op ||
share->op_old == op);
share->op= share->op_old= 0;
free_share(&share);
i_ndb->dropEventOperation(op);
} }
delete ndb; delete i_ndb;
ndb= 0; i_ndb= 0;
} }
hash_free(&ndb_schema_objects); hash_free(&ndb_schema_objects);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment