Commit 37865bc9 authored by unknown's avatar unknown
Browse files

bug#6995 - ndb

don't store keys in normalized form
instead save everythings as normal attributes               


ndb/src/kernel/blocks/backup/Backup.cpp:
  Remove special handling of keys, that was build to support
   tables where keys was only stored in ACC
ndb/src/kernel/blocks/backup/Backup.hpp:
  Remove special handling of keys, that was build to support
   tables where keys was only stored in ACC
ndb/src/kernel/blocks/backup/BackupInit.cpp:
  Remove special handling of keys, that was build to support
   tables where keys was only stored in ACC
ndb/tools/restore/Restore.cpp:
  Remove special handling of keys, that was build to support
   tables where keys was only stored in ACC
ndb/tools/restore/Restore.hpp:
  Remove special handling of keys, that was build to support
   tables where keys was only stored in ACC
parent b1f4a482
Loading
Loading
Loading
Loading
+21 −102
Original line number Diff line number Diff line
@@ -1265,10 +1265,6 @@ Backup::createAttributeMask(TablePtr tabPtr,
    jam();
    AttributePtr attr;
    table.attributes.getPtr(attr, i);
    if(attr.p->data.key != 0){
      jam();
      continue;
    }
    mask.set(i);
  }
}
@@ -2954,12 +2950,9 @@ Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)

  tabPtr.p->schemaVersion = tmpTab.TableVersion;
  tabPtr.p->noOfAttributes = tmpTab.NoOfAttributes;
  tabPtr.p->noOfKeys = tmpTab.NoOfKeyAttr;
  tabPtr.p->noOfNull = 0;
  tabPtr.p->noOfVariable = 0; // Computed while iterating over attribs
  tabPtr.p->sz_FixedKeys = 0; // Computed while iterating over attribs
  tabPtr.p->sz_FixedAttributes = 0; // Computed while iterating over attribs
  tabPtr.p->variableKeyId = RNIL;   // Computed while iterating over attribs
  tabPtr.p->triggerIds[0] = ILLEGAL_TRIGGER_ID;
  tabPtr.p->triggerIds[1] = ILLEGAL_TRIGGER_ID;
  tabPtr.p->triggerIds[2] = ILLEGAL_TRIGGER_ID;
@@ -2994,7 +2987,6 @@ Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
    
    attrPtr.p->data.nullable = tmp.AttributeNullableFlag;
    attrPtr.p->data.fixed = (tmp.AttributeArraySize != 0);
    attrPtr.p->data.key = tmp.AttributeKeyFlag;
    attrPtr.p->data.sz32 = sz32;

    /**
@@ -3002,12 +2994,7 @@ Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
     * 1) Fixed
     * 2) Nullable
     * 3) Variable
     * 4) Fixed key
     * 5) Variable key
     */
    if(attrPtr.p->data.key == false) {
      jam();
      
    if(attrPtr.p->data.fixed == true && attrPtr.p->data.nullable == false) {
      jam();
      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
@@ -3029,25 +3016,6 @@ Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
      ndbrequire(0);
    }//if
    
    } else if(attrPtr.p->data.key == true) {
      jam();
      ndbrequire(attrPtr.p->data.nullable == false);
      
      if(attrPtr.p->data.fixed == true) { // Fixed key
	jam();
	tabPtr.p->sz_FixedKeys += sz32;
      }//if
      
      if(attrPtr.p->data.fixed == false) { // Variable key
	jam();
	attrPtr.p->data.offset = 0;
	tabPtr.p->noOfVariable++;
	ndbrequire(tabPtr.p->variableKeyId == RNIL); // Only one variable key
	tabPtr.p->variableKeyId = attrPtr.i;
	ndbrequire(0);
      }//if
    }//if
    
    it.next(); // Move Past EndOfAttribute
  }//for
  return tabPtr;
@@ -3355,7 +3323,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
    Table & table = * tabPtr.p;
    ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
    const Uint32 parallelism = 16;
    const Uint32 attrLen = 5 + table.noOfAttributes - table.noOfKeys;
    const Uint32 attrLen = 5 + table.noOfAttributes;

    req->senderData = filePtr.i;
    req->resultRef = reference();
@@ -3366,7 +3334,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
    req->tableId = table.tableId;
    ScanFragReq::setLockMode(req->requestInfo, 0);
    ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
    ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
    ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
    ScanFragReq::setAttrLen(req->requestInfo,attrLen); 
    req->transId1 = 0;
    req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
@@ -3381,7 +3349,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
    signal->theData[2] = (BACKUP << 20) + (getOwnNodeId() << 8);
    
    // Return all
    signal->theData[3] = table.noOfAttributes - table.noOfKeys;
    signal->theData[3] = table.noOfAttributes;
    signal->theData[4] = 0;
    signal->theData[5] = 0;
    signal->theData[6] = 0;
@@ -3393,10 +3361,6 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
      jam();
      AttributePtr attr;
      table.attributes.getPtr(attr, i);
      if(attr.p->data.key != 0) {
	jam();
	continue;
      }//if
      
      AttributeHeader::init(&signal->theData[dataPos], i, 0);
      dataPos++;
@@ -3506,64 +3470,19 @@ Backup::execTRANSID_AI(Signal* signal)
  }
}

void
Backup::execKEYINFO20(Signal* signal)
{
  jamEntry();
  
  const Uint32 filePtrI = signal->theData[0];
  const Uint32 keyLen   = signal->theData[1];
  //const Uint32 scanInfo = signal->theData[2];
  //const Uint32 transId1 = signal->theData[3];
  //const Uint32 transId2 = signal->theData[4];
  const Uint32 dataLen  = signal->length() - 5;

  BackupFilePtr filePtr;
  c_backupFilePool.getPtr(filePtr, filePtrI);
  
  OperationRecord & op = filePtr.p->operation;
  
  /**
   * Unpack data
   */
  ndbrequire(keyLen == dataLen);
  const Uint32 * src = &signal->theData[5];
  const Uint32 klFixed = op.getFixedKeySize();
  ndbrequire(keyLen >= klFixed);
  
  Uint32 * dst = op.newKey();
  memcpy(dst, src, klFixed << 2);
  
  const Uint32 szLeft = (keyLen - klFixed);
  if(szLeft > 0) {
    jam();
    src += klFixed;
    dst = op.newVariableKey(szLeft);
    memcpy(dst, src, (szLeft << 2));
    ndbrequire(0);
  }//if
  
  if(op.finished()){
    jam();
    op.newRecord(op.dst);
  }
}

void 
Backup::OperationRecord::init(const TablePtr & ptr)
{
  
  tablePtr = ptr.i;
  noOfAttributes = (ptr.p->noOfAttributes - ptr.p->noOfKeys) + 1;
  variableKeyId = ptr.p->variableKeyId;
  noOfAttributes = ptr.p->noOfAttributes;
  
  sz_Bitmask = (ptr.p->noOfNull + 31) >> 5;
  sz_FixedKeys = ptr.p->sz_FixedKeys;
  sz_FixedAttribs = ptr.p->sz_FixedAttributes;

  if(ptr.p->noOfVariable == 0) {
    jam();
    maxRecordSize = 1 + sz_Bitmask + sz_FixedKeys + sz_FixedAttribs;
    maxRecordSize = 1 + sz_Bitmask + sz_FixedAttribs;
  } else {
    jam();
    maxRecordSize = 
+3 −37
Original line number Diff line number Diff line
@@ -76,7 +76,6 @@ protected:
   */
  void execSCAN_HBREP(Signal* signal);
  void execTRANSID_AI(Signal* signal);
  void execKEYINFO20(Signal* signal);
  void execSCAN_FRAGREF(Signal* signal);
  void execSCAN_FRAGCONF(Signal* signal);

@@ -172,8 +171,8 @@ public:
    struct Data {
      Uint8 nullable;
      Uint8 fixed;
      Uint8 key; 
      Uint8 unused; 
      Uint8 unused2;
      Uint32 sz32;       // No of 32 bit words
      Uint32 offset;     // Relative DataFixedAttributes/DataFixedKeys
      Uint32 offsetNull; // In NullBitmask
@@ -199,12 +198,9 @@ public:
    Uint32 frag_mask;
    Uint32 tableType;
    Uint32 noOfNull;
    Uint32 noOfKeys;
    Uint32 noOfAttributes;
    Uint32 noOfVariable;
    Uint32 sz_FixedKeys;
    Uint32 sz_FixedAttributes;
    Uint32 variableKeyId;
    Uint32 triggerIds[3];
    bool   triggerAllocated[3];
    
@@ -224,7 +220,6 @@ public:
     * Once per table
     */
    void init(const TablePtr & ptr);
    inline Uint32 getFixedKeySize() const { return sz_FixedKeys; }
    
    /**
     * Once per fragment
@@ -247,23 +242,19 @@ public:
    /**
     * Per attribute
     */
    Uint32 * newKey();
    void     nullAttribute(Uint32 nullOffset);
    Uint32 * newNullable(Uint32 attrId, Uint32 sz);
    Uint32 * newAttrib(Uint32 offset, Uint32 sz);
    Uint32 * newVariable(Uint32 id, Uint32 sz);
    Uint32 * newVariableKey(Uint32 sz);
    
  private:
    Uint32* base; 
    Uint32* dst_Length;
    Uint32* dst_Bitmask;
    Uint32* dst_FixedKeys;
    Uint32* dst_FixedAttribs;
    BackupFormat::DataFile::VariableData* dst_VariableData;
    
    Uint32 noOfAttributes; // No of Attributes
    Uint32 variableKeyId;  // Id of variable key
    Uint32 attrLeft;       // No of attributes left

    Uint32 opNoDone;
@@ -289,7 +280,6 @@ public:
     * sizes of part
     */
    Uint32 sz_Bitmask;
    Uint32 sz_FixedKeys;
    Uint32 sz_FixedAttribs;

  public:
@@ -628,7 +618,6 @@ Backup::OperationRecord::newRecord(Uint32 * p){
  base = p;
  dst_Length       = p; p += 1;
  dst_Bitmask      = p; p += sz_Bitmask;
  dst_FixedKeys    = p; p += sz_FixedKeys;
  dst_FixedAttribs = p; p += sz_FixedAttribs;
  dst_VariableData = (BackupFormat::DataFile::VariableData*)p;
  BitmaskImpl::clear(sz_Bitmask, dst_Bitmask);
@@ -645,14 +634,6 @@ Backup::OperationRecord::newAttrib(Uint32 offset, Uint32 sz){
  return dst;
}

inline
Uint32 *
Backup::OperationRecord::newKey(){
  attrLeft --;
  attrSzLeft = 0;
  return dst_FixedKeys;
}

inline
void
Backup::OperationRecord::nullAttribute(Uint32 offsetNull){
@@ -691,21 +672,6 @@ Backup::OperationRecord::newVariable(Uint32 id, Uint32 sz){
  return dst;
}

inline
Uint32 *
Backup::OperationRecord::newVariableKey(Uint32 sz){
  attrLeft--;
  attrSzLeft = 0;
  attrSzTotal += sz;
  
  dst = &dst_VariableData->Data[0];
  dst_VariableData->Sz = htonl(sz);
  dst_VariableData->Id = htonl(variableKeyId);
  
  dst_VariableData = (BackupFormat::DataFile::VariableData *)(dst + sz);
  return dst;
}

inline
bool
Backup::OperationRecord::finished(){
@@ -713,7 +679,7 @@ Backup::OperationRecord::finished(){
    return false;
  }
  
  opLen += attrSzTotal + sz_FixedKeys;
  opLen += attrSzTotal;
  opNoDone++;
  
  scanStop = dst = (Uint32 *)dst_VariableData;
+0 −1
Original line number Diff line number Diff line
@@ -126,7 +126,6 @@ Backup::Backup(const Configuration & conf) :
  
  addRecSignal(GSN_SCAN_HBREP, &Backup::execSCAN_HBREP);
  addRecSignal(GSN_TRANSID_AI, &Backup::execTRANSID_AI);
  addRecSignal(GSN_KEYINFO20, &Backup::execKEYINFO20);
  addRecSignal(GSN_SCAN_FRAGREF, &Backup::execSCAN_FRAGREF);
  addRecSignal(GSN_SCAN_FRAGCONF, &Backup::execSCAN_FRAGCONF);

+0 −27
Original line number Diff line number Diff line
@@ -334,27 +334,6 @@ RestoreDataIterator::getNextTuple(int & res)
  Uint32 *buf_ptr = (Uint32*)_buf_ptr, *ptr = buf_ptr;
  ptr += m_currentTable->m_nullBitmaskSize;
  Uint32 i;
  for(i= 0; i < m_currentTable->m_fixedKeys.size(); i++){
    assert(ptr < buf_ptr + dataLength);
 
    const Uint32 attrId = m_currentTable->m_fixedKeys[i]->attrId;

    AttributeData * attr_data = m_tuple.getData(attrId);
    const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);

    const Uint32 sz = attr_desc->getSizeInWords();

    attr_data->null = false;
    attr_data->void_value = ptr;

    if(!Twiddle(attr_desc, attr_data))
      {
	res = -1;
	return NULL;
      }
    ptr += sz;
  }

  for(i = 0; i < m_currentTable->m_fixedAttribs.size(); i++){
    assert(ptr < buf_ptr + dataLength);

@@ -699,12 +678,6 @@ void TableS::createAttr(NdbDictionary::Column *column)
  if (d->m_column->getAutoIncrement())
    m_auto_val_id= d->attrId;

  if(d->m_column->getPrimaryKey() /* && not variable */)
  {
    m_fixedKeys.push_back(d);
    return;
  }

  if(!d->m_column->getNullable())
  {
    m_fixedAttribs.push_back(d);
+0 −2
Original line number Diff line number Diff line
@@ -123,8 +123,6 @@ class TableS {
  Uint32 schemaVersion;
  Uint32 backupVersion;
  Vector<AttributeDesc *> allAttributesDesc;
  Vector<AttributeDesc *> m_fixedKeys;
  //Vector<AttributeDesc *> m_variableKey; 
  Vector<AttributeDesc *> m_fixedAttribs;
  Vector<AttributeDesc *> m_variableAttribs;