Commit e308ad70 authored by unknown's avatar unknown

Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.0-ndb

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.0-ndb-merge


sql/ha_ndbcluster.cc:
  Auto merged
parents 820a2b8c 8890b295
...@@ -1888,5 +1888,27 @@ set engine_condition_pushdown = 1; ...@@ -1888,5 +1888,27 @@ set engine_condition_pushdown = 1;
SELECT fname, lname FROM t1 WHERE (fname like 'Y%') or (lname like 'F%'); SELECT fname, lname FROM t1 WHERE (fname like 'Y%') or (lname like 'F%');
fname lname fname lname
Young Foo Young Foo
drop table t1;
create table t1 (a int, b int, c int, d int, primary key using hash(a))
engine=ndbcluster;
insert into t1 values (10,1,100,0+0x1111);
insert into t1 values (20,2,200,0+0x2222);
insert into t1 values (30,3,300,0+0x3333);
insert into t1 values (40,4,400,0+0x4444);
insert into t1 values (50,5,500,0+0x5555);
set engine_condition_pushdown = on;
select a,b,d from t1
where b in (0,1,2,5)
order by b;
a b d
10 1 4369
20 2 8738
50 5 21845
a b d
10 1 4369
20 2 8738
50 5 21845
Warnings:
Warning 4294 Scan filter is too large, discarded
set engine_condition_pushdown = @old_ecpd; set engine_condition_pushdown = @old_ecpd;
DROP TABLE t1,t2,t3,t4,t5; DROP TABLE t1,t2,t3,t4,t5;
...@@ -39,4 +39,12 @@ pk1 b c ...@@ -39,4 +39,12 @@ pk1 b c
10 0 0 10 0 0
12 2 2 12 2 2
14 1 1 14 1 1
create unique index ib on t1(b);
update t1 set c = 4 where pk1 = 12;
update ignore t1 set b = 55 where pk1 = 14;
select * from t1 order by pk1;
pk1 b c
10 0 0
12 2 4
14 55 1
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
This diff is collapsed.
...@@ -33,6 +33,11 @@ UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4; ...@@ -33,6 +33,11 @@ UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4;
select * from t1 order by pk1; select * from t1 order by pk1;
UPDATE t1 set pk1 = pk1 + 10; UPDATE t1 set pk1 = pk1 + 10;
select * from t1 order by pk1; select * from t1 order by pk1;
# bug#25817
create unique index ib on t1(b);
update t1 set c = 4 where pk1 = 12;
update ignore t1 set b = 55 where pk1 = 14;
select * from t1 order by pk1;
--disable_warnings --disable_warnings
DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t1;
......
...@@ -41,8 +41,7 @@ public: ...@@ -41,8 +41,7 @@ public:
STATIC_CONST( FRAGMENT_MEMORY= 0xFFF9 ); STATIC_CONST( FRAGMENT_MEMORY= 0xFFF9 );
/** Initialize AttributeHeader at location aHeaderPtr */ /** Initialize AttributeHeader at location aHeaderPtr */
static AttributeHeader& init(void* aHeaderPtr, Uint32 anAttributeId, static void init(Uint32* aHeaderPtr, Uint32 anAttributeId, Uint32 aDataSize);
Uint32 aDataSize);
/** Returns size of AttributeHeader (usually one or two words) */ /** Returns size of AttributeHeader (usually one or two words) */
Uint32 getHeaderSize() const; // In 32-bit words Uint32 getHeaderSize() const; // In 32-bit words
...@@ -100,10 +99,11 @@ public: ...@@ -100,10 +99,11 @@ public:
*/ */
inline inline
AttributeHeader& AttributeHeader::init(void* aHeaderPtr, Uint32 anAttributeId, void AttributeHeader::init(Uint32* aHeaderPtr, Uint32 anAttributeId,
Uint32 aDataSize) Uint32 aDataSize)
{ {
return * new (aHeaderPtr) AttributeHeader(anAttributeId, aDataSize); AttributeHeader ah(anAttributeId, aDataSize);
*aHeaderPtr = ah.m_value;
} }
inline inline
......
...@@ -46,6 +46,7 @@ public: ...@@ -46,6 +46,7 @@ public:
* Length of signal * Length of signal
*/ */
STATIC_CONST( StaticLength = 11 ); STATIC_CONST( StaticLength = 11 );
STATIC_CONST( MaxTotalAttrInfo = 0xFFFF );
private: private:
......
...@@ -1052,6 +1052,7 @@ class Ndb ...@@ -1052,6 +1052,7 @@ class Ndb
friend class NdbDictInterface; friend class NdbDictInterface;
friend class NdbBlob; friend class NdbBlob;
friend class NdbImpl; friend class NdbImpl;
friend class NdbScanFilterImpl;
#endif #endif
public: public:
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define NDB_SCAN_FILTER_HPP #define NDB_SCAN_FILTER_HPP
#include <ndb_types.h> #include <ndb_types.h>
#include <ndbapi_limits.h>
/** /**
* @class NdbScanFilter * @class NdbScanFilter
...@@ -31,8 +32,13 @@ public: ...@@ -31,8 +32,13 @@ public:
/** /**
* Constructor * Constructor
* @param op The NdbOperation that the filter belongs to (is applied to). * @param op The NdbOperation that the filter belongs to (is applied to).
* @param abort_on_too_large abort transaction on filter too large
* default: true
* @param max_size Maximum size of generated filter in words
*/ */
NdbScanFilter(class NdbOperation * op); NdbScanFilter(class NdbOperation * op,
bool abort_on_too_large = true,
Uint32 max_size = NDB_MAX_SCANFILTER_SIZE_IN_WORDS);
~NdbScanFilter(); ~NdbScanFilter();
/** /**
...@@ -166,6 +172,25 @@ public: ...@@ -166,6 +172,25 @@ public:
/** @} *********************************************************************/ /** @} *********************************************************************/
#endif #endif
enum Error {
FilterTooLarge = 4294
};
/**
* Get filter level error.
*
* Most errors are set only on operation level, and they abort the
* transaction. The error FilterTooLarge is set on filter level and
* by default it propagates to operation level and also aborts the
* transaction.
*
* If option abort_on_too_large is set to false, then FilterTooLarge
* does not propagate. One can then either ignore this error (in
* which case no filtering is done) or try to define a new filter
* immediately.
*/
const class NdbError & getNdbError() const;
private: private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class NdbScanFilterImpl; friend class NdbScanFilterImpl;
......
...@@ -26,4 +26,6 @@ ...@@ -26,4 +26,6 @@
#define NDB_MAX_TUPLE_SIZE (NDB_MAX_TUPLE_SIZE_IN_WORDS*4) #define NDB_MAX_TUPLE_SIZE (NDB_MAX_TUPLE_SIZE_IN_WORDS*4)
#define NDB_MAX_ACTIVE_EVENTS 100 #define NDB_MAX_ACTIVE_EVENTS 100
#define NDB_MAX_SCANFILTER_SIZE_IN_WORDS 50000
#endif #endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef NDB_RAND_H
#define NDB_RAND_H
#define NDB_RAND_MAX 32767
#ifdef __cplusplus
extern "C" {
#endif
int ndb_rand(void);
void ndb_srand(unsigned seed);
#ifdef __cplusplus
}
#endif
#endif
...@@ -24,7 +24,8 @@ libgeneral_la_SOURCES = \ ...@@ -24,7 +24,8 @@ libgeneral_la_SOURCES = \
uucode.c random.c version.c \ uucode.c random.c version.c \
strdup.c \ strdup.c \
ConfigValues.cpp ndb_init.c basestring_vsnprintf.c \ ConfigValues.cpp ndb_init.c basestring_vsnprintf.c \
Bitmask.cpp Bitmask.cpp \
ndb_rand.c
EXTRA_PROGRAMS = testBitmask EXTRA_PROGRAMS = testBitmask
testBitmask_SOURCES = testBitmask.cpp testBitmask_SOURCES = testBitmask.cpp
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_rand.h>
static unsigned long next= 1;
/**
* ndb_rand
*
* constant time, cheap, pseudo-random number generator.
*
* NDB_RAND_MAX assumed to be 32767
*
* This is the POSIX example for "generating the same sequence on
* different machines". Although that is not one of our requirements.
*/
int ndb_rand(void)
{
next= next * 1103515245 + 12345;
return((unsigned)(next/65536) % 32768);
}
void ndb_srand(unsigned seed)
{
next= seed;
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <RefConvert.hpp> #include <RefConvert.hpp>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <my_sys.h> #include <my_sys.h>
#include <ndb_rand.h>
#include <signaldata/EventReport.hpp> #include <signaldata/EventReport.hpp>
#include <signaldata/TcKeyReq.hpp> #include <signaldata/TcKeyReq.hpp>
...@@ -6278,7 +6279,7 @@ void Dbtc::timeOutLoopStartLab(Signal* signal, Uint32 api_con_ptr) ...@@ -6278,7 +6279,7 @@ void Dbtc::timeOutLoopStartLab(Signal* signal, Uint32 api_con_ptr)
jam(); jam();
if (api_timer != 0) { if (api_timer != 0) {
Uint32 error= ZTIME_OUT_ERROR; Uint32 error= ZTIME_OUT_ERROR;
time_out_value= time_out_param + (api_con_ptr & mask_value); time_out_value= time_out_param + (ndb_rand() & mask_value);
if (unlikely(old_mask_value)) // abort during single user mode if (unlikely(old_mask_value)) // abort during single user mode
{ {
apiConnectptr.i = api_con_ptr; apiConnectptr.i = api_con_ptr;
......
...@@ -1138,7 +1138,8 @@ Dbtup::updateStartLab(Signal* signal, ...@@ -1138,7 +1138,8 @@ Dbtup::updateStartLab(Signal* signal,
regOperPtr->attrinbufLen); regOperPtr->attrinbufLen);
} else { } else {
jam(); jam();
if (interpreterStartLab(signal, pagePtr, regOperPtr->pageOffset) == -1) retValue = interpreterStartLab(signal, pagePtr, regOperPtr->pageOffset);
if (retValue == -1)
{ {
jam(); jam();
return -1; return -1;
...@@ -1577,8 +1578,8 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1577,8 +1578,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Uint32 TdataForUpdate[3]; Uint32 TdataForUpdate[3];
Uint32 Tlen; Uint32 Tlen;
AttributeHeader& ah = AttributeHeader::init(&TdataForUpdate[0], AttributeHeader ah(TattrId, TattrNoOfWords);
TattrId, TattrNoOfWords); TdataForUpdate[0] = ah.m_value;
TdataForUpdate[1] = TregMemBuffer[theRegister + 2]; TdataForUpdate[1] = TregMemBuffer[theRegister + 2];
TdataForUpdate[2] = TregMemBuffer[theRegister + 3]; TdataForUpdate[2] = TregMemBuffer[theRegister + 3];
Tlen = TattrNoOfWords + 1; Tlen = TattrNoOfWords + 1;
...@@ -1594,6 +1595,7 @@ int Dbtup::interpreterNextLab(Signal* signal, ...@@ -1594,6 +1595,7 @@ int Dbtup::interpreterNextLab(Signal* signal,
// Write a NULL value into the attribute // Write a NULL value into the attribute
/* --------------------------------------------------------- */ /* --------------------------------------------------------- */
ah.setNULL(); ah.setNULL();
TdataForUpdate[0] = ah.m_value;
Tlen = 1; Tlen = 1;
}//if }//if
int TnoDataRW= updateAttributes(pagePtr, int TnoDataRW= updateAttributes(pagePtr,
......
...@@ -676,8 +676,6 @@ bool ...@@ -676,8 +676,6 @@ bool
Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr) Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr)
{ {
Uint32 keyReadBuffer[MAX_KEY_SIZE_IN_WORDS]; Uint32 keyReadBuffer[MAX_KEY_SIZE_IN_WORDS];
Uint32 attributeHeader;
AttributeHeader* ahOut = (AttributeHeader*)&attributeHeader;
AttributeHeader ahIn(*updateBuffer); AttributeHeader ahIn(*updateBuffer);
Uint32 attributeId = ahIn.getAttributeId(); Uint32 attributeId = ahIn.getAttributeId();
Uint32 attrDescriptorIndex = regTabPtr->tabDescriptor + (attributeId << ZAD_LOG_SIZE); Uint32 attrDescriptorIndex = regTabPtr->tabDescriptor + (attributeId << ZAD_LOG_SIZE);
...@@ -700,16 +698,17 @@ Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr) ...@@ -700,16 +698,17 @@ Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr)
ReadFunction f = regTabPtr->readFunctionArray[attributeId]; ReadFunction f = regTabPtr->readFunctionArray[attributeId];
AttributeHeader::init(&attributeHeader, attributeId, 0); AttributeHeader attributeHeader(attributeId, 0);
tOutBufIndex = 0; tOutBufIndex = 0;
tMaxRead = MAX_KEY_SIZE_IN_WORDS; tMaxRead = MAX_KEY_SIZE_IN_WORDS;
bool tmp = tXfrmFlag; bool tmp = tXfrmFlag;
tXfrmFlag = true; tXfrmFlag = true;
ndbrequire((this->*f)(&keyReadBuffer[0], ahOut, attrDescriptor, attributeOffset)); ndbrequire((this->*f)(&keyReadBuffer[0], &attributeHeader, attrDescriptor,
attributeOffset));
tXfrmFlag = tmp; tXfrmFlag = tmp;
ndbrequire(tOutBufIndex == ahOut->getDataSize()); ndbrequire(tOutBufIndex == attributeHeader.getDataSize());
if (ahIn.getDataSize() != ahOut->getDataSize()) { if (ahIn.getDataSize() != attributeHeader.getDataSize()) {
ljam(); ljam();
return true; return true;
}//if }//if
......
...@@ -1168,9 +1168,7 @@ DbUtil::prepareOperation(Signal* signal, PreparePtr prepPtr) ...@@ -1168,9 +1168,7 @@ DbUtil::prepareOperation(Signal* signal, PreparePtr prepPtr)
/************************************************************** /**************************************************************
* Attribute found - store in mapping (AttributeId, Position) * Attribute found - store in mapping (AttributeId, Position)
**************************************************************/ **************************************************************/
AttributeHeader & attrMap = AttributeHeader attrMap(attrDesc.AttributeId, // 1. Store AttrId
AttributeHeader::init(attrMappingIt.data,
attrDesc.AttributeId, // 1. Store AttrId
0); 0);
if (attrDesc.AttributeKeyFlag) { if (attrDesc.AttributeKeyFlag) {
...@@ -1199,6 +1197,7 @@ DbUtil::prepareOperation(Signal* signal, PreparePtr prepPtr) ...@@ -1199,6 +1197,7 @@ DbUtil::prepareOperation(Signal* signal, PreparePtr prepPtr)
return; return;
} }
} }
*(attrMappingIt.data) = attrMap.m_value;
#if 0 #if 0
ndbout << "BEFORE: attrLength: " << attrLength << endl; ndbout << "BEFORE: attrLength: " << attrLength << endl;
#endif #endif
......
...@@ -227,10 +227,10 @@ MgmtSrvr::startEventLog() ...@@ -227,10 +227,10 @@ MgmtSrvr::startEventLog()
} }
} }
void void
MgmtSrvr::stopEventLog() MgmtSrvr::stopEventLog()
{ {
// Nothing yet g_eventLogger.close();
} }
class ErrorItem class ErrorItem
......
...@@ -392,9 +392,8 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) ...@@ -392,9 +392,8 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
return NULL; return NULL;
}//if }//if
}//if }//if
Uint32 ah; AttributeHeader ah(tAttrInfo->m_attrId, 0);
AttributeHeader::init(&ah, tAttrInfo->m_attrId, 0); if (insertATTRINFO(ah.m_value) != -1) {
if (insertATTRINFO(ah) != -1) {
// Insert Attribute Id into ATTRINFO part. // Insert Attribute Id into ATTRINFO part.
/************************************************************************ /************************************************************************
...@@ -525,12 +524,11 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, ...@@ -525,12 +524,11 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
tAttrId = tAttrInfo->m_attrId; tAttrId = tAttrInfo->m_attrId;
const char *aValue = aValuePassed; const char *aValue = aValuePassed;
Uint32 ahValue;
if (aValue == NULL) { if (aValue == NULL) {
if (tAttrInfo->m_nullable) { if (tAttrInfo->m_nullable) {
AttributeHeader& ah = AttributeHeader::init(&ahValue, tAttrId, 0); AttributeHeader ah(tAttrId, 0);
ah.setNULL(); ah.setNULL();
insertATTRINFO(ahValue); insertATTRINFO(ah.m_value);
// Insert Attribute Id with the value // Insert Attribute Id with the value
// NULL into ATTRINFO part. // NULL into ATTRINFO part.
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -563,8 +561,8 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, ...@@ -563,8 +561,8 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
}//if }//if
const Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Including bits in last word const Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Including bits in last word
const Uint32 sizeInWords = sizeInBytes / 4; // Excluding bits in last word const Uint32 sizeInWords = sizeInBytes / 4; // Excluding bits in last word
(void) AttributeHeader::init(&ahValue, tAttrId, totalSizeInWords); AttributeHeader ah(tAttrId, totalSizeInWords);
insertATTRINFO( ahValue ); insertATTRINFO( ah.m_value );
/*********************************************************************** /***********************************************************************
* Check if the pointer of the value passed is aligned on a 4 byte boundary. * Check if the pointer of the value passed is aligned on a 4 byte boundary.
......
...@@ -14,11 +14,15 @@ ...@@ -14,11 +14,15 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NdbScanFilter.hpp> #include <NdbScanFilter.hpp>
#include <Ndb.hpp>
#include <NdbOperation.hpp> #include <NdbOperation.hpp>
#include "NdbDictionaryImpl.hpp" #include "NdbDictionaryImpl.hpp"
#include <Vector.hpp> #include <Vector.hpp>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <Interpreter.hpp> #include <Interpreter.hpp>
#include <signaldata/AttrInfo.hpp>
#include "NdbApiSignal.hpp"
#include "NdbUtil.hpp"
#ifdef VM_TRACE #ifdef VM_TRACE
#include <NdbEnv.h> #include <NdbEnv.h>
...@@ -52,14 +56,37 @@ public: ...@@ -52,14 +56,37 @@ public:
int cond_col_const(Interpreter::BinaryCondition, Uint32 attrId, int cond_col_const(Interpreter::BinaryCondition, Uint32 attrId,
const void * value, Uint32 len); const void * value, Uint32 len);
bool m_abort_on_too_large;
NdbOperation::OperationStatus m_initial_op_status;
Uint32 m_initial_AI_size;
Uint32 m_max_size;
Uint32 get_size() {
assert(m_operation->theTotalCurrAI_Len >= m_initial_AI_size);
return m_operation->theTotalCurrAI_Len - m_initial_AI_size;
}
bool check_size() {
if (get_size() <= m_max_size)
return true;
handle_filter_too_large();
return false;
}
void handle_filter_too_large();
NdbError m_error;
}; };
const Uint32 LabelExit = ~0; const Uint32 LabelExit = ~0;
NdbScanFilter::NdbScanFilter(class NdbOperation * op) NdbScanFilter::NdbScanFilter(class NdbOperation * op,
bool abort_on_too_large,
Uint32 max_size)
: m_impl(* new NdbScanFilterImpl()) : m_impl(* new NdbScanFilterImpl())
{ {
DBUG_ENTER("NdbScanFilter::NdbScanFilter");
m_impl.m_current.m_group = (NdbScanFilter::Group)0; m_impl.m_current.m_group = (NdbScanFilter::Group)0;
m_impl.m_current.m_popCount = 0; m_impl.m_current.m_popCount = 0;
m_impl.m_current.m_ownLabel = 0; m_impl.m_current.m_ownLabel = 0;
...@@ -69,6 +96,21 @@ NdbScanFilter::NdbScanFilter(class NdbOperation * op) ...@@ -69,6 +96,21 @@ NdbScanFilter::NdbScanFilter(class NdbOperation * op)
m_impl.m_latestAttrib = ~0; m_impl.m_latestAttrib = ~0;
m_impl.m_operation = op; m_impl.m_operation = op;
m_impl.m_negative = 0; m_impl.m_negative = 0;
DBUG_PRINT("info", ("op status: %d tot AI: %u in curr: %u",
op->theStatus,
op->theTotalCurrAI_Len, op->theAI_LenInCurrAI));
m_impl.m_abort_on_too_large = abort_on_too_large;
m_impl.m_initial_op_status = op->theStatus;
m_impl.m_initial_AI_size = op->theTotalCurrAI_Len;
if (max_size > NDB_MAX_SCANFILTER_SIZE_IN_WORDS)
max_size = NDB_MAX_SCANFILTER_SIZE_IN_WORDS;
m_impl.m_max_size = max_size;
m_impl.m_error.code = 0;
DBUG_VOID_RETURN;
} }
NdbScanFilter::~NdbScanFilter(){ NdbScanFilter::~NdbScanFilter(){
...@@ -200,30 +242,38 @@ NdbScanFilter::end(){ ...@@ -200,30 +242,38 @@ NdbScanFilter::end(){
switch(tmp.m_group){ switch(tmp.m_group){
case NdbScanFilter::AND: case NdbScanFilter::AND:
if(tmp.m_trueLabel == (Uint32)~0){ if(tmp.m_trueLabel == (Uint32)~0){
m_impl.m_operation->interpret_exit_ok(); if (m_impl.m_operation->interpret_exit_ok() == -1)
return -1;
} else { } else {
m_impl.m_operation->branch_label(tmp.m_trueLabel); if (m_impl.m_operation->branch_label(tmp.m_trueLabel) == -1)
return -1;
} }
break; break;
case NdbScanFilter::NAND: case NdbScanFilter::NAND:
if(tmp.m_trueLabel == (Uint32)~0){ if(tmp.m_trueLabel == (Uint32)~0){
m_impl.m_operation->interpret_exit_nok(); if (m_impl.m_operation->interpret_exit_nok() == -1)
return -1;
} else { } else {
m_impl.m_operation->branch_label(tmp.m_falseLabel); if (m_impl.m_operation->branch_label(tmp.m_falseLabel) == -1)
return -1;
} }
break; break;
case NdbScanFilter::OR: case NdbScanFilter::OR:
if(tmp.m_falseLabel == (Uint32)~0){ if(tmp.m_falseLabel == (Uint32)~0){
m_impl.m_operation->interpret_exit_nok(); if (m_impl.m_operation->interpret_exit_nok() == -1)
return -1;
} else { } else {
m_impl.m_operation->branch_label(tmp.m_falseLabel); if (m_impl.m_operation->branch_label(tmp.m_falseLabel) == -1)
return -1;
} }
break; break;
case NdbScanFilter::NOR: case NdbScanFilter::NOR:
if(tmp.m_falseLabel == (Uint32)~0){ if(tmp.m_falseLabel == (Uint32)~0){
m_impl.m_operation->interpret_exit_ok(); if (m_impl.m_operation->interpret_exit_ok() == -1)
return -1;
} else { } else {
m_impl.m_operation->branch_label(tmp.m_trueLabel); if (m_impl.m_operation->branch_label(tmp.m_trueLabel) == -1)
return -1;
} }
break; break;
default: default:
...@@ -231,24 +281,29 @@ NdbScanFilter::end(){ ...@@ -231,24 +281,29 @@ NdbScanFilter::end(){
return -1; return -1;
} }
m_impl.m_operation->def_label(tmp.m_ownLabel); if (m_impl.m_operation->def_label(tmp.m_ownLabel) == -1)
return -1;
if(m_impl.m_stack.size() == 0){ if(m_impl.m_stack.size() == 0){
switch(tmp.m_group){ switch(tmp.m_group){
case NdbScanFilter::AND: case NdbScanFilter::AND:
case NdbScanFilter::NOR: case NdbScanFilter::NOR:
m_impl.m_operation->interpret_exit_nok(); if (m_impl.m_operation->interpret_exit_nok() == -1)
return -1;
break; break;
case NdbScanFilter::OR: case NdbScanFilter::OR:
case NdbScanFilter::NAND: case NdbScanFilter::NAND:
m_impl.m_operation->interpret_exit_ok(); if (m_impl.m_operation->interpret_exit_ok() == -1)
return -1;
break; break;
default: default:
m_impl.m_operation->setErrorCodeAbort(4260); m_impl.m_operation->setErrorCodeAbort(4260);
return -1; return -1;
} }
} }
if (!m_impl.check_size())
return -1;
return 0; return 0;
} }
...@@ -261,10 +316,16 @@ NdbScanFilter::istrue(){ ...@@ -261,10 +316,16 @@ NdbScanFilter::istrue(){
} }
if(m_impl.m_current.m_trueLabel == (Uint32)~0){ if(m_impl.m_current.m_trueLabel == (Uint32)~0){
return m_impl.m_operation->interpret_exit_ok(); if (m_impl.m_operation->interpret_exit_ok() == -1)
return -1;
} else { } else {
return m_impl.m_operation->branch_label(m_impl.m_current.m_trueLabel); if (m_impl.m_operation->branch_label(m_impl.m_current.m_trueLabel) == -1)
return -1;
} }
if (!m_impl.check_size())
return -1;
return 0;
} }
int int
...@@ -276,10 +337,16 @@ NdbScanFilter::isfalse(){ ...@@ -276,10 +337,16 @@ NdbScanFilter::isfalse(){
} }
if(m_impl.m_current.m_falseLabel == (Uint32)~0){ if(m_impl.m_current.m_falseLabel == (Uint32)~0){
return m_impl.m_operation->interpret_exit_nok(); if (m_impl.m_operation->interpret_exit_nok() == -1)
return -1;
} else { } else {
return m_impl.m_operation->branch_label(m_impl.m_current.m_falseLabel); if (m_impl.m_operation->branch_label(m_impl.m_current.m_falseLabel) == -1)
return -1;
} }
if (!m_impl.check_size())
return -1;
return 0;
} }
...@@ -330,7 +397,11 @@ NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){ ...@@ -330,7 +397,11 @@ NdbScanFilterImpl::cond_col(Interpreter::UnaryCondition op, Uint32 AttrId){
} }
Branch1 branch = table2[op].m_branches[m_current.m_group]; Branch1 branch = table2[op].m_branches[m_current.m_group];
(m_operation->* branch)(AttrId, m_current.m_ownLabel); if ((m_operation->* branch)(AttrId, m_current.m_ownLabel) == -1)
return -1;
if (!check_size())
return -1;
return 0; return 0;
} }
...@@ -463,8 +534,12 @@ NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op, ...@@ -463,8 +534,12 @@ NdbScanFilterImpl::cond_col_const(Interpreter::BinaryCondition op,
return -1; return -1;
} }
int ret = (m_operation->* branch)(AttrId, value, len, false, m_current.m_ownLabel); if ((m_operation->* branch)(AttrId, value, len, false, m_current.m_ownLabel) == -1)
return ret; return -1;
if (!check_size())
return -1;
return 0;
} }
int int
...@@ -490,7 +565,130 @@ NdbScanFilter::cmp(BinaryCondition cond, int ColId, ...@@ -490,7 +565,130 @@ NdbScanFilter::cmp(BinaryCondition cond, int ColId,
return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len); return m_impl.cond_col_const(Interpreter::NOT_LIKE, ColId, val, len);
} }
return -1; return -1;
} }
void
NdbScanFilterImpl::handle_filter_too_large()
{
DBUG_ENTER("NdbScanFilterImpl::handle_filter_too_large");
NdbOperation* const op = m_operation;
m_error.code = NdbScanFilter::FilterTooLarge;
if (m_abort_on_too_large)
op->setErrorCodeAbort(m_error.code);
/*
* Possible interpreted parts at this point are:
*
* 1. initial read
* 2. interpreted program
*
* It is assumed that NdbScanFilter has created all of 2
* so that we don't have to save interpreter state.
*/
const Uint32 size = get_size();
assert(size != 0);
// new ATTRINFO size
const Uint32 new_size = m_initial_AI_size;
// find last signal for new size
assert(op->theFirstATTRINFO != NULL);
NdbApiSignal* lastSignal = op->theFirstATTRINFO;
Uint32 n = 0;
while (n + AttrInfo::DataLength < new_size) {
lastSignal = lastSignal->next();
assert(lastSignal != NULL);
n += AttrInfo::DataLength;
}
assert(n < size);
// release remaining signals
NdbApiSignal* tSignal = lastSignal->next();
op->theNdb->releaseSignalsInList(&tSignal);
lastSignal->next(NULL);
// length of lastSignal
const Uint32 new_curr = AttrInfo::HeaderLength + new_size - n;
assert(new_curr <= 25);
DBUG_PRINT("info", ("op status: %d->%d tot AI: %u->%u in curr: %u->%u",
op->theStatus, m_initial_op_status,
op->theTotalCurrAI_Len, new_size,
op->theAI_LenInCurrAI, new_curr));
// reset op state
op->theStatus = m_initial_op_status;
// reset interpreter state to initial
NdbBranch* tBranch = op->theFirstBranch;
while (tBranch != NULL) {
NdbBranch* tmp = tBranch;
tBranch = tBranch->theNext;
op->theNdb->releaseNdbBranch(tmp);
}
op->theFirstBranch = NULL;
op->theLastBranch = NULL;
NdbLabel* tLabel = op->theFirstLabel;
while (tLabel != NULL) {
NdbLabel* tmp = tLabel;
tLabel = tLabel->theNext;
op->theNdb->releaseNdbLabel(tmp);
}
op->theFirstLabel = NULL;
op->theLastLabel = NULL;
NdbCall* tCall = op->theFirstCall;
while (tCall != NULL) {
NdbCall* tmp = tCall;
tCall = tCall->theNext;
op->theNdb->releaseNdbCall(tmp);
}
op->theFirstCall = NULL;
op->theLastCall = NULL;
NdbSubroutine* tSubroutine = op->theFirstSubroutine;
while (tSubroutine != NULL) {
NdbSubroutine* tmp = tSubroutine;
tSubroutine = tSubroutine->theNext;
op->theNdb->releaseNdbSubroutine(tmp);
}
op->theFirstSubroutine = NULL;
op->theLastSubroutine = NULL;
op->theNoOfLabels = 0;
op->theNoOfSubroutines = 0;
// reset AI size
op->theTotalCurrAI_Len = new_size;
op->theAI_LenInCurrAI = new_curr;
// reset signal pointers
op->theCurrentATTRINFO = lastSignal;
op->theATTRINFOptr = &lastSignal->getDataPtrSend()[new_curr];
// interpreter sizes are set later somewhere
DBUG_VOID_RETURN;
}
static void
update(const NdbError & _err){
NdbError & error = (NdbError &) _err;
ndberror_struct ndberror = (ndberror_struct)error;
ndberror_update(&ndberror);
error = NdbError(ndberror);
}
const NdbError &
NdbScanFilter::getNdbError() const
{
update(m_impl.m_error);
return m_impl.m_error;
}
#if 0 #if 0
......
...@@ -849,6 +849,10 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -849,6 +849,10 @@ NdbScanOperation::doSendScan(int aProcessorId)
// sending it. This could not be done in openScan because // sending it. This could not be done in openScan because
// we created the ATTRINFO signals after the SCAN_TABREQ signal. // we created the ATTRINFO signals after the SCAN_TABREQ signal.
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
if (unlikely(theTotalCurrAI_Len > ScanTabReq::MaxTotalAttrInfo)) {
setErrorCode(4257);
return -1;
}
req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
Uint32 tmp = req->requestInfo; Uint32 tmp = req->requestInfo;
ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_); ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_);
......
...@@ -527,7 +527,8 @@ ErrorBundle ErrorCodes[] = { ...@@ -527,7 +527,8 @@ ErrorBundle ErrorCodes[] = {
{ 4270, IE, "Unknown blob error" }, { 4270, IE, "Unknown blob error" },
{ 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" }, { 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" },
{ 4271, AE, "Invalid index object, not retrieved via getIndex()" }, { 4271, AE, "Invalid index object, not retrieved via getIndex()" },
{ 4275, AE, "The blob method is incompatible with operation type or lock mode" } { 4275, AE, "The blob method is incompatible with operation type or lock mode" },
{ 4294, AE, "Scan filter is too large, discarded" }
}; };
static static
......
...@@ -1356,6 +1356,30 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec ...@@ -1356,6 +1356,30 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec
DBUG_RETURN(0); DBUG_RETURN(0);
} }
bool ha_ndbcluster::check_index_fields_in_write_set(uint keyno)
{
KEY* key_info= table->key_info + keyno;
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
uint i;
DBUG_ENTER("check_index_fields_in_write_set");
if (m_retrieve_all_fields)
{
DBUG_RETURN(true);
}
for (i= 0; key_part != end; key_part++, i++)
{
Field* field= key_part->field;
if (field->query_id != current_thd->query_id)
{
DBUG_RETURN(false);
}
}
DBUG_RETURN(true);
}
int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno) int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno)
{ {
KEY* key_info= table->key_info + keyno; KEY* key_info= table->key_info + keyno;
...@@ -1643,7 +1667,8 @@ check_null_in_record(const KEY* key_info, const byte *record) ...@@ -1643,7 +1667,8 @@ check_null_in_record(const KEY* key_info, const byte *record)
* primary key or unique index values * primary key or unique index values
*/ */
int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) int ha_ndbcluster::peek_indexed_rows(const byte *record,
NDB_WRITE_OP write_op)
{ {
NdbTransaction *trans= m_active_trans; NdbTransaction *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
...@@ -1656,7 +1681,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) ...@@ -1656,7 +1681,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
first= NULL; first= NULL;
if (check_pk && table->s->primary_key != MAX_KEY) if (write_op != NDB_UPDATE && table->s->primary_key != MAX_KEY)
{ {
/* /*
* Fetch any row with colliding primary key * Fetch any row with colliding primary key
...@@ -1687,9 +1712,15 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk) ...@@ -1687,9 +1712,15 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
*/ */
if (check_null_in_record(key_info, record)) if (check_null_in_record(key_info, record))
{ {
DBUG_PRINT("info", ("skipping check for key with NULL")); DBUG_PRINT("info", ("skipping check for key with NULL"));
continue; continue;
} }
if (write_op != NDB_INSERT && !check_index_fields_in_write_set(i))
{
DBUG_PRINT("info", ("skipping check for key %u not in write_set", i));
continue;
}
NdbIndexOperation *iop; NdbIndexOperation *iop;
NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index; NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
key_part= key_info->key_part; key_part= key_info->key_part;
...@@ -2268,7 +2299,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2268,7 +2299,7 @@ int ha_ndbcluster::write_row(byte *record)
start_bulk_insert will set parameters to ensure that each start_bulk_insert will set parameters to ensure that each
write_row is committed individually write_row is committed individually
*/ */
int peek_res= peek_indexed_rows(record, true); int peek_res= peek_indexed_rows(record, NDB_INSERT);
if (!peek_res) if (!peek_res)
{ {
...@@ -2302,7 +2333,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2302,7 +2333,7 @@ int ha_ndbcluster::write_row(byte *record)
auto_value, 1) == -1) auto_value, 1) == -1)
{ {
if (--retries && if (--retries &&
ndb->getNdbError().status == NdbError::TemporaryError); ndb->getNdbError().status == NdbError::TemporaryError)
{ {
my_sleep(retry_sleep); my_sleep(retry_sleep);
continue; continue;
...@@ -2456,7 +2487,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2456,7 +2487,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE || if (m_ignore_dup_key && (thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI)) thd->lex->sql_command == SQLCOM_UPDATE_MULTI))
{ {
int peek_res= peek_indexed_rows(new_data, pk_update); NDB_WRITE_OP write_op= (pk_update) ? NDB_PK_UPDATE : NDB_UPDATE;
int peek_res= peek_indexed_rows(new_data, write_op);
if (!peek_res) if (!peek_res)
{ {
...@@ -4862,7 +4894,7 @@ ulonglong ha_ndbcluster::get_auto_increment() ...@@ -4862,7 +4894,7 @@ ulonglong ha_ndbcluster::get_auto_increment()
auto_value, cache_size, step, start)) auto_value, cache_size, step, start))
{ {
if (--retries && if (--retries &&
ndb->getNdbError().status == NdbError::TemporaryError); ndb->getNdbError().status == NdbError::TemporaryError)
{ {
my_sleep(retry_sleep); my_sleep(retry_sleep);
continue; continue;
......
...@@ -59,6 +59,12 @@ typedef struct ndb_index_data { ...@@ -59,6 +59,12 @@ typedef struct ndb_index_data {
bool null_in_unique_index; bool null_in_unique_index;
} NDB_INDEX_DATA; } NDB_INDEX_DATA;
typedef enum ndb_write_op {
NDB_INSERT = 0,
NDB_UPDATE = 1,
NDB_PK_UPDATE = 2
} NDB_WRITE_OP;
typedef struct st_ndbcluster_share { typedef struct st_ndbcluster_share {
THR_LOCK lock; THR_LOCK lock;
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -251,7 +257,7 @@ private: ...@@ -251,7 +257,7 @@ private:
const NdbOperation *first, const NdbOperation *first,
const NdbOperation *last, const NdbOperation *last,
uint errcode); uint errcode);
int peek_indexed_rows(const byte *record, bool check_pk); int peek_indexed_rows(const byte *record, NDB_WRITE_OP write_op);
int unique_index_read(const byte *key, uint key_len, int unique_index_read(const byte *key, uint key_len,
byte *buf); byte *buf);
int ordered_index_scan(const key_range *start_key, int ordered_index_scan(const key_range *start_key,
...@@ -286,6 +292,7 @@ private: ...@@ -286,6 +292,7 @@ private:
int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff); int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff);
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_primary_key_from_record(NdbOperation *op, const byte *record);
bool check_index_fields_in_write_set(uint keyno);
int set_index_key_from_record(NdbOperation *op, const byte *record, int set_index_key_from_record(NdbOperation *op, const byte *record,
uint keyno); uint keyno);
int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0); int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0);
......
...@@ -1338,9 +1338,23 @@ ha_ndbcluster_cond::generate_scan_filter(NdbScanOperation *op) ...@@ -1338,9 +1338,23 @@ ha_ndbcluster_cond::generate_scan_filter(NdbScanOperation *op)
if (m_cond_stack) if (m_cond_stack)
{ {
NdbScanFilter filter(op); NdbScanFilter filter(op, false); // don't abort on too large
DBUG_RETURN(generate_scan_filter_from_cond(filter)); int ret=generate_scan_filter_from_cond(filter);
if (ret != 0)
{
const NdbError& err=filter.getNdbError();
if (err.code == NdbScanFilter::FilterTooLarge)
{
// err.message has static storage
DBUG_PRINT("info", ("%s", err.message));
push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
err.code, err.message);
ret=0;
}
}
if (ret != 0)
DBUG_RETURN(ret);
} }
else else
{ {
...@@ -1391,7 +1405,7 @@ int ha_ndbcluster_cond::generate_scan_filter_from_key(NdbScanOperation *op, ...@@ -1391,7 +1405,7 @@ int ha_ndbcluster_cond::generate_scan_filter_from_key(NdbScanOperation *op,
{ {
KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts; KEY_PART_INFO* end= key_part+key_info->key_parts;
NdbScanFilter filter(op); NdbScanFilter filter(op, true); // abort on too large
int res; int res;
DBUG_ENTER("generate_scan_filter_from_key"); DBUG_ENTER("generate_scan_filter_from_key");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment