Commit 6be6b4ab authored by Olivier Bertrand's avatar Olivier Bertrand

- All the processing of creating, dropping, modifying indexes was redesigned.

  The code was a legacy from the first versions of the XDB engine dating 2004
  and was not working anymore with recent versions of MySQL/MariaDB. A patch
  in create had been added but is was unsatisfying, recreating all indexes on
  any alter statement and sometimes doing nothing when it should have.
  This is a major update to be tested for stability. It was in most important
  cases et all current tests pass with this new version

modified:
  storage/connect/global.h
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/mycat.cc
  storage/connect/plugutil.c
  storage/connect/tabdos.cpp
  storage/connect/user_connect.cc
  storage/connect/user_connect.h
  storage/connect/xindex.h
parent b3b02104
......@@ -220,6 +220,7 @@ typedef struct _global { /* Global structure */
char Message[MAX_STR];
short Trace;
int Createas; /* To pass info to created table */
void *Xchk; /* indexes in create/alter */
int jump_level;
jmp_buf jumper[MAX_JUMP + 2];
} GLOBAL;
......
......@@ -881,6 +881,25 @@ bool ha_connect::GetBooleanOption(char *opname, bool bdef)
return opval;
} // end of GetBooleanOption
/****************************************************************************/
/* Set the value of the opname option (does not work for oplist options) */
/* Currently used only to set the Sepindex value. */
/****************************************************************************/
bool ha_connect::SetBooleanOption(char *opname, bool b)
{
PTOS options= GetTableOptionStruct(table);
if (!options)
return true;
if (!stricmp(opname, "SepIndex"))
options->sepindex= b;
else
return true;
return false;
} // end of SetBooleanOption
/****************************************************************************/
/* Return the value of an integer option or NO_IVAL if not specified. */
/****************************************************************************/
......@@ -1167,40 +1186,41 @@ void *ha_connect::GetColumnOption(void *field, PCOLINFO pcf)
/****************************************************************************/
/* Returns the index description structure used to make the index. */
/****************************************************************************/
PIXDEF ha_connect::GetIndexInfo(int n)
PIXDEF ha_connect::GetIndexInfo(void)
{
char *name, *pn;
bool unique;
PIXDEF xdp= NULL;
PKPDEF kpp, pkp= NULL;
PIXDEF xdp, pxd=NULL, toidx= NULL;
PKPDEF kpp, pkp;
PGLOBAL& g= xp->g;
KEY kp;
// Find the index to describe
if ((unsigned)n < table->s->keynames.count)
// kp= table->key_info[n]; which one ???
kp= table->s->key_info[n];
else
return NULL;
// Now get index information
pn= (char*)table->s->keynames.type_names[n];
name= (char*)PlugSubAlloc(g, NULL, strlen(pn) + 1);
strcpy(name, pn); // This is probably unuseful
unique= (kp.flags & 1) != 0;
for (int n= 0; (unsigned)n < table->s->keynames.count; n++) {
if (xtrace)
printf("Getting created index %d info\n", n + 1);
// Allocate the index description block
xdp= new(g) INDEXDEF(name, unique, n);
// Find the index to describe
kp= table->s->key_info[n];
// Get the the key parts info
for (int k= 0; (unsigned)k < kp.key_parts; k++) {
pn= (char*)kp.key_part[k].field->field_name;
// Now get index information
pn= (char*)table->s->keynames.type_names[n];
name= (char*)PlugSubAlloc(g, NULL, strlen(pn) + 1);
strcpy(name, pn); // This is probably unuseful
unique= (kp.flags & 1) != 0;
pkp= NULL;
// Allocate the index description block
xdp= new(g) INDEXDEF(name, unique, n);
// Get the the key parts info
for (int k= 0; (unsigned)k < kp.key_parts; k++) {
pn= (char*)kp.key_part[k].field->field_name;
name= (char*)PlugSubAlloc(g, NULL, strlen(pn) + 1);
strcpy(name, pn); // This is probably unuseful
// Allocate the key part description block
kpp= new(g) KPARTDEF(name, k + 1);
kpp->SetKlen(kp.key_part[k].length);
// Allocate the key part description block
kpp= new(g) KPARTDEF(name, k + 1);
kpp->SetKlen(kp.key_part[k].length);
#if 0 // NIY
// Index on auto increment column can be an XXROW index
......@@ -1213,16 +1233,25 @@ PIXDEF ha_connect::GetIndexInfo(int n)
} // endif AUTO_INCREMENT
#endif // 0
if (pkp)
pkp->SetNext(kpp);
if (pkp)
pkp->SetNext(kpp);
else
xdp->SetToKeyParts(kpp);
pkp= kpp;
} // endfor k
xdp->SetNParts(kp.key_parts);
if (pxd)
pxd->SetNext(xdp);
else
xdp->SetToKeyParts(kpp);
toidx= xdp;
pkp= kpp;
} // endfor k
pxd= xdp;
} // endfor n
xdp->SetNParts(kp.key_parts);
return xdp;
return toidx;
} // end of GetIndexInfo
const char *ha_connect::GetDBName(const char* name)
......@@ -1387,10 +1416,11 @@ bool ha_connect::OpenTable(PGLOBAL g, bool del)
istable= true;
// strmake(tname, table_name, sizeof(tname)-1);
if (xmod == MODE_ANY && *tdbp->GetName() != '#')
// We may be in a create index query
if (xp) // xp can be null when called from create
xp->tabp= (PTDBDOS)tdbp; // The table on which the index is created
// We may be in a create index query
if (xmod == MODE_ANY && *tdbp->GetName() != '#') {
// The current indexes
PIXDEF oldpix= GetIndexInfo();
} // endif xmod
// tdbp->SetOrig((PTBX)table); // used by CheckCond
} else
......@@ -2984,7 +3014,7 @@ err:
int ha_connect::external_lock(THD *thd, int lock_type)
{
int rc= 0;
bool del= false;
bool del= false, xcheck=false, cras= false;
MODE newmode;
PTOS options= GetTableOptionStruct(table);
PGLOBAL g= GetPlug(thd);
......@@ -3019,51 +3049,88 @@ int ha_connect::external_lock(THD *thd, int lock_type)
// This is unlocking, do it by closing the table
if (xp->CheckQueryID())
rc= 2; // Logical error ???
else if (tdbp) {
if (tdbp->GetMode() == MODE_ANY && *tdbp->GetName() == '#' &&
xp->tabp && ((PTDBASE)tdbp)->GetDef()->Indexable()) {
PDOSDEF defp1= (PDOSDEF)((PTDBASE)tdbp)->GetDef();
PDOSDEF defp2= (PDOSDEF)xp->tabp->GetDef();
PIXDEF xp1, xp2, sxp;
// Look for new created indexes
for (xp1= defp1->GetIndx(); xp1; xp1= xp1->GetNext()) {
for (xp2= defp2->GetIndx(); xp2; xp2= xp2->GetNext())
if (!stricmp(xp1->GetName(), xp2->GetName()))
else if (g->Xchk) {
if (!tdbp || *tdbp->GetName() == '#') {
bool oldsep= ((PCHK)g->Xchk)->oldsep;
bool newsep= ((PCHK)g->Xchk)->newsep;
PTDBDOS tdp= (PTDBDOS)(tdbp ? tdbp : GetTDB(g));
PDOSDEF ddp= (PDOSDEF)tdp->GetDef();
PIXDEF xp, xp1, xp2, drp=NULL, adp= NULL;
PIXDEF oldpix= ((PCHK)g->Xchk)->oldpix;
PIXDEF newpix= ((PCHK)g->Xchk)->newpix;
PIXDEF *xlst, *xprc;
ddp->SetIndx(oldpix);
if (oldsep != newsep) {
// All indexes have to be remade
ddp->DeleteIndexFile(g, NULL);
oldpix= NULL;
ddp->SetIndx(NULL);
SetBooleanOption("Sepindex", newsep);
} else if (newsep) {
// Make the list of dropped indexes
xlst= &drp; xprc= &oldpix;
for (xp2= oldpix; xp2; xp2= xp) {
for (xp1= newpix; xp1; xp1= xp1->Next)
if (!stricmp(xp1->Name, xp2->Name))
break; // Index not to drop
xp= xp2->GetNext();
if (!xp1) {
*xlst= xp2;
*xprc= xp;
*(xlst= &xp2->Next)= NULL;
} else
xprc= &xp2->Next;
} // endfor xp2
if (drp) {
// Here we erase the index files
ddp->DeleteIndexFile(g, drp);
} // endif xp1
} else if (oldpix) {
// TODO: optimize the case of just adding new indexes
if (!newpix)
ddp->DeleteIndexFile(g, NULL);
oldpix= NULL; // To remake all indexes
ddp->SetIndx(NULL);
} // endif sepindex
// Make the list of new created indexes
xlst= &adp; xprc= &newpix;
for (xp1= newpix; xp1; xp1= xp) {
for (xp2= oldpix; xp2; xp2= xp2->Next)
if (!stricmp(xp1->Name, xp2->Name))
break; // Index already made
xp= xp1->Next;
if (!xp2) {
// Here we do make the index on tabp
sxp= xp1->GetNext();
xp1->SetNext(NULL);
xp->tabp->MakeIndex(g, xp1, true);
xp1->SetNext(sxp);
} // endif xp2
*xlst= xp1;
*xprc= xp;
*(xlst= &xp1->Next)= NULL;
} else
xprc= &xp1->Next;
} // endfor xp1
// Look for dropped indexes
for (xp2= defp2->GetIndx(); xp2; xp2= xp2->GetNext()) {
for (xp1= defp1->GetIndx(); xp1; xp1= xp1->GetNext())
if (!stricmp(xp1->GetName(), xp2->GetName()))
break; // Index not to drop
if (!xp1) {
// Here we erase the index file
sxp= xp2->GetNext();
xp2->SetNext(NULL);
defp2->DeleteIndexFile(g, xp2);
xp2->SetNext(sxp);
} // endif xp1
} // endfor xp2
if (adp)
// Here we do make the new indexes
tdp->MakeIndex(g, adp, true);
} // endif Mode
if (CloseTable(g))
rc= HA_ERR_INTERNAL_ERROR;
} // endelse Xchk
} // endif tdbp
if (CloseTable(g))
rc= HA_ERR_INTERNAL_ERROR;
DBUG_RETURN(rc);
} // endif MODE_ANY
......@@ -3126,7 +3193,8 @@ int ha_connect::external_lock(THD *thd, int lock_type)
} else if (newmode == MODE_READ) {
switch (thd->lex->sql_command) {
case SQLCOM_CREATE_TABLE:
g->Createas= 1; // To tell created table to ignore FLAG
xcheck= true;
cras= true;
case SQLCOM_INSERT:
case SQLCOM_LOAD:
case SQLCOM_INSERT_SELECT:
......@@ -3142,10 +3210,11 @@ int ha_connect::external_lock(THD *thd, int lock_type)
break;
case SQLCOM_DROP_INDEX:
case SQLCOM_CREATE_INDEX:
case SQLCOM_ALTER_TABLE:
xcheck= true;
// stop= true;
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
case SQLCOM_ALTER_TABLE:
newmode= MODE_ANY;
break;
case SQLCOM_CREATE_VIEW:
......@@ -3170,6 +3239,16 @@ int ha_connect::external_lock(THD *thd, int lock_type)
valid_info= false;
} // endif CheckCleanup
if (xcheck) {
// This must occur after CheckCleanup
g->Xchk= new(g) XCHK;
((PCHK)g->Xchk)->oldsep= GetBooleanOption("Sepindex", false);
((PCHK)g->Xchk)->oldpix= GetIndexInfo();
} // endif xcheck
if (cras)
g->Createas= 1; // To tell created table to ignore FLAG
if (xtrace)
printf("Calling CntCheckDB db=%s\n", GetDBName(NULL));
......@@ -3691,9 +3770,9 @@ bool ha_connect::pre_create(THD *thd, HA_CREATE_INFO *create_info,
ok= true;
if ((dsn= create_info->connect_string.str)) {
PMYDEF mydef= new(g) MYSQLDEF();
PDBUSER dup= PlgGetUser(g);
PCATLG cat= (dup) ? dup->Catalog : NULL;
PMYDEF mydef= new(g) MYSQLDEF();
dsn= (char*)PlugSubAlloc(g, NULL, strlen(dsn) + 1);
strncpy(dsn, create_info->connect_string.str,
......@@ -3959,7 +4038,6 @@ int ha_connect::create(const char *name, TABLE *table_arg,
Field *fp;
TABTYPE type;
TABLE *st= table; // Probably unuseful
PIXDEF xdp, pxd= NULL, toidx= NULL;
PGLOBAL g= GetPlug(table_arg->in_use);
DBUG_ENTER("ha_connect::create");
......@@ -4189,43 +4267,40 @@ int ha_connect::create(const char *name, TABLE *table_arg,
} // endif buf
} else {
// Check whether indexes were specified
} // endif filename
// Get the index definitions
for (int n= 0; (unsigned)n < table->s->keynames.count; n++) {
if (xtrace)
printf("Getting created index %d info\n", n + 1);
// To check whether indexes have to be made or remade
if (!g->Xchk) {
PIXDEF xdp;
xdp= GetIndexInfo(n);
if (pxd)
pxd->SetNext(xdp);
else
toidx= xdp;
// We should be in CREATE TABLE
if (table->in_use->lex->sql_command != SQLCOM_CREATE_TABLE)
push_warning(table->in_use, MYSQL_ERROR::WARN_LEVEL_WARN, 0,
"Wrong command in create, please contact CONNECT team");
pxd= xdp;
} // endfor n
if (toidx) {
// Get the index definitions
if (xdp= GetIndexInfo()) {
PDBUSER dup= PlgGetUser(g);
PCATLG cat= (dup) ? dup->Catalog : NULL;
DBUG_ASSERT(cat);
if (cat)
if (cat) {
cat->SetDataPath(g, table_arg->in_use->db);
if ((rc= optimize(NULL, NULL))) {
printf("Create rc=%d %s\n", rc, g->Message);
my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
rc= HA_ERR_INTERNAL_ERROR;
} else
CloseTable(g);
if ((rc= optimize(NULL, NULL))) {
printf("Create rc=%d %s\n", rc, g->Message);
my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0));
rc= HA_ERR_INTERNAL_ERROR;
} else
CloseTable(g);
} // endif toidx
} // endif cat
} // endif filename
} // endif xdp
} else {
((PCHK)g->Xchk)->newsep= GetBooleanOption("Sepindex", false);
((PCHK)g->Xchk)->newpix= GetIndexInfo();
} // endif Xchk
table= st;
} // endif type
......@@ -4250,17 +4325,35 @@ bool ha_connect::check_if_incompatible_data(HA_CREATE_INFO *info,
{
//ha_table_option_struct *param_old, *param_new;
DBUG_ENTER("ha_connect::check_if_incompatible_data");
// TO DO: implement and check it.
// TO DO: really implement and check it.
THD *thd= current_thd;
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 0,
"The current version of CONNECT did not check what you changed in ALTER. Use at your own risk");
if (table) {
PTOS options= info->option_struct;
PTOS newopt= info->option_struct;
PTOS oldopt= table->s->option_struct;
#if 0
if (newopt->sepindex != oldopt->sepindex) {
// All indexes to be remade
PGLOBAL g= GetPlug(thd);
if (!g)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 0,
"Execute OPTIMIZE TABLE to remake the indexes");
else
g->Xchk= new(g) XCHK;
} // endif sepindex
#endif // 0
if (options->filename)
if (newopt->filename)
DBUG_RETURN(COMPATIBLE_DATA_NO);
} // endif table
DBUG_RETURN(COMPATIBLE_DATA_YES);
} // end of check_if_incompatible_data
......
......@@ -31,23 +31,33 @@
/****************************************************************************/
typedef struct _create_xinfo {
char *Type; /* Retrieved from table comment */
char *Filename; /* Set if not standard */
char *Filename; /* Set if not standard */
char *IndexFN; /* Set if not standard */
ulonglong Maxrows; /* Estimated max nb of rows */
ulong Lrecl; /* Set if not default */
ulong Elements; /* Number of lines in blocks */
ulong Elements; /* Number of lines in blocks */
bool Fixed; /* False for DOS type */
void *Pcf; /* To list of columns */
void *Pxdf; /* To list of indexes */
} CRXINFO, *PCXF;
typedef struct _xinfo {
ulonglong data_file_length; /* Length of data file */
ulonglong data_file_length; /* Length of data file */
ha_rows records; /* Records in table */
ulong mean_rec_length; /* Physical record length */
char *data_file_name; /* Physical file name */
char *data_file_name; /* Physical file name */
} XINFO, *PXF;
class XCHK : public BLOCK {
public:
XCHK(void) {oldsep= newsep= false; oldpix= newpix= NULL;}
bool oldsep; // Sepindex before create/alter
bool newsep; // Sepindex after create/alter
PIXDEF oldpix; // The indexes before create/alter
PIXDEF newpix; // The indexes after create/alter
}; // end of class XCHK
typedef class XCHK *PCHK;
typedef class user_connect *PCONNECT;
typedef struct ha_table_option_struct TOS, *PTOS;
typedef struct ha_field_option_struct FOS, *PFOS;
......@@ -87,11 +97,12 @@ public:
char *GetStringOption(char *opname, char *sdef= NULL);
PTOS GetTableOptionStruct(TABLE *table_arg);
bool GetBooleanOption(char *opname, bool bdef);
bool SetBooleanOption(char *opname, bool b);
int GetIntegerOption(char *opname);
bool SetIntegerOption(char *opname, int n);
PFOS GetFieldOptionStruct(Field *fp);
void *GetColumnOption(void *field, PCOLINFO pcf);
PIXDEF GetIndexInfo(int n);
PIXDEF GetIndexInfo(void);
const char *GetDBName(const char *name);
const char *GetTableName(void);
int GetColNameLen(Field *fp);
......
......@@ -556,26 +556,8 @@ int MYCAT::GetColCatInfo(PGLOBAL g, PTABDEF defp)
/***********************************************************************/
bool MYCAT::GetIndexInfo(PGLOBAL g, PTABDEF defp)
{
PIXDEF xdp, pxd= NULL, toidx= NULL;
// Now take care of the index definitions
for (int n= 0; ; n++) {
if (xtrace)
printf("Getting index %d info\n", n + 1);
if (!(xdp= Hc->GetIndexInfo(n)))
break;
if (pxd)
pxd->SetNext(xdp);
else
toidx= xdp;
pxd= xdp;
} // endfor n
// All is correct, attach new index(es)
defp->SetIndx(toidx);
// Attach new index(es)
defp->SetIndx(Hc->GetIndexInfo());
return false;
} // end of GetIndexInfo
......
......@@ -152,6 +152,7 @@ PGLOBAL PlugInit(LPCSTR Language, uint worksize)
g->Trace = 0;
g->Createas = 0;
g->Activityp = g->ActivityStart = NULL;
g->Xchk = NULL;
strcpy(g->Message, "");
/*******************************************************************/
......
......@@ -157,13 +157,13 @@ bool DOSDEF::DeleteIndexFile(PGLOBAL g, PIXDEF pxdf)
char filename[_MAX_PATH];
bool sep, rc = false;
if (!pxdf)
if (!To_Indx)
return false; // No index
// If true indexes are in separate files
sep = Cat->GetBoolCatInfo("SepIndex", false);
if (!sep && To_Indx) {
if (!sep && pxdf) {
strcpy(g->Message, MSG(NO_RECOV_SPACE));
return true;
} // endif sep
......@@ -204,7 +204,7 @@ bool DOSDEF::DeleteIndexFile(PGLOBAL g, PIXDEF pxdf)
#endif // UNIX
} // endfor pxdf
} else { // !sep, no more indexes or name is NULL
} else { // !sep
// Drop all indexes, delete the common file
PlugSetPath(filename, Ofn, GetPath());
strcat(PlugRemoveType(filename, filename), ftype);
......
......@@ -70,7 +70,6 @@ user_connect::user_connect(THD *thd, const char *dbn)
next= NULL;
previous= NULL;
g= NULL;
tabp= NULL;
last_query_id= 0;
count= 0;
......@@ -141,6 +140,8 @@ bool user_connect::CheckCleanup(void)
if (thdp->query_id > last_query_id) {
PlugCleanup(g, true);
PlugSubSet(g, g->Sarea, g->Sarea_Size);
g->Xchk = NULL;
g->Createas = 0;
last_query_id= thdp->query_id;
if (xtrace)
......
......@@ -70,7 +70,6 @@ protected:
PCONNECT previous; // Previous user in chain
PGLOBAL g; // The common handle to CONNECT
//char dbname[32]; // The DBCONNECT database
PTDBDOS tabp; // The table used on create
query_id_t last_query_id; // the latest user query id
int count; // if used by several handlers
// Statistics
......
......@@ -76,6 +76,7 @@ typedef struct index_off {
class DllExport INDEXDEF : public BLOCK { /* Index description block */
friend class PLUGCAT;
friend class DOSDEF;
friend class ha_connect;
friend int PlgMakeIndex(PGLOBAL g, PSZ name, PIXDEF pxdf, bool add);
public:
// Constructor
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment