Commit 06c001ec authored by unknown's avatar unknown
Browse files

BUG#9749 - ndb lock upgrade - more fixes...

1) Make getNoParall into function instead of a procedure
2) Check for multiple transactions in "upgrade's" parallell queue
3) Set lock mode according to lock_owner's lockMode

NOTE: Does still not handle lock upgrade in case of aborts correctly


ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  Make getNoParall into function instead of a procedure
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  1) Make getNoParall into function instead of a procedure
  2) Check for multiple transactions in "upgrade's" parallell queue
  3) Set lock mode according to lock_owner's lockMode
parent 59acb039
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -1022,7 +1022,7 @@ private:
  Uint32 placeReadInLockQueue(Signal* signal);
  void placeSerialQueueRead(Signal* signal);
  void checkOnlyReadEntry(Signal* signal);
  void getNoParallelTransaction(Signal* signal);
  Uint32 getNoParallelTransaction(const Operationrec*);
  void moveLastParallelQueue(Signal* signal);
  void moveLastParallelQueueWrite(Signal* signal);
  Uint32 placeWriteInLockQueue(Signal* signal);
@@ -1265,7 +1265,6 @@ private:
  OperationrecPtr mlpqOperPtr;
  OperationrecPtr queOperPtr;
  OperationrecPtr readWriteOpPtr;
  OperationrecPtr tgnptMainOpPtr;
  Uint32 cfreeopRec;
  Uint32 coprecsize;
/* --------------------------------------------------------------------------------- */
@@ -1516,7 +1515,6 @@ private:
  Uint32 turlIndex;
  Uint32 tlfrTmp1;
  Uint32 tlfrTmp2;
  Uint32 tgnptNrTransaction;
  Uint32 tudqeIndex;
  Uint32 tscanTrid1;
  Uint32 tscanTrid2;
+33 −36
Original line number Diff line number Diff line
@@ -1936,9 +1936,7 @@ void Dbacc::insertelementLab(Signal* signal)
/* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeReadInLockQueue(Signal* signal) 
{
  tgnptMainOpPtr = queOperPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(queOperPtr.p) == 1) {
    if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) && 
        (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
      /* --------------------------------------------------------------------------------- */
@@ -2021,9 +2019,7 @@ void Dbacc::placeSerialQueueRead(Signal* signal)
    checkOnlyReadEntry(signal);
    return;
  }//if
  tgnptMainOpPtr = readWriteOpPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
    jam();
    /* --------------------------------------------------------------------------------- */
    /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR     */
@@ -2104,24 +2100,23 @@ void Dbacc::checkOnlyReadEntry(Signal* signal)
/* --------------------------------------------------------------------------------- */
/* GET_NO_PARALLEL_TRANSACTION                                                       */
/* --------------------------------------------------------------------------------- */
void Dbacc::getNoParallelTransaction(Signal* signal) 
Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op) 
{
  OperationrecPtr tnptOpPtr;
  OperationrecPtr tmp;
  
  tgnptNrTransaction = 1;
  tnptOpPtr.i = tgnptMainOpPtr.p->nextParallelQue;
  while ((tnptOpPtr.i != RNIL) &&
         (tgnptNrTransaction == 1)) {
    jam();
    ptrCheckGuard(tnptOpPtr, coprecsize, operationrec);
    if ((tnptOpPtr.p->transId1 == tgnptMainOpPtr.p->transId1) &&
        (tnptOpPtr.p->transId2 == tgnptMainOpPtr.p->transId2)) {
      tnptOpPtr.i = tnptOpPtr.p->nextParallelQue;
    } else {
  tmp.i= op->nextParallelQue;
  Uint32 transId[2] = { op->transId1, op->transId2 };
  while (tmp.i != RNIL) 
  {
    jam();
      tgnptNrTransaction++;
    }//if
  }//while
    ptrCheckGuard(tmp, coprecsize, operationrec);
    if (tmp.p->transId1 == transId[0] && tmp.p->transId2 == transId[1])
      tmp.i = tmp.p->nextParallelQue;
    else
      return 2;
  }
  return 1;
}//Dbacc::getNoParallelTransaction()
void Dbacc::moveLastParallelQueue(Signal* signal) 
@@ -2162,9 +2157,7 @@ void Dbacc::moveLastParallelQueueWrite(Signal* signal)
/* --------------------------------------------------------------------------------- */
Uint32 Dbacc::placeWriteInLockQueue(Signal* signal) 
{
  tgnptMainOpPtr = queOperPtr;
  getNoParallelTransaction(signal);
  if (!((tgnptNrTransaction == 1) &&
  if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
	(queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
	(queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
    jam();
@@ -2215,9 +2208,7 @@ void Dbacc::placeSerialQueueWrite(Signal* signal)
  }//if
  readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
  ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
  tgnptMainOpPtr = readWriteOpPtr;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction == 1) {
  if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
    /* --------------------------------------------------------------------------------- */
    /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR     */
    /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK.                                    */
@@ -5878,9 +5869,7 @@ Dbacc::check_lock_upgrade(Signal* signal,
    return;
  }
  
  tgnptMainOpPtr = lock_owner;
  getNoParallelTransaction(signal);
  if (tgnptNrTransaction > 1)
  if (getNoParallelTransaction(lock_owner.p) > 1)
  {
    jam();
    /**
@@ -5889,6 +5878,15 @@ Dbacc::check_lock_upgrade(Signal* signal,
    return;
  }
  if (getNoParallelTransaction(next.p) > 1)
  {
    jam();
    /**
     * No lock upgrade if more than 1 transaction in next's parallell queue
     */
    return;
  }
  
  OperationrecPtr tmp;
  tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
  if(tmp.i != RNIL)
@@ -5901,19 +5899,18 @@ Dbacc::check_lock_upgrade(Signal* signal,
  
  // Find end of parallell que
  tmp = lock_owner;
  tmp.p->lockMode= 1;
  while(tmp.p->nextParallelQue != RNIL)
  {
    jam();
    tmp.i = tmp.p->nextParallelQue;
    ptrCheckGuard(tmp, coprecsize, operationrec);
    tmp.p->lockMode= 1;
  }
  
  next.p->prevParallelQue = tmp.i;
  tmp.p->nextParallelQue = next.i;
  
  OperationrecPtr save = operationRecPtr;
  Uint32 lockMode = lock_owner.p->lockMode;
  Uint32 TelementIsDisappeared = 0; // lock upgrade = all reads
  Uint32 ThashValue = lock_owner.p->hashValue;
@@ -5927,7 +5924,7 @@ Dbacc::check_lock_upgrade(Signal* signal,
    next.p->localdata[1] = localdata[1];
    
    operationRecPtr = next;
    ndbassert(next.p->lockMode);
    next.p->lockMode = lockMode;
    TelementIsDisappeared = executeNextOperation(signal);
    if (next.p->nextParallelQue != RNIL) 
    {