Commit e053924a authored by unknown's avatar unknown

ha_ndb blobs

parent e1309e18
......@@ -311,7 +311,7 @@ public:
ExtDatetime = NdbSqlUtil::Type::Datetime,
ExtTimespec = NdbSqlUtil::Type::Timespec,
ExtBlob = NdbSqlUtil::Type::Blob,
ExtClob = NdbSqlUtil::Type::Clob
ExtText = NdbSqlUtil::Type::Text
// Attribute data interpretation
......@@ -435,7 +435,7 @@ public:
AttributeArraySize = 12 * AttributeExtLength;
return true;
case DictTabInfo::ExtBlob:
case DictTabInfo::ExtClob:
case DictTabInfo::ExtText:
AttributeType = DictTabInfo::StringType;
AttributeSize = DictTabInfo::an8Bit;
// head + inline part [ attr precision ]
......@@ -50,24 +50,33 @@ class NdbColumnImpl;
* - closed: after transaction commit
* - invalid: after rollback or transaction close
* NdbBlob supports 2 styles of data access:
* NdbBlob supports 3 styles of data access:
* - in prepare phase, NdbBlob methods getValue and setValue are used to
* prepare a read or write of a single blob value of known size
* prepare a read or write of a blob value of known size
* - in active phase, NdbBlob methods readData and writeData are used to
* read or write blob data of undetermined size
* - in prepare phase, setActiveHook is used to define a routine which
* is invoked as soon as the handle becomes active
* - in active phase, readData and writeData are used to read or write
* blob data of arbitrary size
* The styles can be applied in combination (in above order).
* Blob operations take effect at next transaction execute. In some
* cases NdbBlob is forced to do implicit executes. To avoid this,
* operate on complete blob parts.
* Use NdbConnection::executePendingBlobOps to flush your reads and
* writes. It avoids execute penalty if nothing is pending. It is not
* needed after execute (obviously) or after next scan result.
* NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary.
* Notes:
* - table and its blob part tables are not created atomically
* - blob data operations take effect at next transaction execute
* - NdbBlob may need to do implicit executes on the transaction
* - read and write of complete parts is much more efficient
* - scan must use the "new" interface NdbScanOperation
* - scan with blobs applies hold-read-lock (at minimum)
* - to update a blob in a read op requires exclusive tuple lock
* - update op in scan must do its own getBlobHandle
* - delete creates implicit, not-accessible blob handles
......@@ -78,12 +87,16 @@ class NdbColumnImpl;
* - scan must use exclusive locking for now
* Todo:
* - add scan method hold-read-lock-until-next + return-keyinfo
* - better check of keyinfo length when setting keys
* - better check of allowed blob op vs locking mode
* - add scan method hold-read-lock + return-keyinfo
* - check keyinfo length when setting keys
* - check allowed blob ops vs locking mode
* - overload control (too many pending ops)
class NdbBlob {
* State.
enum State {
Idle = 0,
Prepared = 1,
......@@ -92,9 +105,15 @@ public:
Invalid = 9
State getState();
* Inline blob header.
struct Head {
Uint64 length;
* Prepare to read blob value. The value is available after execute.
* Use isNull to check for NULL and getLength to get the real length
* Use getNull to check for NULL and getLength to get the real length
* and to check for truncation. Sets current read/write position to
* after the data read.
......@@ -106,6 +125,20 @@ public:
* data to null pointer (0) to create a NULL value.
int setValue(const void* data, Uint32 bytes);
* Callback for setActiveHook. Invoked immediately when the prepared
* operation has been executed (but not committed). Any getValue or
* setValue is done first. The blob handle is active so readData or
* writeData etc can be used to manipulate blob value. A user-defined
* argument is passed along. Returns non-zero on error.
typedef int ActiveHook(NdbBlob* me, void* arg);
* Define callback for blob handle activation. The queue of prepared
* operations will be executed in no commit mode up to this point and
* then the callback is invoked.
int setActiveHook(ActiveHook* activeHook, void* arg);
* Check if blob is null.
......@@ -115,7 +148,7 @@ public:
int setNull();
* Get current length in bytes. Use isNull to distinguish between
* Get current length in bytes. Use getNull to distinguish between
* length 0 blob and NULL blob.
int getLength(Uint64& length);
......@@ -180,6 +213,13 @@ public:
static const int ErrAbort = 4268;
// "Unknown blob error"
static const int ErrUnknown = 4269;
* Return info about all blobs in this operation.
// Get first blob in list
NdbBlob* blobsFirstBlob();
// Get next blob in list after this one
NdbBlob* blobsNextBlob();
friend class Ndb;
......@@ -214,10 +254,11 @@ private:
bool theSetFlag;
const char* theSetBuf;
Uint32 theGetSetBytes;
// head
struct Head {
Uint64 length;
// pending ops
Uint8 thePendingBlobOps;
// activation callback
ActiveHook* theActiveHook;
void* theActiveHookArg;
// buffers
struct Buf {
char* data;
......@@ -235,7 +276,6 @@ private:
char* theInlineData;
NdbRecAttr* theHeadInlineRecAttr;
bool theHeadInlineUpdateFlag;
bool theNewPartFlag;
// length and read/write position
int theNullFlag;
Uint64 theLength;
......@@ -276,6 +316,11 @@ private:
int insertParts(const char* buf, Uint32 part, Uint32 count);
int updateParts(const char* buf, Uint32 part, Uint32 count);
int deleteParts(Uint32 part, Uint32 count);
// pending ops
int executePendingBlobReads();
int executePendingBlobWrites();
// callbacks
int invokeActiveHook();
// blob handle maintenance
int atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
int preExecute(ExecType anExecType, bool& batch);
......@@ -287,6 +332,7 @@ private:
void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
void setErrorCode(NdbConnection* aCon, bool invalidFlag = true);
#ifdef VM_TRACE
int getOperationType() const;
friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
......@@ -431,6 +431,15 @@ public:
/** @} *********************************************************************/
* Execute the transaction in NoCommit mode if there are any not-yet
* executed blob part operations of given types. Otherwise do
* nothing. The flags argument is bitwise OR of (1 << optype) where
* optype comes from NdbOperation::OperationType. Only the basic PK
* ops are used (read, insert, update, delete).
int executePendingBlobOps(Uint8 flags = 0xFF);
* Release completed operations
......@@ -642,6 +651,7 @@ private:
Uint32 theBuddyConPtr;
// optim: any blobs
bool theBlobFlag;
Uint8 thePendingBlobOps;
static void sendTC_COMMIT_ACK(NdbApiSignal *,
Uint32 transId1, Uint32 transId2,
......@@ -869,6 +879,21 @@ NdbConnection::OpSent()
void executePendingBlobOps();
#include <stdlib.h>
NdbConnection::executePendingBlobOps(Uint8 flags)
if (thePendingBlobOps & flags) {
// not executeNoBlobs because there can be new ops with blobs
return execute(NoCommit);
return 0;
......@@ -876,5 +901,3 @@ NdbConnection::ptr2int(){
......@@ -183,7 +183,7 @@ public:
Datetime, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes )
Timespec, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes )
Blob, ///< Binary large object (see NdbBlob)
Clob ///< Text blob
Text ///< Text blob
......@@ -309,7 +309,8 @@ public:
* For blob, set or get "part size" i.e. number of bytes to store in
* each tuple of the "blob table". Must be less than 64k.
* each tuple of the "blob table". Can be set to zero to omit parts
* and to allow only inline bytes ("tinyblob").
void setPartSize(int size) { setScale(size); }
int getPartSize() const { return getScale(); }
......@@ -1060,6 +1061,6 @@ public:
class NdbOut& operator <<(class NdbOut& ndbout, const NdbDictionary::Column::Type type);
class NdbOut& operator <<(class NdbOut& out, const NdbDictionary::Column& col);
......@@ -80,7 +80,7 @@ public:
Datetime, // Precision down to 1 sec (size 8 bytes)
Timespec, // Precision down to 1 nsec (size 12 bytes)
Blob, // Blob
Clob // Text blob
Text // Text blob
Enum m_typeId;
Cmp* m_cmp; // set to NULL if cmp not implemented
......@@ -125,7 +125,7 @@ private:
static Cmp cmpDatetime;
static Cmp cmpTimespec;
static Cmp cmpBlob;
static Cmp cmpClob;
static Cmp cmpText;
inline int
......@@ -344,17 +344,15 @@ NdbSqlUtil::cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full,
case Type::Blob: // XXX fix
case Type::Clob:
case Type::Text:
// skip blob head, the rest is varchar
// skip blob head, the rest is char
const unsigned skip = NDB_BLOB_HEAD_SIZE;
if (size >= skip + 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1 + skip;
u2.p = p2 + skip;
// length in first 2 bytes
int k = strncmp(u1.v + 2, u2.v + 2, ((size - skip) << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
return CmpUnknown;
......@@ -161,8 +161,8 @@ NdbSqlUtil::m_typeList[] = {
NULL // cmpDatetime
......@@ -299,9 +299,9 @@ NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size
NdbSqlUtil::cmpClob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
return cmp(Type::Clob, p1, p2, full, size);
return cmp(Type::Text, p1, p2, full, size);
This diff is collapsed.
......@@ -89,7 +89,8 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
// Scan operations
theListState = NotInList;
theError.code = 0;
......@@ -150,6 +151,7 @@ NdbConnection::init()
theBuddyConPtr = 0xFFFFFFFF;
theBlobFlag = false;
thePendingBlobOps = 0;
......@@ -269,20 +271,27 @@ NdbConnection::execute(ExecType aTypeOfExec,
if (! theBlobFlag)
return executeNoBlobs(aTypeOfExec, abortOption, forceSend);
// execute prepared ops in batches, as requested by blobs
* execute prepared ops in batches, as requested by blobs
* - blob error does not terminate execution
* - blob error sets error on operation
* - if error on operation skip blob calls
ExecType tExecType;
NdbOperation* tPrepOp;
int ret = 0;
do {
tExecType = aTypeOfExec;
tPrepOp = theFirstOpInList;
while (tPrepOp != NULL) {
if (tPrepOp->theError.code == 0) {
bool batch = false;
NdbBlob* tBlob = tPrepOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preExecute(tExecType, batch) == -1)
return -1;
ret = -1;
tBlob = tBlob->theNext;
if (batch) {
......@@ -290,6 +299,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
tExecType = NoCommit;
tPrepOp = tPrepOp->next();
// save rest of prepared ops if batch
......@@ -304,27 +314,31 @@ NdbConnection::execute(ExecType aTypeOfExec,
if (tExecType == Commit) {
NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) {
if (tOp->theError.code == 0) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preCommit() == -1)
return -1;
ret = -1;
tBlob = tBlob->theNext;
tOp = tOp->next();
if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
return -1;
ret = -1;
NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) {
if (tOp->theError.code == 0) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
// may add new operations if batch
if (tBlob->postExecute(tExecType) == -1)
return -1;
ret = -1;
tBlob = tBlob->theNext;
tOp = tOp->next();
......@@ -338,7 +352,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
} while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
return 0;
return ret;
......@@ -397,6 +411,7 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec,
thePendingBlobOps = 0;
return 0;
......@@ -806,73 +806,90 @@ NdbDictionary::Dictionary::getNdbError() const {
return m_impl.getNdbError();
NdbOut& operator <<(NdbOut& ndbout, const NdbDictionary::Column::Type type)
// printers
operator<<(NdbOut& out, const NdbDictionary::Column& col)
case NdbDictionary::Column::Bigunsigned:
ndbout << "Bigunsigned";
out << col.getName() << " ";
switch (col.getType()) {
case NdbDictionary::Column::Tinyint:
out << "Tinyint";
case NdbDictionary::Column::Unsigned:
ndbout << "Unsigned";
case NdbDictionary::Column::Tinyunsigned:
out << "Tinyunsigned";
case NdbDictionary::Column::Smallint:
out << "Smallint";
case NdbDictionary::Column::Smallunsigned:
ndbout << "Smallunsigned";
out << "Smallunsigned";
case NdbDictionary::Column::Tinyunsigned:
ndbout << "Tinyunsigned";
case NdbDictionary::Column::Mediumint:
out << "Mediumint";
case NdbDictionary::Column::Bigint:
ndbout << "Bigint";
case NdbDictionary::Column::Mediumunsigned:
out << "Mediumunsigned";
case NdbDictionary::Column::Int:
ndbout << "Int";
out << "Int";
case NdbDictionary::Column::Smallint:
ndbout << "Smallint";
case NdbDictionary::Column::Tinyint:
ndbout << "Tinyint";
case NdbDictionary::Column::Unsigned:
out << "Unsigned";
case NdbDictionary::Column::Char:
ndbout << "Char";
case NdbDictionary::Column::Bigint:
out << "Bigint";
case NdbDictionary::Column::Varchar:
ndbout << "Varchar";
case NdbDictionary::Column::Bigunsigned:
out << "Bigunsigned";
case NdbDictionary::Column::Float:
ndbout << "Float";
out << "Float";
case NdbDictionary::Column::Double:
ndbout << "Double";
out << "Double";
case NdbDictionary::Column::Mediumint:
ndbout << "Mediumint";
case NdbDictionary::Column::Decimal:
out << "Decimal(" << col.getScale() << "," << col.getPrecision() << ")";
case NdbDictionary::Column::Mediumunsigned:
ndbout << "Mediumunsigend";
case NdbDictionary::Column::Char:
out << "Char(" << col.getLength() << ")";
case NdbDictionary::Column::Varchar:
out << "Varchar(" << col.getLength() << ")";
case NdbDictionary::Column::Binary:
ndbout << "Binary";
out << "Binary(" << col.getLength() << ")";
case NdbDictionary::Column::Varbinary:
ndbout << "Varbinary";
out << "Varbinary(" << col.getLength() << ")";
case NdbDictionary::Column::Decimal:
ndbout << "Decimal";
case NdbDictionary::Column::Datetime:
out << "Datetime";
case NdbDictionary::Column::Timespec:
ndbout << "Timespec";
out << "Timespec";
case NdbDictionary::Column::Blob:
ndbout << "Blob";
out << "Blob(" << col.getInlineSize() << "," << col.getPartSize()
<< ";" << col.getStripeSize() << ")";
case NdbDictionary::Column::Text:
out << "Text(" << col.getInlineSize() << "," << col.getPartSize()
<< ";" << col.getStripeSize() << ")";
case NdbDictionary::Column::Undefined:
ndbout << "Undefined";
out << "Undefined";
ndbout << "Unknown type=" << (Uint32)type;
out << "Type" << (Uint32)col.getType();
return ndbout;
if (col.getPrimaryKey())
out << " PRIMARY KEY";
else if (! col.getNullable())
out << " NOT NULL";
out << " NULL";
return out;
......@@ -181,7 +181,7 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const
case NdbDictionary::Column::Timespec:
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Text:
if (m_precision != col.m_precision ||
m_scale != col.m_scale ||
m_length != col.m_length) {
......@@ -1088,7 +1088,7 @@ columnTypeMapping[] = {
{ DictTabInfo::ExtDatetime, NdbDictionary::Column::Datetime },
{ DictTabInfo::ExtTimespec, NdbDictionary::Column::Timespec },
{ DictTabInfo::ExtBlob, NdbDictionary::Column::Blob },
{ DictTabInfo::ExtClob, NdbDictionary::Column::Clob },
{ DictTabInfo::ExtText, NdbDictionary::Column::Text },
{ -1, -1 }
......@@ -1253,7 +1253,7 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
for (unsigned i = 0; i < t.m_columns.size(); i++) {
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType())
if (! c.getBlobType() || c.getPartSize() == 0)
NdbTableImpl bt;
NdbBlob::getBlobTable(bt, &t, &c);
......@@ -1622,7 +1622,7 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
for (unsigned i = 0; i < t.m_columns.size(); i++) {
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType())
if (! c.getBlobType() || c.getPartSize() == 0)
char btname[NdbBlob::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
......@@ -441,7 +441,7 @@ inline
NdbColumnImpl::getBlobType() const {
return (m_type == NdbDictionary::Column::Blob ||
m_type == NdbDictionary::Column::Clob);
m_type == NdbDictionary::Column::Text);
......@@ -29,6 +29,7 @@ Adjust: 971206 UABRONM First version
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include "NdbDictionaryImpl.hpp"
#include <NdbTCP.h>
......@@ -147,78 +148,100 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){
return false;
NdbOut& operator<<(NdbOut& ndbout, const NdbRecAttr &r)
NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
if (r.isNULL())
ndbout << "[NULL]";
return ndbout;
out << "[NULL]";
return out;
if (r.arraySize() > 1)
ndbout << "[";
out << "[";
for (Uint32 j = 0; j < r.arraySize(); j++)
if (j > 0)
ndbout << " ";
out << " ";
case NdbDictionary::Column::Bigunsigned:
ndbout << r.u_64_value();
out << r.u_64_value();
case NdbDictionary::Column::Unsigned:
ndbout << r.u_32_value();
out << r.u_32_value();
case NdbDictionary::Column::Smallunsigned:
ndbout << r.u_short_value();
out << r.u_short_value();
case NdbDictionary::Column::Tinyunsigned:
ndbout << (unsigned) r.u_char_value();
out << (unsigned) r.u_char_value();
case NdbDictionary::Column::Bigint:
ndbout << r.int64_value();
out << r.int64_value();
case NdbDictionary::Column::Int:
ndbout << r.int32_value();
out << r.int32_value();
case NdbDictionary::Column::Smallint:
ndbout << r.short_value();
out << r.short_value();
case NdbDictionary::Column::Tinyint:
ndbout << (int) r.char_value();
out << (int) r.char_value();
case NdbDictionary::Column::Char:
ndbout.print("%.*s", r.arraySize(), r.aRef());
out.print("%.*s", r.arraySize(), r.aRef());
j = r.arraySize();
case NdbDictionary::Column::Varchar:
short len = ntohs(r.u_short_value());
ndbout.print("%.*s", len, r.aRef()+2);
out.print("%.*s", len, r.aRef()+2);
j = r.arraySize();
case NdbDictionary::Column::Float:
ndbout << r.float_value();
out << r.float_value();
case NdbDictionary::Column::Double:
ndbout << r.double_value();
out << r.double_value();
case NdbDictionary::Column::Blob:
const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef();
out << h->length << ":";
const unsigned char* p = (const unsigned char*)(h + 1);
unsigned n = r.arraySize() - sizeof(*h);
for (unsigned k = 0; k < n && k < h->length; k++)
out.print("%02X", (int)p[k]);
j = r.arraySize();
case NdbDictionary::Column::Text:
const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef();
out << h->length << ":";
const unsigned char* p = (const unsigned char*)(h + 1);
unsigned n = r.arraySize() - sizeof(*h);
for (unsigned k = 0; k < n && k < h->length; k++)
out.print("%c", (int)p[k]);
j = r.arraySize();
default: /* no print functions for the rest, just print type */
ndbout << r.getType();
out << r.getType();
j = r.arraySize();
if (j > 1)
ndbout << " %u times" << j;
out << " " << j << " times";
if (r.arraySize() > 1)
ndbout << "]";
out << "]";
return ndbout;
return out;
......@@ -55,6 +55,13 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return -1;
tBlob = tBlob->theNext;
* Flush blob part ops on behalf of user because
* - nextResult is analogous to execute(NoCommit)
* - user is likely to want blob value before next execute
if (m_operation->m_transConnection->executePendingBlobOps() == -1)
return -1;
return 0;
return res;
......@@ -23,7 +23,6 @@
#include <NdbOut.hpp>
class NDBT_Attribute : public NdbDictionary::Column {
friend class NdbOut& operator <<(class NdbOut&, const NDBT_Attribute &);
NDBT_Attribute(const char* _name,
Column::Type _type,
This diff is collapsed.
......@@ -18,35 +18,6 @@
#include <NdbTimer.hpp>
#include <NDBT.hpp>
class NdbOut&
operator <<(class NdbOut& ndbout, const NDBT_Attribute & attr){
NdbDictionary::Column::Type type = attr.getType();
ndbout << attr.getName() << " " << type;
case NdbDictionary::Column::Decimal:
ndbout << "(" << attr.getScale() << ", " << attr.getPrecision() << ")";
if(attr.getLength() != 1)
ndbout << "[" << attr.getLength() << "]";
ndbout << " NULL";
ndbout << " NOT NULL";
ndbout << " PRIMARY KEY";
return ndbout;
class NdbOut&
operator <<(class NdbOut& ndbout, const NDBT_Table & tab)
......@@ -830,7 +830,8 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab,
if(pTab2 == 0 && pDict->createTable(* pTab) != 0){
g_err << "ERROR1: Failed to create table " << pTab->getName() << endl;
g_err << "ERROR1: Failed to create table " << pTab->getName()
<< pDict->getNdbError() << endl;
tests[t]->saveTestResult(pTab, FAILED_TO_CREATE);
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment