Commit 780bd17c authored by unknown's avatar unknown
Browse files

Merge mysql.com:/home/jonas/src/mysql-4.1

into mysql.com:/home/jonas/src/mysql-5.0


ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  Auto merged
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  Auto merged
parents c657ba2a 06c001ec
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -984,7 +984,7 @@ private:
  Uint32 placeReadInLockQueue(Signal* signal);
  void placeSerialQueueRead(Signal* signal);
  void checkOnlyReadEntry(Signal* signal);
  void getNoParallelTransaction(Signal* signal);
  Uint32 getNoParallelTransaction(const Operationrec*);
  void moveLastParallelQueue(Signal* signal);
  void moveLastParallelQueueWrite(Signal* signal);
  Uint32 placeWriteInLockQueue(Signal* signal);
@@ -1045,6 +1045,8 @@ private:
  Uint32 executeNextOperation(Signal* signal);
  void releaselock(Signal* signal);
  void takeOutFragWaitQue(Signal* signal);
  void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
			  OperationrecPtr release_op);
  void allocOverflowPage(Signal* signal);
  bool getrootfragmentrec(Signal* signal, RootfragmentrecPtr&, Uint32 fragId);
  void insertLockOwnersList(Signal* signal, const OperationrecPtr&);
@@ -1206,7 +1208,6 @@ private:
  OperationrecPtr mlpqOperPtr;
  OperationrecPtr queOperPtr;
  OperationrecPtr readWriteOpPtr;
  OperationrecPtr tgnptMainOpPtr;
  Uint32 cfreeopRec;
  Uint32 coprecsize;
/* --------------------------------------------------------------------------------- */
@@ -1412,7 +1413,6 @@ private:
  Uint32 turlIndex;
  Uint32 tlfrTmp1;
  Uint32 tlfrTmp2;
  Uint32 tgnptNrTransaction;
  Uint32 tscanTrid1;
  Uint32 tscanTrid2;

+167 −29
Original line number Diff line number Diff line
@@ -2033,9 +2033,7 @@ void Dbacc::insertelementLab(Signal* signal)
/* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeReadInLockQueue(Signal* signal) 
{
  tgnptMainOpPtr = queOperPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(queOperPtr.p) == 1) {
    if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && 
        (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
      /* --------------------------------------------------------------------------------- */
@@ -2118,9 +2116,7 @@ void Dbacc::placeSerialQueueRead(Signal* signal)
    checkOnlyReadEntry(signal);
    return;
  }//if
  tgnptMainOpPtr = readWriteOpPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
    jam();
    /* --------------------------------------------------------------------------------- */
    /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR     */
@@ -2201,24 +2197,23 @@ void Dbacc::checkOnlyReadEntry(Signal* signal)
/* --------------------------------------------------------------------------------- */
/* GET_NO_PARALLEL_TRANSACTION                                                       */
/* --------------------------------------------------------------------------------- */
void Dbacc::getNoParallelTransaction(Signal* signal) 
Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op) 
{
  OperationrecPtr tnptOpPtr;
  OperationrecPtr tmp;
  
  tgnptNrTransaction = 1;
  tnptOpPtr.i = tgnptMainOpPtr.p->nextParallelQue;
  while ((tnptOpPtr.i != RNIL) &&
         (tgnptNrTransaction == 1)) {
    jam();
    ptrCheckGuard(tnptOpPtr, coprecsize, operationrec);
    if ((tnptOpPtr.p->transId1 == tgnptMainOpPtr.p->transId1) &&
        (tnptOpPtr.p->transId2 == tgnptMainOpPtr.p->transId2)) {
      tnptOpPtr.i = tnptOpPtr.p->nextParallelQue;
    } else {
  tmp.i= op->nextParallelQue;
  Uint32 transId[2] = { op->transId1, op->transId2 };
  while (tmp.i != RNIL) 
  {
    jam();
      tgnptNrTransaction++;
    }//if
  }//while
    ptrCheckGuard(tmp, coprecsize, operationrec);
    if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1])
      tmp.i = tmp.p->nextParallelQue;
    else
      return 2;
  }
  return 1;
}//Dbacc::getNoParallelTransaction()

void Dbacc::moveLastParallelQueue(Signal* signal) 
@@ -2259,9 +2254,7 @@ void Dbacc::moveLastParallelQueueWrite(Signal* signal)
/* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) 
{
  tgnptMainOpPtr = queOperPtr;
  getNoParallelTransaction(signal);
  if (!((tgnptNrTransaction == 1) &&
  if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
	(queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
	(queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
    jam();
@@ -2312,9 +2305,7 @@ void Dbacc::placeSerialQueueWrite(Signal* signal)
  }//if
  readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
  ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
  tgnptMainOpPtr = readWriteOpPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
    /* --------------------------------------------------------------------------------- */
    /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR     */
    /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK.                                    */
@@ -4449,9 +4440,154 @@ void Dbacc::commitOperation(Signal* signal)
      ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
      tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
    }//if
  }//if

    /**
     * Check possible lock upgrade
     * 1) Find lock owner
     * 2) Count transactions in parallel que
     * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
     *      upgrade next serial
     */
    if(operationRecPtr.p->lockMode)
    {
      jam();
      /**
       * Committing a non shared operation can't lead to lock upgrade
       */
      return;
    }
    
    OperationrecPtr lock_owner;
    lock_owner.i = operationRecPtr.p->prevParallelQue;
    ptrCheckGuard(lock_owner, coprecsize, operationrec);
    Uint32 transid[2] = { lock_owner.p->transId1, 
			  lock_owner.p->transId2 };
    
    
    while(lock_owner.p->prevParallelQue != RNIL)
    {
      lock_owner.i = lock_owner.p->prevParallelQue;
      ptrCheckGuard(lock_owner, coprecsize, operationrec);
      
      if(lock_owner.p->transId1 != transid[0] || 
	 lock_owner.p->transId2 != transid[1])
      {
	jam();
	/**
	 * If more than 1 trans in lock queue -> no lock upgrade
	 */
	return;
      }
    }
    
    check_lock_upgrade(signal, lock_owner, operationRecPtr);
  }
}//Dbacc::commitOperation()

void
Dbacc::check_lock_upgrade(Signal* signal, 
			  OperationrecPtr lock_owner,
			  OperationrecPtr release_op)
{
  if((lock_owner.p->transId1 == release_op.p->transId1 &&
      lock_owner.p->transId2 == release_op.p->transId2) ||
     release_op.p->lockMode ||
     lock_owner.p->nextSerialQue == RNIL)
  {
    jam();
    /**
     * No lock upgrade if same trans or lock owner has no serial queue
     *                 or releasing non shared op
     */
    return;
  }

  OperationrecPtr next;
  next.i = lock_owner.p->nextSerialQue;
  ptrCheckGuard(next, coprecsize, operationrec);
  
  if(lock_owner.p->transId1 != next.p->transId1 ||
     lock_owner.p->transId2 != next.p->transId2)
  {
    jam();
    /**
     * No lock upgrad if !same trans in serial queue
     */
    return;
  }
  
  if (getNoParallelTransaction(lock_owner.p) > 1)
  {
    jam();
    /**
     * No lock upgrade if more than 1 transaction in parallell queue
     */
    return;
  }

  if (getNoParallelTransaction(next.p) > 1)
  {
    jam();
    /**
     * No lock upgrade if more than 1 transaction in next's parallell queue
     */
    return;
  }
  
  OperationrecPtr tmp;
  tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
  if(tmp.i != RNIL)
  {
    ptrCheckGuard(tmp, coprecsize, operationrec);
    ndbassert(tmp.p->prevSerialQue == next.i);
    tmp.p->prevSerialQue = lock_owner.i;
  }
  next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
  
  // Find end of parallell que
  tmp = lock_owner;
  while(tmp.p->nextParallelQue != RNIL)
  {
    jam();
    tmp.i = tmp.p->nextParallelQue;
    ptrCheckGuard(tmp, coprecsize, operationrec);
  }
  
  next.p->prevParallelQue = tmp.i;
  tmp.p->nextParallelQue = next.i;
  
  OperationrecPtr save = operationRecPtr;
  Uint32 lockMode = lock_owner.p->lockMode;

  Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads
  Uint32 ThashValue = lock_owner.p->hashValue;
  Uint32 localdata[2];
  localdata[0] = lock_owner.p->localdata[0];
  localdata[1] = lock_owner.p->localdata[1];
  do {
    next.p->elementIsDisappeared = TelementIsDisappeared;
    next.p->hashValue = ThashValue;
    next.p->localdata[0] = localdata[0];
    next.p->localdata[1] = localdata[1];
    
    operationRecPtr = next;
    next.p->lockMode = lockMode;
    TelementIsDisappeared = executeNextOperation(signal);
    if (next.p->nextParallelQue != RNIL) 
    {
      jam();
      next.i = next.p->nextParallelQue;
      ptrCheckGuard(next, coprecsize, operationrec);
    } else {
      jam();
      break;
    }//if
  } while (1);
  
  operationRecPtr = save;
  
}

/* ------------------------------------------------------------------------- */
/* RELEASELOCK                                                               */
/*          RESETS LOCK OF AN ELEMENT.                                       */
@@ -4488,6 +4624,8 @@ void Dbacc::releaselock(Signal* signal)
      ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
      trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
    }//if

    check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
    /* --------------------------------------------------------------------------------- */
    /*       SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT   */
    /*       STARTING QUEUED OPERATIONS. THUS WE CAN END HERE.                           */
+60 −12
Original line number Diff line number Diff line
@@ -547,6 +547,8 @@ runLockUpgrade1(NDBT_Context* ctx, NDBT_Step* step){
  do
  {
    CHECK(hugoOps.startTransaction(pNdb) == 0);  
    if(ctx->getProperty("LOCK_UPGRADE", 1) == 1)
    {
      CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0);
      CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
    
@@ -555,13 +557,54 @@ runLockUpgrade1(NDBT_Context* ctx, NDBT_Step* step){
      ndbout_c("wait 2");
      ctx->getPropertyWait("READ_DONE", 2);
      ndbout_c("wait 2 - done");
    }
    else
    {
      ctx->setProperty("READ_DONE", 1);
      ctx->broadcast();
      ctx->getPropertyWait("READ_DONE", 2);
      ndbout_c("wait 2 - done");
      CHECK(hugoOps.pkReadRecord(pNdb, 0, 1, NdbOperation::LM_Read) == 0);
      CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
    }
    if(ctx->getProperty("LU_OP", o_INS) == o_INS)
    {
      CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
      CHECK(hugoOps.pkInsertRecord(pNdb, 0, 1, 2) == 0);
    }
    else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD)
    {
      CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0);
    } 
    else
    {
      CHECK(hugoOps.pkDeleteRecord(pNdb, 0, 1) == 0);
    }
    ctx->setProperty("READ_DONE", 3);
    ctx->broadcast();
    ndbout_c("before update");
    CHECK(hugoOps.pkUpdateRecord(pNdb, 0, 1, 2) == 0);
    ndbout_c("wait update");
    CHECK(hugoOps.execute_NoCommit(pNdb) == 0);
    CHECK(hugoOps.closeTransaction(pNdb));
    CHECK(hugoOps.execute_Commit(pNdb) == 0);
    CHECK(hugoOps.closeTransaction(pNdb) == 0);

    CHECK(hugoOps.startTransaction(pNdb) == 0);
    CHECK(hugoOps.pkReadRecord(pNdb, 0, 1) == 0);
    int res= hugoOps.execute_Commit(pNdb);
    if(ctx->getProperty("LU_OP", o_INS) == o_INS)
    {
      CHECK(res == 0);
      CHECK(hugoOps.verifyUpdatesValue(2) == 0);
    }
    else if(ctx->getProperty("LU_OP", o_UPD) == o_UPD)
    {
      CHECK(res == 0);
      CHECK(hugoOps.verifyUpdatesValue(2) == 0);
    } 
    else
    {
      CHECK(res == 626);
    }
    
  } while(0);
  
  return result;
@@ -595,7 +638,7 @@ runLockUpgrade2(NDBT_Context* ctx, NDBT_Step* step){
    CHECK(hugoOps.execute_Commit(pNdb) == 0);      
  } while(0);

  return NDBT_FAILED;
  return result;
}

int
@@ -607,11 +650,16 @@ main(int argc, const char** argv){

  NDBT_TestSuite ts("testOperations");

  for(Uint32 i = 0; i <6; i++)
  {
    BaseString name("bug_9749");
    name.appfmt("_%d", i);
    NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts, 
						    name.c_str(), "");    
    
    pt->setProperty("LOCK_UPGRADE", 1 + (i & 1));
    pt->setProperty("LU_OP", 1 + (i >> 1));
    
    pt->addInitializer(new NDBT_Initializer(pt,
					    "runClearTable", 
					    runClearTable));