Commit bce59293 authored by Olivier Bertrand's avatar Olivier Bertrand

-Fix MDEV-4878. Table locking is now supported.

modified:
  storage/connect/ha_connect.cc
  storage/connect/ha_connect.h
  storage/connect/plgdbsem.h

- Fiw a bug making records_in_range sometimes return a negative
  value.

modified:
  storage/connect/xindex.cpp
parent 8b3e07e1
...@@ -497,6 +497,7 @@ ha_connect::ha_connect(handlerton *hton, TABLE_SHARE *table_arg) ...@@ -497,6 +497,7 @@ ha_connect::ha_connect(handlerton *hton, TABLE_SHARE *table_arg)
creat_query_id= (table && table->in_use) ? table->in_use->query_id : 0; creat_query_id= (table && table->in_use) ? table->in_use->query_id : 0;
stop= false; stop= false;
indexing= -1; indexing= -1;
locked= 0;
data_file_name= NULL; data_file_name= NULL;
index_file_name= NULL; index_file_name= NULL;
enable_activate_all_index= 0; enable_activate_all_index= 0;
...@@ -1865,7 +1866,7 @@ int ha_connect::open(const char *name, int mode, uint test_if_locked) ...@@ -1865,7 +1866,7 @@ int ha_connect::open(const char *name, int mode, uint test_if_locked)
DBUG_ENTER("ha_connect::open"); DBUG_ENTER("ha_connect::open");
if (xtrace) if (xtrace)
printf("open: name=%s mode=%d test=%ud\n", name, mode, test_if_locked); printf("open: name=%s mode=%d test=%u\n", name, mode, test_if_locked);
if (!(share= get_share(name, table))) if (!(share= get_share(name, table)))
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -1983,8 +1984,11 @@ int ha_connect::write_row(uchar *buf) ...@@ -1983,8 +1984,11 @@ int ha_connect::write_row(uchar *buf)
PGLOBAL& g= xp->g; PGLOBAL& g= xp->g;
DBUG_ENTER("ha_connect::write_row"); DBUG_ENTER("ha_connect::write_row");
// Open the table if it was not opened yet (possible ???) // Open the table if it was not opened yet (locked)
if (!IsOpened()) if (!IsOpened() || xmod != tdbp->GetMode()) {
if (IsOpened())
CloseTable(g);
if (OpenTable(g)) { if (OpenTable(g)) {
if (strstr(g->Message, "read only")) if (strstr(g->Message, "read only"))
rc= HA_ERR_TABLE_READONLY; rc= HA_ERR_TABLE_READONLY;
...@@ -1994,6 +1998,8 @@ int ha_connect::write_row(uchar *buf) ...@@ -1994,6 +1998,8 @@ int ha_connect::write_row(uchar *buf)
DBUG_RETURN(rc); DBUG_RETURN(rc);
} // endif tdbp } // endif tdbp
} // endif isopened
if (tdbp->GetMode() == MODE_ANY) if (tdbp->GetMode() == MODE_ANY)
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -2112,6 +2118,13 @@ int ha_connect::index_init(uint idx, bool sorted) ...@@ -2112,6 +2118,13 @@ int ha_connect::index_init(uint idx, bool sorted)
if ((rc= rnd_init(0))) if ((rc= rnd_init(0)))
return rc; return rc;
if (locked == 2) {
// Indexes are not updated in lock write mode
active_index= MAX_KEY;
indexing= 0;
DBUG_RETURN(0);
} // endif locked
indexing= CntIndexInit(g, tdbp, (signed)idx); indexing= CntIndexInit(g, tdbp, (signed)idx);
if (indexing <= 0) { if (indexing <= 0) {
...@@ -2356,16 +2369,15 @@ int ha_connect::rnd_init(bool scan) ...@@ -2356,16 +2369,15 @@ int ha_connect::rnd_init(bool scan)
printf("%p in rnd_init: scan=%d\n", this, scan); printf("%p in rnd_init: scan=%d\n", this, scan);
if (g) { if (g) {
// Open the table if it was not opened yet (possible ???) if (!table || xmod == MODE_INSERT)
if (!IsOpened()) { DBUG_RETURN(HA_ERR_INITIALIZATION);
if (!table || xmod == MODE_INSERT)
DBUG_RETURN(HA_ERR_INITIALIZATION);
if (OpenTable(g, xmod == MODE_DELETE)) // Close the table if it was opened yet (locked?)
DBUG_RETURN(HA_ERR_INITIALIZATION); if (IsOpened())
CloseTable(g);
} else if (OpenTable(g, xmod == MODE_DELETE))
void(CntRewindTable(g, tdbp)); // Read from beginning DBUG_RETURN(HA_ERR_INITIALIZATION);
} // endif g } // endif g
...@@ -2780,6 +2792,153 @@ bool ha_connect::IsSameIndex(PIXDEF xp1, PIXDEF xp2) ...@@ -2780,6 +2792,153 @@ bool ha_connect::IsSameIndex(PIXDEF xp1, PIXDEF xp2)
return b; return b;
} // end of IsSameIndex } // end of IsSameIndex
MODE ha_connect::CheckMode(PGLOBAL g, THD *thd,
MODE newmode, bool *chk, bool *cras)
{
if (xtrace) {
LEX_STRING *query_string= thd_query_string(thd);
printf("%p check_mode: cmdtype=%d\n", this, thd_sql_command(thd));
printf("Cmd=%.*s\n", (int) query_string->length, query_string->str);
} // endif xtrace
// Next code is temporarily replaced until sql_command is set
stop= false;
if (newmode == MODE_WRITE) {
switch (thd_sql_command(thd)) {
case SQLCOM_LOCK_TABLES:
locked= 2;
case SQLCOM_CREATE_TABLE:
case SQLCOM_INSERT:
case SQLCOM_LOAD:
case SQLCOM_INSERT_SELECT:
newmode= MODE_INSERT;
break;
// case SQLCOM_REPLACE:
// case SQLCOM_REPLACE_SELECT:
// newmode= MODE_UPDATE; // To be checked
// break;
case SQLCOM_DELETE:
case SQLCOM_DELETE_MULTI:
case SQLCOM_TRUNCATE:
newmode= MODE_DELETE;
break;
case SQLCOM_UPDATE:
case SQLCOM_UPDATE_MULTI:
newmode= MODE_UPDATE;
break;
case SQLCOM_SELECT:
case SQLCOM_OPTIMIZE:
newmode= MODE_READ;
break;
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
case SQLCOM_ALTER_TABLE:
newmode= MODE_ANY;
break;
case SQLCOM_DROP_INDEX:
case SQLCOM_CREATE_INDEX:
newmode= MODE_ANY;
// stop= true;
break;
case SQLCOM_CREATE_VIEW:
case SQLCOM_DROP_VIEW:
newmode= MODE_ANY;
break;
default:
printf("Unsupported sql_command=%d", thd_sql_command(thd));
strcpy(g->Message, "CONNECT Unsupported command");
my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
newmode= MODE_ERROR;
break;
} // endswitch newmode
} else if (newmode == MODE_READ) {
switch (thd_sql_command(thd)) {
case SQLCOM_CREATE_TABLE:
*chk= true;
*cras= true;
case SQLCOM_INSERT:
case SQLCOM_LOAD:
case SQLCOM_INSERT_SELECT:
// case SQLCOM_REPLACE:
// case SQLCOM_REPLACE_SELECT:
case SQLCOM_DELETE:
case SQLCOM_DELETE_MULTI:
case SQLCOM_TRUNCATE:
case SQLCOM_UPDATE:
case SQLCOM_UPDATE_MULTI:
case SQLCOM_SELECT:
case SQLCOM_OPTIMIZE:
break;
case SQLCOM_LOCK_TABLES:
locked= 1;
break;
case SQLCOM_DROP_INDEX:
case SQLCOM_CREATE_INDEX:
case SQLCOM_ALTER_TABLE:
*chk= true;
// stop= true;
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
newmode= MODE_ANY;
break;
case SQLCOM_CREATE_VIEW:
case SQLCOM_DROP_VIEW:
newmode= MODE_ANY;
break;
default:
printf("Unsupported sql_command=%d", thd_sql_command(thd));
strcpy(g->Message, "CONNECT Unsupported command");
my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
newmode= MODE_ERROR;
break;
} // endswitch newmode
} // endif's newmode
if (xtrace)
printf("New mode=%d\n", newmode);
return newmode;
} // end of check_mode
int ha_connect::start_stmt(THD *thd, thr_lock_type lock_type)
{
int rc= 0;
bool chk=false, cras= false;
MODE newmode;
PGLOBAL g= GetPlug(thd, xp);
DBUG_ENTER("ha_connect::start_stmt");
// Action will depend on lock_type
switch (lock_type) {
case TL_WRITE_ALLOW_WRITE:
case TL_WRITE_CONCURRENT_INSERT:
case TL_WRITE_DELAYED:
case TL_WRITE_DEFAULT:
case TL_WRITE_LOW_PRIORITY:
case TL_WRITE:
case TL_WRITE_ONLY:
newmode= MODE_WRITE;
break;
case TL_READ:
case TL_READ_WITH_SHARED_LOCKS:
case TL_READ_HIGH_PRIORITY:
case TL_READ_NO_INSERT:
case TL_READ_DEFAULT:
newmode= MODE_READ;
break;
case TL_UNLOCK:
default:
newmode= MODE_ANY;
break;
} // endswitch mode
xmod= CheckMode(g, thd, newmode, &chk, &cras);
DBUG_RETURN((xmod == MODE_ERROR) ? HA_ERR_INTERNAL_ERROR : 0);
} // end of start_stmt
/** /**
@brief @brief
This create a lock on the table. If you are implementing a storage engine This create a lock on the table. If you are implementing a storage engine
...@@ -2841,7 +3000,8 @@ int ha_connect::external_lock(THD *thd, int lock_type) ...@@ -2841,7 +3000,8 @@ int ha_connect::external_lock(THD *thd, int lock_type)
if (newmode == MODE_ANY) { if (newmode == MODE_ANY) {
// This is unlocking, do it by closing the table // This is unlocking, do it by closing the table
if (xp->CheckQueryID()) if (xp->CheckQueryID() && thd_sql_command(thd) != SQLCOM_UNLOCK_TABLES
&& thd_sql_command(thd) != SQLCOM_LOCK_TABLES)
rc= 2; // Logical error ??? rc= 2; // Logical error ???
else if (g->Xchk) { else if (g->Xchk) {
if (!tdbp || *tdbp->GetName() == '#') { if (!tdbp || *tdbp->GetName() == '#') {
...@@ -2939,109 +3099,15 @@ int ha_connect::external_lock(THD *thd, int lock_type) ...@@ -2939,109 +3099,15 @@ int ha_connect::external_lock(THD *thd, int lock_type)
//#endif // !DEBUG //#endif // !DEBUG
} // endif Close } // endif Close
locked= 0;
DBUG_RETURN(rc); DBUG_RETURN(rc);
} // endif MODE_ANY } // endif MODE_ANY
if (xtrace) { // Table mode depends on the query type
LEX_STRING *query_string= thd_query_string(thd); newmode= CheckMode(g, thd, newmode, &xcheck, &cras);
printf("%p external_lock: cmdtype=%d\n", this, thd_sql_command(thd));
printf("Cmd=%.*s\n", (int) query_string->length, query_string->str);
} // endif xtrace
// Next code is temporarily replaced until sql_command is set if (newmode == MODE_ERROR)
stop= false; DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
if (newmode == MODE_WRITE) {
switch (thd_sql_command(thd)) {
case SQLCOM_CREATE_TABLE:
case SQLCOM_INSERT:
case SQLCOM_LOAD:
case SQLCOM_INSERT_SELECT:
newmode= MODE_INSERT;
break;
// case SQLCOM_REPLACE:
// case SQLCOM_REPLACE_SELECT:
// newmode= MODE_UPDATE; // To be checked
// break;
case SQLCOM_DELETE:
case SQLCOM_DELETE_MULTI:
case SQLCOM_TRUNCATE:
newmode= MODE_DELETE;
break;
case SQLCOM_UPDATE:
case SQLCOM_UPDATE_MULTI:
newmode= MODE_UPDATE;
break;
case SQLCOM_SELECT:
case SQLCOM_OPTIMIZE:
newmode= MODE_READ;
break;
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
case SQLCOM_ALTER_TABLE:
newmode= MODE_ANY;
break;
case SQLCOM_DROP_INDEX:
case SQLCOM_CREATE_INDEX:
newmode= MODE_ANY;
// stop= true;
break;
case SQLCOM_CREATE_VIEW:
case SQLCOM_DROP_VIEW:
newmode= MODE_ANY;
break;
default:
printf("Unsupported sql_command=%d", thd_sql_command(thd));
strcpy(g->Message, "CONNECT Unsupported command");
my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
break;
} // endswitch newmode
} else if (newmode == MODE_READ) {
switch (thd_sql_command(thd)) {
case SQLCOM_CREATE_TABLE:
xcheck= true;
cras= true;
case SQLCOM_INSERT:
case SQLCOM_LOAD:
case SQLCOM_INSERT_SELECT:
// case SQLCOM_REPLACE:
// case SQLCOM_REPLACE_SELECT:
case SQLCOM_DELETE:
case SQLCOM_DELETE_MULTI:
case SQLCOM_TRUNCATE:
case SQLCOM_UPDATE:
case SQLCOM_UPDATE_MULTI:
case SQLCOM_SELECT:
case SQLCOM_OPTIMIZE:
break;
case SQLCOM_DROP_INDEX:
case SQLCOM_CREATE_INDEX:
case SQLCOM_ALTER_TABLE:
xcheck= true;
// stop= true;
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
newmode= MODE_ANY;
break;
case SQLCOM_CREATE_VIEW:
case SQLCOM_DROP_VIEW:
newmode= MODE_ANY;
break;
default:
printf("Unsupported sql_command=%d", thd_sql_command(thd));
strcpy(g->Message, "CONNECT Unsupported command");
my_message(ER_NOT_ALLOWED_COMMAND, g->Message, MYF(0));
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
break;
} // endswitch newmode
} // endif's newmode
if (xtrace)
printf("New mode=%d\n", newmode);
// If this is the start of a new query, cleanup the previous one // If this is the start of a new query, cleanup the previous one
if (xp->CheckCleanup()) { if (xp->CheckCleanup()) {
......
...@@ -406,6 +406,7 @@ const char *GetValStr(OPVAL vop, bool neg); ...@@ -406,6 +406,7 @@ const char *GetValStr(OPVAL vop, bool neg);
void position(const uchar *record); ///< required void position(const uchar *record); ///< required
int info(uint); ///< required int info(uint); ///< required
int extra(enum ha_extra_function operation); int extra(enum ha_extra_function operation);
int start_stmt(THD *thd, thr_lock_type lock_type);
int external_lock(THD *thd, int lock_type); ///< required int external_lock(THD *thd, int lock_type); ///< required
int delete_all_rows(void); int delete_all_rows(void);
ha_rows records_in_range(uint inx, key_range *min_key, ha_rows records_in_range(uint inx, key_range *min_key,
...@@ -435,6 +436,7 @@ const char *GetValStr(OPVAL vop, bool neg); ...@@ -435,6 +436,7 @@ const char *GetValStr(OPVAL vop, bool neg);
protected: protected:
bool check_privileges(THD *thd, PTOS options); bool check_privileges(THD *thd, PTOS options);
MODE CheckMode(PGLOBAL g, THD *thd, MODE newmode, bool *chk, bool *cras);
// Members // Members
static ulong num; // Tracable handler number static ulong num; // Tracable handler number
...@@ -452,6 +454,7 @@ protected: ...@@ -452,6 +454,7 @@ protected:
bool valid_info; // True if xinfo is valid bool valid_info; // True if xinfo is valid
bool stop; // Used when creating index bool stop; // Used when creating index
int indexing; // Type of indexing for CONNECT int indexing; // Type of indexing for CONNECT
int locked; // Table lock
THR_LOCK_DATA lock_data; THR_LOCK_DATA lock_data;
public: public:
......
...@@ -151,7 +151,8 @@ enum ALGMOD {AMOD_AUTO = 0, /* PLG chooses best algorithm */ ...@@ -151,7 +151,8 @@ enum ALGMOD {AMOD_AUTO = 0, /* PLG chooses best algorithm */
#define NAM_LEN 128 #define NAM_LEN 128
#endif // !0 #endif // !0
enum MODE {MODE_ANY = 0, /* Unspecified mode */ enum MODE {MODE_ERROR = -1, /* Invalid mode */
MODE_ANY = 0, /* Unspecified mode */
MODE_READ = 10, /* Input/Output mode */ MODE_READ = 10, /* Input/Output mode */
MODE_WRITE = 20, /* Input/Output mode */ MODE_WRITE = 20, /* Input/Output mode */
MODE_UPDATE = 30, /* Input/Output mode */ MODE_UPDATE = 30, /* Input/Output mode */
......
...@@ -1832,8 +1832,9 @@ int XINDXS::Range(PGLOBAL g, int limit, bool incl) ...@@ -1832,8 +1832,9 @@ int XINDXS::Range(PGLOBAL g, int limit, bool incl)
/*********************************************************************/ /*********************************************************************/
if (xp->GetType() == TYPE_CONST) { if (xp->GetType() == TYPE_CONST) {
kp->Valp->SetValue_pval(xp->GetValue(), !kp->Prefix); kp->Valp->SetValue_pval(xp->GetValue(), !kp->Prefix);
k = FastFind(Nval);
if ((k = FastFind(Nval)) < Num_K) if (k < Num_K || Op != OP_EQ)
if (limit) if (limit)
n = (Mul) ? k : kp->Val_K; n = (Mul) ? k : kp->Val_K;
else else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment