Commit f79075e9 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - wl#2972 fix tinyblob

parent ddc419af
......@@ -1269,6 +1269,8 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
// prepare blob column and table
if (prepareColumn() == -1)
DBUG_RETURN(-1);
// tinyblob sanity
assert((theBlobEventOp == NULL) == (theBlobTable == NULL));
// extra buffers
theBlobEventDataBuf.alloc(thePartSize);
// prepare receive of head+inline
......@@ -1278,20 +1280,22 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
DBUG_RETURN(-1);
}
// prepare receive of blob part
if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL ||
(theBlobEventPartRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)2),
(char*)&thePartNumber, version)) == NULL ||
(theBlobEventDataRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)3),
theBlobEventDataBuf.data, version)) == NULL) {
setErrorCode(theBlobEventOp);
DBUG_RETURN(-1);
if (theBlobEventOp != NULL) {
if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL ||
(theBlobEventPartRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)2),
(char*)&thePartNumber, version)) == NULL ||
(theBlobEventDataRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)3),
theBlobEventDataBuf.data, version)) == NULL) {
setErrorCode(theBlobEventOp);
DBUG_RETURN(-1);
}
}
setState(Prepared);
DBUG_RETURN(0);
......
......@@ -318,42 +318,48 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
tBlob = tBlob->theNext;
}
// blob event name
char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
// find blob event op if any (it serves both post and pre handles)
assert(tAttrInfo->m_blobTable != NULL);
NdbEventOperationImpl* tBlobOp = theBlobOpList;
NdbEventOperationImpl* tLastBlopOp = NULL;
while (tBlobOp != NULL) {
if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
assert(tBlobOp->m_eventImpl->m_tableImpl == tAttrInfo->m_blobTable);
break;
NdbEventOperationImpl* tBlobOp = NULL;
const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
if (! is_tinyblob) {
// blob event name
char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
// find blob event op if any (it serves both post and pre handles)
tBlobOp = theBlobOpList;
NdbEventOperationImpl* tLastBlopOp = NULL;
while (tBlobOp != NULL) {
if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
assert(tBlobOp->m_eventImpl->m_tableImpl == tAttrInfo->m_blobTable);
break;
}
tLastBlopOp = tBlobOp;
tBlobOp = tBlobOp->theNextBlobOp;
}
tLastBlopOp = tBlobOp;
tBlobOp = tBlobOp->theNextBlobOp;
}
DBUG_PRINT("info", ("%s op %s", tBlobOp ? " reuse" : " create", bename));
DBUG_PRINT("info", ("%s op %s", tBlobOp ? " reuse" : " create", bename));
// create blob event op if not found
if (tBlobOp == NULL) {
NdbEventOperation* tmp = m_ndb->createEventOperation(bename);
if (tmp == NULL)
DBUG_RETURN(NULL);
tBlobOp = &tmp->m_impl;
// create blob event op if not found
if (tBlobOp == NULL) {
NdbEventOperation* tmp = m_ndb->createEventOperation(bename);
if (tmp == NULL)
DBUG_RETURN(NULL);
tBlobOp = &tmp->m_impl;
// pointer to main table op
tBlobOp->theMainOp = this;
tBlobOp->m_mergeEvents = m_mergeEvents;
// pointer to main table op
tBlobOp->theMainOp = this;
tBlobOp->m_mergeEvents = m_mergeEvents;
// add to list end
if (tLastBlopOp == NULL)
theBlobOpList = tBlobOp;
else
tLastBlopOp->theNextBlobOp = tBlobOp;
tBlobOp->theNextBlobOp = NULL;
// add to list end
if (tLastBlopOp == NULL)
theBlobOpList = tBlobOp;
else
tLastBlopOp->theNextBlobOp = tBlobOp;
tBlobOp->theNextBlobOp = NULL;
}
}
tBlob = m_ndb->getNdbBlob();
......
......@@ -203,7 +203,9 @@ struct Col {
uint length;
uint size;
bool isblob() const {
return type == NdbDictionary::Column::Text;
return
type == NdbDictionary::Column::Text ||
type == NdbDictionary::Column::Blob;
}
};
......@@ -213,19 +215,21 @@ static Col g_col[] = {
{ 2, "seq", NdbDictionary::Column::Unsigned, false, false, 1, 4 },
{ 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen },
{ 4, "tx1", NdbDictionary::Column::Text, false, true, 0, 0 },
{ 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 }
{ 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 },
{ 6, "bl1", NdbDictionary::Column::Blob, false, true, 0, 0 } // tinyblob
};
static const uint g_maxcol = sizeof(g_col)/sizeof(g_col[0]);
static const uint g_blobcols = 3;
static uint
ncol()
{
uint n = g_maxcol;
if (g_opts.no_blobs)
n -= 2;
n -= g_blobcols;
else if (g_opts.one_blob)
n -= 1;
n -= (g_blobcols - 1);
return n;
}
......@@ -278,6 +282,11 @@ createtable()
col.setStripeSize(g_blobstripesize);
col.setCharset(cs);
break;
case NdbDictionary::Column::Blob:
col.setInlineSize(g_blobinlinesize);
col.setPartSize(0);
col.setStripeSize(0);
break;
default:
assert(false);
break;
......@@ -332,6 +341,7 @@ struct Data {
char cc1[g_charlen + 1];
Txt tx1;
Txt tx2;
Txt bl1;
Ptr ptr[g_maxcol];
int ind[g_maxcol]; // -1 = no data, 1 = NULL, 0 = not NULL
uint noop; // bit: omit in NdbOperation (implicit NULL INS or no UPD)
......@@ -342,14 +352,15 @@ struct Data {
memset(pk2, 0, sizeof(pk2));
seq = 0;
memset(cc1, 0, sizeof(cc1));
tx1.val = tx2.val = 0;
tx1.len = tx2.len = 0;
tx1.val = tx2.val = bl1.val = 0;
tx1.len = tx2.len = bl1.len = 0;
ptr[0].u32 = &pk1;
ptr[1].ch = pk2;
ptr[2].u32 = &seq;
ptr[3].ch = cc1;
ptr[4].txt = &tx1;
ptr[5].txt = &tx2;
ptr[6].txt = &bl1;
for (i = 0; i < g_maxcol; i++)
ind[i] = -1;
noop = 0;
......@@ -358,6 +369,7 @@ struct Data {
void free() {
delete [] tx1.val;
delete [] tx2.val;
delete [] bl1.val;
init();
}
};
......@@ -379,6 +391,7 @@ cmpcol(const Col& c, const Data& d1, const Data& d2)
return 1;
break;
case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{
const Data::Txt& t1 = *d1.ptr[i].txt;
const Data::Txt& t2 = *d2.ptr[i].txt;
......@@ -429,6 +442,7 @@ operator<<(NdbOut& out, const Data& d)
}
break;
case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{
Data::Txt& t = *d.ptr[i].txt;
bool first = true;
......@@ -1071,19 +1085,21 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
}
break;
case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{
const bool tinyblob = (c.type == NdbDictionary::Column::Blob);
Data::Txt& t = *d.ptr[i].txt;
delete [] t.val;
t.val = 0;
if (g_opts.tweak & 1) {
uint u = 256 + 2000;
uint u = g_blobinlinesize + (tinyblob ? 0 : g_blobpartsize);
uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval));
t.val = new char [u];
t.len = u;
memset(t.val, g_charval[v], u);
break;
}
uint u = urandom(g_maxblobsize);
uint u = urandom(tinyblob ? g_blobinlinesize : g_maxblobsize);
u = urandom(u); // 4x bias for smaller blobs
u = urandom(u);
t.val = new char [u];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment