Loading ndb/test/ndbapi/test_event.cpp +282 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,29 @@ int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runCreateShadowTable(NDBT_Context* ctx, NDBT_Step* step) { const NdbDictionary::Table *table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); GETNDB(step)->getDictionary()->dropTable(buf); if (GETNDB(step)->getDictionary()->getTable(buf)) { g_err << "unsucessful drop of " << buf << endl; return NDBT_FAILED; } NdbDictionary::Table table_shadow(*table); table_shadow.setName(buf); GETNDB(step)->getDictionary()->createTable(table_shadow); if (GETNDB(step)->getDictionary()->getTable(buf)) return NDBT_OK; g_err << "unsucessful create of " << buf << endl; return NDBT_FAILED; } int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step) { int loops = ctx->getNumLoops(); Loading Loading @@ -135,6 +158,36 @@ int runEventLoad(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runEventMixedLoad(NDBT_Context* ctx, NDBT_Step* step) { int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); HugoTransactions hugoTrans(*ctx->getTab()); sleep(5); if (hugoTrans.loadTable(GETNDB(step), 3*records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkDelRecords(GETNDB(step), 3*records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.loadTable(GETNDB(step), records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } return NDBT_OK; } int runDropEvent(NDBT_Context* ctx, NDBT_Step* step) { HugoTransactions hugoTrans(*ctx->getTab()); Loading @@ -146,6 +199,224 @@ int runDropEvent(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runVerify(NDBT_Context* ctx, NDBT_Step* step) { int records = ctx->getNumRecords(); const NdbDictionary::Table * table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); const NdbDictionary::Table * table_shadow; if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0) { g_err << "Unable to get table " << buf << endl; return NDBT_FAILED; } HugoTransactions hugoTrans(*table_shadow); if (hugoTrans.pkReadRecords(GETNDB(step), records) != 0){ return NDBT_FAILED; } return NDBT_OK; } int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) { DBUG_ENTER("runEventApplier"); int records = ctx->getNumRecords(); int loops = ctx->getNumLoops(); const NdbDictionary::Table * table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); const NdbDictionary::Table * table_shadow; if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0) { g_err << "Unable to get table " << buf << endl; DBUG_RETURN(NDBT_FAILED); } sprintf(buf, "%s_EVENT", table->getName()); NdbEventOperation *pOp; pOp = GETNDB(step)->createEventOperation(buf, 10*records); if ( pOp == NULL ) { g_err << "Event operation creation failed on %s" << buf << endl; DBUG_RETURN(NDBT_FAILED); } int i; int n_columns= table->getNoOfColumns(); NdbRecAttr* recAttr[1024]; NdbRecAttr* recAttrPre[1024]; for (i = 0; i < n_columns; i++) { recAttr[i] = pOp->getValue(table->getColumn(i)->getName()); recAttrPre[i] = pOp->getPreValue(table->getColumn(i)->getName()); } if (pOp->execute()) { // This starts changes to "start flowing" g_err << "execute operation execution failed: \n"; g_err << pOp->getNdbError().code << " " << pOp->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } int r= 0; int res; while (r < 10*records){ //printf("now waiting for event...\n"); res= GETNDB(step)->pollEvents(1000); // wait for event or 1000 ms if (res <= 0) continue; //printf("got data! %d\n", r); int overrun= 0; while (pOp->next(&overrun) > 0) { if (overrun) { g_err << "buffer overrun\n"; DBUG_RETURN(NDBT_FAILED); } r++; Uint32 gci= pOp->getGCI(); if (!pOp->isConsistent()) { g_err << "A node failure has occured and events might be missing\n"; DBUG_RETURN(NDBT_FAILED); } int noRetries= 0; do { NdbTransaction *trans= GETNDB(step)->startTransaction(); if (trans == 0) { g_err << "startTransaction failed " << GETNDB(step)->getNdbError().code << " " << GETNDB(step)->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } NdbOperation *op= trans->getNdbOperation(table_shadow); if (op == 0) { g_err << "getNdbOperation failed " << trans->getNdbError().code << " " << trans->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: if (op->insertTuple()) { g_err << "insertTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_DELETE: if (op->deleteTuple()) { g_err << "deleteTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_UPDATE: if (op->updateTuple()) { g_err << "updateTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_ALL: abort(); } for (i= 0; i < n_columns; i++) { if (table->getColumn(i)->getPrimaryKey() && op->equal(i,recAttr[i]->aRef())) { g_err << "equal " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: for (i= 0; i < n_columns; i++) { if (!table->getColumn(i)->getPrimaryKey() && op->setValue(i,recAttr[i]->aRef())) { g_err << "setValue(insert) " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } break; case NdbDictionary::Event::TE_DELETE: break; case NdbDictionary::Event::TE_UPDATE: for (i= 0; i < n_columns; i++) { if (!table->getColumn(i)->getPrimaryKey() && recAttr[i]->isNULL() >= 0 && op->setValue(i,recAttr[i]->aRef())) { g_err << "setValue(update) " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } break; case NdbDictionary::Event::TE_ALL: abort(); } if (trans->execute(Commit) == 0) { trans->close(); // everything ok break; } if (noRetries++ == 10 || trans->getNdbError().status != NdbError::TemporaryError) { g_err << "execute " << r << " failed " << trans->getNdbError().code << " " << trans->getNdbError().message << endl; trans->close(); DBUG_RETURN(NDBT_FAILED); } trans->close(); NdbSleep_MilliSleep(100); // sleep before retying } while(1); } } if (GETNDB(step)->dropEventOperation(pOp)) { g_err << "dropEventOperation execution failed " << GETNDB(step)->getNdbError().code << " " << GETNDB(step)->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } DBUG_RETURN(NDBT_OK); } // INITIALIZER(runInsert); // STEP(runPkRead); // VERIFIER(runVerifyInsert); Loading Loading @@ -176,6 +447,17 @@ TESTCASE("ParallellEventOperation", STEP(runEventLoad); FINALIZER(runDropEvent); } TESTCASE("EventOperationApplier", "Verify that if we apply the data we get from event " "operation is the same as the original table" "NOTE! No errors are allowed!" ){ INITIALIZER(runCreateEvent); INITIALIZER(runCreateShadowTable); STEP(runEventApplier); STEP(runEventMixedLoad); FINALIZER(runDropEvent); FINALIZER(runVerify); } NDBT_TESTSUITE_END(test_event); int main(int argc, const char** argv){ Loading Loading
ndb/test/ndbapi/test_event.cpp +282 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,29 @@ int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runCreateShadowTable(NDBT_Context* ctx, NDBT_Step* step) { const NdbDictionary::Table *table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); GETNDB(step)->getDictionary()->dropTable(buf); if (GETNDB(step)->getDictionary()->getTable(buf)) { g_err << "unsucessful drop of " << buf << endl; return NDBT_FAILED; } NdbDictionary::Table table_shadow(*table); table_shadow.setName(buf); GETNDB(step)->getDictionary()->createTable(table_shadow); if (GETNDB(step)->getDictionary()->getTable(buf)) return NDBT_OK; g_err << "unsucessful create of " << buf << endl; return NDBT_FAILED; } int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step) { int loops = ctx->getNumLoops(); Loading Loading @@ -135,6 +158,36 @@ int runEventLoad(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runEventMixedLoad(NDBT_Context* ctx, NDBT_Step* step) { int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); HugoTransactions hugoTrans(*ctx->getTab()); sleep(5); if (hugoTrans.loadTable(GETNDB(step), 3*records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkDelRecords(GETNDB(step), 3*records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.loadTable(GETNDB(step), records, 1, true, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){ return NDBT_FAILED; } return NDBT_OK; } int runDropEvent(NDBT_Context* ctx, NDBT_Step* step) { HugoTransactions hugoTrans(*ctx->getTab()); Loading @@ -146,6 +199,224 @@ int runDropEvent(NDBT_Context* ctx, NDBT_Step* step) return NDBT_OK; } int runVerify(NDBT_Context* ctx, NDBT_Step* step) { int records = ctx->getNumRecords(); const NdbDictionary::Table * table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); const NdbDictionary::Table * table_shadow; if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0) { g_err << "Unable to get table " << buf << endl; return NDBT_FAILED; } HugoTransactions hugoTrans(*table_shadow); if (hugoTrans.pkReadRecords(GETNDB(step), records) != 0){ return NDBT_FAILED; } return NDBT_OK; } int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) { DBUG_ENTER("runEventApplier"); int records = ctx->getNumRecords(); int loops = ctx->getNumLoops(); const NdbDictionary::Table * table= ctx->getTab(); char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); const NdbDictionary::Table * table_shadow; if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0) { g_err << "Unable to get table " << buf << endl; DBUG_RETURN(NDBT_FAILED); } sprintf(buf, "%s_EVENT", table->getName()); NdbEventOperation *pOp; pOp = GETNDB(step)->createEventOperation(buf, 10*records); if ( pOp == NULL ) { g_err << "Event operation creation failed on %s" << buf << endl; DBUG_RETURN(NDBT_FAILED); } int i; int n_columns= table->getNoOfColumns(); NdbRecAttr* recAttr[1024]; NdbRecAttr* recAttrPre[1024]; for (i = 0; i < n_columns; i++) { recAttr[i] = pOp->getValue(table->getColumn(i)->getName()); recAttrPre[i] = pOp->getPreValue(table->getColumn(i)->getName()); } if (pOp->execute()) { // This starts changes to "start flowing" g_err << "execute operation execution failed: \n"; g_err << pOp->getNdbError().code << " " << pOp->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } int r= 0; int res; while (r < 10*records){ //printf("now waiting for event...\n"); res= GETNDB(step)->pollEvents(1000); // wait for event or 1000 ms if (res <= 0) continue; //printf("got data! %d\n", r); int overrun= 0; while (pOp->next(&overrun) > 0) { if (overrun) { g_err << "buffer overrun\n"; DBUG_RETURN(NDBT_FAILED); } r++; Uint32 gci= pOp->getGCI(); if (!pOp->isConsistent()) { g_err << "A node failure has occured and events might be missing\n"; DBUG_RETURN(NDBT_FAILED); } int noRetries= 0; do { NdbTransaction *trans= GETNDB(step)->startTransaction(); if (trans == 0) { g_err << "startTransaction failed " << GETNDB(step)->getNdbError().code << " " << GETNDB(step)->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } NdbOperation *op= trans->getNdbOperation(table_shadow); if (op == 0) { g_err << "getNdbOperation failed " << trans->getNdbError().code << " " << trans->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: if (op->insertTuple()) { g_err << "insertTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_DELETE: if (op->deleteTuple()) { g_err << "deleteTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_UPDATE: if (op->updateTuple()) { g_err << "updateTuple " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } break; case NdbDictionary::Event::TE_ALL: abort(); } for (i= 0; i < n_columns; i++) { if (table->getColumn(i)->getPrimaryKey() && op->equal(i,recAttr[i]->aRef())) { g_err << "equal " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } switch (pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: for (i= 0; i < n_columns; i++) { if (!table->getColumn(i)->getPrimaryKey() && op->setValue(i,recAttr[i]->aRef())) { g_err << "setValue(insert) " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } break; case NdbDictionary::Event::TE_DELETE: break; case NdbDictionary::Event::TE_UPDATE: for (i= 0; i < n_columns; i++) { if (!table->getColumn(i)->getPrimaryKey() && recAttr[i]->isNULL() >= 0 && op->setValue(i,recAttr[i]->aRef())) { g_err << "setValue(update) " << i << " " << op->getNdbError().code << " " << op->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } } break; case NdbDictionary::Event::TE_ALL: abort(); } if (trans->execute(Commit) == 0) { trans->close(); // everything ok break; } if (noRetries++ == 10 || trans->getNdbError().status != NdbError::TemporaryError) { g_err << "execute " << r << " failed " << trans->getNdbError().code << " " << trans->getNdbError().message << endl; trans->close(); DBUG_RETURN(NDBT_FAILED); } trans->close(); NdbSleep_MilliSleep(100); // sleep before retying } while(1); } } if (GETNDB(step)->dropEventOperation(pOp)) { g_err << "dropEventOperation execution failed " << GETNDB(step)->getNdbError().code << " " << GETNDB(step)->getNdbError().message << endl; DBUG_RETURN(NDBT_FAILED); } DBUG_RETURN(NDBT_OK); } // INITIALIZER(runInsert); // STEP(runPkRead); // VERIFIER(runVerifyInsert); Loading Loading @@ -176,6 +447,17 @@ TESTCASE("ParallellEventOperation", STEP(runEventLoad); FINALIZER(runDropEvent); } TESTCASE("EventOperationApplier", "Verify that if we apply the data we get from event " "operation is the same as the original table" "NOTE! No errors are allowed!" ){ INITIALIZER(runCreateEvent); INITIALIZER(runCreateShadowTable); STEP(runEventApplier); STEP(runEventMixedLoad); FINALIZER(runDropEvent); FINALIZER(runVerify); } NDBT_TESTSUITE_END(test_event); int main(int argc, const char** argv){ Loading