Commit 2b2bdf74 authored by mskold/marty@linux.site's avatar mskold/marty@linux.site

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.1-ndb

into  mysql.com:/windows/Linux_space/MySQL/mysql-5.1-new-ndb
parents 7b99f8a0 c9798697
......@@ -2702,6 +2702,9 @@ private:
Uint32 cnoOfAllocatedPages;
Uint32 m_max_allocate_pages;
/* read ahead in pages during disk order scan */
Uint32 m_max_page_read_ahead;
Tablerec *tablerec;
Uint32 cnoOfTablerec;
......
......@@ -344,6 +344,18 @@ void Dbtup::execREAD_CONFIG_REQ(Signal* signal)
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch));
c_scanLockPool.setSize(nScanOp * nScanBatch);
/* read ahead for disk scan can not be more that disk page buffer */
{
Uint64 tmp = 64*1024*1024;
ndb_mgm_get_int64_parameter(p, CFG_DB_DISK_PAGE_BUFFER_MEMORY, &tmp);
m_max_page_read_ahead = (tmp + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE; // in pages
// never read ahead more than 32 pages
if (m_max_page_read_ahead > 32)
m_max_page_read_ahead = 32;
}
ScanOpPtr lcp;
ndbrequire(c_scanOpPool.seize(lcp));
new (lcp.p) ScanOp();
......
......@@ -687,13 +687,74 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
// move to next extent
jam();
pos.m_extent_info_ptr_i = ext_ptr.i;
Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
key.m_file_no = ext->m_key.m_file_no;
key.m_page_no = ext->m_first_page_no;
}
}
key.m_page_idx = 0;
pos.m_get = ScanPos::Get_page_dd;
/*
read ahead for scan in disk order
do read ahead every 8:th page
*/
if ((bits & ScanOp::SCAN_DD) &&
(((key.m_page_no - ext->m_first_page_no) & 7) == 0))
{
jam();
// initialize PGMAN request
Page_cache_client::Request preq;
preq.m_page = pos.m_key;
preq.m_callback = TheNULLCallback;
// set maximum read ahead
Uint32 read_ahead = m_max_page_read_ahead;
while (true)
{
// prepare page read ahead in current extent
Uint32 page_no = preq.m_page.m_page_no;
Uint32 page_no_limit = page_no + read_ahead;
Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
if (page_no_limit > limit)
{
jam();
// read ahead crosses extent, set limit for this extent
read_ahead = page_no_limit - limit;
page_no_limit = limit;
// and make sure we only read one extra extent next time around
if (read_ahead > alloc.m_extent_size)
read_ahead = alloc.m_extent_size;
}
else
{
jam();
read_ahead = 0; // no more to read ahead after this
}
// do read ahead pages for this extent
while (page_no < page_no_limit)
{
// page request to PGMAN
jam();
preq.m_page.m_page_no = page_no;
int flags = 0;
// ignore result
m_pgman.get_page(signal, preq, flags);
jamEntry();
page_no++;
}
if (!read_ahead || !list.next(ext_ptr))
{
// no more extents after this or read ahead done
jam();
break;
}
// move to next extent and initialize PGMAN request accordingly
Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
preq.m_page.m_file_no = ext->m_key.m_file_no;
preq.m_page.m_page_no = ext->m_first_page_no;
}
} // if ScanOp::SCAN_DD read ahead
}
/*FALLTHRU*/
case ScanPos::Get_page_dd:
......@@ -726,6 +787,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
safe_cast(&Dbtup::disk_page_tup_scan_callback);
int flags = 0;
int res = m_pgman.get_page(signal, preq, flags);
jamEntry();
if (res == 0) {
jam();
// request queued
......
......@@ -122,7 +122,7 @@ Pgman::execREAD_CONFIG_REQ(Signal* signal)
if (page_buffer > 0)
{
page_buffer /= GLOBAL_PAGE_SIZE; // in pages
page_buffer = (page_buffer + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE; // in pages
m_param.m_max_pages = page_buffer;
m_page_entry_pool.setSize(m_param.m_lirs_stack_mult * page_buffer);
m_param.m_max_hot_pages = (page_buffer * 9) / 10;
......@@ -144,7 +144,7 @@ Pgman::Param::Param() :
m_lirs_stack_mult(10),
m_max_hot_pages(56),
m_max_loop_count(256),
m_max_io_waits(64),
m_max_io_waits(256),
m_stats_loop_delay(1000),
m_cleanup_loop_delay(200),
m_lcp_loop_delay(0)
......
......@@ -1658,6 +1658,11 @@ SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
}
SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
void
SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
{ abort(); /* should never be called */ }
SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
{ &SimulatedBlock::TheNULLCallbackFunction, 0 };
void
SimulatedBlock::sendFragmentedSignal(BlockReference ref,
......
......@@ -131,6 +131,8 @@ public:
virtual const char* get_filename(Uint32 fd) const { return "";}
protected:
static Callback TheEmptyCallback;
void TheNULLCallbackFunction(class Signal*, Uint32, Uint32);
static Callback TheNULLCallback;
void execute(Signal* signal, Callback & c, Uint32 returnCode);
......@@ -599,6 +601,8 @@ inline
void
SimulatedBlock::execute(Signal* signal, Callback & c, Uint32 returnCode){
CallbackFunction fun = c.m_callbackFunction;
if (fun == TheNULLCallback.m_callbackFunction)
return;
ndbrequire(fun != 0);
c.m_callbackFunction = NULL;
(this->*fun)(signal, c.m_callbackData, returnCode);
......
......@@ -30,9 +30,11 @@ int main(int argc, const char** argv){
const char* _tabname = NULL;
int _help = 0;
int _batch = 512;
const char* db = "TEST_DB";
struct getargs args[] = {
{ "batch", 'b', arg_integer, &_batch, "Number of operations in each transaction", "batch" },
{ "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
......@@ -55,7 +57,7 @@ int main(int argc, const char** argv){
{
return NDBT_ProgramExit(NDBT_FAILED);
}
Ndb MyNdb(&con, "TEST_DB" );
Ndb MyNdb(&con, db);
if(MyNdb.init() != 0){
ERR(MyNdb.getNdbError());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment