Commit 3dcf8673 authored by unknown's avatar unknown

NDBAPI cleanup

parent 524914b2
...@@ -48,6 +48,16 @@ ...@@ -48,6 +48,16 @@
extern my_bool opt_ndb_optimized_node_selection; extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring; extern const char *opt_ndbcluster_connectstring;
// ndb interface initialization/cleanup
#ifdef __cplusplus
extern "C" {
#endif
extern void ndb_init_internal();
extern void ndb_end_internal();
#ifdef __cplusplus
}
#endif
const char *ndb_distribution_names[]= {"KEYHASH", "LINHASH", NullS}; const char *ndb_distribution_names[]= {"KEYHASH", "LINHASH", NullS};
TYPELIB ndb_distribution_typelib= { array_elements(ndb_distribution_names)-1, TYPELIB ndb_distribution_typelib= { array_elements(ndb_distribution_names)-1,
"", ndb_distribution_names, NULL }; "", ndb_distribution_names, NULL };
...@@ -6392,6 +6402,9 @@ static int ndbcluster_init() ...@@ -6392,6 +6402,9 @@ static int ndbcluster_init()
if (have_ndbcluster != SHOW_OPTION_YES) if (have_ndbcluster != SHOW_OPTION_YES)
DBUG_RETURN(0); // nothing else to do DBUG_RETURN(0); // nothing else to do
// Initialize ndb interface
ndb_init_internal();
// Set connectstring if specified // Set connectstring if specified
if (opt_ndbcluster_connectstring != 0) if (opt_ndbcluster_connectstring != 0)
DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring)); DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
...@@ -6540,6 +6553,9 @@ static int ndbcluster_end(ha_panic_function type) ...@@ -6540,6 +6553,9 @@ static int ndbcluster_end(ha_panic_function type)
delete g_ndb_cluster_connection; delete g_ndb_cluster_connection;
g_ndb_cluster_connection= NULL; g_ndb_cluster_connection= NULL;
// cleanup ndb interface
ndb_end_internal();
pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&ndbcluster_mutex);
pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "slave.h" #include "slave.h"
#include "ha_ndbcluster_binlog.h" #include "ha_ndbcluster_binlog.h"
#include "NdbDictionary.hpp" #include "NdbDictionary.hpp"
#include "ndb_cluster_connection.hpp"
#include <util/NdbAutoPtr.hpp> #include <util/NdbAutoPtr.hpp>
#ifdef ndb_dynamite #ifdef ndb_dynamite
...@@ -111,8 +112,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, ...@@ -111,8 +112,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
bool have_lock); bool have_lock);
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ static Uint64 *p_latest_trans_gci= 0;
extern Uint64 g_latest_trans_gci;
/* /*
Global variables for holding the binlog_index table reference Global variables for holding the binlog_index table reference
...@@ -439,7 +439,7 @@ static void ndbcluster_binlog_wait(THD *thd) ...@@ -439,7 +439,7 @@ static void ndbcluster_binlog_wait(THD *thd)
{ {
DBUG_ENTER("ndbcluster_binlog_wait"); DBUG_ENTER("ndbcluster_binlog_wait");
const char *save_info= thd ? thd->proc_info : 0; const char *save_info= thd ? thd->proc_info : 0;
ulonglong wait_epoch= g_latest_trans_gci; ulonglong wait_epoch= *p_latest_trans_gci;
int count= 30; int count= 30;
if (thd) if (thd)
thd->proc_info= "Waiting for ndbcluster binlog update to " thd->proc_info= "Waiting for ndbcluster binlog update to "
...@@ -3284,6 +3284,7 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, ...@@ -3284,6 +3284,7 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_handler_t ndb_binlog_thread_func(void *arg)
{ {
THD *thd; /* needs to be first for thread_stack */ THD *thd; /* needs to be first for thread_stack */
...@@ -3292,6 +3293,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3292,6 +3293,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Thd_ndb *thd_ndb=0; Thd_ndb *thd_ndb=0;
int ndb_update_binlog_index= 1; int ndb_update_binlog_index= 1;
injector *inj= injector::instance(); injector *inj= injector::instance();
#ifdef RUN_NDB_BINLOG_TIMER #ifdef RUN_NDB_BINLOG_TIMER
Timer main_timer; Timer main_timer;
#endif #endif
...@@ -3380,6 +3382,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3380,6 +3382,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
*/ */
injector_thd= thd; injector_thd= thd;
injector_ndb= i_ndb; injector_ndb= i_ndb;
p_latest_trans_gci=
injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
schema_ndb= s_ndb; schema_ndb= s_ndb;
ndb_binlog_thread_running= 1; ndb_binlog_thread_running= 1;
if (opt_bin_log) if (opt_bin_log)
...@@ -3476,7 +3480,7 @@ restart: ...@@ -3476,7 +3480,7 @@ restart:
"ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. " "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
"RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.", "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
(unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci); (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
g_latest_trans_gci= 0; *p_latest_trans_gci= 0;
ndb_latest_handled_binlog_epoch= 0; ndb_latest_handled_binlog_epoch= 0;
ndb_latest_applied_binlog_epoch= 0; ndb_latest_applied_binlog_epoch= 0;
ndb_latest_received_binlog_epoch= 0; ndb_latest_received_binlog_epoch= 0;
...@@ -3503,7 +3507,7 @@ restart: ...@@ -3503,7 +3507,7 @@ restart:
} }
do_ndbcluster_binlog_close_connection= BCCC_running; do_ndbcluster_binlog_close_connection= BCCC_running;
for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) && ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
do_ndbcluster_binlog_close_connection != BCCC_restart; ) do_ndbcluster_binlog_close_connection != BCCC_restart; )
{ {
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -3511,8 +3515,8 @@ restart: ...@@ -3511,8 +3515,8 @@ restart:
{ {
DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, " DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
"ndb_latest_handled_binlog_epoch: %llu, " "ndb_latest_handled_binlog_epoch: %llu, "
"g_latest_trans_gci: %llu", do_ndbcluster_binlog_close_connection, "*p_latest_trans_gci: %llu", do_ndbcluster_binlog_close_connection,
ndb_latest_handled_binlog_epoch, g_latest_trans_gci)); ndb_latest_handled_binlog_epoch, *p_latest_trans_gci));
} }
#endif #endif
#ifdef RUN_NDB_BINLOG_TIMER #ifdef RUN_NDB_BINLOG_TIMER
...@@ -3548,7 +3552,7 @@ restart: ...@@ -3548,7 +3552,7 @@ restart:
} }
if ((abort_loop || do_ndbcluster_binlog_close_connection) && if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
(ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
!ndb_binlog_running)) !ndb_binlog_running))
break; /* Shutting down server */ break; /* Shutting down server */
...@@ -3598,11 +3602,11 @@ restart: ...@@ -3598,11 +3602,11 @@ restart:
{ {
DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart")); DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
do_ndbcluster_binlog_close_connection= BCCC_restart; do_ndbcluster_binlog_close_connection= BCCC_restart;
if (ndb_latest_received_binlog_epoch < g_latest_trans_gci && ndb_binlog_running) if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
{ {
sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog " sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog "
"as latest received epoch is %lld", "as latest received epoch is %lld",
g_latest_trans_gci, ndb_latest_received_binlog_epoch); *p_latest_trans_gci, ndb_latest_received_binlog_epoch);
} }
} }
} }
...@@ -3784,11 +3788,11 @@ restart: ...@@ -3784,11 +3788,11 @@ restart:
{ {
DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart")); DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
do_ndbcluster_binlog_close_connection= BCCC_restart; do_ndbcluster_binlog_close_connection= BCCC_restart;
if (ndb_latest_received_binlog_epoch < g_latest_trans_gci && ndb_binlog_running) if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
{ {
sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog " sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog "
"as latest received epoch is %lld", "as latest received epoch is %lld",
g_latest_trans_gci, ndb_latest_received_binlog_epoch); *p_latest_trans_gci, ndb_latest_received_binlog_epoch);
} }
} }
} }
...@@ -3861,6 +3865,7 @@ err: ...@@ -3861,6 +3865,7 @@ err:
/* don't mess with the injector_ndb anymore from other threads */ /* don't mess with the injector_ndb anymore from other threads */
injector_thd= 0; injector_thd= 0;
injector_ndb= 0; injector_ndb= 0;
p_latest_trans_gci= 0;
schema_ndb= 0; schema_ndb= 0;
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory thd->db= 0; // as not to try to free memory
...@@ -3960,7 +3965,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, ...@@ -3960,7 +3965,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
"latest_handled_binlog_epoch=%s, " "latest_handled_binlog_epoch=%s, "
"latest_applied_binlog_epoch=%s", "latest_applied_binlog_epoch=%s",
llstr(ndb_latest_epoch, buff1), llstr(ndb_latest_epoch, buff1),
llstr(g_latest_trans_gci, buff2), llstr(*p_latest_trans_gci, buff2),
llstr(ndb_latest_received_binlog_epoch, buff3), llstr(ndb_latest_received_binlog_epoch, buff3),
llstr(ndb_latest_handled_binlog_epoch, buff4), llstr(ndb_latest_handled_binlog_epoch, buff4),
llstr(ndb_latest_applied_binlog_epoch, buff5)); llstr(ndb_latest_applied_binlog_epoch, buff5));
......
...@@ -1093,6 +1093,15 @@ public: ...@@ -1093,6 +1093,15 @@ public:
~Ndb(); ~Ndb();
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* The current ndb_cluster_connection get_ndb_cluster_connection.
*
* @return the current connection
*/
Ndb_cluster_connection& get_ndb_cluster_connection();
#endif
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** /**
* The current catalog name can be fetched by getCatalogName. * The current catalog name can be fetched by getCatalogName.
......
...@@ -735,6 +735,7 @@ private: ...@@ -735,6 +735,7 @@ private:
Uint32 theTCConPtr; // Transaction Co-ordinator connection pointer. Uint32 theTCConPtr; // Transaction Co-ordinator connection pointer.
Uint64 theTransactionId; // theTransactionId of the transaction Uint64 theTransactionId; // theTransactionId of the transaction
Uint32 theGlobalCheckpointId; // The gloabl checkpoint identity of the transaction Uint32 theGlobalCheckpointId; // The gloabl checkpoint identity of the transaction
Uint64 *p_latest_trans_gci; // Reference to latest gci for connection
ConStatusType theStatus; // The status of the connection ConStatusType theStatus; // The status of the connection
enum CompletionStatus { enum CompletionStatus {
NotCompleted, NotCompleted,
...@@ -753,7 +754,7 @@ private: ...@@ -753,7 +754,7 @@ private:
bool theTransactionIsStarted; bool theTransactionIsStarted;
bool theInUseState; bool theInUseState;
bool theSimpleState; bool theSimpleState;
Uint8 m_abortOption; // Type of commit Uint8 m_abortOption; // Type of commi
enum ListState { enum ListState {
NotInList, NotInList,
......
...@@ -114,6 +114,8 @@ public: ...@@ -114,6 +114,8 @@ public:
void init_get_next_node(Ndb_cluster_connection_node_iter &iter); void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter); unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter);
Uint64 *get_latest_trans_gci();
#endif #endif
private: private:
......
...@@ -361,6 +361,7 @@ private: ...@@ -361,6 +361,7 @@ private:
Uint32 poll_SHM(Uint32 timeOutMillis); Uint32 poll_SHM(Uint32 timeOutMillis);
int m_shm_own_pid; int m_shm_own_pid;
int m_transp_count;
}; };
#endif // Define of TransporterRegistry_H #endif // Define of TransporterRegistry_H
...@@ -213,8 +213,8 @@ TransporterRegistry::unpack(Uint32 * readPtr, ...@@ -213,8 +213,8 @@ TransporterRegistry::unpack(Uint32 * readPtr,
Uint32 * eodPtr, Uint32 * eodPtr,
NodeId remoteNodeId, NodeId remoteNodeId,
IOState state) { IOState state) {
static SignalHeader signalHeader; SignalHeader signalHeader;
static LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
Uint32 loop_count = 0; Uint32 loop_count = 0;
if(state == NoHalt || state == HaltOutput){ if(state == NoHalt || state == HaltOutput){
while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
......
...@@ -80,14 +80,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) ...@@ -80,14 +80,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
TransporterRegistry::TransporterRegistry(void * callback, TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters, unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) unsigned sizeOfLongSignalMemory) :
m_mgm_handle(0),
m_transp_count(0)
{ {
DBUG_ENTER("TransporterRegistry::TransporterRegistry"); DBUG_ENTER("TransporterRegistry::TransporterRegistry");
nodeIdSpecified = false; nodeIdSpecified = false;
maxTransporters = _maxTransporters; maxTransporters = _maxTransporters;
sendCounter = 1; sendCounter = 1;
m_mgm_handle= 0;
callbackObj=callback; callbackObj=callback;
...@@ -1002,7 +1003,6 @@ TransporterRegistry::performReceive() ...@@ -1002,7 +1003,6 @@ TransporterRegistry::performReceive()
#endif #endif
} }
static int x = 0;
void void
TransporterRegistry::performSend() TransporterRegistry::performSend()
{ {
...@@ -1070,7 +1070,7 @@ TransporterRegistry::performSend() ...@@ -1070,7 +1070,7 @@ TransporterRegistry::performSend()
} }
#endif #endif
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
for (i = x; i < nTCPTransporters; i++) for (i = m_transp_count; i < nTCPTransporters; i++)
{ {
TCP_Transporter *t = theTCPTransporters[i]; TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() && if (t && t->hasDataToSend() && t->isConnected() &&
...@@ -1079,7 +1079,7 @@ TransporterRegistry::performSend() ...@@ -1079,7 +1079,7 @@ TransporterRegistry::performSend()
t->doSend(); t->doSend();
} }
} }
for (i = 0; i < x && i < nTCPTransporters; i++) for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
{ {
TCP_Transporter *t = theTCPTransporters[i]; TCP_Transporter *t = theTCPTransporters[i];
if (t && t->hasDataToSend() && t->isConnected() && if (t && t->hasDataToSend() && t->isConnected() &&
...@@ -1088,8 +1088,8 @@ TransporterRegistry::performSend() ...@@ -1088,8 +1088,8 @@ TransporterRegistry::performSend()
t->doSend(); t->doSend();
} }
} }
x++; m_transp_count++;
if (x == nTCPTransporters) x = 0; if (m_transp_count == nTCPTransporters) m_transp_count = 0;
#endif #endif
#endif #endif
#ifdef NDB_SCI_TRANSPORTER #ifdef NDB_SCI_TRANSPORTER
......
...@@ -16,6 +16,16 @@ ...@@ -16,6 +16,16 @@
#include <ndb_global.h> #include <ndb_global.h>
#include <my_sys.h> #include <my_sys.h>
#include <NdbMutex.h>
NdbMutex *g_ndb_connection_mutex = NULL;
void
ndb_init_internal()
{
if (!g_ndb_connection_mutex)
g_ndb_connection_mutex = NdbMutex_Create();
}
int int
ndb_init() ndb_init()
...@@ -25,11 +35,20 @@ ndb_init() ...@@ -25,11 +35,20 @@ ndb_init()
write(2, err, strlen(err)); write(2, err, strlen(err));
exit(1); exit(1);
} }
ndb_init_internal();
return 0; return 0;
} }
void
ndb_end_internal()
{
if (g_ndb_connection_mutex)
NdbMutex_Destroy(g_ndb_connection_mutex);
}
void void
ndb_end(int flags) ndb_end(int flags)
{ {
my_end(flags); my_end(flags);
ndb_end_internal();
} }
...@@ -1194,7 +1194,7 @@ const unsigned int * ...@@ -1194,7 +1194,7 @@ const unsigned int *
ndb_mgm_get_clusterlog_severity_filter(NdbMgmHandle handle) ndb_mgm_get_clusterlog_severity_filter(NdbMgmHandle handle)
{ {
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_get_clusterlog_severity_filter"); SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_get_clusterlog_severity_filter");
static unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]= unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]=
{0,0,0,0,0,0,0}; {0,0,0,0,0,0,0};
const ParserRow<ParserDummy> getinfo_reply[] = { const ParserRow<ParserDummy> getinfo_reply[] = {
MGM_CMD("clusterlog", NULL, ""), MGM_CMD("clusterlog", NULL, ""),
......
...@@ -1172,6 +1172,12 @@ convertEndian(Uint32 Data) ...@@ -1172,6 +1172,12 @@ convertEndian(Uint32 Data)
} }
// <internal> // <internal>
Ndb_cluster_connection &
Ndb::get_ndb_cluster_connection()
{
return theImpl->m_ndb_cluster_connection;
}
const char * Ndb::getCatalogName() const const char * Ndb::getCatalogName() const
{ {
return theImpl->m_dbname.c_str(); return theImpl->m_dbname.c_str();
......
...@@ -1303,8 +1303,6 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb, ...@@ -1303,8 +1303,6 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb,
m_local_table_data_size= 0; m_local_table_data_size= 0;
} }
static int f_dictionary_count = 0;
NdbDictionaryImpl::~NdbDictionaryImpl() NdbDictionaryImpl::~NdbDictionaryImpl()
{ {
NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0); NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0);
...@@ -1317,33 +1315,6 @@ NdbDictionaryImpl::~NdbDictionaryImpl() ...@@ -1317,33 +1315,6 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
curr = m_localHash.m_tableHash.getNext(curr); curr = m_localHash.m_tableHash.getNext(curr);
} }
m_globalHash->lock();
if(--f_dictionary_count == 0){
delete NdbDictionary::Column::FRAGMENT;
delete NdbDictionary::Column::FRAGMENT_FIXED_MEMORY;
delete NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY;
delete NdbDictionary::Column::ROW_COUNT;
delete NdbDictionary::Column::COMMIT_COUNT;
delete NdbDictionary::Column::ROW_SIZE;
delete NdbDictionary::Column::RANGE_NO;
delete NdbDictionary::Column::DISK_REF;
delete NdbDictionary::Column::RECORDS_IN_RANGE;
delete NdbDictionary::Column::ROWID;
delete NdbDictionary::Column::ROW_GCI;
NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0;
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0;
NdbDictionary::Column::ROW_COUNT= 0;
NdbDictionary::Column::COMMIT_COUNT= 0;
NdbDictionary::Column::ROW_SIZE= 0;
NdbDictionary::Column::RANGE_NO= 0;
NdbDictionary::Column::DISK_REF= 0;
NdbDictionary::Column::RECORDS_IN_RANGE= 0;
NdbDictionary::Column::ROWID= 0;
NdbDictionary::Column::ROW_GCI= 0;
}
m_globalHash->unlock();
} else { } else {
assert(curr == 0); assert(curr == 0);
} }
...@@ -1486,32 +1457,6 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb, ...@@ -1486,32 +1457,6 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb,
{ {
m_globalHash = &tf->m_globalDictCache; m_globalHash = &tf->m_globalDictCache;
if(m_receiver.setTransporter(ndb, tf)){ if(m_receiver.setTransporter(ndb, tf)){
m_globalHash->lock();
if(f_dictionary_count++ == 0){
NdbDictionary::Column::FRAGMENT=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT");
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT_FIXED_MEMORY");
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT_VARSIZED_MEMORY");
NdbDictionary::Column::ROW_COUNT=
NdbColumnImpl::create_pseudo("NDB$ROW_COUNT");
NdbDictionary::Column::COMMIT_COUNT=
NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT");
NdbDictionary::Column::ROW_SIZE=
NdbColumnImpl::create_pseudo("NDB$ROW_SIZE");
NdbDictionary::Column::RANGE_NO=
NdbColumnImpl::create_pseudo("NDB$RANGE_NO");
NdbDictionary::Column::DISK_REF=
NdbColumnImpl::create_pseudo("NDB$DISK_REF");
NdbDictionary::Column::RECORDS_IN_RANGE=
NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE");
NdbDictionary::Column::ROWID=
NdbColumnImpl::create_pseudo("NDB$ROWID");
NdbDictionary::Column::ROW_GCI=
NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
}
m_globalHash->unlock();
return true; return true;
} }
return false; return false;
......
...@@ -925,8 +925,6 @@ NdbEventOperationImpl::printAll() ...@@ -925,8 +925,6 @@ NdbEventOperationImpl::printAll()
* Each Ndb object has a Object. * Each Ndb object has a Object.
*/ */
// ToDo ref count this so it get's destroyed
NdbMutex *NdbEventBuffer::p_add_drop_mutex= 0;
NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
m_system_nodes(ndb->theImpl->theNoOfDBnodes), m_system_nodes(ndb->theImpl->theNoOfDBnodes),
...@@ -938,7 +936,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : ...@@ -938,7 +936,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
m_max_free_thresh(100), m_max_free_thresh(100),
m_gci_slip_thresh(3), m_gci_slip_thresh(3),
m_dropped_ev_op(0), m_dropped_ev_op(0),
m_active_op_count(0) m_active_op_count(0),
m_add_drop_mutex(0)
{ {
#ifdef VM_TRACE #ifdef VM_TRACE
m_latest_command= "NdbEventBuffer::NdbEventBuffer"; m_latest_command= "NdbEventBuffer::NdbEventBuffer";
...@@ -950,16 +949,6 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : ...@@ -950,16 +949,6 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
exit(-1); exit(-1);
} }
m_mutex= ndb->theImpl->theWaiter.m_mutex; m_mutex= ndb->theImpl->theWaiter.m_mutex;
lock();
if (p_add_drop_mutex == 0)
{
if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) {
ndbout_c("NdbEventBuffer: NdbMutex_Create() failed");
exit(-1);
}
}
unlock();
// ToDo set event buffer size // ToDo set event buffer size
// pre allocate event data array // pre allocate event data array
m_sz= 0; m_sz= 0;
...@@ -969,6 +958,10 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : ...@@ -969,6 +958,10 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
m_free_data= 0; m_free_data= 0;
m_free_data_sz= 0; m_free_data_sz= 0;
// get reference to mutex managed by current connection
m_add_drop_mutex=
m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex;
// initialize lists // initialize lists
bzero(&g_empty_gci_container, sizeof(Gci_container)); bzero(&g_empty_gci_container, sizeof(Gci_container));
init_gci_containers(); init_gci_containers();
...@@ -1006,14 +999,6 @@ NdbEventBuffer::~NdbEventBuffer() ...@@ -1006,14 +999,6 @@ NdbEventBuffer::~NdbEventBuffer()
} }
NdbCondition_Destroy(p_cond); NdbCondition_Destroy(p_cond);
lock();
if (p_add_drop_mutex)
{
NdbMutex_Destroy(p_add_drop_mutex);
p_add_drop_mutex = 0;
}
unlock();
} }
void void
......
...@@ -406,8 +406,8 @@ public: ...@@ -406,8 +406,8 @@ public:
void dropEventOperation(NdbEventOperation *); void dropEventOperation(NdbEventOperation *);
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); } void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); }
void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); } void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); }
void lock() { NdbMutex_Lock(m_mutex); } void lock() { NdbMutex_Lock(m_mutex); }
void unlock() { NdbMutex_Unlock(m_mutex); } void unlock() { NdbMutex_Unlock(m_mutex); }
...@@ -509,6 +509,7 @@ private: ...@@ -509,6 +509,7 @@ private:
NdbEventOperationImpl *m_dropped_ev_op; NdbEventOperationImpl *m_dropped_ev_op;
Uint32 m_active_op_count; Uint32 m_active_op_count;
NdbMutex *m_add_drop_mutex;
}; };
inline inline
......
...@@ -32,8 +32,6 @@ ...@@ -32,8 +32,6 @@
#include <signaldata/TcKeyFailConf.hpp> #include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/TcHbRep.hpp> #include <signaldata/TcHbRep.hpp>
Uint64 g_latest_trans_gci = 0;
/***************************************************************************** /*****************************************************************************
NdbTransaction( Ndb* aNdb ); NdbTransaction( Ndb* aNdb );
...@@ -64,6 +62,7 @@ NdbTransaction::NdbTransaction( Ndb* aNdb ) : ...@@ -64,6 +62,7 @@ NdbTransaction::NdbTransaction( Ndb* aNdb ) :
theTCConPtr(0), theTCConPtr(0),
theTransactionId(0), theTransactionId(0),
theGlobalCheckpointId(0), theGlobalCheckpointId(0),
p_latest_trans_gci(0),
theStatus(NotConnected), theStatus(NotConnected),
theCompletionStatus(NotCompleted), theCompletionStatus(NotCompleted),
theCommitStatus(NotStarted), theCommitStatus(NotStarted),
...@@ -129,6 +128,8 @@ NdbTransaction::init() ...@@ -129,6 +128,8 @@ NdbTransaction::init()
theCompletedLastOp = NULL; theCompletedLastOp = NULL;
theGlobalCheckpointId = 0; theGlobalCheckpointId = 0;
p_latest_trans_gci =
theNdb->theImpl->m_ndb_cluster_connection.get_latest_trans_gci();
theCommitStatus = Started; theCommitStatus = Started;
theCompletionStatus = NotCompleted; theCompletionStatus = NotCompleted;
m_abortOption = AbortOnError; m_abortOption = AbortOnError;
...@@ -1572,7 +1573,7 @@ NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf) ...@@ -1572,7 +1573,7 @@ NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf)
theGlobalCheckpointId = commitConf->gci; theGlobalCheckpointId = commitConf->gci;
// theGlobalCheckpointId == 0 if NoOp transaction // theGlobalCheckpointId == 0 if NoOp transaction
if (theGlobalCheckpointId) if (theGlobalCheckpointId)
g_latest_trans_gci = theGlobalCheckpointId; *p_latest_trans_gci = theGlobalCheckpointId;
return 0; return 0;
} else { } else {
#ifdef NDB_NO_DROPPED_SIGNAL #ifdef NDB_NO_DROPPED_SIGNAL
...@@ -1752,7 +1753,7 @@ from other transactions. ...@@ -1752,7 +1753,7 @@ from other transactions.
theCommitStatus = Committed; theCommitStatus = Committed;
theGlobalCheckpointId = tGCI; theGlobalCheckpointId = tGCI;
assert(tGCI); assert(tGCI);
g_latest_trans_gci = tGCI; *p_latest_trans_gci = tGCI;
} else if ((tNoComp >= tNoSent) && } else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){ (theLastExecOpInList->theCommitIndicator == 1)){
...@@ -1930,7 +1931,7 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf, ...@@ -1930,7 +1931,7 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf,
theCommitStatus = Committed; theCommitStatus = Committed;
theGlobalCheckpointId = tGCI; theGlobalCheckpointId = tGCI;
assert(tGCI); assert(tGCI);
g_latest_trans_gci = tGCI; *p_latest_trans_gci = tGCI;
} else if ((tNoComp >= tNoSent) && } else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){ (theLastExecOpInList->theCommitIndicator == 1)){
/**********************************************************************/ /**********************************************************************/
......
...@@ -35,8 +35,6 @@ ...@@ -35,8 +35,6 @@
#include <EventLogger.hpp> #include <EventLogger.hpp>
EventLogger g_eventLogger; EventLogger g_eventLogger;
static int g_run_connect_thread= 0;
#include <NdbMutex.h> #include <NdbMutex.h>
#ifdef VM_TRACE #ifdef VM_TRACE
NdbMutex *ndb_print_state_mutex= NULL; NdbMutex *ndb_print_state_mutex= NULL;
...@@ -87,8 +85,9 @@ const char *Ndb_cluster_connection::get_connectstring(char *buf, ...@@ -87,8 +85,9 @@ const char *Ndb_cluster_connection::get_connectstring(char *buf,
pthread_handler_t run_ndb_cluster_connection_connect_thread(void *me) pthread_handler_t run_ndb_cluster_connection_connect_thread(void *me)
{ {
g_run_connect_thread= 1; Ndb_cluster_connection_impl* connection= (Ndb_cluster_connection_impl*) me;
((Ndb_cluster_connection_impl*) me)->connect_thread(); connection->m_run_connect_thread= 1;
connection->connect_thread();
return me; return me;
} }
...@@ -258,9 +257,6 @@ unsigned Ndb_cluster_connection::get_connect_count() const ...@@ -258,9 +257,6 @@ unsigned Ndb_cluster_connection::get_connect_count() const
return m_impl.get_connect_count(); return m_impl.get_connect_count();
} }
/* /*
* Ndb_cluster_connection_impl * Ndb_cluster_connection_impl
*/ */
...@@ -269,11 +265,17 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * ...@@ -269,11 +265,17 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
connect_string) connect_string)
: Ndb_cluster_connection(*this), : Ndb_cluster_connection(*this),
m_optimized_node_selection(1), m_optimized_node_selection(1),
m_name(0) m_name(0),
m_run_connect_thread(0),
m_event_add_drop_mutex(0),
m_latest_trans_gci(0)
{ {
DBUG_ENTER("Ndb_cluster_connection"); DBUG_ENTER("Ndb_cluster_connection");
DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this)); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
if (!m_event_add_drop_mutex)
m_event_add_drop_mutex= NdbMutex_Create();
g_eventLogger.createConsoleHandler(); g_eventLogger.createConsoleHandler();
g_eventLogger.setCategory("NdbApi"); g_eventLogger.setCategory("NdbApi");
g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR); g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR);
...@@ -301,6 +303,33 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * ...@@ -301,6 +303,33 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
} }
m_transporter_facade= new TransporterFacade(); m_transporter_facade= new TransporterFacade();
NdbMutex_Lock(g_ndb_connection_mutex);
if(g_ndb_connection_count++ == 0){
NdbDictionary::Column::FRAGMENT=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT");
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT_FIXED_MEMORY");
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY=
NdbColumnImpl::create_pseudo("NDB$FRAGMENT_VARSIZED_MEMORY");
NdbDictionary::Column::ROW_COUNT=
NdbColumnImpl::create_pseudo("NDB$ROW_COUNT");
NdbDictionary::Column::COMMIT_COUNT=
NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT");
NdbDictionary::Column::ROW_SIZE=
NdbColumnImpl::create_pseudo("NDB$ROW_SIZE");
NdbDictionary::Column::RANGE_NO=
NdbColumnImpl::create_pseudo("NDB$RANGE_NO");
NdbDictionary::Column::DISK_REF=
NdbColumnImpl::create_pseudo("NDB$DISK_REF");
NdbDictionary::Column::RECORDS_IN_RANGE=
NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE");
NdbDictionary::Column::ROWID=
NdbColumnImpl::create_pseudo("NDB$ROWID");
NdbDictionary::Column::ROW_GCI=
NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
}
NdbMutex_Unlock(g_ndb_connection_mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -314,7 +343,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() ...@@ -314,7 +343,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
if (m_connect_thread) if (m_connect_thread)
{ {
void *status; void *status;
g_run_connect_thread= 0; m_run_connect_thread= 0;
NdbThread_WaitFor(m_connect_thread, &status); NdbThread_WaitFor(m_connect_thread, &status);
NdbThread_Destroy(&m_connect_thread); NdbThread_Destroy(&m_connect_thread);
m_connect_thread= 0; m_connect_thread= 0;
...@@ -339,6 +368,36 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() ...@@ -339,6 +368,36 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
if (m_name) if (m_name)
free(m_name); free(m_name);
NdbMutex_Lock(g_ndb_connection_mutex);
if(--g_ndb_connection_count == 0){
delete NdbDictionary::Column::FRAGMENT;
delete NdbDictionary::Column::FRAGMENT_FIXED_MEMORY;
delete NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY;
delete NdbDictionary::Column::ROW_COUNT;
delete NdbDictionary::Column::COMMIT_COUNT;
delete NdbDictionary::Column::ROW_SIZE;
delete NdbDictionary::Column::RANGE_NO;
delete NdbDictionary::Column::DISK_REF;
delete NdbDictionary::Column::RECORDS_IN_RANGE;
delete NdbDictionary::Column::ROWID;
delete NdbDictionary::Column::ROW_GCI;
NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0;
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0;
NdbDictionary::Column::ROW_COUNT= 0;
NdbDictionary::Column::COMMIT_COUNT= 0;
NdbDictionary::Column::ROW_SIZE= 0;
NdbDictionary::Column::RANGE_NO= 0;
NdbDictionary::Column::DISK_REF= 0;
NdbDictionary::Column::RECORDS_IN_RANGE= 0;
NdbDictionary::Column::ROWID= 0;
NdbDictionary::Column::ROW_GCI= 0;
}
NdbMutex_Unlock(g_ndb_connection_mutex);
if (m_event_add_drop_mutex)
NdbMutex_Destroy(m_event_add_drop_mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -576,17 +635,23 @@ void Ndb_cluster_connection_impl::connect_thread() ...@@ -576,17 +635,23 @@ void Ndb_cluster_connection_impl::connect_thread()
if (r == -1) { if (r == -1) {
printf("Ndb_cluster_connection::connect_thread error\n"); printf("Ndb_cluster_connection::connect_thread error\n");
DBUG_ASSERT(false); DBUG_ASSERT(false);
g_run_connect_thread= 0; m_run_connect_thread= 0;
} else { } else {
// Wait before making a new connect attempt // Wait before making a new connect attempt
NdbSleep_SecSleep(1); NdbSleep_SecSleep(1);
} }
} while (g_run_connect_thread); } while (m_run_connect_thread);
if (m_connect_callback) if (m_connect_callback)
(*m_connect_callback)(); (*m_connect_callback)();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
Uint64 *
Ndb_cluster_connection::get_latest_trans_gci()
{
m_impl.get_latest_trans_gci();
}
void void
Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter) Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
{ {
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include <ndb_cluster_connection.hpp> #include <ndb_cluster_connection.hpp>
#include <Vector.hpp> #include <Vector.hpp>
#include <NdbMutex.h>
extern NdbMutex *g_ndb_connection_mutex;
static int g_ndb_connection_count = 0;
class TransporterFacade; class TransporterFacade;
class ConfigRetriever; class ConfigRetriever;
...@@ -41,6 +45,9 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection ...@@ -41,6 +45,9 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection
Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter); Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
inline unsigned get_connect_count() const; inline unsigned get_connect_count() const;
public:
inline Uint64 *get_latest_trans_gci() { return &m_latest_trans_gci; }
private: private:
friend class Ndb; friend class Ndb;
friend class NdbImpl; friend class NdbImpl;
...@@ -72,6 +79,9 @@ private: ...@@ -72,6 +79,9 @@ private:
int m_optimized_node_selection; int m_optimized_node_selection;
char *m_name; char *m_name;
int m_run_connect_thread;
NdbMutex *m_event_add_drop_mutex;
Uint64 m_latest_trans_gci;
}; };
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment