Commit 6ad50384 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new
parents ef12f5c4 e8423ff0
...@@ -30,6 +30,7 @@ struct SubCreateReq { ...@@ -30,6 +30,7 @@ struct SubCreateReq {
friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16); friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
STATIC_CONST( SignalLength = 6 ); STATIC_CONST( SignalLength = 6 );
STATIC_CONST( SignalLength2 = 7 );
enum SubscriptionType { enum SubscriptionType {
SingleTableScan = 1, // SingleTableScan = 1, //
...@@ -50,6 +51,7 @@ struct SubCreateReq { ...@@ -50,6 +51,7 @@ struct SubCreateReq {
Uint32 subscriptionKey; Uint32 subscriptionKey;
Uint32 subscriptionType; Uint32 subscriptionType;
Uint32 tableId; Uint32 tableId;
Uint32 state;
}; };
struct SubCreateRef { struct SubCreateRef {
......
...@@ -1040,6 +1040,15 @@ Suma::execSUB_CREATE_REQ(Signal* signal) ...@@ -1040,6 +1040,15 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ? const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
Subscription::REPORT_SUBSCRIBE : 0; Subscription::REPORT_SUBSCRIBE : 0;
const Uint32 tableId = req.tableId; const Uint32 tableId = req.tableId;
Subscription::State state = (Subscription::State) req.state;
if (signal->getLength() != SubCreateReq::SignalLength2)
{
/*
api or restarted by older version
if restarted by old version, do the best we can
*/
state = Subscription::DEFINED;
}
Subscription key; Subscription key;
key.m_subscriptionId = subId; key.m_subscriptionId = subId;
...@@ -1067,6 +1076,17 @@ Suma::execSUB_CREATE_REQ(Signal* signal) ...@@ -1067,6 +1076,17 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
addTableId(req.tableId, subPtr, 0); addTableId(req.tableId, subPtr, 0);
} }
} else { } else {
if (c_startup.m_restart_server_node_id &&
refToNode(subRef) != c_startup.m_restart_server_node_id)
{
/**
* only allow "restart_server" Suma's to come through
* for restart purposes
*/
jam();
sendSubStartRef(signal, 1405);
DBUG_VOID_RETURN;
}
// Check that id/key is unique // Check that id/key is unique
if(c_subscriptions.find(subPtr, key)) { if(c_subscriptions.find(subPtr, key)) {
jam(); jam();
...@@ -1090,7 +1110,7 @@ Suma::execSUB_CREATE_REQ(Signal* signal) ...@@ -1090,7 +1110,7 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
subPtr.p->m_options = reportSubscribe | reportAll; subPtr.p->m_options = reportSubscribe | reportAll;
subPtr.p->m_tableId = tableId; subPtr.p->m_tableId = tableId;
subPtr.p->m_table_ptrI = RNIL; subPtr.p->m_table_ptrI = RNIL;
subPtr.p->m_state = Subscription::DEFINED; subPtr.p->m_state = state;
subPtr.p->n_subscribers = 0; subPtr.p->n_subscribers = 0;
subPtr.p->m_current_sync_ptrI = RNIL; subPtr.p->m_current_sync_ptrI = RNIL;
...@@ -1446,7 +1466,9 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP ...@@ -1446,7 +1466,9 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP
jam(); jam();
DBUG_ENTER("Suma::completeOneSubscriber"); DBUG_ENTER("Suma::completeOneSubscriber");
if (tabPtr.p->m_error) if (tabPtr.p->m_error &&
(c_startup.m_restart_server_node_id == 0 ||
tabPtr.p->m_state != Table::DROPPED))
{ {
sendSubStartRef(signal,subbPtr,tabPtr.p->m_error, sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
SubscriptionData::TableData); SubscriptionData::TableData);
...@@ -1531,8 +1553,44 @@ Suma::completeInitTable(Signal *signal, TablePtr tabPtr) ...@@ -1531,8 +1553,44 @@ Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
void void
Suma::execGET_TABINFOREF(Signal* signal){ Suma::execGET_TABINFOREF(Signal* signal){
jamEntry(); jamEntry();
/* ToDo handle this */ GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
Uint32 tableId = ref->tableId;
Uint32 senderData = ref->senderData;
GetTabInfoRef::ErrorCode errorCode =
(GetTabInfoRef::ErrorCode) ref->errorCode;
int do_resend_request = 0;
TablePtr tabPtr;
c_tablePool.getPtr(tabPtr, senderData);
switch (errorCode)
{
case GetTabInfoRef::TableNotDefined:
// wrong state
break;
case GetTabInfoRef::InvalidTableId:
// no such table
break;
case GetTabInfoRef::Busy:
do_resend_request = 1;
break;
case GetTabInfoRef::TableNameTooLong:
ndbrequire(false); ndbrequire(false);
}
if (do_resend_request)
{
GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = senderData;
req->requestType =
GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
req->tableId = tableId;
sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
30, GetTabInfoReq::SignalLength);
return;
}
tabPtr.p->m_state = Table::DROPPED;
tabPtr.p->m_error = errorCode;
completeAllSubscribers(signal, tabPtr);
completeInitTable(signal, tabPtr);
} }
void void
...@@ -2173,13 +2231,24 @@ Suma::execSUB_START_REQ(Signal* signal){ ...@@ -2173,13 +2231,24 @@ Suma::execSUB_START_REQ(Signal* signal){
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (subPtr.p->m_state != Subscription::DEFINED) { if (subPtr.p->m_state == Subscription::LOCKED) {
jam(); jam();
DBUG_PRINT("info",("Locked")); DBUG_PRINT("info",("Locked"));
sendSubStartRef(signal, 1411); sendSubStartRef(signal, 1411);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id == 0) {
jam();
DBUG_PRINT("info",("Dropped"));
sendSubStartRef(signal, 1418);
DBUG_VOID_RETURN;
}
ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
c_startup.m_restart_server_node_id);
SubscriberPtr subbPtr; SubscriberPtr subbPtr;
if(!c_subscriberPool.seize(subbPtr)){ if(!c_subscriberPool.seize(subbPtr)){
jam(); jam();
...@@ -2193,6 +2262,7 @@ Suma::execSUB_START_REQ(Signal* signal){ ...@@ -2193,6 +2262,7 @@ Suma::execSUB_START_REQ(Signal* signal){
c_subscriber_nodes.set(refToNode(subscriberRef)); c_subscriber_nodes.set(refToNode(subscriberRef));
// setup subscription record // setup subscription record
if (subPtr.p->m_state == Subscription::DEFINED)
subPtr.p->m_state = Subscription::LOCKED; subPtr.p->m_state = Subscription::LOCKED;
// store these here for later use // store these here for later use
subPtr.p->m_senderRef = senderRef; subPtr.p->m_senderRef = senderRef;
...@@ -2241,8 +2311,14 @@ Suma::sendSubStartComplete(Signal* signal, ...@@ -2241,8 +2311,14 @@ Suma::sendSubStartComplete(Signal* signal,
SubscriptionPtr subPtr; SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
ndbrequire( subPtr.p->m_state == Subscription::LOCKED ) ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
(subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id));
if (subPtr.p->m_state == Subscription::LOCKED)
{
jam();
subPtr.p->m_state = Subscription::DEFINED; subPtr.p->m_state = Subscription::DEFINED;
}
subPtr.p->n_subscribers++; subPtr.p->n_subscribers++;
DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] " DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
...@@ -2293,8 +2369,14 @@ Suma::sendSubStartRef(Signal* signal, ...@@ -2293,8 +2369,14 @@ Suma::sendSubStartRef(Signal* signal,
SubscriptionPtr subPtr; SubscriptionPtr subPtr;
c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
ndbrequire( subPtr.p->m_state == Subscription::LOCKED ); ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
(subPtr.p->m_state == Subscription::DROPPED &&
c_startup.m_restart_server_node_id));
if (subPtr.p->m_state == Subscription::LOCKED)
{
jam();
subPtr.p->m_state = Subscription::DEFINED; subPtr.p->m_state = Subscription::DEFINED;
}
SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend(); SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
ref->senderRef = reference(); ref->senderRef = reference();
...@@ -2360,6 +2442,18 @@ Suma::execSUB_STOP_REQ(Signal* signal){ ...@@ -2360,6 +2442,18 @@ Suma::execSUB_STOP_REQ(Signal* signal){
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (c_startup.m_restart_server_node_id &&
refToNode(senderRef) != c_startup.m_restart_server_node_id)
{
/**
* only allow "restart_server" Suma's to come through
* for restart purposes
*/
jam();
sendSubStopRef(signal, 1405);
DBUG_VOID_RETURN;
}
if (subPtr.p->m_state == Subscription::LOCKED) { if (subPtr.p->m_state == Subscription::LOCKED) {
jam(); jam();
DBUG_PRINT("error", ("locked")); DBUG_PRINT("error", ("locked"));
...@@ -3668,7 +3762,17 @@ Suma::execSUB_REMOVE_REQ(Signal* signal) ...@@ -3668,7 +3762,17 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
sendSubRemoveRef(signal, req, 1413); sendSubRemoveRef(signal, req, 1413);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (subPtr.p->m_state == Subscription::DROPPED)
{
/**
* already dropped
*/
jam();
sendSubRemoveRef(signal, req, 1419);
DBUG_VOID_RETURN;
}
ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers)); DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
if (subPtr.p->n_subscribers == 0) if (subPtr.p->n_subscribers == 0)
...@@ -3981,8 +4085,9 @@ Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) ...@@ -3981,8 +4085,9 @@ Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
case SubCreateReq::TableEvent: case SubCreateReq::TableEvent:
jam(); jam();
req->tableId = subPtr.p->m_tableId; req->tableId = subPtr.p->m_tableId;
req->state = subPtr.p->m_state;
suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal, suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
SubCreateReq::SignalLength, JBB); SubCreateReq::SignalLength2, JBB);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
case SubCreateReq::SingleTableScan: case SubCreateReq::SingleTableScan:
jam(); jam();
......
...@@ -475,6 +475,8 @@ ErrorBundle ErrorCodes[] = { ...@@ -475,6 +475,8 @@ ErrorBundle ErrorCodes[] = {
{ 1415, DMEC, SE, "Subscription not unique in subscriber manager" }, { 1415, DMEC, SE, "Subscription not unique in subscriber manager" },
{ 1416, DMEC, IS, "Can't accept more subscriptions, out of space in pool" }, { 1416, DMEC, IS, "Can't accept more subscriptions, out of space in pool" },
{ 1417, DMEC, SE, "Table in suscription not defined, probably dropped" }, { 1417, DMEC, SE, "Table in suscription not defined, probably dropped" },
{ 1418, DMEC, SE, "Subscription dropped, no new subscribers allowed" },
{ 1419, DMEC, SE, "Subscription already dropped" },
{ 4004, DMEC, AE, "Attribute name not found in the Table" }, { 4004, DMEC, AE, "Attribute name not found in the Table" },
......
...@@ -101,6 +101,40 @@ static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab) ...@@ -101,6 +101,40 @@ static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
return NDBT_OK; return NDBT_OK;
} }
static
NdbEventOperation *createEventOperation(Ndb *ndb,
const NdbDictionary::Table &tab,
int do_report_error = 1)
{
char buf[1024];
sprintf(buf, "%s_EVENT", tab.getName());
NdbEventOperation *pOp= ndb->createEventOperation(buf);
if (pOp == 0)
{
if (do_report_error)
g_err << "createEventOperation: "
<< ndb->getNdbError().code << " "
<< ndb->getNdbError().message << endl;
return 0;
}
int n_columns= tab.getNoOfColumns();
for (int j = 0; j < n_columns; j++)
{
pOp->getValue(tab.getColumn(j)->getName());
pOp->getPreValue(tab.getColumn(j)->getName());
}
if ( pOp->execute() )
{
if (do_report_error)
g_err << "pOp->execute(): "
<< pOp->getNdbError().code << " "
<< pOp->getNdbError().message << endl;
ndb->dropEventOperation(pOp);
return 0;
}
return pOp;
}
static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step) static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step)
{ {
if (createEvent(GETNDB(step),* ctx->getTab()) != 0){ if (createEvent(GETNDB(step),* ctx->getTab()) != 0){
...@@ -870,7 +904,7 @@ static int createAllEvents(NDBT_Context* ctx, NDBT_Step* step) ...@@ -870,7 +904,7 @@ static int createAllEvents(NDBT_Context* ctx, NDBT_Step* step)
static int dropAllEvents(NDBT_Context* ctx, NDBT_Step* step) static int dropAllEvents(NDBT_Context* ctx, NDBT_Step* step)
{ {
DBUG_ENTER("createAllEvents"); DBUG_ENTER("dropAllEvents");
Ndb * ndb= GETNDB(step); Ndb * ndb= GETNDB(step);
int i; int i;
...@@ -1212,6 +1246,18 @@ static int createEventOperations(Ndb * ndb) ...@@ -1212,6 +1246,18 @@ static int createEventOperations(Ndb * ndb)
DBUG_RETURN(NDBT_OK); DBUG_RETURN(NDBT_OK);
} }
static int createAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("createAllEventOperations");
Ndb * ndb= GETNDB(step);
int r= createEventOperations(ndb);
if (r != NDBT_OK)
{
DBUG_RETURN(NDBT_FAILED);
}
DBUG_RETURN(NDBT_OK);
}
static int dropEventOperations(Ndb * ndb) static int dropEventOperations(Ndb * ndb)
{ {
DBUG_ENTER("dropEventOperations"); DBUG_ENTER("dropEventOperations");
...@@ -1228,6 +1274,18 @@ static int dropEventOperations(Ndb * ndb) ...@@ -1228,6 +1274,18 @@ static int dropEventOperations(Ndb * ndb)
DBUG_RETURN(NDBT_OK); DBUG_RETURN(NDBT_OK);
} }
static int dropAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("dropAllEventOperations");
Ndb * ndb= GETNDB(step);
int r= dropEventOperations(ndb);
if (r != NDBT_OK)
{
DBUG_RETURN(NDBT_FAILED);
}
DBUG_RETURN(NDBT_OK);
}
static int runMulti(NDBT_Context* ctx, NDBT_Step* step) static int runMulti(NDBT_Context* ctx, NDBT_Step* step)
{ {
DBUG_ENTER("runMulti"); DBUG_ENTER("runMulti");
...@@ -1409,6 +1467,87 @@ static int runMulti_NR(NDBT_Context* ctx, NDBT_Step* step) ...@@ -1409,6 +1467,87 @@ static int runMulti_NR(NDBT_Context* ctx, NDBT_Step* step)
DBUG_RETURN(NDBT_OK); DBUG_RETURN(NDBT_OK);
} }
static int restartAllNodes()
{
NdbRestarter restarter;
int id = 0;
do {
int nodeId = restarter.getDbNodeId(id++);
ndbout << "Restart node " << nodeId << endl;
if(restarter.restartOneDbNode(nodeId, false, false, true) != 0){
g_err << "Failed to restartNextDbNode" << endl;
break;
}
if(restarter.waitClusterStarted(60) != 0){
g_err << "Cluster failed to start" << endl;
break;
}
id = id % restarter.getNumDbNodes();
} while (id);
return id != 0;
}
static int runCreateDropNR(NDBT_Context* ctx, NDBT_Step* step)
{
DBUG_ENTER("runCreateDropNR");
Ndb * ndb= GETNDB(step);
int result = NDBT_OK;
NdbRestarter restarter;
int loops = ctx->getNumLoops();
if (restarter.getNumDbNodes() < 2)
{
ctx->stopTest();
return NDBT_OK;
}
do
{
result = NDBT_FAILED;
const NdbDictionary::Table* pTab = ctx->getTab();
if (createEvent(ndb, *pTab))
{
g_err << "createEvent failed" << endl;
break;
}
NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
if (pOp == 0)
{
g_err << "Failed to createEventOperation" << endl;
break;
}
if (dropEvent(ndb, *pTab))
{
g_err << "Failed to dropEvent()" << endl;
break;
}
ndbout << "Restarting with dropped events with subscribers" << endl;
if (restartAllNodes())
break;
if (ndb->getDictionary()->dropTable(pTab->getName()) != 0){
g_err << "Failed to drop " << pTab->getName() <<" in db" << endl;
break;
}
ndbout << "Restarting with dropped events and dropped "
<< "table with subscribers" << endl;
if (restartAllNodes())
break;
if (ndb->dropEventOperation(pOp))
{
g_err << "Failed dropEventOperation" << endl;
break;
}
NdbDictionary::Table tmp(*pTab);
tmp.setNodeGroupIds(0, 0);
if (ndb->getDictionary()->createTable(tmp) != 0){
g_err << "createTable failed: "
<< ndb->getDictionary()->getNdbError() << endl;
break;
}
result = NDBT_OK;
} while (--loops);
DBUG_RETURN(result);
}
NDBT_TESTSUITE(test_event); NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation", TESTCASE("BasicEventOperation",
...@@ -1492,6 +1631,11 @@ TESTCASE("Multi_NR", ...@@ -1492,6 +1631,11 @@ TESTCASE("Multi_NR",
FINALIZER(dropAllShadows); FINALIZER(dropAllShadows);
FINALIZER(dropAllEvents); FINALIZER(dropAllEvents);
} }
TESTCASE("CreateDropNR",
"Verify that we can Create and Drop in any order"
"NOTE! No errors are allowed!" ){
FINALIZER(runCreateDropNR);
}
NDBT_TESTSUITE_END(test_event); NDBT_TESTSUITE_END(test_event);
int main(int argc, const char** argv){ int main(int argc, const char** argv){
......
...@@ -218,6 +218,11 @@ max-time: 2500 ...@@ -218,6 +218,11 @@ max-time: 2500
cmd: test_event cmd: test_event
args: -n Multi args: -n Multi
#
max-time: 2500
cmd: test_event
args: -n CreateDropNR -l 2
max-time: 600 max-time: 600
cmd: testBasic cmd: testBasic
args: -n PkRead T1 args: -n PkRead T1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment