Commit 9ac63170 authored by tulin@mysql.com's avatar tulin@mysql.com

cleanup and streamlining of thread create/exit in ndb

parent b018755e
......@@ -76,7 +76,7 @@ int NdbThread_WaitFor(struct NdbThread* p_wait_thread, void** status);
*
* * status: exit code
*/
void NdbThread_Exit(int status);
void NdbThread_Exit(void *status);
/**
* Set thread concurrency level
......
......@@ -54,10 +54,7 @@ extern "C" void* thread1func(void* arg)
if (arg1 != 7)
fail("TEST1", "Wrong arg");
NdbThread_Exit(returnvalue);
return NULL;
return returnvalue;
}
// test 2 variables and funcs
......@@ -80,10 +77,7 @@ extern "C" void* test2func(void* arg)
fail("TEST2", "Failed to unlock mutex");
int returnvalue = arg1;
NdbThread_Exit(returnvalue);
return NULL;
return returnvalue;
}
......@@ -129,8 +123,7 @@ extern "C" void* testfunc(void* arg)
}
while(tmpVar<100);
NdbThread_Exit(0);
return NULL;
return 0;
}
extern "C" void* testTryLockfunc(void* arg)
......@@ -169,8 +162,7 @@ extern "C" void* testTryLockfunc(void* arg)
}
while(tmpVar<100);
NdbThread_Exit(0);
return NULL;
return 0;
}
......
......@@ -17,7 +17,7 @@
#include <ndb_global.h>
#include <NdbThread.h>
#include <pthread.h>
#include <my_pthread.h>
#include <NdbMem.h>
#define MAX_THREAD_NAME 16
......@@ -39,21 +39,29 @@ struct NdbThread
static
void*
ndb_thread_wrapper(void* _ss){
void * ret;
struct NdbThread * ss = (struct NdbThread *)_ss;
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER
if (g_ndb_shm_signum)
my_thread_init();
{
sigset_t mask;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
pthread_sigmask(SIG_BLOCK, &mask, 0);
}
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER
if (g_ndb_shm_signum)
{
sigset_t mask;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
pthread_sigmask(SIG_BLOCK, &mask, 0);
}
#endif
ret= (* ss->func)(ss->object);
DBUG_RETURN(ret);
{
void *ret;
struct NdbThread * ss = (struct NdbThread *)_ss;
ret= (* ss->func)(ss->object);
my_thread_end();
NdbThread_Exit(ret);
}
/* will never be reached */
DBUG_RETURN(0);
}
}
......@@ -130,9 +138,9 @@ int NdbThread_WaitFor(struct NdbThread* p_wait_thread, void** status)
}
void NdbThread_Exit(int status)
void NdbThread_Exit(void *status)
{
pthread_exit(&status);
pthread_exit(status);
}
......
......@@ -1104,11 +1104,8 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
static void *
run_start_clients_C(void * me)
{
my_thread_init();
((TransporterRegistry*) me)->start_clients_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
return 0;
}
// Run by kernel thread
......
......@@ -186,11 +186,7 @@ extern "C"
void*
socketServerThread_C(void* _ss){
SocketServer * ss = (SocketServer *)_ss;
my_thread_init();
ss->doRun();
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......@@ -309,11 +305,8 @@ void*
sessionThread_C(void* _sc){
SocketServer::Session * si = (SocketServer::Session *)_sc;
my_thread_init();
if(!transfer(si->m_socket)){
si->m_stopped = true;
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......@@ -325,8 +318,6 @@ sessionThread_C(void* _sc){
}
si->m_stopped = true;
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......
......@@ -82,7 +82,6 @@ static int numAsyncFiles = 0;
extern "C" void * runAsyncFile(void* arg)
{
my_thread_init();
((AsyncFile*)arg)->run();
return (NULL);
}
......@@ -876,8 +875,6 @@ void AsyncFile::endReq()
{
// Thread is ended with return
if (theWriteBuffer) NdbMem_Free(theWriteBuffer);
my_thread_end();
NdbThread_Exit(0);
}
......
......@@ -40,7 +40,6 @@ extern "C" void* runProducer(void*arg)
NdbSleep_MilliSleep(i);
i++;
}
NdbThread_Exit(0);
return NULL;
}
......@@ -58,7 +57,6 @@ extern "C" void* runConsumer(void* arg)
delete p;
}
NdbThread_Exit(0);
return NULL;
}
......@@ -92,7 +90,6 @@ extern "C" void* runProducer2(void*arg)
NdbSleep_MilliSleep(i);
i++;
}
NdbThread_Exit(0);
return NULL;
}
......@@ -111,7 +108,6 @@ extern "C" void* runConsumer2(void* arg)
delete p;
}
ndbout << "Consumer2: " << count << " received" << endl;
NdbThread_Exit(0);
return NULL;
}
......
......@@ -27,10 +27,7 @@
extern "C"
void*
runWatchDog(void* w){
my_thread_init();
((WatchDog*)w)->run();
my_thread_end();
NdbThread_Exit(0);
return NULL;
}
......
......@@ -457,8 +457,6 @@ event_thread_run(void* m)
{
NdbMgmHandle handle= *(NdbMgmHandle*)m;
my_thread_init();
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
int fd = ndb_mgm_listen_event(handle, filter);
if (fd > 0)
......@@ -478,9 +476,7 @@ event_thread_run(void* m)
do_event_thread= -1;
}
my_thread_end();
NdbThread_Exit(0);
return 0;
return NULL;
}
bool
......
......@@ -70,12 +70,7 @@ void *
MgmtSrvr::logLevelThread_C(void* m)
{
MgmtSrvr *mgm = (MgmtSrvr*)m;
my_thread_init();
mgm->logLevelThreadRun();
my_thread_end();
NdbThread_Exit(0);
/* NOTREACHED */
return 0;
}
......@@ -83,12 +78,7 @@ void *
MgmtSrvr::signalRecvThread_C(void *m)
{
MgmtSrvr *mgm = (MgmtSrvr*)m;
my_thread_init();
mgm->signalRecvThreadRun();
my_thread_end();
NdbThread_Exit(0);
/* NOTREACHED */
return 0;
}
......
......@@ -54,7 +54,6 @@ runClusterMgr_C(void * me)
#ifdef NDB_OSE
NdbSleep_MilliSleep(50);
#endif
NdbThread_Exit(0);
return NULL;
}
......@@ -560,10 +559,7 @@ extern "C"
void*
runArbitMgr_C(void* me)
{
my_thread_init();
((ArbitMgr*) me)->threadMain();
my_thread_end();
NdbThread_Exit(0);
return NULL;
}
......
......@@ -405,11 +405,8 @@ extern "C"
void*
runSendRequest_C(void * me)
{
my_thread_init();
((TransporterFacade*) me)->threadMainSend();
my_thread_end();
NdbThread_Exit(0);
return me;
return 0;
}
void TransporterFacade::threadMainSend(void)
......@@ -443,11 +440,8 @@ extern "C"
void*
runReceiveResponse_C(void * me)
{
my_thread_init();
((TransporterFacade*) me)->threadMainReceive();
my_thread_end();
NdbThread_Exit(0);
return me;
return 0;
}
void TransporterFacade::threadMainReceive(void)
......
......@@ -87,11 +87,8 @@ const char *Ndb_cluster_connection::get_connectstring(char *buf,
extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
{
my_thread_init();
g_run_connect_thread= 1;
((Ndb_cluster_connection_impl*) me)->connect_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
}
......
......@@ -984,7 +984,6 @@ void* ThreadExec(void* ThreadData){
delete pMyNdb;
pMyNdb = NULL ;
ThreadReady[thread_no] = 1;
NdbThread_Exit(0) ;
return 0 ;
}//if
......@@ -1197,7 +1196,6 @@ void* ThreadExec(void* ThreadData){
} // for(;;)
delete pMyNdb ;
NdbThread_Exit(0) ;
return 0 ; // Compiler is happy now
return 0 ;
}
......@@ -494,8 +494,7 @@ threadLoop(void* ThreadData)
delete localNdb;
ThreadReady[threadNo] = 1;
NdbThread_Exit(0);
return NULL; // Just to keep compiler happy
return NULL;
}//threadLoop()
static
......
......@@ -617,7 +617,7 @@ static void* flexBenchThread(void* pArg)
free(attrRefValue) ;
free(pOps) ;
delete pNdb ;
NdbThread_Exit(0) ;
return 0; // thread exits
}
pNdb->init();
......@@ -934,8 +934,7 @@ static void* flexBenchThread(void* pArg)
free(longKeyAttrValue);
} // if
NdbThread_Exit(0);
return NULL; // Just to keep compiler happy
return NULL; // Thread exits
}
......
......@@ -612,10 +612,7 @@ flexHammerThread(void* pArg)
flexHammerErrorData->resetErrorCounters();
// And exit using NDBT
NdbThread_Exit(0);
return NULL;
return NULL; // thread exits
} // flexHammerThread
......
......@@ -701,8 +701,7 @@ flexScanThread(void* ThreadData)
free(pkValue);
} // if
NdbThread_Exit(0);
return NULL;
return NULL; // thread exits
} // flexScanThread
......
......@@ -389,8 +389,7 @@ threadLoop(void* ThreadData)
delete localNdb;
ThreadReady[loc_threadNo] = 1;
NdbThread_Exit(0);
return NULL; // Just to keep compiler happy
return NULL; // Thread exits
}//threadLoop()
static
......
......@@ -406,9 +406,8 @@ threadLoop(void* ThreadData)
delete localNdb;
ThreadReady[threadNo] = 1;
NdbThread_Exit(0);
return NULL;
return NULL; // thread exits
}
void executeThread(StartType aType, Ndb* aNdbObject, ThreadNdb* threadInfo)
......
......@@ -710,7 +710,7 @@ static void* flexBenchThread(void* pArg)
the_socket_name,
0) == NULL ) {
ndbout << "failed" << endl;
NdbThread_Exit(0) ;
return 0;
}
ndbout << "ok" << endl;
......@@ -722,7 +722,7 @@ static void* flexBenchThread(void* pArg)
if (r) {
ndbout << "autocommit on/off failed" << endl;
NdbThread_Exit(0) ;
return 0;
}
}
#endif
......@@ -741,7 +741,7 @@ static void* flexBenchThread(void* pArg)
ndbout << threadNo << endl ;
ndbout << "Thread #" << threadNo << " will now exit" << endl ;
tResult = 13 ;
NdbThread_Exit(0) ;
return 0;
}
if (use_ndb) {
......@@ -750,7 +750,7 @@ static void* flexBenchThread(void* pArg)
ndbout << "Failed to get an NDB object" << endl;
ndbout << "Thread #" << threadNo << " will now exit" << endl ;
tResult = 13;
NdbThread_Exit(0) ;
return 0;
}
pNdb->waitUntilReady();
return_ndb_object(pNdb, ndb_id);
......@@ -900,11 +900,11 @@ static void* flexBenchThread(void* pArg)
prep_insert[i] = mysql_prepare(&mysql, buf, pos);
if (prep_insert[i] == 0) {
ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
if (mysql_bind_param(prep_insert[i], bind_insert)) {
ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
}
......@@ -926,11 +926,11 @@ static void* flexBenchThread(void* pArg)
prep_update[i] = mysql_prepare(&mysql, buf, pos);
if (prep_update[i] == 0) {
ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
if (mysql_bind_param(prep_update[i], bind_update)) {
ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
}
......@@ -953,15 +953,15 @@ static void* flexBenchThread(void* pArg)
prep_read[i] = mysql_prepare(&mysql, buf, pos);
if (prep_read[i] == 0) {
ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
if (mysql_bind_param(prep_read[i], bind_read)) {
ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
if (mysql_bind_result(prep_read[i], &bind_read[1])) {
ndbout << "mysql_bind_result: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
}
......@@ -978,11 +978,11 @@ static void* flexBenchThread(void* pArg)
prep_delete[i] = mysql_prepare(&mysql, buf, pos);
if (prep_delete[i] == 0) {
ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
if (mysql_bind_param(prep_delete[i], bind_delete)) {
ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
NdbThread_Exit(0) ;
return 0;
}
}
}
......@@ -1431,8 +1431,7 @@ static void* flexBenchThread(void* pArg)
ndbout << "I got here " << endl;
return_ndb_object(pNdb, ndb_id);
}
NdbThread_Exit(0);
return NULL; // Just to keep compiler happy
return NULL;
}
......
......@@ -274,8 +274,6 @@ threadRoutine(void *arg)
asyncDbDisconnect(pNDB);
NdbThread_Exit(0);
return NULL;
}
......
......@@ -475,7 +475,6 @@ void *
runStep_C(void * s)
{
runStep(s);
NdbThread_Exit(0);
return NULL;
}
......
......@@ -291,7 +291,6 @@ extern "C" void*
copyrun_C(void* copy)
{
((Copy*) copy)->run();
NdbThread_Exit(0);
return 0;
}
......@@ -322,7 +321,6 @@ extern "C" void*
connrun_C(void* conn)
{
((Conn*) conn)->run();
NdbThread_Exit(0);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment