Commit 096ace92 authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - bug#17761 blob tables patch 3a [requires next patch 3b]

parent 53ea1584
......@@ -196,6 +196,10 @@ public:
Datafile = 22, ///< Datafile
Undofile = 23 ///< Undofile
};
// used 1) until type BlobTable added 2) in upgrade code
static bool
isBlobTableName(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
static inline bool
isTable(int tableType) {
......
......@@ -260,3 +260,33 @@ DictFilegroupInfo::File::init(){
FileSizeLo = 0;
FileFreeExtents = 0;
}
// blob table name hack
bool
DictTabInfo::isBlobTableName(const char* name, Uint32* ptab_id, Uint32* pcol_no)
{
const char* const prefix = "NDB$BLOB_";
const char* s = strrchr(name, table_name_separator);
s = (s == NULL ? name : s + 1);
if (strncmp(s, prefix, strlen(prefix)) != 0)
return false;
s += strlen(prefix);
uint i, n;
for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
n += 10 * n + (s[i] - '0');
if (i == 0 || s[i] != '_')
return false;
const uint tab_id = n;
s = &s[i + 1];
for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
n += 10 * n + (s[i] - '0');
if (i == 0 || s[i] != 0)
return false;
const uint col_no = n;
if (ptab_id)
*ptab_id = tab_id;
if (pcol_no)
*pcol_no = col_no;
return true;
}
......@@ -27,6 +27,7 @@ static NdbTableImpl f_altered_table;
Ndb_local_table_info *
Ndb_local_table_info::create(NdbTableImpl *table_impl, Uint32 sz)
{
assert(! is_ndb_blob_table(table_impl));
Uint32 tot_size= sizeof(Ndb_local_table_info) - sizeof(Uint64)
+ ((sz+7) & ~7); // round to Uint64
void *data= malloc(tot_size);
......@@ -44,6 +45,7 @@ void Ndb_local_table_info::destroy(Ndb_local_table_info *info)
Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
assert(! is_ndb_blob_table(table_impl));
m_table_impl= table_impl;
}
......@@ -61,18 +63,21 @@ LocalDictCache::~LocalDictCache(){
Ndb_local_table_info *
LocalDictCache::get(const char * name){
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
return m_tableHash.getData(name, len);
}
void
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
assert(! is_ndb_blob_table(name));
const Uint32 id = tab_info->m_table_impl->m_id;
m_tableHash.insertKey(name, strlen(name), id, tab_info);
}
void
LocalDictCache::drop(const char * name){
assert(! is_ndb_blob_table(name));
Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
DBUG_ASSERT(info != 0);
Ndb_local_table_info::destroy(info);
......@@ -142,6 +147,7 @@ GlobalDictCache::get(const char * name)
{
DBUG_ENTER("GlobalDictCache::get");
DBUG_PRINT("enter", ("name: %s", name));
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * versions = 0;
......@@ -196,6 +202,7 @@ GlobalDictCache::put(const char * name, NdbTableImpl * tab)
tab ? tab->m_internalName.c_str() : "tab NULL",
tab ? tab->m_version & 0xFFFFFF : 0,
tab ? tab->m_version >> 24 : 0));
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers = m_tableHash.getData(name, len);
......@@ -261,6 +268,7 @@ GlobalDictCache::drop(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::drop");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
......@@ -316,6 +324,7 @@ GlobalDictCache::release(NdbTableImpl * tab)
{
DBUG_ENTER("GlobalDictCache::release");
DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
assert(! is_ndb_blob_table(tab));
unsigned i;
const Uint32 len = strlen(tab->m_internalName.c_str());
......@@ -365,6 +374,7 @@ GlobalDictCache::alter_table_rep(const char * name,
Uint32 tableVersion,
bool altered)
{
assert(! is_ndb_blob_table(name));
const Uint32 len = strlen(name);
Vector<TableVersion> * vers =
m_tableHash.getData(name, len);
......
......@@ -754,7 +754,7 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
theDictionary->get_local_table_info(internal_tabname);
if (info == 0)
DBUG_RETURN(~(Uint64)0);
const NdbTableImpl *table= info->m_table_impl;
......@@ -846,7 +846,7 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
BaseString internal_tabname(internalize_table_name(aTableName));
Ndb_local_table_info *info=
theDictionary->get_local_table_info(internal_tabname, false);
theDictionary->get_local_table_info(internal_tabname);
if (info == 0) {
theError= theDictionary->getNdbError();
DBUG_RETURN(false);
......
......@@ -49,6 +49,18 @@
extern Uint64 g_latest_trans_gci;
bool
is_ndb_blob_table(const char* name)
{
return DictTabInfo::isBlobTableName(name);
}
bool
is_ndb_blob_table(const NdbTableImpl* t)
{
return is_ndb_blob_table(t->m_internalName.c_str());
}
//#define EVENT_DEBUG
/**
......@@ -239,6 +251,9 @@ NdbColumnImpl::init(Type t)
NdbColumnImpl::~NdbColumnImpl()
{
if (m_blobTable != NULL)
delete m_blobTable;
m_blobTable = NULL;
}
bool
......@@ -1282,6 +1297,14 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
if (impl == 0){
impl = m_receiver.getTable(internalTableName,
m_ndb.usingFullyQualifiedNames());
if (impl != 0) {
int ret = getBlobTables(*impl);
if (ret != 0) {
delete impl;
return 0;
}
}
m_globalHash->lock();
m_globalHash->put(internalTableName.c_str(), impl);
m_globalHash->unlock();
......@@ -1307,6 +1330,9 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
{
NdbTableImpl *old;
int ret = getBlobTables(*impl);
assert(ret == 0);
m_globalHash->lock();
if ((old= m_globalHash->get(impl->m_internalName.c_str())))
{
......@@ -1321,13 +1347,42 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
Ndb_local_table_info::create(impl, m_local_table_data_size);
m_localHash.put(impl->m_internalName.c_str(), info);
addBlobTables(*impl);
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
}
int
NdbDictionaryImpl::getBlobTables(NdbTableImpl &t)
{
unsigned n= t.m_noOfBlobs;
DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
i--;
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
// retrieve blob table def from DICT - by-pass cache
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
BaseString btname_internal = m_ndb.internalize_table_name(btname);
NdbTableImpl* bt =
m_receiver.getTable(btname_internal, m_ndb.usingFullyQualifiedNames());
if (bt == NULL)
DBUG_RETURN(-1);
// TODO check primary id/version when returned by DICT
// the blob column owns the blob table
assert(c.m_blobTable == NULL);
c.m_blobTable = bt;
}
DBUG_RETURN(0);
}
#if 0
bool
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
......@@ -2148,31 +2203,6 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
DBUG_RETURN(0);
}
int
NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
{
unsigned n= t.m_noOfBlobs;
DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
i--;
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
// Save BLOB table handle
NdbTableImpl * cachedBlobTable = getTable(btname);
if (cachedBlobTable == 0) {
DBUG_RETURN(-1);
}
c.m_blobTable = cachedBlobTable;
}
DBUG_RETURN(0);
}
int
NdbDictInterface::createTable(Ndb & ndb,
NdbTableImpl & impl)
......@@ -2188,7 +2218,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
DBUG_ENTER("NdbDictionaryImpl::alterTable");
Ndb_local_table_info * local = 0;
if((local= get_local_table_info(originalInternalName, false)) == 0)
if((local= get_local_table_info(originalInternalName)) == 0)
{
m_error.code = 709;
DBUG_RETURN(-1);
......@@ -2727,15 +2757,21 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
char btname[NdbBlobImpl::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
if (dropTable(btname) != 0) {
if (m_error.code != 709 && m_error.code != 723){
DBUG_PRINT("exit",("error %u - exiting",m_error.code));
NdbTableImpl* bt = c.m_blobTable;
if (bt == NULL) {
DBUG_PRINT("info", ("col %s: blob table pointer is NULL",
c.m_name.c_str()));
continue; // "force" mode on
}
// drop directly - by-pass cache
int ret = m_receiver.dropTable(*c.m_blobTable);
if (ret != 0) {
DBUG_PRINT("info", ("col %s: blob table %s: error %d",
c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
if (! (ret == 709 || ret == 723)) // "force" mode on
DBUG_RETURN(-1);
}
DBUG_PRINT("info",("error %u - continuing",m_error.code));
}
// leave c.m_blobTable defined
}
DBUG_RETURN(0);
}
......@@ -2794,53 +2830,31 @@ NdbDictInterface::execDROP_TABLE_REF(NdbApiSignal * signal,
}
int
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl)
{
const char * internalTableName = impl.m_internalName.c_str();
DBUG_ENTER("NdbDictionaryImpl::invalidateObject");
DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
if (lock)
m_globalHash->lock();
if (impl.m_noOfBlobs != 0) {
for (uint i = 0; i < impl.m_columns.size(); i++) {
NdbColumnImpl& c = *impl.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
assert(c.m_blobTable != NULL);
invalidateObject(*c.m_blobTable, false);
}
}
m_localHash.drop(internalTableName);
m_globalHash->lock();
impl.m_status = NdbDictionary::Object::Invalid;
m_globalHash->drop(&impl);
if (lock)
m_globalHash->unlock();
m_globalHash->unlock();
DBUG_RETURN(0);
}
int
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
{
const char * internalTableName = impl.m_internalName.c_str();
DBUG_ENTER("NdbDictionaryImpl::removeCachedObject");
DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
if (lock)
m_globalHash->lock();
if (impl.m_noOfBlobs != 0) {
for (uint i = 0; i < impl.m_columns.size(); i++) {
NdbColumnImpl& c = *impl.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
assert(c.m_blobTable != NULL);
removeCachedObject(*c.m_blobTable, false);
}
}
m_localHash.drop(internalTableName);
m_globalHash->lock();
m_globalHash->release(&impl);
if (lock)
m_globalHash->unlock();
m_globalHash->unlock();
DBUG_RETURN(0);
}
......@@ -2851,8 +2865,7 @@ NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
const BaseString& internalName)
{
Ndb_local_table_info * info = get_local_table_info(internalName,
false);
Ndb_local_table_info * info = get_local_table_info(internalName);
if(info == 0){
m_error.code = 4243;
return 0;
......@@ -3503,7 +3516,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
// We only have the table name with internal name
DBUG_PRINT("info",("table %s", ev->getTableName()));
Ndb_local_table_info *info;
info= get_local_table_info(ev->getTableName(), true);
info= get_local_table_info(ev->getTableName());
if (info == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
......@@ -3513,7 +3526,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
if (info->m_table_impl->m_status != NdbDictionary::Object::Retrieved)
{
removeCachedObject(*info->m_table_impl);
info= get_local_table_info(ev->getTableName(), true);
info= get_local_table_info(ev->getTableName());
if (info == 0)
{
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
......
......@@ -30,6 +30,9 @@
#include "NdbWaiter.hpp"
#include "DictCache.hpp"
bool is_ndb_blob_table(const char* name);
bool is_ndb_blob_table(const class NdbTableImpl* t);
class NdbDictObjectImpl {
public:
int m_id;
......@@ -542,13 +545,12 @@ public:
int createTable(NdbTableImpl &t);
int createBlobTables(NdbTableImpl& t);
int addBlobTables(NdbTableImpl &);
int alterTable(NdbTableImpl &t);
int dropTable(const char * name);
int dropTable(NdbTableImpl &);
int dropBlobTables(NdbTableImpl &);
int invalidateObject(NdbTableImpl &, bool lock = true);
int removeCachedObject(NdbTableImpl &, bool lock = true);
int invalidateObject(NdbTableImpl &);
int removeCachedObject(NdbTableImpl &);
int createIndex(NdbIndexImpl &ix);
int dropIndex(const char * indexName,
......@@ -573,8 +575,9 @@ public:
NdbTableImpl * getTable(const char * tableName, void **data= 0);
void putTable(NdbTableImpl *impl);
Ndb_local_table_info* get_local_table_info(
const BaseString& internalTableName, bool do_add_blob_tables);
int getBlobTables(NdbTableImpl &);
Ndb_local_table_info*
get_local_table_info(const BaseString& internalTableName);
NdbIndexImpl * getIndex(const char * indexName,
const char * tableName);
NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL);
......@@ -848,7 +851,7 @@ NdbDictionaryImpl::getTable(const char * table_name, void **data)
{
const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
Ndb_local_table_info *info=
get_local_table_info(internal_tabname, true);
get_local_table_info(internal_tabname);
if (info == 0)
return 0;
if (data)
......@@ -858,8 +861,7 @@ NdbDictionaryImpl::getTable(const char * table_name, void **data)
inline
Ndb_local_table_info *
NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
bool do_add_blob_tables)
NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
{
Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
if (info == 0) {
......@@ -868,9 +870,6 @@ NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
return 0;
}
}
if (do_add_blob_tables && info->m_table_impl->m_noOfBlobs)
addBlobTables(*(info->m_table_impl));
return info; // autoincrement already initialized
}
......@@ -891,7 +890,7 @@ NdbDictionaryImpl::getIndex(const char * index_name,
if (internal_indexname.length())
{
Ndb_local_table_info * info=
get_local_table_info(internal_indexname, false);
get_local_table_info(internal_indexname);
if (info)
{
NdbTableImpl * tab= info->m_table_impl;
......@@ -956,6 +955,4 @@ NdbUndofileImpl::getImpl(const NdbDictionary::Undofile & t){
return t.m_impl;
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment