Commit 712f4c9a authored by Claes Sjofors's avatar Claes Sjofors

Merge branch 'master'

parents 59fe4ea9 ccef4c7b
......@@ -574,6 +574,13 @@ int sev_server::mainloop()
garbage_collector(0);
time_Aadd(&next_garco, &next_garco, &garco_interval);
}
if (sts == QCOM__ALLOCQUOTA) {
struct timespec r, t;
t.tv_sec = 0;
t.tv_nsec = tmo * 1000000;
nanosleep(&t, &r);
continue;
}
if (sts == QCOM__TMO || !mp)
continue;
......@@ -965,6 +972,7 @@ int sev_server::send_histdata(
qcom_sQid tgt, sev_sMsgHistDataGetRequest* rmsg, unsigned int size)
{
pthread_t thread;
pthread_attr_t attr;
int sts;
sev_sHistDataThread* arg = (sev_sHistDataThread*)malloc(sizeof(*arg));
......@@ -974,8 +982,11 @@ int sev_server::send_histdata(
arg->size = size;
if (m_read_threads) {
printf("New read thread\n");
sts = pthread_create(&thread, NULL, send_histdata_thread, arg);
static int pcnt = 0;
printf("New read thread %d\n", pcnt++);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
sts = pthread_create(&thread, &attr, send_histdata_thread, arg);
if (sts != 0)
printf("pthread_create error %d\n", sts);
} else {
......@@ -1062,11 +1073,13 @@ void* sev_server::send_histdata_thread(void* arg)
if (!qcom_Put(&sts, &tgt, &put)) {
qcom_Free(&sts, put.data);
}
free(tbuf);
free(vbuf);
if (sev->m_read_threads)
sev->m_db->delete_thread(thread);
// pthread_exit( (void *) 1);
pthread_exit( (void *) 1);
return (void*)1;
}
......
......@@ -639,6 +639,7 @@ MYSQL* sev_dbms_env::open_thread(unsigned int* sts)
void sev_dbms_env::close_thread(MYSQL* con)
{
mysql_close(con);
mysql_thread_end();
}
bool sev_dbms_env::exists()
......@@ -1956,6 +1957,10 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
char where_part[200];
int rows = 0;
MYSQL* con;
char startval[8];
char endval[8];
char *startvalue = 0;
char *endvalue = 0;
if (thread)
con = (MYSQL*)thread;
......@@ -2009,16 +2014,39 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
unsigned int endid;
if (starttime) {
if (type == pwr_eType_Boolean) {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, starttime, 1, &startid);
if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, &startid, 0);
get_id_value(thread, item.tablename, startid, type, size,
startval);
startvalue = startval;
} else {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, starttime, 1, &startid);
if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, &startid, 0);
}
} else {
get_id_range(sts, thread, &item, item.options, &startid, 0);
// startid = 0;
}
if (endtime) {
if (type == pwr_eType_Boolean) {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, endtime, 1, &endid);
if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, 0, &endid);
if (endid == 0)
endid = strtoul(row[4], 0, 10);
get_id_value(thread, item.tablename, endid, type, size, endval);
endvalue = endval;
} else {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, endtime, 0, &endid);
......@@ -2026,6 +2054,7 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
get_id_range(sts, thread, &item, item.options, 0, &endid);
if (endid == 0)
endid = strtoul(row[4], 0, 10);
}
} else
endid = strtoul(row[4], 0, 10);
......@@ -2312,13 +2341,20 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
} else
break;
}
int bufrows = rows;
int bufrows = rows + (startvalue != 0) + (endvalue != 0);
if (options & pwr_mSevOptionsMask_ReadOptimized) {
*tbuf = (pwr_tTime*)calloc(bufrows, sizeof(pwr_tTime));
*vbuf = calloc(bufrows, size);
int bcnt = 0;
if (startvalue) {
(*tbuf)[bcnt] = *starttime;
memcpy((*vbuf), startvalue, size);
bcnt++;
}
for (int i = 0; i < rows; i++) {
int j = 0;
......@@ -2384,6 +2420,13 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
// else
// printf( "%5d %5d %s %s\n", i, bcnt, row[0], row[1]);
}
if (endvalue) {
(*tbuf)[bcnt] = *endtime;
memcpy(((char*)*vbuf) + bcnt * size, endvalue, size);
bcnt++;
}
printf("bcnt %d bufrows %d\n", bcnt, bufrows);
*bsize = bcnt;
mysql_free_result(result);
......@@ -3999,6 +4042,48 @@ int sev_dbms::get_closest_time(void* thread, char* tablename,
return 1;
}
int sev_dbms::get_id_value(void* thread, char* tablename,
unsigned int id, pwr_eType type, int size,
void *value)
{
char query[200];
MYSQL* con;
if (thread)
con = (MYSQL*)thread;
else
con = m_env->con();
sprintf(query, "select value from %s where id = %d",
tablename, id);
int rc = mysql_query(con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf("%s Query Error\n", __FUNCTION__);
return SEV__DBERROR;
}
MYSQL_ROW row;
MYSQL_RES* result = mysql_store_result(con);
if (!result) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf("%s Status Result Error\n", __FUNCTION__);
return SEV__DBERROR;
}
row = mysql_fetch_row(result);
if (!row) {
mysql_free_result(result);
return SEV__NOROWS;
} else
cdh_StringToAttrValue(type, row[0], value);
mysql_free_result(result);
return 1;
}
int sev_dbms::get_objectvalues(pwr_tStatus* sts, void* thread, sev_item* item,
unsigned int size, pwr_tTime* starttime, pwr_tTime* endtime, int maxsize,
pwr_tTime** tbuf, void** vbuf, unsigned int* bsize)
......
......@@ -216,6 +216,9 @@ public:
void delete_thread(void* thread);
int get_closest_time(void* thread, char* tablename, unsigned int options,
pwr_tTime* time, int before, unsigned int* id);
int get_id_value(void* thread, char* tablename,
unsigned int id, pwr_eType type, int size,
void *value);
void string_to_mysqlstring(char* in, char* out, int size);
void mysqlstring_to_string(char* in, char* out, int size);
int get_id_range(pwr_tStatus* sts, void* thread, sev_item* item,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment