Commit a2263a1f authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - bug#17813 schema.query => blob

parent 235c5d35
......@@ -746,10 +746,10 @@ static int ndbcluster_create_schema_table(THD *thd)
*/
end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
NDB_REP_DB "." NDB_SCHEMA_TABLE
" ( db VARCHAR(63) NOT NULL,"
" name VARCHAR(63) NOT NULL,"
" ( db VARBINARY(63) NOT NULL,"
" name VARBINARY(63) NOT NULL,"
" slock BINARY(32) NOT NULL,"
" query VARCHAR(4094) NOT NULL,"
" query BLOB NOT NULL,"
" node_id INT UNSIGNED NOT NULL,"
" epoch BIGINT UNSIGNED NOT NULL,"
" id INT UNSIGNED NOT NULL,"
......@@ -802,7 +802,6 @@ void ndbcluster_setup_binlog_table_shares(THD *thd)
#define SCHEMA_TYPE_I 8u
#define SCHEMA_SIZE 9u
#define SCHEMA_SLOCK_SIZE 32u
#define SCHEMA_QUERY_SIZE 4096u
struct Cluster_schema
{
......@@ -813,7 +812,7 @@ struct Cluster_schema
unsigned char slock_length;
uint32 slock[SCHEMA_SLOCK_SIZE/4];
unsigned short query_length;
char query[SCHEMA_QUERY_SIZE];
char *query;
Uint64 epoch;
uint32 node_id;
uint32 id;
......@@ -824,10 +823,26 @@ struct Cluster_schema
/*
Transfer schema table data into corresponding struct
*/
static void ndbcluster_get_schema(TABLE *table,
static void ndbcluster_get_schema(NDB_SHARE *share,
Cluster_schema *s)
{
TABLE *table= share->table;
Field **field;
/* unpack blob values */
byte* blobs_buffer= 0;
uint blobs_buffer_size= 0;
{
ptrdiff_t ptrdiff= 0;
int ret= get_ndb_blobs_value(table, share->ndb_value[0],
blobs_buffer, blobs_buffer_size,
ptrdiff);
if (ret != 0)
{
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
DBUG_PRINT("info", ("blob read error"));
DBUG_ASSERT(false);
}
}
/* db varchar 1 length byte */
field= table->field;
s->db_length= *(uint8*)(*field)->ptr;
......@@ -847,13 +862,19 @@ static void ndbcluster_get_schema(TABLE *table,
s->slock_length= (*field)->field_length;
DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
memcpy(s->slock, (*field)->ptr, s->slock_length);
/* query varchar 2 length bytes */
/* query blob */
field++;
s->query_length= uint2korr((*field)->ptr);
DBUG_ASSERT(s->query_length <= (*field)->field_length);
DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query));
memcpy(s->query, (*field)->ptr + 2, s->query_length);
s->query[s->query_length]= 0;
{
Field_blob *field_blob= (Field_blob*)(*field);
uint blob_len= field_blob->get_length((*field)->ptr);
char *blob_ptr= 0;
field_blob->get_ptr(&blob_ptr);
assert(blob_len == 0 || blob_ptr != 0);
s->query_length= blob_len;
s->query= sql_alloc(blob_len+1);
memcpy(s->query, blob_ptr, blob_len);
s->query[blob_len]= 0;
}
/* node_id */
field++;
s->node_id= ((Field_long *)*field)->val_int();
......@@ -869,6 +890,8 @@ static void ndbcluster_get_schema(TABLE *table,
/* type */
field++;
s->type= ((Field_long *)*field)->val_int();
/* free blobs buffer */
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
}
/*
......@@ -1013,7 +1036,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
char save_db[FN_REFLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[SCHEMA_QUERY_SIZE];
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
......@@ -1037,10 +1060,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
......@@ -1068,9 +1094,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
DBUG_ASSERT(r == 0);
/* query */
ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length);
r|= op->setValue(SCHEMA_QUERY_I, tmp_buf);
{
NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
DBUG_ASSERT(ndb_blob != 0);
uint blob_len= query_length;
const char* blob_ptr= query;
r|= ndb_blob->setValue(blob_ptr, blob_len);
DBUG_ASSERT(r == 0);
}
/* node_id */
r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
DBUG_ASSERT(r == 0);
......@@ -1203,7 +1234,7 @@ ndbcluster_update_slock(THD *thd,
char save_db[FN_HEADLEN];
strcpy(save_db, ndb->getDatabaseName());
char tmp_buf[SCHEMA_QUERY_SIZE];
char tmp_buf[FN_REFLEN];
NDBDICT *dict= ndb->getDictionary();
ndb->setDatabaseName(NDB_REP_DB);
const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
......@@ -1227,10 +1258,13 @@ ndbcluster_update_slock(THD *thd,
for (i= 0; i < SCHEMA_SIZE; i++)
{
col[i]= ndbtab->getColumn(i);
if (i != SCHEMA_QUERY_I)
{
sz[i]= col[i]->getLength();
DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
}
}
}
while (1)
{
......@@ -1506,7 +1540,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
MY_BITMAP slock;
bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
uint node_id= g_ndb_cluster_connection->node_id();
ndbcluster_get_schema(share->table, schema);
ndbcluster_get_schema(share, schema);
if (schema->node_id != node_id)
{
int log_query= 0, post_epoch_unlock= 0;
......@@ -2265,6 +2299,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/
DBUG_ENTER("ndbcluster_create_event_ops");
DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
DBUG_ASSERT(share != 0);
......@@ -2374,6 +2409,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
else
{
DBUG_PRINT("info", ("%s blob", col_name));
DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
attr0.blob= op->getBlobHandle(col_name);
attr1.blob= op->getPreBlobHandle(col_name);
if (attr0.blob == NULL || attr1.blob == NULL)
......
......@@ -1164,7 +1164,7 @@ public:
*/
enum EventReport {
ER_UPDATED = 0,
ER_ALL = 1,
ER_ALL = 1, // except not-updated blob inlines
ER_SUBSCRIBE = 2
};
......
......@@ -878,6 +878,7 @@ ArrayPool<TupTriggerData> c_triggerPool;
{}
Bitmask<MAXNROFATTRIBUTESINWORDS> notNullAttributeMask;
Bitmask<MAXNROFATTRIBUTESINWORDS> blobAttributeMask;
ReadFunction* readFunctionArray;
UpdateFunction* updateFunctionArray;
......
......@@ -201,6 +201,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->m_no_of_attributes= noOfAttributes;
regTabPtr.p->notNullAttributeMask.clear();
regTabPtr.p->blobAttributeMask.clear();
Uint32 offset[10];
Uint32 tableDescriptorRef= allocTabDescr(regTabPtr.p, offset);
......@@ -286,6 +287,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
Uint32 attrId = signal->theData[2];
Uint32 attrDescriptor = signal->theData[3];
Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
// DICT sends charset number in upper half
Uint32 csNumber = (signal->theData[4] >> 16);
......@@ -353,6 +355,10 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
regTabPtr.p->notNullAttributeMask.set(attrId);
}
if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT) {
regTabPtr.p->blobAttributeMask.set(attrId);
}
switch (AttributeDescriptor::getArrayType(attrDescriptor)) {
case NDB_ARRAYTYPE_FIXED:
{
......
......@@ -867,10 +867,19 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
} else {
ljam();
//--------------------------------------------------------------------
// All others send all attributes that are monitored
// All others send all attributes that are monitored, except:
// Omit unchanged blob inlines on update i.e.
// attributeMask & ~ (blobAttributeMask & ~ changeMask)
//--------------------------------------------------------------------
numAttrsToRead = setAttrIds(trigPtr->attributeMask,
regTabPtr->m_no_of_attributes, &readBuffer[0]);
Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
attributeMask = trigPtr->attributeMask;
if (regOperPtr->op_struct.op_type == ZUPDATE) {
Bitmask<MAXNROFATTRIBUTESINWORDS> tmpMask = regTabPtr->blobAttributeMask;
tmpMask.bitANDC(req_struct->changeMask);
attributeMask.bitANDC(tmpMask);
}
numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes,
&readBuffer[0]);
}
ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE);
//--------------------------------------------------------------------
......
......@@ -598,6 +598,8 @@ NdbBlob::getHeadFromRecAttr()
theNullFlag = theHeadInlineRecAttr->isNULL();
assert(theEventBlobVersion >= 0 || theNullFlag != -1);
theLength = ! theNullFlag ? theHead->length : 0;
DBUG_PRINT("info", ("theNullFlag=%d theLength=%llu",
theNullFlag, theLength));
DBUG_VOID_RETURN;
}
......@@ -1835,10 +1837,13 @@ int
NdbBlob::atNextEvent()
{
DBUG_ENTER("NdbBlob::atNextEvent");
DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d", this, theEventOp, theBlobEventOp, theEventBlobVersion));
Uint32 optype = theEventOp->m_data_item->sdata->operation;
DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d optype=%u", this, theEventOp, theBlobEventOp, theEventBlobVersion, optype));
if (theState == Invalid)
DBUG_RETURN(-1);
assert(theEventBlobVersion >= 0);
if (optype >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)
DBUG_RETURN(0);
getHeadFromRecAttr();
if (theNullFlag == -1) // value not defined
DBUG_RETURN(0);
......
......@@ -26,6 +26,8 @@
#include <signaldata/DictTabInfo.hpp>
#include <ndb_limits.h>
#include "../../../../sql/ha_ndbcluster_tables.h"
Uint16 Twiddle16(Uint16 in); // Byte shift 16-bit data
Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data
Uint64 Twiddle64(Uint64 in); // Byte shift 64-bit data
......@@ -111,6 +113,8 @@ RestoreMetaData::loadContent()
return 0;
}
}
if (! markSysTables())
return 0;
if(!readGCPEntry())
return 0;
return 1;
......@@ -275,6 +279,49 @@ end:
return true;
}
bool
RestoreMetaData::markSysTables()
{
Uint32 i;
for (i = 0; i < getNoOfTables(); i++) {
TableS* table = allTables[i];
const char* tableName = table->getTableName();
if ( // XXX should use type
strcmp(tableName, "SYSTAB_0") == 0 ||
strcmp(tableName, "NDB$EVENTS_0") == 0 ||
strcmp(tableName, "sys/def/SYSTAB_0") == 0 ||
strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0 ||
strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) == 0 ||
strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE)== 0 )
table->isSysTable = true;
}
for (i = 0; i < getNoOfTables(); i++) {
TableS* blobTable = allTables[i];
const char* blobTableName = blobTable->getTableName();
// yet another match blob
int cnt, id1, id2;
char buf[256];
cnt = sscanf(blobTableName, "%[^/]/%[^/]/NDB$BLOB_%d_%d",
buf, buf, &id1, &id2);
if (cnt == 4) {
Uint32 j;
for (j = 0; j < getNoOfTables(); j++) {
TableS* table = allTables[j];
if (table->getTableId() == id1) {
if (table->isSysTable)
blobTable->isSysTable = true;
break;
}
}
if (j == getNoOfTables()) {
err << "Restore: Bad primary table id in " << blobTableName << endl;
return false;
}
}
}
return true;
}
bool
RestoreMetaData::readGCPEntry() {
......@@ -312,6 +359,7 @@ TableS::TableS(Uint32 version, NdbTableImpl* tableImpl)
m_auto_val_id= ~(Uint32)0;
m_max_auto_val= 0;
backupVersion = version;
isSysTable = false;
for (int i = 0; i < tableImpl->getNoOfColumns(); i++)
createAttr(tableImpl->getColumn(i));
......
......@@ -134,7 +134,7 @@ class TableS {
Uint32 m_auto_val_id;
Uint64 m_max_auto_val;
int pos;
bool isSysTable;
void createAttr(NdbDictionary::Column *column);
......@@ -222,6 +222,10 @@ public:
return allAttributesDesc[attributeId];
}
bool getSysTable() const {
return isSysTable;
}
TableS& operator=(TableS& org) ;
}; // TableS;
......@@ -279,6 +283,7 @@ class RestoreMetaData : public BackupFile {
Vector<TableS *> allTables;
bool readMetaFileHeader();
bool readMetaTableDesc();
bool markSysTables();
bool readGCPEntry();
Uint32 readMetaTableList();
......
......@@ -411,16 +411,17 @@ clearConsumers()
g_consumers.clear();
}
static bool
checkSysTable(const char *tableName)
static inline bool
checkSysTable(const TableS* table)
{
return ga_dont_ignore_systab_0 ||
(strcmp(tableName, "SYSTAB_0") != 0 &&
strcmp(tableName, "NDB$EVENTS_0") != 0 &&
strcmp(tableName, "sys/def/SYSTAB_0") != 0 &&
strcmp(tableName, "sys/def/NDB$EVENTS_0") != 0 &&
strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) != 0 &&
strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE) != 0);
return ga_dont_ignore_systab_0 || ! table->getSysTable();
}
static inline bool
checkSysTable(const RestoreMetaData& metaData, uint i)
{
assert(i < metaData.getNoOfTables());
return checkSysTable(metaData[i]);
}
static void
......@@ -534,7 +535,7 @@ main(int argc, char** argv)
debug << "Restoring tables" << endl;
for(i = 0; i<metaData.getNoOfTables(); i++)
{
if (checkSysTable(metaData[i]->getTableName()))
if (checkSysTable(metaData, i))
{
for(Uint32 j= 0; j < g_consumers.size(); j++)
if (!g_consumers[j]->table(* metaData[i]))
......@@ -572,7 +573,7 @@ main(int argc, char** argv)
const TupleS* tuple;
while ((tuple = dataIter.getNextTuple(res= 1)) != 0)
{
if (checkSysTable(tuple->getTable()->getTableName()))
if (checkSysTable(tuple->getTable()))
for(Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->tuple(* tuple, fragmentId);
} // while (tuple != NULL);
......@@ -617,7 +618,7 @@ main(int argc, char** argv)
bool alloc_flag = false;
while ((logEntry = logIter.getNextLogEntry(res= 0, &alloc_flag)) != 0)
{
if (checkSysTable(logEntry->m_table->getTableName()))
if (checkSysTable(logEntry->m_table))
for(Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->logEntry(* logEntry);
if (alloc_flag)
......@@ -638,7 +639,7 @@ main(int argc, char** argv)
{
for(i = 0; i<metaData.getNoOfTables(); i++)
{
if (checkSysTable(metaData[i]->getTableName()))
if (checkSysTable(metaData, i))
{
for(Uint32 j= 0; j < g_consumers.size(); j++)
if (!g_consumers[j]->finalize_table(* metaData[i]))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment