Commit a7917b1e authored by unknown's avatar unknown
Browse files

bug#11166 - ndb

  Fix potential inconsistency when running ndb_restore due to faulty parsing
    of backup log wrt inserts


ndb/src/kernel/blocks/backup/Backup.cpp:
  Make sure that entire stopGCP is in log
ndb/tools/restore/Restore.cpp:
  
    Notice: this will not group and commit the deltas listed below
    into a ChangeSet, because there are no ChangeSet comments.
    Click [Checkin] again to check in only the commented deltas,
    or type Control-l to go back and provide ChangeSet comments.
ndb/tools/restore/Restore.hpp:
  Keep track of last gci to next iteration
ndb/tools/restore/consumer_restore.cpp:
  Handle insert in log
  Only allow certain errors
parent d4e921d6
Loading
Loading
Loading
Loading
+21 −4
Original line number Diff line number Diff line
@@ -1676,13 +1676,30 @@ Backup::execWAIT_GCP_CONF(Signal* signal){
    ptr.p->masterData.sendCounter= 0;
    ptr.p->masterData.gsn = GSN_BACKUP_FRAGMENT_REQ;
    nextFragment(signal, ptr);
    return;
  } else {
    jam();
    if(gcp >= ptr.p->startGCP + 3)
    {
      CRASH_INSERTION((10009));
      ptr.p->stopGCP = gcp;
      sendDropTrig(signal, ptr); // regular dropping of triggers
      return;
    }//if
    
    /**
     * Make sure that we got entire stopGCP 
     */
    WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
    req->senderRef = reference();
    req->senderData = ptr.i;
    req->requestType = WaitGCPReq::CompleteForceStart;
    sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal, 
	       WaitGCPReq::SignalLength,JBB);
    return;
  }
}

/*****************************************************************************
 * 
 * Master functionallity - Backup fragment
+4 −4
Original line number Diff line number Diff line
@@ -765,6 +765,7 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md)
  setLogFile(md, 0);

  m_count = 0;
  m_last_gci = 0;
}

const LogEntry *
@@ -772,7 +773,6 @@ RestoreLogIterator::getNextLogEntry(int & res) {
  // Read record length
  typedef BackupFormat::LogFile::LogEntry LogE;

  Uint32 gcp= 0;
  LogE * logE= 0;
  Uint32 len= ~0;
  const Uint32 stopGCP = m_metaData.getStopGCP();
@@ -802,9 +802,9 @@ RestoreLogIterator::getNextLogEntry(int & res) {
    
    if(hasGcp){
      len--;
      gcp = ntohl(logE->Data[len-2]);
      m_last_gci = ntohl(logE->Data[len-2]);
    }
  } while(gcp > stopGCP + 1);
  } while(m_last_gci > stopGCP + 1);
  
  m_logEntry.m_table = m_metaData.getTable(logE->TableId);
  switch(logE->TriggerEvent){
+1 −0
Original line number Diff line number Diff line
@@ -361,6 +361,7 @@ private:
  const RestoreMetaData & m_metaData;

  Uint32 m_count;  
  Uint32 m_last_gci;
  LogEntry m_logEntry;
public:
  RestoreLogIterator(const RestoreMetaData &);
+31 −5
Original line number Diff line number Diff line
@@ -513,6 +513,13 @@ BackupRestore::logEntry(const LogEntry & tup)
    exit(-1);
  }

  if (check != 0) 
  {
    err << "Error defining op: " << trans->getNdbError() << endl;
    exit(-1);
  } // if
  
  Bitmask<4096> keys;
  for (Uint32 i= 0; i < tup.size(); i++) 
  {
    const AttributeS * attr = tup[i];
@@ -525,9 +532,21 @@ BackupRestore::logEntry(const LogEntry & tup)

    const Uint32 length = (size / 8) * arraySize;
    if (attr->Desc->m_column->getPrimaryKey())
      op->equal(attr->Desc->attrId, dataPtr, length);
    {
      if(!keys.get(attr->Desc->attrId))
      {
	keys.set(attr->Desc->attrId);
	check= op->equal(attr->Desc->attrId, dataPtr, length);
      }
    }
    else
      op->setValue(attr->Desc->attrId, dataPtr, length);
      check= op->setValue(attr->Desc->attrId, dataPtr, length);
    
    if (check != 0) 
    {
      err << "Error defining op: " << trans->getNdbError() << endl;
      exit(-1);
    } // if
  }
  
  const int ret = trans->execute(Commit);
@@ -536,18 +555,25 @@ BackupRestore::logEntry(const LogEntry & tup)
    // Both insert update and delete can fail during log running
    // and it's ok
    // TODO: check that the error is either tuple exists or tuple does not exist?
    bool ok= false;
    NdbError errobj= trans->getNdbError();
    switch(tup.m_type)
    {
    case LogEntry::LE_INSERT:
      if(errobj.status == NdbError::PermanentError &&
	 errobj.classification == NdbError::ConstraintViolation)
	ok= true;
      break;
    case LogEntry::LE_UPDATE:
      break;
    case LogEntry::LE_DELETE:
      if(errobj.status == NdbError::PermanentError &&
	 errobj.classification == NdbError::NoDataFound)
	ok= true;
      break;
    }
    if (false)
    if (!ok)
    {
      err << "execute failed: " << trans->getNdbError() << endl;
      err << "execute failed: " << errobj << endl;
      exit(-1);
    }
  }