Commit 5285b530 authored by unknown's avatar unknown
Browse files

bug#11133 - ndb write handling

  fix handling of write's in lock queue
  add test case
  add support for pkWrite+async exec in HugoOperations


ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  Handle operation type wrt ZWRITE when restarting operations
ndb/test/include/HugoOperations.hpp:
  Add support for 
  1) pkWriteRecord 
  2) async execute
ndb/test/ndbapi/testNdbApi.cpp:
  Extend test case for bug#11133 with multi transaction tests aswell...
ndb/test/src/HugoOperations.cpp:
  Add support for 
  1) pkWriteRecord 
  2) async execute
parent 8cf4e0c8
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -6198,7 +6198,24 @@ Uint32 Dbacc::executeNextOperation(Signal* signal)
      sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
      return operationRecPtr.p->elementIsDisappeared;
    }//if
  }//if
  } 
  else if(operationRecPtr.p->operation == ZWRITE)
  {
    jam();
    operationRecPtr.p->operation = ZINSERT;
    if (operationRecPtr.p->prevParallelQue != RNIL) {
      OperationrecPtr prevOpPtr;
      jam();
      prevOpPtr.i = operationRecPtr.p->prevParallelQue;
      ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
      if (prevOpPtr.p->operation != ZDELETE) 
      {
        jam();
        operationRecPtr.p->operation = ZUPDATE;
      }
    }
  }
  if (operationRecPtr.p->operation == ZSCAN_OP &&
      ! operationRecPtr.p->isAccLockReq) {
    jam();
+14 −0
Original line number Diff line number Diff line
@@ -38,6 +38,11 @@ public:
		     int numRecords = 1,
		     int updatesValue = 0);
  
  int pkWriteRecord(Ndb*,
		    int recordNo,
		    int numRecords = 1,
		    int updatesValue = 0);
  
  int pkReadRecord(Ndb*,
		   int recordNo,
		   int numRecords = 1,
@@ -88,6 +93,10 @@ public:
		      NdbScanOperation::LM_CommittedRead, 
		      int numRecords = 1);


  int execute_async(Ndb*, ExecType, AbortOption = AbortOnError);
  int wait_async(Ndb*, int timeout = -1);

protected:
  void allocRows(int rows);
  void deallocRows();
@@ -102,6 +111,11 @@ protected:
  Vector<RsPair> m_executed_result_sets;

  NdbConnection* pTrans;

  int m_async_reply;
  int m_async_return;
  friend void HugoOperations_async_callback(int, NdbConnection*, void*);
  void callback(int res, NdbConnection*);
};

#endif
+70 −221
Original line number Diff line number Diff line
@@ -1049,6 +1049,8 @@ int runCheckGetNdbErrorOperation(NDBT_Context* ctx, NDBT_Step* step){
  return result;
}

#define C2(x) { int _x= (x); if(_x == 0) return NDBT_FAILED; }

int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){
  int result = NDBT_OK;
  const NdbDictionary::Table* pTab = ctx->getTab();
@@ -1056,227 +1058,75 @@ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){
  HugoOperations hugoOps(*pTab);

  Ndb* pNdb = GETNDB(step);
  Uint32 lm;

  NdbConnection* pCon = pNdb->startTransaction();
  if (pCon == NULL){
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }
  
  NdbOperation* pOp = pCon->getNdbOperation(pTab->getName());
  if (pOp == NULL){
    ERR(pCon->getNdbError());
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }
  
  if (pOp->readTuple(NdbOperation::LM_Exclusive) != 0){
    pNdb->closeTransaction(pCon);
    ERR(pOp->getNdbError());
    return NDBT_FAILED;
  }
      
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() == true){
      if(hugoOps.equalForAttr(pOp, a, 1) != 0){
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }

  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() != true){
      if (pOp->getValue(pTab->getColumn(a)->getName()) == NULL) {
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  int check = pCon->execute(NoCommit);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }
  
  pOp = pCon->getNdbOperation(pTab->getName());
  if (pOp == NULL){
    ERR(pCon->getNdbError());
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }
  
  if (pOp->deleteTuple() != 0){
    pNdb->closeTransaction(pCon);
    ERR(pOp->getNdbError());
    return NDBT_FAILED;
  }
      
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() == true){
      if(hugoOps.equalForAttr(pOp, a, 1) != 0){
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }

  check = pCon->execute(NoCommit);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }

  pOp = pCon->getNdbOperation(pTab->getName());
  if (pOp == NULL){
    ERR(pCon->getNdbError());
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }

  if (pOp->writeTuple() != 0){
    pNdb->closeTransaction(pCon);
    ERR(pOp->getNdbError());
    return NDBT_FAILED;
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() == true){
      if(hugoOps.equalForAttr(pOp, a, 1) != 0){
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() != true){
      if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0)
      {
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  check = pCon->execute(NoCommit);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }

  pOp = pCon->getNdbOperation(pTab->getName());
  if (pOp == NULL){
    ERR(pCon->getNdbError());
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }
  
  if (pOp->writeTuple() != 0){
    pNdb->closeTransaction(pCon);
    ERR(pOp->getNdbError());
    return NDBT_FAILED;
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() == true){
      if(hugoOps.equalForAttr(pOp, a, 1) != 0){
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() != true){
      if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0)
      {
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  check = pCon->execute(NoCommit);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }
  
  check = pCon->execute(Rollback);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }
  
  pCon->close();

  pCon = pNdb->startTransaction();
  if (pCon == NULL){
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }

  pOp = pCon->getNdbOperation(pTab->getName());
  if (pOp == NULL){
    ERR(pCon->getNdbError());
    pNdb->closeTransaction(pCon);  
    return NDBT_FAILED;
  }
  
  if (pOp->writeTuple() != 0){
    pNdb->closeTransaction(pCon);
    ERR(pOp->getNdbError());
    return NDBT_FAILED;
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() == true){
      if(hugoOps.equalForAttr(pOp, a, 1) != 0){
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  for(int a = 0; a<pTab->getNoOfColumns(); a++){
    if (pTab->getColumn(a)->getPrimaryKey() != true){
      if(hugoOps.setValueForAttr(pOp, a, 1, 1) != 0)
      {
	ERR(pCon->getNdbError());
	pNdb->closeTransaction(pCon);
	return NDBT_FAILED;
      }
    }
  }
  
  check = pCon->execute(Commit);
  if (check == 0){
    ndbout << "execute worked" << endl;
  } else {
    ERR(pCon->getNdbError());
    result = NDBT_FAILED;
  }
  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);

  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);

  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);

  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Exclusive) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkWriteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);

  Ndb ndb2("TEST_DB");
  C2(ndb2.init() == 0);
  C2(ndb2.waitUntilReady() == 0);
  HugoOperations hugoOps2(*pTab);  

  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkInsertRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps2.startTransaction(&ndb2) == 0);
  C2(hugoOps2.pkWriteRecord(&ndb2, 0, 1) == 0);
  C2(hugoOps2.execute_async(&ndb2, NoCommit) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps2.wait_async(&ndb2) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);
  C2(hugoOps2.closeTransaction(&ndb2) == 0);  

  C2(hugoOps.startTransaction(pNdb) == 0);
  C2(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
  C2(hugoOps.execute_NoCommit(pNdb) == 0);
  C2(hugoOps2.startTransaction(&ndb2) == 0);
  C2(hugoOps2.pkWriteRecord(&ndb2, 0, 1) == 0);
  C2(hugoOps2.execute_async(&ndb2, NoCommit) == 0);
  C2(hugoOps.execute_Commit(pNdb) == 0);
  C2(hugoOps2.wait_async(&ndb2) == 0);
  C2(hugoOps.closeTransaction(pNdb) == 0);
  C2(hugoOps2.closeTransaction(&ndb2) == 0);  

  return result;
}
@@ -1359,7 +1209,6 @@ TESTCASE("ReadWithoutGetValue",
}
TESTCASE("Bug_11133", 
	 "Test ReadEx-Delete-Write\n"){ 
  INITIALIZER(runLoadTable);
  INITIALIZER(runBug_11133);
  FINALIZER(runClearTable);
}
+87 −4
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@

#include <HugoOperations.hpp>


int HugoOperations::startTransaction(Ndb* pNdb){
  
  if (pTrans != NULL){
@@ -199,6 +198,48 @@ int HugoOperations::pkInsertRecord(Ndb* pNdb,
  return NDBT_OK;
}

int HugoOperations::pkWriteRecord(Ndb* pNdb,
				  int recordNo,
				  int numRecords,
				  int updatesValue){
  
  int a, check;
  for(int r=0; r < numRecords; r++){
    NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());	
    if (pOp == NULL) {
      ERR(pTrans->getNdbError());
      return NDBT_FAILED;
    }
    
    check = pOp->writeTuple();
    if( check == -1 ) {
      ERR(pTrans->getNdbError());
      return NDBT_FAILED;
    }
    
    // Define primary keys
    for(a = 0; a<tab.getNoOfColumns(); a++){
      if (tab.getColumn(a)->getPrimaryKey() == true){
	if(equalForAttr(pOp, a, r+recordNo) != 0){
	  ERR(pTrans->getNdbError());
	  return NDBT_FAILED;
	}
      }
    }
    
    // Define attributes to update
    for(a = 0; a<tab.getNoOfColumns(); a++){
      if (tab.getColumn(a)->getPrimaryKey() == false){
	if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){ 
	  ERR(pTrans->getNdbError());
	  return NDBT_FAILED;
	}
      }
    } 
  }
  return NDBT_OK;
}

int HugoOperations::pkDeleteRecord(Ndb* pNdb,
				   int recordNo,
				   int numRecords){
@@ -399,16 +440,58 @@ int HugoOperations::execute_Rollback(Ndb* pNdb){
  return NDBT_OK;
}

void
HugoOperations_async_callback(int res, NdbConnection* pCon, void* ho)
{
  ((HugoOperations*)ho)->callback(res, pCon);
}

void
HugoOperations::callback(int res, NdbConnection* pCon)
{
  assert(pCon == pTrans);
  m_async_reply= 1;
  m_async_return= res;
}

int 
HugoOperations::execute_async(Ndb* pNdb, ExecType et, AbortOption eao){
  
  m_async_reply= 0;
  pTrans->executeAsynchPrepare(et,
			       HugoOperations_async_callback,
			       this,
			       eao);
  
  pNdb->sendPreparedTransactions();
  
  return NDBT_OK;
}

int
HugoOperations::wait_async(Ndb* pNdb, int timeout)
{
  pNdb->pollNdb(1000);

  if(m_async_reply)
  {
    return m_async_return;
  }
  ndbout_c("wait returned nothing...");
  return -1;
}

HugoOperations::HugoOperations(const NdbDictionary::Table& _tab):
  UtilTransactions(_tab),
  calc(_tab),
  pTrans(NULL){

  pTrans(NULL)
{
}

HugoOperations::~HugoOperations(){
  deallocRows();
  if (pTrans != NULL){
  if (pTrans != NULL)
  {
    pTrans->close();
    pTrans = NULL;
  }