Commit 260c0de9 authored by Olivier Bertrand's avatar Olivier Bertrand

- Add (limited) UPDATE/DELETE support to MYSQL type CONNECT tables

modified:
  storage/connect/ha_connect.cc
  storage/connect/tabmysql.cpp
  storage/connect/tabmysql.h
  storage/connect/tabodbc.cpp
parent eca84a9b
......@@ -4718,6 +4718,6 @@ maria_declare_plugin(connect)
NULL, /* status variables */
NULL, /* system variables */
"0.1", /* string version */
MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */
MariaDB_PLUGIN_MATURITY_ALPHA /* maturity */
}
maria_declare_plugin_end;
......@@ -321,7 +321,7 @@ bool MYSQLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
Portnumber = Cat->GetIntCatInfo("Port", GetDefaultPort());
Server = Hostname;
} else if (ParseURL(g, url))
return TRUE;
return true;
Bind = !!Cat->GetIntCatInfo("Bind", 0);
Delayed = !!Cat->GetIntCatInfo("Delayed", 0);
......@@ -344,7 +344,7 @@ bool MYSQLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
char *locdb = Database;
if (ParseURL(g, url))
return TRUE;
return true;
Database = locdb;
} // endif url
......@@ -353,10 +353,14 @@ bool MYSQLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
} // endif am
if ((Srcdef = Cat->GetStringCatInfo(g, "Srcdef", NULL)))
Isview = TRUE;
Isview = true;
// Used for Update and Delete
Qrystr = Cat->GetStringCatInfo(g, "Query_String", "?");
Quoted = Cat->GetIntCatInfo("Quoted", 0);
// Specific for command executing tables
Xsrc = Cat->GetBoolCatInfo("Execsrc", FALSE);
Xsrc = Cat->GetBoolCatInfo("Execsrc", false);
Mxr = Cat->GetIntCatInfo("Maxerr", 0);
return FALSE;
} // end of DefineAM
......@@ -390,6 +394,8 @@ TDBMYSQL::TDBMYSQL(PMYDEF tdp) : TDBASE(tdp)
User = tdp->Username;
Pwd = tdp->Password;
Server = tdp->Server;
Qrystr = tdp->Qrystr;
Quoted = max(0, tdp->Quoted);
Port = tdp->Portnumber;
Isview = tdp->Isview;
Prep = tdp->Bind;
......@@ -402,6 +408,8 @@ TDBMYSQL::TDBMYSQL(PMYDEF tdp) : TDBASE(tdp)
User = NULL;
Pwd = NULL;
Server = NULL;
Qrystr = NULL;
Quoted = 0;
Port = 0;
Isview = FALSE;
Prep = FALSE;
......@@ -426,6 +434,8 @@ TDBMYSQL::TDBMYSQL(PGLOBAL g, PTDBMY tdbp) : TDBASE(tdbp)
Srcdef = tdbp->Srcdef;
User = tdbp->User;
Pwd = tdbp->Pwd;
Qrystr = tdbp->Qrystr;
Quoted = tdbp->Quoted;
Port = tdbp->Port;
Isview = tdbp->Isview;
Prep = tdbp->Prep;
......@@ -612,120 +622,62 @@ bool TDBMYSQL::MakeInsert(PGLOBAL g)
return FALSE;
} // end of MakeInsert
#if 0
/***********************************************************************/
/* MakeUpdate: make the Update statement use with MySQL connection. */
/* Note: currently limited to local values and filtering. */
/* Limited to remote values and filtering. */
/***********************************************************************/
bool TDBMYSQL::MakeUpdate(PGLOBAL g, PSELECT selist)
int TDBMYSQL::MakeUpdate(PGLOBAL g)
{
char *setlist, *colname, *where = NULL, *tk = "`";
int len = 0, nset = 0;
bool b = FALSE;
PXOB xp;
PSELECT selp;
if (Query)
return FALSE; // already done
if (To_Filter)
if (To_Filter->CheckLocal(this)) {
where = (char*)PlugSubAlloc(g, NULL, 512); // Should be enough
*where = '\0';
if (!PlugRephraseSQL(g, where, To_Filter, TYPE_FILTER, tk))
return TRUE;
To_Filter = NULL;
len = strlen(where);
} else {
strcpy(g->Message, MSG(NO_REF_UPDATE));
return TRUE;
} // endif Local
for (selp = selist; selp; selp = selp->GetNext_Proj())
nset++;
assert(nset);
// Allocate a pretty big buffer
setlist = (char*)PlugSubAlloc(g, NULL, 256 * nset);
*setlist = '\0';
char *qc, cmd[8], tab[96], end[1024];
for (selp = selist; selp; selp = selp->GetNext_Proj()) {
if (selp->GetSetType() == TYPE_COLBLK) {
colname = selp->GetSetCol()->GetName();
} else if (selp->GetSetType() == TYPE_COLUMN) {
colname = (char*)((PCOLUMN)selp->GetSetCol())->GetName();
} else {
sprintf(g->Message, MSG(BAD_SET_TYPE), selp->GetSetType());
return TRUE;
} // endif Type
if (b)
strcat(setlist, ", ");
else
b = TRUE;
strcat(strcat(strcat(strcat(setlist, tk), colname), tk), " = ");
xp = selp->GetObject();
if (!xp->CheckLocal(this)) {
strcpy(g->Message, MSG(NO_REF_UPDATE));
return TRUE;
} else if (xp->GetType() == TYPE_SUBQ)
// Cannot be correlated because CheckLocal would have failed
xp = new(g) CONSTANT(xp->GetValue());
Query = (char*)PlugSubAlloc(g, NULL, strlen(Qrystr) + 64);
memset(end, 0, sizeof(end));
if (!PlugRephraseSQL(g, setlist + strlen(setlist),
xp, TYPE_XOBJECT, tk))
return TRUE;
} // endfor selp
// Below 16 is enough to take care of the fixed part of the query
len += (strlen(setlist) + strlen(Tabname) + 16);
Query = (char*)PlugSubAlloc(g, NULL, len);
strcat(strcat(strcat(strcpy(Query, "UPDATE "), tk), Tabname), tk);
strcat(strcat(Query, " SET "), setlist);
if (where)
strcat(Query, where);
if (sscanf(Qrystr, "%s `%[^`]`%1023c", cmd, tab, end) > 2 ||
sscanf(Qrystr, "%s \"%[^\"]\"%1023c", cmd, tab, end) > 2)
qc = "`";
else if (sscanf(Qrystr, "%s %s%1023c", cmd, tab, end) > 2)
qc = (Quoted) ? "`" : "";
else {
strcpy(g->Message, "Cannot use this UPDATE command");
return RC_FX;
} // endif sscanf
return FALSE;
assert(!stricmp(cmd, "update"));
strcat(strcat(strcat(strcpy(Query, "UPDATE "), qc), Tabname), qc);
strcat(Query, end);
return RC_OK;
} // end of MakeUpdate
/***********************************************************************/
/* MakeDelete: make the Delete statement use with MySQL connection. */
/* If no filtering Truncate is used because it is faster than Delete. */
/* However, the number of deleted lines is not returned by MySQL. */
/* Note: currently limited to local filtering. */
/* MakeDelete: make the Delete statement used with MySQL connection. */
/* Limited to remote filtering. */
/***********************************************************************/
bool TDBMYSQL::MakeDelete(PGLOBAL g)
int TDBMYSQL::MakeDelete(PGLOBAL g)
{
char *tk = "`";
int len = 0;
char *qc, cmd[8], from[8], tab[96], end[512];
if (Query)
return FALSE; // already done
Query = (char*)PlugSubAlloc(g, NULL, strlen(Qrystr) + 64);
memset(end, 0, sizeof(end));
if (!To_Filter)
AftRows = -1; // Means "all lines deleted"
if (sscanf(Qrystr, "%s %s `%[^`]`%511c", cmd, from, tab, end) > 2 ||
sscanf(Qrystr, "%s %s \"%[^\"]\"%511c", cmd, from, tab, end) > 2)
qc = "`";
else if (sscanf(Qrystr, "%s %s %s%511c", cmd, from, tab, end) > 2)
qc = (Quoted) ? "`" : "";
else {
strcpy(g->Message, "Cannot use this DELETE command");
return RC_FX;
} // endif sscanf
// Below 16 is more than length of 'delete from ' + 3
len += (strlen(Tabname) + 16);
len += (To_Filter ? strlen(To_Filter) + 7 : 0);
Query = (char*)PlugSubAlloc(g, NULL, len);
strcpy(Query, (To_Filter) ? "DELETE FROM " : "TRUNCATE ");
strcat(strcat(strcat(Query, tk), Tabname), tk);
assert(!stricmp(cmd, "delete") && !stricmp(from, "from"));
strcat(strcat(strcat(strcpy(Query, "DELETE FROM "), qc), Tabname), qc);
if (To_Filter)
strcat(strcat(Query, " WHERE "), To_Filter);
if (*end)
strcat(Query, end);
return FALSE;
return RC_OK;
} // end of MakeDelete
#endif // 0
/***********************************************************************/
/* XCV GetMaxSize: returns the maximum number of rows in the table. */
......@@ -753,7 +705,8 @@ int TDBMYSQL::GetMaxSize(PGLOBAL g)
Query = NULL; // Must be remade when columns are known
#endif // 0
MaxSize = 10; // To make MySQL happy
// Return 0 in mode DELETE in case of delete all.
MaxSize = (Mode == MODE_DELETE) ? 0 : 10; // To make MySQL happy
} // endif MaxSize
return MaxSize;
......@@ -768,12 +721,11 @@ int TDBMYSQL::RowNumber(PGLOBAL g, bool b)
} // end of RowNumber
/***********************************************************************/
/* Return 0 in mode DELETE to tell that the delete is done. */
/* Return 0 in mode UPDATE to tell that the update is done. */
/***********************************************************************/
int TDBMYSQL::GetProgMax(PGLOBAL g)
{
return (Mode == MODE_DELETE || Mode == MODE_UPDATE) ? 0
: GetMaxSize(g);
return (Mode == MODE_UPDATE) ? 0 : GetMaxSize(g);
} // end of GetProgMax
/***********************************************************************/
......@@ -876,33 +828,16 @@ bool TDBMYSQL::OpenDB(PGLOBAL g)
m_Rc = Myc.ExecSQL(g, cmd, &w);
} // endif m_Rc
#if 0
} else if (Next) {
strcpy(g->Message, MSG(NO_JOIN_UPDEL));
} else if (Mode == MODE_DELETE) {
strcpy(g->Message, "MySQL table delete not implemented yet\n");
bool rc = MakeDelete(g);
if (!rc && Myc.ExecSQL(g, Query) == RC_NF) {
if (!AftRows)
AftRows = Myc.GetRows();
m_Rc = RC_OK;
} // endif ExecSQL
#endif // 0
} else {
// bool rc = MakeUpdate(g, sqlp->GetProj());
strcpy(g->Message, "MySQL table delete/update not implemented yet\n");
} // endelse
} else
m_Rc = (Mode == MODE_DELETE) ? MakeDelete(g) : MakeUpdate(g);
if (m_Rc == RC_FX) {
Myc.Close();
return TRUE;
return true;
} // endif rc
Use = USE_OPEN; // Do it now in case we are recursively called
return FALSE;
Use = USE_OPEN;
return false;
} // end of OpenDB
/***********************************************************************/
......@@ -977,6 +912,38 @@ char *TDBMYSQL::FindFieldColumn(char *name)
return cp;
} // end of FindFieldColumn
/***********************************************************************/
/* Send an UPDATE or DELETE command to the remote server. */
/***********************************************************************/
int TDBMYSQL::SendCommand(PGLOBAL g)
{
int w;
if (Myc.ExecSQLcmd(g, Query, &w) == RC_NF) {
AftRows = Myc.m_Afrw;
sprintf(g->Message, "%s: %d affected rows", Tabname, AftRows);
PushWarning(g, this, 0); // 0 means a Note
if (trace)
htrc("%s\n", g->Message);
if (w && Myc.ExecSQL(g, "SHOW WARNINGS") == RC_OK) {
// We got warnings from the remote server
while (Myc.Fetch(g, -1) == RC_OK) {
sprintf(g->Message, "%s: (%s) %s", Tabname,
Myc.GetCharField(1), Myc.GetCharField(2));
PushWarning(g, this);
} // endwhile Fetch
Myc.FreeResult();
} // endif w
return RC_EF; // Nothing else to do
} else
return RC_FX; // Error
} // end of SendCommand
/***********************************************************************/
/* Data Base read routine for MYSQL access method. */
/***********************************************************************/
......@@ -988,6 +955,9 @@ int TDBMYSQL::ReadDB(PGLOBAL g)
htrc("MySQL ReadDB: R%d Mode=%d key=%p link=%p Kindex=%p\n",
GetTdb_No(), Mode, To_Key_Col, To_Link, To_Kindex);
if (Mode == MODE_UPDATE || Mode == MODE_DELETE)
return SendCommand(g);
/*********************************************************************/
/* Now start the reading process. */
/* Here is the place to fetch the line. */
......@@ -1043,12 +1013,16 @@ int TDBMYSQL::WriteDB(PGLOBAL g)
} // end of WriteDB
/***********************************************************************/
/* Data Base delete line routine for MYSQL access methods. */
/* Data Base delete all routine for MYSQL access methods. */
/***********************************************************************/
int TDBMYSQL::DeleteDB(PGLOBAL g, int irc)
{
strcpy(g->Message, MSG(NO_MYSQL_DELETE));
return RC_FX;
if (irc == RC_FX)
// Send the DELETE (all) command to the remote table
return (SendCommand(g) == RC_FX) ? RC_FX : RC_OK;
else
return RC_OK; // Ignore
} // end of DeleteDB
/***********************************************************************/
......
......@@ -53,8 +53,10 @@ class MYSQLDEF : public TABDEF {/* Logical table description */
PSZ Username; /* User logon name */
PSZ Password; /* Password logon info */
PSZ Server; /* PServerID */
PSZ Qrystr; /* The original query */
int Portnumber; /* MySQL port number (0 = default) */
int Mxr; /* Maxerr for an Exec table */
int Quoted; /* Identifier quoting level */
bool Isview; /* TRUE if this table is a MySQL view */
bool Bind; /* Use prepared statement on insert */
bool Delayed; /* Delayed insert */
......@@ -104,9 +106,10 @@ class TDBMYSQL : public TDBASE {
// Internal functions
bool MakeSelect(PGLOBAL g);
bool MakeInsert(PGLOBAL g);
//bool MakeUpdate(PGLOBAL g);
//bool MakeDelete(PGLOBAL g);
int MakeUpdate(PGLOBAL g);
int MakeDelete(PGLOBAL g);
int BindColumns(PGLOBAL g);
int SendCommand(PGLOBAL g);
// Members
MYSQLC Myc; // MySQL connection class
......@@ -120,6 +123,7 @@ class TDBMYSQL : public TDBASE {
char *Server; // The server ID
char *Query; // Points to SQL query
char *Qbuf; // Used for not prepared insert
char *Qrystr; // The original query
bool Fetched; // True when fetch was done
bool Isview; // True if this table is a MySQL view
bool Prep; // Use prepared statement on insert
......@@ -129,6 +133,7 @@ class TDBMYSQL : public TDBASE {
int N; // The current table index
int Port; // MySQL port number (0 = default)
int Nparm; // The number of statement parameters
int Quoted; // The identifier quoting level
}; // end of class TDBMYSQL
/***********************************************************************/
......
......@@ -602,7 +602,7 @@ int TDBODBC::GetMaxSize(PGLOBAL g)
{
if (MaxSize < 0) {
// Make MariaDB happy
MaxSize = (Mode == MODE_READ) ? 100 : 0;
MaxSize = (Mode == MODE_DELETE) ? 0 : 10;
#if 0
// This is unuseful and takes time
if (Srcdef) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment