Commit 35b2f212 authored by unknown's avatar unknown

Bug #28751 Lots of memory locked in memory causes high kswapd

- add odirect option for lcp+backup+redo log to lower CPU/kswapd usage
- writing odirect removes need for kernel write buffers avoiding kswapd to kick in


mysql-test/ndb/ndb_config_2_node.ini:
  run mysql-test-run using ODirect
storage/ndb/include/mgmapi/mgmapi_config_parameters.h:
  add new config parameter to choose ODirect
storage/ndb/include/ndb_global.h.in:
  specify alignment needed for odirect
storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  read odirect config param
  open LCP and Backup datafiles with odirect if specified
  insert empty padding record if odirect is used
  allocate buffers aligned to be able to use odirect
storage/ndb/src/kernel/blocks/backup/Backup.hpp:
  odirect and padding options
storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp:
  add empty_record in file format
storage/ndb/src/kernel/blocks/backup/BackupInit.cpp:
  read odirect config and allocate aligned
storage/ndb/src/kernel/blocks/backup/FsBuffer.hpp:
  correct debug printouts
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  read odirect config param and align buffers
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp:
  read odirect config param and align buffers
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  read config params and open redo log files with odirect if set
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp:
  aligned writing for odirect
  correct odirect open options with test+fallback if odirect fails
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp:
  align + odirect check
storage/ndb/src/kernel/blocks/restore.cpp:
  restor block to ignore new lcp padding empty_record
storage/ndb/src/kernel/vm/SimulatedBlock.cpp:
  alligend log buffer allocation for odirect
storage/ndb/src/kernel/vm/SimulatedBlock.hpp:
  alligend log buffer allocation for odirect
storage/ndb/src/mgmsrv/ConfigInfo.cpp:
  new config param for odirect, default false
storage/ndb/tools/restore/Restore.cpp:
  ndb_restore to skip empty_record alignment padding in backup file
parent 1182b801
......@@ -12,6 +12,7 @@ MaxNoOfAttributes= CHOOSE_MaxNoOfAttributes
TimeBetweenGlobalCheckpoints= 500
NoOfFragmentLogFiles= 3
DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory
ODirect= 1
# the following parametes just function as a small regression
# test that the parameter exists
InitialNoOfOpenFiles= 27
......
......@@ -115,6 +115,8 @@
#define CFG_DB_MEMREPORT_FREQUENCY 166
#define CFG_DB_O_DIRECT 168
#define CFG_DB_SGA 198 /* super pool mem */
#define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */
......
......@@ -146,4 +146,6 @@ extern "C" {
#define MAX(x,y) (((x)>(y))?(x):(y))
#endif
#define NDB_O_DIRECT_WRITE_ALIGNMENT 512
#endif
......@@ -2761,6 +2761,8 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr)
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
filePtr.p->m_flags |= BackupFile::BF_OPENING;
if (c_defaults.m_o_direct)
req->fileFlags |= FsOpenReq::OM_DIRECT;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
......@@ -3735,12 +3737,31 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo)
}
bool
Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo)
Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record)
{
Uint32 * tmp;
const Uint32 footSz = sizeof(BackupFormat::DataFile::FragmentFooter) >> 2;
Uint32 sz = footSz + 1;
if(dataBuffer.getWritePtr(&tmp, footSz + 1)) {
if (fill_record)
{
Uint32 * new_tmp;
if (!dataBuffer.getWritePtr(&tmp, sz))
return false;
new_tmp = tmp + sz;
if ((UintPtr)new_tmp & (sizeof(Page32)-1))
{
/* padding is needed to get full write */
new_tmp += 2 /* to fit empty header minimum 2 words*/;
new_tmp = (Uint32 *)(((UintPtr)new_tmp + sizeof(Page32)-1) &
~(UintPtr)(sizeof(Page32)-1));
/* new write sz */
sz = new_tmp - tmp;
}
}
if(dataBuffer.getWritePtr(&tmp, sz)) {
jam();
* tmp = 0; // Finish record stream
tmp++;
......@@ -3752,7 +3773,17 @@ Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo)
foot->FragmentNo = htonl(fragNo);
foot->NoOfRecords = htonl(noOfRecords);
foot->Checksum = htonl(0);
dataBuffer.updateWritePtr(footSz + 1);
if (sz != footSz + 1)
{
tmp += footSz;
memset(tmp, 0, (sz - footSz - 1) * 4);
*tmp = htonl(BackupFormat::EMPTY_ENTRY);
tmp++;
*tmp = htonl(sz - footSz - 1);
}
dataBuffer.updateWritePtr(sz);
return true;
}//if
return false;
......@@ -3854,8 +3885,13 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
return;
}//if
BackupRecordPtr ptr LINT_SET_PTR;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
OperationRecord & op = filePtr.p->operation;
if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo)) {
if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo,
c_defaults.m_o_direct))
{
jam();
signal->theData[0] = BackupContinueB::BUFFER_FULL_FRAG_COMPLETE;
signal->theData[1] = filePtr.i;
......@@ -3865,9 +3901,6 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
BackupRecordPtr ptr LINT_SET_PTR;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
if (ptr.p->is_lcp())
{
ptr.p->slaveState.setState(STOPPING);
......@@ -4905,6 +4938,8 @@ Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr)
FsOpenReq::OM_CREATE |
FsOpenReq::OM_APPEND |
FsOpenReq::OM_AUTOSYNC;
if (c_defaults.m_o_direct)
req->fileFlags |= FsOpenReq::OM_DIRECT;
FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
req->auto_sync_size = c_defaults.m_disk_synch_size;
......
......@@ -240,7 +240,7 @@ public:
* Once per fragment
*/
bool newFragment(Uint32 tableId, Uint32 fragNo);
bool fragComplete(Uint32 tableId, Uint32 fragNo);
bool fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record);
/**
* Once per scan frag (next) req/conf
......@@ -534,6 +534,7 @@ public:
Uint32 m_disk_write_speed;
Uint32 m_disk_synch_size;
Uint32 m_diskless;
Uint32 m_o_direct;
};
/**
......
......@@ -32,7 +32,8 @@ struct BackupFormat {
TABLE_LIST = 4,
TABLE_DESCRIPTION = 5,
GCP_ENTRY = 6,
FRAGMENT_INFO = 7
FRAGMENT_INFO = 7,
EMPTY_ENTRY = 8
};
struct FileHeader {
......@@ -93,6 +94,13 @@ struct BackupFormat {
Uint32 NoOfRecords;
Uint32 Checksum;
};
/* optional padding for O_DIRECT */
struct EmptyEntry {
Uint32 SectionType;
Uint32 SectionLength;
/* not used data */
};
};
/**
......
......@@ -148,10 +148,13 @@ Backup::execREAD_CONFIG_REQ(Signal* signal)
c_defaults.m_disk_write_speed = 10 * (1024 * 1024);
c_defaults.m_disk_write_speed_sr = 100 * (1024 * 1024);
c_defaults.m_disk_synch_size = 4 * (1024 * 1024);
c_defaults.m_o_direct = true;
Uint32 noBackups = 0, noTables = 0, noAttribs = 0, noFrags = 0;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS,
&c_defaults.m_diskless));
ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT,
&c_defaults.m_o_direct);
ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED_SR,
&c_defaults.m_disk_write_speed_sr);
ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED,
......@@ -204,7 +207,7 @@ Backup::execREAD_CONFIG_REQ(Signal* signal)
/ sizeof(Page32);
// We need to allocate an additional of 2 pages. 1 page because of a bug in
// ArrayPool and another one for DICTTAINFO.
c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2);
c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2, true);
{ // Init all tables
SLList<Table> tables(c_tablePool);
......
......@@ -270,8 +270,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
* ptr = &Tp[Tr];
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d",
Tr, Tw, Ts, Tm, sz1, * sz));
DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d",
Tr, Tmw, Ts, Tm, sz1, * sz));
return true;
}
......@@ -279,8 +279,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
if(!m_eof){
* _eof = false;
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> false",
Tr, Tw, Ts, Tm, sz1));
DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> false",
Tr, Tmw, Ts, Tm, sz1));
return false;
}
......@@ -289,8 +289,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
* _eof = true;
* ptr = &Tp[Tr];
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d eof",
Tr, Tw, Ts, Tm, sz1, * sz));
DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d eof",
Tr, Tmw, Ts, Tm, sz1, * sz));
return false;
}
......@@ -316,13 +316,13 @@ FsBuffer::getWritePtr(Uint32 ** ptr, Uint32 sz){
if(sz1 > sz){ // Note at least 1 word of slack
* ptr = &Tp[Tw];
DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> true",
sz, Tr, Tw, Ts, sz1));
DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> true",
sz, Tw, sz1));
return true;
}
DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> false",
sz, Tr, Tw, Ts, sz1));
DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> false",
sz, Tw, sz1));
return false;
}
......@@ -339,11 +339,15 @@ FsBuffer::updateWritePtr(Uint32 sz){
m_free -= sz;
if(Tnew < Ts){
m_writeIndex = Tnew;
DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
sz, m_writeIndex));
return;
}
memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2);
m_writeIndex = Tnew - Ts;
DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
sz, m_writeIndex));
}
inline
......
......@@ -115,9 +115,6 @@ class Dbtup;
/* ------------------------------------------------------------------------- */
/* VARIOUS CONSTANTS USED AS FLAGS TO THE FILE MANAGER. */
/* ------------------------------------------------------------------------- */
#define ZOPEN_READ 0
#define ZOPEN_WRITE 1
#define ZOPEN_READ_WRITE 2
#define ZVAR_NO_LOG_PAGE_WORD 1
#define ZLIST_OF_PAIRS 0
#define ZLIST_OF_PAIRS_SYNCH 16
......@@ -2686,6 +2683,7 @@ private:
UintR clfoFileSize;
LogPageRecord *logPageRecord;
void *logPageRecordUnaligned;
LogPageRecordPtr logPagePtr;
UintR cfirstfreeLogPage;
UintR clogPageFileSize;
......@@ -2889,6 +2887,7 @@ private:
UintR ctransidHash[1024];
Uint32 c_diskless;
Uint32 c_o_direct;
Uint32 c_error_insert_table_id;
public:
......
......@@ -49,6 +49,7 @@ void Dblqh::initData()
logFileRecord = 0;
logFileOperationRecord = 0;
logPageRecord = 0;
logPageRecordUnaligned= 0;
pageRefRecord = 0;
tablerec = 0;
tcConnectionrec = 0;
......@@ -105,10 +106,13 @@ void Dblqh::initRecords()
sizeof(LogFileOperationRecord),
clfoFileSize);
logPageRecord = (LogPageRecord*)allocRecord("LogPageRecord",
sizeof(LogPageRecord),
clogPageFileSize,
false);
logPageRecord =
(LogPageRecord*)allocRecordAligned("LogPageRecord",
sizeof(LogPageRecord),
clogPageFileSize,
&logPageRecordUnaligned,
NDB_O_DIRECT_WRITE_ALIGNMENT,
false);
pageRefRecord = (PageRefRecord*)allocRecord("PageRefRecord",
sizeof(PageRefRecord),
......@@ -378,7 +382,7 @@ Dblqh::~Dblqh()
sizeof(LogFileOperationRecord),
clfoFileSize);
deallocRecord((void**)&logPageRecord,
deallocRecord((void**)&logPageRecordUnaligned,
"LogPageRecord",
sizeof(LogPageRecord),
clogPageFileSize);
......
......@@ -1015,6 +1015,8 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_diskless));
c_o_direct = true;
ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT, &c_o_direct);
Uint32 tmp= 0;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &tmp));
......@@ -13243,7 +13245,9 @@ void Dblqh::openFileRw(Signal* signal, LogFileRecordPtr olfLogFilePtr)
signal->theData[3] = olfLogFilePtr.p->fileName[1];
signal->theData[4] = olfLogFilePtr.p->fileName[2];
signal->theData[5] = olfLogFilePtr.p->fileName[3];
signal->theData[6] = ZOPEN_READ_WRITE | FsOpenReq::OM_AUTOSYNC;
signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//Dblqh::openFileRw()
......@@ -13263,7 +13267,9 @@ void Dblqh::openLogfileInit(Signal* signal)
signal->theData[3] = logFilePtr.p->fileName[1];
signal->theData[4] = logFilePtr.p->fileName[2];
signal->theData[5] = logFilePtr.p->fileName[3];
signal->theData[6] = 0x302 | FsOpenReq::OM_AUTOSYNC;
signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//Dblqh::openLogfileInit()
......@@ -13299,7 +13305,9 @@ void Dblqh::openNextLogfile(Signal* signal)
signal->theData[3] = onlLogFilePtr.p->fileName[1];
signal->theData[4] = onlLogFilePtr.p->fileName[2];
signal->theData[5] = onlLogFilePtr.p->fileName[3];
signal->theData[6] = 2 | FsOpenReq::OM_AUTOSYNC;
signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//if
......
......@@ -163,7 +163,12 @@ AsyncFile::run()
theStartFlag = true;
// Create write buffer for bigger writes
theWriteBufferSize = WRITEBUFFERSIZE;
theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize);
theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
NDB_O_DIRECT_WRITE_ALIGNMENT-1);
theWriteBuffer = (char *)
(((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
NdbMutex_Unlock(theStartMutexPtr);
NdbCondition_Signal(theStartConditionPtr);
......@@ -247,6 +252,78 @@ AsyncFile::run()
static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
#endif
int
AsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode)
{
assert(new_flags & (O_CREAT | O_TRUNC));
#ifdef O_DIRECT
int ret;
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
(errno == EINTR));
if (ret == -1)
{
new_flags &= ~O_DIRECT;
ndbout_c("%s Failed to write using O_DIRECT, disabling",
theFileName.c_str());
}
close(theFd);
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
return errno;
#endif
return 0;
}
int
AsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode)
{
#ifdef O_DIRECT
int ret;
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
(errno == EINTR));
if (ret == -1)
{
ndbout_c("%s Failed to read using O_DIRECT, disabling",
theFileName.c_str());
goto reopen;
}
if(lseek(theFd, 0, SEEK_SET) != 0)
{
return errno;
}
if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
{
struct stat buf;
if ((fstat(theFd, &buf) == -1))
{
return errno;
}
else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0)
{
ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
theFileName.c_str(), GLOBAL_PAGE_SIZE);
goto reopen;
}
}
return 0;
reopen:
close(theFd);
new_flags &= ~O_DIRECT;
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
return errno;
#endif
return 0;
}
void AsyncFile::openReq(Request* request)
{
m_auto_sync_freq = 0;
......@@ -312,7 +389,7 @@ void AsyncFile::openReq(Request* request)
}
#else
Uint32 flags = request->par.open.flags;
Uint32 new_flags = 0;
int new_flags = 0;
// Convert file open flags from Solaris to Liux
if (flags & FsOpenReq::OM_CREATE)
......@@ -343,10 +420,6 @@ void AsyncFile::openReq(Request* request)
{
new_flags |= O_DIRECT;
}
#elif defined O_SYNC
{
flags |= FsOpenReq::OM_SYNC;
}
#endif
if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
......@@ -355,15 +428,19 @@ void AsyncFile::openReq(Request* request)
new_flags |= O_SYNC;
#endif
}
const char * rw = "";
switch(flags & 0x3){
case FsOpenReq::OM_READONLY:
rw = "r";
new_flags |= O_RDONLY;
break;
case FsOpenReq::OM_WRITEONLY:
rw = "w";
new_flags |= O_WRONLY;
break;
case FsOpenReq::OM_READWRITE:
rw = "rw";
new_flags |= O_RDWR;
break;
default:
......@@ -404,11 +481,6 @@ no_odirect:
if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
#ifdef O_SYNC
if (! (flags & FsOpenReq::OM_INIT))
new_flags |= O_SYNC;
#endif
goto no_odirect;
}
#endif
......@@ -421,11 +493,6 @@ no_odirect:
else if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
#ifdef O_SYNC
if (! (flags & FsOpenReq::OM_INIT))
new_flags |= O_SYNC;
#endif
goto no_odirect;
}
#endif
......@@ -512,7 +579,6 @@ no_odirect:
{
ndbout_c("error on first write(%d), disable O_DIRECT", err);
new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
close(theFd);
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd != -1)
......@@ -532,26 +598,32 @@ no_odirect:
else if (flags & FsOpenReq::OM_DIRECT)
{
#ifdef O_DIRECT
do {
int ret;
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) && (errno == EINTR));
if (ret == -1)
{
ndbout_c("%s Failed to read using O_DIRECT, disabling", theFileName.c_str());
flags |= FsOpenReq::OM_SYNC;
flags |= FsOpenReq::OM_INIT;
break;
}
if(lseek(theFd, 0, SEEK_SET) != 0)
{
request->error = errno;
return;
}
} while (0);
if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
{
request->error = check_odirect_write(flags, new_flags, mode);
}
else
{
request->error = check_odirect_read(flags, new_flags, mode);
}
if (request->error)
return;
#endif
}
#ifdef VM_TRACE
if (flags & FsOpenReq::OM_DIRECT)
{
#ifdef O_DIRECT
ndbout_c("%s %s O_DIRECT: %d",
theFileName.c_str(), rw,
!!(new_flags & O_DIRECT));
#else
ndbout_c("%s %s O_DIRECT: 0",
theFileName.c_str(), rw);
#endif
}
#endif
if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
{
#ifdef O_SYNC
......@@ -562,6 +634,10 @@ no_odirect:
new_flags &= ~(O_CREAT | O_TRUNC);
new_flags |= O_SYNC;
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
{
request->error = errno;
}
#endif
}
#endif
......@@ -1079,7 +1155,8 @@ AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
void AsyncFile::endReq()
{
// Thread is ended with return
if (theWriteBuffer) ndbd_free(theWriteBuffer, theWriteBufferSize);
if (theWriteBufferUnaligned)
ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
}
......
......@@ -232,9 +232,13 @@ private:
bool theStartFlag;
int theWriteBufferSize;
char* theWriteBuffer;
void* theWriteBufferUnaligned;
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
int check_odirect_read(Uint32 flags, int&new_flags, int mode);
int check_odirect_write(Uint32 flags, int&new_flags, int mode);
public:
SimulatedBlock& m_fs;
Ptr<GlobalPage> m_page_ptr;
......
......@@ -559,6 +559,9 @@ Restore::restore_next(Signal* signal, FilePtr file_ptr)
case BackupFormat::GCP_ENTRY:
parse_gcp_entry(signal, file_ptr, data, len);
break;
case BackupFormat::EMPTY_ENTRY:
// skip
break;
case 0x4e444242: // 'NDBB'
if (check_file_version(signal, ntohl(* (data+2))) == 0)
{
......
......@@ -39,6 +39,9 @@
#include <AttributeDescriptor.hpp>
#include <NdbSqlUtil.hpp>
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
#define ljamEntry() jamEntryLine(30000 + __LINE__)
#define ljam() jamLine(30000 + __LINE__)
......@@ -656,13 +659,19 @@ SimulatedBlock::getBatSize(Uint16 blockNo){
return sb->theBATSize;
}
void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
{
return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
}
void*
SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
{
void * p = NULL;
size_t size = n*s;
Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s);
Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
size_t size = n*s + over_alloc;
Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
refresh_watch_dog(9);
if (real_size > 0){
#ifdef VM_TRACE_MEM
......@@ -705,6 +714,16 @@ SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, U
refresh_watch_dog(9);
memset(ptr, 0, size);
}
if (unaligned_buffer)
{
*unaligned_buffer = p;
p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
#ifdef VM_TRACE
g_eventLogger.info("'%s' (%u) %llu %llu, alignment correction %u bytes",
type, align, (Uint64)p, (Uint64)p+n*s,
(Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
#endif
}
}
return p;
}
......
......@@ -378,6 +378,7 @@ protected:
*
*/
void* allocRecord(const char * type, size_t s, size_t n, bool clear = true, Uint32 paramId = 0);
void* allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align = NDB_O_DIRECT_WRITE_ALIGNMENT, bool clear = true, Uint32 paramId = 0);
/**
* Deallocate record
......
......@@ -1313,6 +1313,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_O_DIRECT,
"ODirect",
DB_TOKEN,
"Use O_DIRECT file write/read when possible",
ConfigInfo::CI_USED,
true,
ConfigInfo::CI_BOOL,
"false",
"false",
"true"},
/***************************************************************************
* API
***************************************************************************/
......
......@@ -867,13 +867,32 @@ bool RestoreDataIterator::readFragmentHeader(int & ret, Uint32 *fragmentId)
debug << "RestoreDataIterator::getNextFragment" << endl;
if (buffer_read(&Header, sizeof(Header), 1) != 1){
while (1)
{
/* read first part of header */
if (buffer_read(&Header, 8, 1) != 1)
{
ret = 0;
return false;
} // if
/* skip if EMPTY_ENTRY */
Header.SectionType = ntohl(Header.SectionType);
Header.SectionLength = ntohl(Header.SectionLength);
if (Header.SectionType == BackupFormat::EMPTY_ENTRY)
{
void *tmp;
buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
continue;
}
break;
}
/* read rest of header */
if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1)
{
ret = 0;
return false;
} // if
Header.SectionType = ntohl(Header.SectionType);
Header.SectionLength = ntohl(Header.SectionLength);
}
Header.TableId = ntohl(Header.TableId);
Header.FragmentNo = ntohl(Header.FragmentNo);
Header.ChecksumType = ntohl(Header.ChecksumType);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment